Skip to content

Commit

Permalink
[BEAM-244] Add batching for write, use preparedStatementSetter
Browse files Browse the repository at this point in the history
  • Loading branch information
jbonofre committed Sep 25, 2016
1 parent d79d21d commit a75fcfd
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.List;

import javax.annotation.Nullable;
import javax.sql.DataSource;
Expand Down Expand Up @@ -57,14 +59,25 @@
*
* pipeline.apply(JdbcIO.read()
* .withDataSource(myDataSource)
* .withRowMapper(new JdbcIO.RowMapper<MyElement>() {
* public MyElement mapRow(ResultSet resultSet) {
* // use the resultSet to build the element of the PCollection
* // for instance:
* // return resultSet.getString(2);
* }
* })
*
* }
* </pre>
* <p>
* You can find an full {@link RowMapper} example in {@code JdbcIOTest}.
* </p>
* <h3>Writing to JDBC datasource</h3>
* <p>
* JDBC sink supports writing records into a database. It expects a
* {@code PCollection<T>}, converts the {@code T} elements as SQL statement
* and insert into the database. T is the type expected by the provided {@link ElementInserter}.
* and setParameters into the database. T is the type expected by the
* provided {@link PreparedStatementSetter}.
* </p>
* <p>
* Like the source, to configure JDBC sink, you have to provide a datasource. For instance:
Expand All @@ -74,10 +87,22 @@
*
* pipeline
* .apply(...)
* .apply(JdbcIO.write().withDataSource(myDataSource)
* .apply(JdbcIO.write()
* .withDataSource(myDataSource)
* .withQuery(query)
* .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<MyElement>() {
* public void setParameters(MyElement element, PreparedStatement statement) {
* // use the PCollection element to set parameters of the SQL statement used to insert
* // in the database
* // for instance: statement.setString(0, element.toString());
* }
* })
*
* }
* </pre>
* <p>
* You can find a full {@link PreparedStatementSetter} in {@code JdbcIOTest}.
* </p>
*/
public class JdbcIO {

Expand All @@ -98,7 +123,8 @@ public static Read<?> read() {
* @return a {@link Write} {@link PTransform}.
*/
public static Write<?> write() {
return new Write(new Write.JdbcWriter(null, null, null, null));
return new Write(new Write.JdbcWriter(null, null, null, null,
null, 1024L));
}

private JdbcIO() {
Expand All @@ -110,9 +136,7 @@ private JdbcIO() {
* object used in the {@link PCollection}.
*/
public interface RowMapper<T> extends Serializable {

T mapRow(ResultSet resultSet);

}

/**
Expand Down Expand Up @@ -261,9 +285,7 @@ public void processElement(ProcessContext context) throws Exception {
try (ResultSet resultSet = statement.executeQuery()) {
while (resultSet.next()) {
T record = rowMapper.mapRow(resultSet);
if (record != null) {
context.output(record);
}
}
}
}
Expand All @@ -273,15 +295,11 @@ public void processElement(ProcessContext context) throws Exception {
}

/**
* An interface used by the JdbcIO Write for mapping {@link PCollection} elements as a rows of a
* ResultSet on a per-row basis.
* Implementations of this interface perform the actual work of mapping each row to a result
* object used in the {@link PCollection}.
* An interface used by the JdbcIO Write to set the parameters of the {@link PreparedStatement}
* used to setParameters into the database.
*/
public interface ElementInserter<T> extends Serializable {

PreparedStatement insert(T element, Connection connection);

public interface PreparedStatementSetter<T> extends Serializable {
void setParameters(T element, PreparedStatement preparedStatement) throws Exception;
}

/**
Expand All @@ -293,6 +311,10 @@ public Write<T> withDataSource(DataSource dataSource) {
return new Write<>(writer.withDataSource(dataSource));
}

public Write<T> withQuery(String query) {
return new Write<>(writer.withQuery(query));
}

public Write<T> withUsername(String username) {
return new Write<>(writer.withUsername(username));
}
Expand All @@ -301,8 +323,13 @@ public Write<T> withPassword(String password) {
return new Write<>(writer.withPassword(password));
}

public <X> Write<X> withElementInserter(ElementInserter<X> elementInserter) {
return new Write<>(writer.withElementInserter(elementInserter));
public <X> Write<X> withPreparedStatementSetter(
PreparedStatementSetter<X> preparedStatementSetter) {
return new Write<>(writer.withPreparedStatementSetter(preparedStatementSetter));
}

public Write<T> withBatchSize(long batchSize) {
return new Write<>(writer.withBatchSize(batchSize));
}

private final JdbcWriter writer;
Expand All @@ -325,39 +352,60 @@ public void validate(PCollection<T> input) {
private static class JdbcWriter<T> extends DoFn<T, Void> {

private final DataSource dataSource;
private final String query;
private final String username;
private final String password;
private final ElementInserter<T> elementInserter;
private final PreparedStatementSetter<T> preparedStatementSetter;
private long batchSize;

private Connection connection;
private List<T> batch;

public JdbcWriter(DataSource dataSource, String username, String password,
ElementInserter<T> elementInserter) {
public JdbcWriter(DataSource dataSource, String query, String username, String password,
PreparedStatementSetter<T> preparedStatementSetter, long batchSize) {
this.dataSource = dataSource;
this.query = query;
this.username = username;
this.password = password;
this.elementInserter = elementInserter;
this.preparedStatementSetter = preparedStatementSetter;
this.batchSize = batchSize;
}

public JdbcWriter<T> withDataSource(DataSource dataSource) {
return new JdbcWriter<>(dataSource, username, password, elementInserter);
return new JdbcWriter<>(dataSource, query, username, password, preparedStatementSetter,
batchSize);
}

public JdbcWriter<T> withQuery(String query) {
return new JdbcWriter<>(dataSource, query, username, password, preparedStatementSetter,
batchSize);
}

public JdbcWriter<T> withUsername(String username) {
return new JdbcWriter<>(dataSource, username, password, elementInserter);
return new JdbcWriter<>(dataSource, query, username, password, preparedStatementSetter,
batchSize);
}

public JdbcWriter<T> withPassword(String password) {
return new JdbcWriter<>(dataSource, username, password, elementInserter);
return new JdbcWriter<>(dataSource, query, username, password, preparedStatementSetter,
batchSize);
}

public JdbcWriter<T> withPreparedStatementSetter(
PreparedStatementSetter<T> preparedStatementSetter) {
return new JdbcWriter<>(dataSource, query, username, password, preparedStatementSetter,
batchSize);
}

public JdbcWriter<T> withElementInserter(ElementInserter<T> elementInserter) {
return new JdbcWriter<>(dataSource, username, password, elementInserter);
public JdbcWriter<T> withBatchSize(long batchSize) {
return new JdbcWriter<>(dataSource, query, username, password, preparedStatementSetter,
batchSize);
}

public void validate() {
Preconditions.checkNotNull(dataSource, "dataSource");
Preconditions.checkNotNull(elementInserter, "elementInserter");
Preconditions.checkNotNull(query, "query");
Preconditions.checkNotNull(preparedStatementSetter, "preparedStatementSetter");
}

@Setup
Expand All @@ -367,29 +415,40 @@ public void connectToDatabase() throws Exception {
} else {
connection = dataSource.getConnection();
}
connection.setAutoCommit(false);
connection.setAutoCommit(true);
}

@StartBundle
public void startBundle(Context context) {
batch = new ArrayList<>();
}

@ProcessElement
public void processElement(ProcessContext context) {
public void processElement(ProcessContext context) throws Exception {
T record = context.element();
try {
PreparedStatement statement = elementInserter.insert(record, connection);
if (statement != null) {

batch.add(record);
if (batch.size() >= batchSize) {
finishBundle(context);
}
}

@FinishBundle
public void finishBundle(Context context) throws Exception {
for (T record : batch) {
try {
PreparedStatement statement = connection.prepareStatement(query);
preparedStatementSetter.setParameters(record, statement);
try {
statement.executeUpdate();
} finally {
statement.close();
}
} catch (Exception e) {
LOGGER.error("Can't insert into database", e);
}
} catch (Exception e) {
LOGGER.warn("Can't insert data into table", e);
}
}

@FinishBundle
public void finishBundle(Context context) throws Exception {
connection.commit();
batch.clear();
}

@Teardown
Expand Down

0 comments on commit a75fcfd

Please sign in to comment.