Skip to content

Commit

Permalink
[BEAM-11873] Add support for writes with returning values in JdbcIO
Browse files Browse the repository at this point in the history
  • Loading branch information
Aydar Zaynutdinov authored and calvinleungyk committed Sep 22, 2021
1 parent 30ca3fa commit f2b9a13
Show file tree
Hide file tree
Showing 6 changed files with 477 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@
*/
package org.apache.beam.sdk.io.common;

import static org.junit.Assert.assertEquals;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Optional;
import javax.sql.DataSource;
import org.apache.beam.sdk.values.KV;
import org.postgresql.ds.PGSimpleDataSource;

/** This class contains helper methods to ease database usage in tests. */
Expand Down Expand Up @@ -104,4 +108,26 @@ public static void createTableWithStatement(DataSource dataSource, String stmt)
}
}
}

public static ArrayList<KV<Integer, String>> getTestDataToWrite(long rowsToAdd) {
ArrayList<KV<Integer, String>> data = new ArrayList<>();
for (int i = 0; i < rowsToAdd; i++) {
KV<Integer, String> kv = KV.of(i, "Test");
data.add(kv);
}
return data;
}

public static void assertRowCount(DataSource dataSource, String tableName, int expectedRowCount)
throws SQLException {
try (Connection connection = dataSource.getConnection()) {
try (Statement statement = connection.createStatement()) {
try (ResultSet resultSet = statement.executeQuery("select count(*) from " + tableName)) {
resultSet.next();
int count = resultSet.getInt(1);
assertEquals(expectedRowCount, count);
}
}
}
}
}
275 changes: 265 additions & 10 deletions sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ public static <T> ReadWithPartitions<T> readWithPartitions() {
* @param <T> Type of the data to be written.
*/
public static <T> Write<T> write() {
return new Write();
return new Write<>();
}

public static <T> WriteVoid<T> writeVoid() {
Expand Down Expand Up @@ -1283,43 +1283,43 @@ public static class Write<T> extends PTransform<PCollection<T>, PDone> {

/** See {@link WriteVoid#withDataSourceConfiguration(DataSourceConfiguration)}. */
public Write<T> withDataSourceConfiguration(DataSourceConfiguration config) {
return new Write(inner.withDataSourceConfiguration(config));
return new Write<>(inner.withDataSourceConfiguration(config));
}

/** See {@link WriteVoid#withDataSourceProviderFn(SerializableFunction)}. */
public Write<T> withDataSourceProviderFn(
SerializableFunction<Void, DataSource> dataSourceProviderFn) {
return new Write(inner.withDataSourceProviderFn(dataSourceProviderFn));
return new Write<>(inner.withDataSourceProviderFn(dataSourceProviderFn));
}

/** See {@link WriteVoid#withStatement(String)}. */
public Write<T> withStatement(String statement) {
return new Write(inner.withStatement(statement));
return new Write<>(inner.withStatement(statement));
}

/** See {@link WriteVoid#withPreparedStatementSetter(PreparedStatementSetter)}. */
public Write<T> withPreparedStatementSetter(PreparedStatementSetter<T> setter) {
return new Write(inner.withPreparedStatementSetter(setter));
return new Write<>(inner.withPreparedStatementSetter(setter));
}

/** See {@link WriteVoid#withBatchSize(long)}. */
public Write<T> withBatchSize(long batchSize) {
return new Write(inner.withBatchSize(batchSize));
return new Write<>(inner.withBatchSize(batchSize));
}

/** See {@link WriteVoid#withRetryStrategy(RetryStrategy)}. */
public Write<T> withRetryStrategy(RetryStrategy retryStrategy) {
return new Write(inner.withRetryStrategy(retryStrategy));
return new Write<>(inner.withRetryStrategy(retryStrategy));
}

/** See {@link WriteVoid#withRetryConfiguration(RetryConfiguration)}. */
public Write<T> withRetryConfiguration(RetryConfiguration retryConfiguration) {
return new Write(inner.withRetryConfiguration(retryConfiguration));
return new Write<>(inner.withRetryConfiguration(retryConfiguration));
}

/** See {@link WriteVoid#withTable(String)}. */
public Write<T> withTable(String table) {
return new Write(inner.withTable(table));
return new Write<>(inner.withTable(table));
}

/**
Expand All @@ -1341,6 +1341,24 @@ public WriteVoid<T> withResults() {
return inner;
}

/**
* Returns {@link WriteWithResults} transform that could return a specific result.
*
* <p>See {@link WriteWithResults}
*/
public <V extends JdbcWriteResult> WriteWithResults<T, V> withWriteResults(
RowMapper<V> rowMapper) {
return new AutoValue_JdbcIO_WriteWithResults.Builder<T, V>()
.setRowMapper(rowMapper)
.setRetryStrategy(inner.getRetryStrategy())
.setRetryConfiguration(inner.getRetryConfiguration())
.setDataSourceProviderFn(inner.getDataSourceProviderFn())
.setPreparedStatementSetter(inner.getPreparedStatementSetter())
.setStatement(inner.getStatement())
.setTable(inner.getTable())
.build();
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
inner.populateDisplayData(builder);
Expand All @@ -1364,7 +1382,244 @@ void set(
throws SQLException;
}

/** A {@link PTransform} to write to a JDBC datasource. */
/**
* A {@link PTransform} to write to a JDBC datasource. Executes statements one by one.
*
* <p>The INSERT, UPDATE, and DELETE commands sometimes have an optional RETURNING clause that
* supports obtaining data from modified rows while they are being manipulated. Output {@link
* PCollection} of this transform is a collection of such returning results mapped by {@link
* RowMapper}.
*/
@AutoValue
public abstract static class WriteWithResults<T, V extends JdbcWriteResult>
extends PTransform<PCollection<T>, PCollection<V>> {
abstract @Nullable SerializableFunction<Void, DataSource> getDataSourceProviderFn();

abstract @Nullable ValueProvider<String> getStatement();

abstract @Nullable PreparedStatementSetter<T> getPreparedStatementSetter();

abstract @Nullable RetryStrategy getRetryStrategy();

abstract @Nullable RetryConfiguration getRetryConfiguration();

abstract @Nullable String getTable();

abstract @Nullable RowMapper<V> getRowMapper();

abstract Builder<T, V> toBuilder();

@AutoValue.Builder
abstract static class Builder<T, V extends JdbcWriteResult> {
abstract Builder<T, V> setDataSourceProviderFn(
SerializableFunction<Void, DataSource> dataSourceProviderFn);

abstract Builder<T, V> setStatement(ValueProvider<String> statement);

abstract Builder<T, V> setPreparedStatementSetter(PreparedStatementSetter<T> setter);

abstract Builder<T, V> setRetryStrategy(RetryStrategy deadlockPredicate);

abstract Builder<T, V> setRetryConfiguration(RetryConfiguration retryConfiguration);

abstract Builder<T, V> setTable(String table);

abstract Builder<T, V> setRowMapper(RowMapper<V> rowMapper);

abstract WriteWithResults<T, V> build();
}

public WriteWithResults<T, V> withDataSourceConfiguration(DataSourceConfiguration config) {
return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config));
}

public WriteWithResults<T, V> withDataSourceProviderFn(
SerializableFunction<Void, DataSource> dataSourceProviderFn) {
return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
}

public WriteWithResults<T, V> withStatement(String statement) {
return withStatement(ValueProvider.StaticValueProvider.of(statement));
}

public WriteWithResults<T, V> withStatement(ValueProvider<String> statement) {
return toBuilder().setStatement(statement).build();
}

public WriteWithResults<T, V> withPreparedStatementSetter(PreparedStatementSetter<T> setter) {
return toBuilder().setPreparedStatementSetter(setter).build();
}

/**
* When a SQL exception occurs, {@link Write} uses this {@link RetryStrategy} to determine if it
* will retry the statements. If {@link RetryStrategy#apply(SQLException)} returns {@code true},
* then {@link Write} retries the statements.
*/
public WriteWithResults<T, V> withRetryStrategy(RetryStrategy retryStrategy) {
checkArgument(retryStrategy != null, "retryStrategy can not be null");
return toBuilder().setRetryStrategy(retryStrategy).build();
}

/**
* When a SQL exception occurs, {@link Write} uses this {@link RetryConfiguration} to
* exponentially back off and retry the statements based on the {@link RetryConfiguration}
* mentioned.
*
* <p>Usage of RetryConfiguration -
*
* <pre>{@code
* pipeline.apply(JdbcIO.<T>write())
* .withReturningResults(...)
* .withDataSourceConfiguration(...)
* .withRetryStrategy(...)
* .withRetryConfiguration(JdbcIO.RetryConfiguration.
* create(5, Duration.standardSeconds(5), Duration.standardSeconds(1))
*
* }</pre>
*
* maxDuration and initialDuration are Nullable
*
* <pre>{@code
* pipeline.apply(JdbcIO.<T>write())
* .withReturningResults(...)
* .withDataSourceConfiguration(...)
* .withRetryStrategy(...)
* .withRetryConfiguration(JdbcIO.RetryConfiguration.
* create(5, null, null)
*
* }</pre>
*/
public WriteWithResults<T, V> withRetryConfiguration(RetryConfiguration retryConfiguration) {
checkArgument(retryConfiguration != null, "retryConfiguration can not be null");
return toBuilder().setRetryConfiguration(retryConfiguration).build();
}

public WriteWithResults<T, V> withTable(String table) {
checkArgument(table != null, "table name can not be null");
return toBuilder().setTable(table).build();
}

public WriteWithResults<T, V> withRowMapper(RowMapper<V> rowMapper) {
checkArgument(rowMapper != null, "result set getter can not be null");
return toBuilder().setRowMapper(rowMapper).build();
}

@Override
public PCollection<V> expand(PCollection<T> input) {
checkArgument(getStatement() != null, "withStatement() is required");
checkArgument(
getPreparedStatementSetter() != null, "withPreparedStatementSetter() is required");
checkArgument(
(getDataSourceProviderFn() != null),
"withDataSourceConfiguration() or withDataSourceProviderFn() is required");

return input.apply(ParDo.of(new WriteWithResultsFn<>(this)));
}

private static class WriteWithResultsFn<T, V extends JdbcWriteResult> extends DoFn<T, V> {

private final WriteWithResults<T, V> spec;
private DataSource dataSource;
private Connection connection;
private PreparedStatement preparedStatement;
private static FluentBackoff retryBackOff;

public WriteWithResultsFn(WriteWithResults<T, V> spec) {
this.spec = spec;
}

@Setup
public void setup() {
dataSource = spec.getDataSourceProviderFn().apply(null);
RetryConfiguration retryConfiguration = spec.getRetryConfiguration();

retryBackOff =
FluentBackoff.DEFAULT
.withInitialBackoff(retryConfiguration.getInitialDuration())
.withMaxCumulativeBackoff(retryConfiguration.getMaxDuration())
.withMaxRetries(retryConfiguration.getMaxAttempts());
}

@ProcessElement
public void processElement(ProcessContext context) throws Exception {
T record = context.element();

// Only acquire the connection if there is something to write.
if (connection == null) {
connection = dataSource.getConnection();
connection.setAutoCommit(false);
preparedStatement = connection.prepareStatement(spec.getStatement().get());
}
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backoff = retryBackOff.backoff();
while (true) {
try (PreparedStatement preparedStatement =
connection.prepareStatement(spec.getStatement().get())) {
try {

try {
spec.getPreparedStatementSetter().setParameters(record, preparedStatement);
} catch (Exception e) {
throw new RuntimeException(e);
}

// execute the statement
preparedStatement.execute();
// commit the changes
connection.commit();
context.output(spec.getRowMapper().mapRow(preparedStatement.getResultSet()));
return;
} catch (SQLException exception) {
if (!spec.getRetryStrategy().apply(exception)) {
throw exception;
}
LOG.warn("Deadlock detected, retrying", exception);
connection.rollback();
if (!BackOffUtils.next(sleeper, backoff)) {
// we tried the max number of times
throw exception;
}
}
}
}
}

@FinishBundle
public void finishBundle() throws Exception {
cleanUpStatementAndConnection();
}

@Override
protected void finalize() throws Throwable {
cleanUpStatementAndConnection();
}

private void cleanUpStatementAndConnection() throws Exception {
try {
if (preparedStatement != null) {
try {
preparedStatement.close();
} finally {
preparedStatement = null;
}
}
} finally {
if (connection != null) {
try {
connection.close();
} finally {
connection = null;
}
}
}
}
}
}

/**
* A {@link PTransform} to write to a JDBC datasource. Executes statements in a batch, and returns
* a trivial result.
*/
@AutoValue
public abstract static class WriteVoid<T> extends PTransform<PCollection<T>, PCollection<Void>> {

Expand Down

0 comments on commit f2b9a13

Please sign in to comment.