Java 8 ETL toolkit without dependencies. Enhydrator reads table-like structures, filters, transforms and writes them back.
<dependency>
<groupId>com.airhacks</groupId>
<artifactId>enhydrator</artifactId>
<version>[RECENT-VERSION]</version>
</dependency>
Enhydrator reads the data from a Source
, filters, transforms and writes it back to a Sink
.
The source is responsible for converting an external information into an Iterable
of Row
s.
@FunctionalInterface
public interface Source {
Iterable<Row> query(String query, Object… params);
default Iterable<Row> query() {
return this.query(null);
}
}
Enhydrator ships with CSVFileSource
, CSVStreamSource
, JDBCSource
, ScriptableSource
and VirtualSinkSource
(a in-memory source and sink at the same time).
The essential data structure is Row
. A row comprises Column
s accessible by index and / or a name:
public class Row {
private final Map<String, Column> columnByName;
private final Map<Integer, Column> columnByIndex;
//…
}
A Column holds an index, name and an optional value:
public class Column implements Cloneable {
private int index;
private String name;
private Optional<Object> value;
//…
}
Sink is the Source’s counterpart:
public abstract class Sink implements AutoCloseable {
public abstract void processRow(Row entries);
}
Each transformed Row
is passed to the Sink. Enhydrator ships with CSVFileSink
, JDBCSink
, LogSink
, PojoSink
(a Row
to Object mapper), RowSink
and VirtualSinkSource
.
Filter expression is a JavaScript (Nashorn) snippet evaluated against the current row. The script has to return a Boolean true
. Anything else is going to be interpreted as false
and will skip the processing of current row.
The current Row
instance is passed to the script as a variable $ROW
. In addition to the current Row, also $MEMORY
(a map-like structure available for the entire processing pipeline), $EMPTY
(an empty row) and also programmatically passed variables are accessible.
Each row is going to be transformed according to the following schema:
- All configured filter expressions are evaluated against the current row and have to return
true
. - Pre-Row transformations are executed. A row transformation is a function:
Function<Row, Row>
. "Row in, Row out" - Row expressions are executed agains the current row with the same variables (
$ROW
,$EMPTY
etc.) as filters. A row expression does not have to return anything (isvoid
). - Column transformations are executed on the actual values:
Function<Object, Object>
of theColumn
. - Post-Row transformations are executed as in 2.
- The remaining
Row
is passed to the Sink instance.
The following language.csv
file is filtered for Language "java" and the corresponding column "rank" is converted to an Integer
language;rank
java;1
c;2
cobol;3
esoteric;4
The following test should pass. See the origin test FromJsonToCSVTest.java:
@Test
public void filterAndCastFromCSVFileToLog() {
Source source = new CSVFileSource(INPUT + "/languages.csv", ";", "utf-8", true);
VirtualSinkSource sink = new VirtualSinkSource();
Pump pump = new Pump.Engine().
from(source).
filter("$ROW.getColumnValue('language') === 'java'").
startWith(new DatatypeNameMapper().addMapping("rank", Datatype.INTEGER)).
to(sink).
to(new LogSink()).
build();
Memory memory = pump.start();
assertFalse(memory.areErrorsOccured());
assertThat(memory.getProcessedRowCount(), is(5l));
//expecting only "java" language
assertThat(sink.getNumberOfRows(), is(1));
String languageValue = (String) sink.getRow(0).getColumnValue("language");
assertThat(languageValue, is("java"));
//expecting "java" having rank 1 as Integer
Object rankValue = sink.getRow(0).getColumnValue("rank");
assertTrue(rankValue instanceof Integer);
assertThat((Integer) rankValue, is(1));
}