From 51a0bd8f325767e55fccb7edaf10a78c621dd99c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?= Date: Tue, 13 Sep 2016 21:15:49 +0200 Subject: [PATCH] [BEAM-244] Refactore to use first simple composite DoFn instead of source --- .../org/apache/beam/sdk/io/jdbc/JdbcIO.java | 414 ++++++++---------- 1 file changed, 181 insertions(+), 233 deletions(-) diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java index 12ec706763d6d..ff3365f85b652 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import java.io.Serializable; import java.math.BigDecimal; import java.sql.Array; import java.sql.Blob; @@ -43,8 +44,11 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -109,7 +113,7 @@ public class JdbcIO { * @return a {@link Read} {@link PTransform}. */ public static Read read() { - return new Read(new BoundedJdbcSource(null, null, null, null)); + return new Read(null, null, null, null); } /** @@ -130,70 +134,19 @@ private JdbcIO() { public static class Read extends PTransform> { public Read withDataSource(DataSource dataSource) { - return new Read(source.withDataSource(dataSource)); + return new Read(dataSource, query, username, password); } public Read withQuery(String query) { - return new Read(source.withQuery(query)); + return new Read(dataSource, query, username, password); } public Read withUsername(String username) { - return new Read(source.withUsername(username)); + return new Read(dataSource, query, username, password); } public Read withPassword(String password) { - return new Read(source.withPassword(password)); - } - - private final BoundedJdbcSource source; - - private Read(BoundedJdbcSource source) { - this.source = source; - } - - @Override - public PCollection apply(PBegin input) { - return input.apply(org.apache.beam.sdk.io.Read.from(getSource())); - } - - /** - * Creates a {@link BoundedSource} with the configuration in {@link Read}. - */ - @VisibleForTesting - BoundedSource getSource() { - return source; - } - - @Override - public void validate(PBegin input) { - source.validate(); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - - source.populateDisplayData(builder); - } - - } - - private static class BoundedJdbcSource extends BoundedSource { - - public BoundedJdbcSource withDataSource(DataSource dataSource) { - return new BoundedJdbcSource(dataSource, query, username, password); - } - - public BoundedJdbcSource withQuery(String query) { - return new BoundedJdbcSource(dataSource, query, username, password); - } - - public BoundedJdbcSource withUsername(String username) { - return new BoundedJdbcSource(dataSource, query, username, password); - } - - public BoundedJdbcSource withPassword(String password) { - return new BoundedJdbcSource(dataSource, query, username, password); + return new Read(dataSource, query, username, password); } private final DataSource dataSource; @@ -203,8 +156,8 @@ public BoundedJdbcSource withPassword(String password) { @Nullable private final String password; - public BoundedJdbcSource(DataSource dataSource, String query, String username, - String password) { + private Read(DataSource dataSource, String query, @Nullable String username, + @Nullable String password) { this.dataSource = dataSource; this.query = query; this.username = username; @@ -212,214 +165,209 @@ public BoundedJdbcSource(DataSource dataSource, String query, String username, } @Override - public Coder getDefaultOutputCoder() { - return SerializableCoder.of(JdbcDataRecord.class); + public PCollection apply(PBegin input) { + JdbcOptions jdbcOptions = JdbcOptions.from(dataSource, query, username, password); + + PCollection output = input.apply(Create.of(query)) + .apply(ParDo.of(new ReadFn(jdbcOptions))); + + return output; } @Override - public void validate() { - Preconditions.checkNotNull(dataSource, "dataSource"); - Preconditions.checkNotNull(query, "query"); + public void validate(PBegin input) { + Preconditions.checkNotNull(dataSource); + Preconditions.checkNotNull(query); } @Override public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("dataSource", dataSource.getClass().getName())); builder.add(DisplayData.item("query", query)); builder.addIfNotNull(DisplayData.item("username", username)); } - @Override - public boolean producesSortedKeys(PipelineOptions options) { - return false; - } + @VisibleForTesting + static class JdbcOptions implements Serializable { + private final DataSource dataSource; + private final String query; + @Nullable + private final String username; + @Nullable + private final String password; - @Override - public BoundedReader createReader(PipelineOptions options) { - return new BoundedJdbcReader(this); - } + private JdbcOptions(DataSource dataSource, String query, @Nullable String username, + @Nullable String password) { + this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); + this.query = Preconditions.checkNotNull(query, "query"); + this.username = username; + this.password = password; + } - @Override - public long getEstimatedSizeBytes(PipelineOptions options) { - // there's no actual way to get the estimated size of a query without performing the - // query. So it would be a bottleneck in the JdbcIO. - return 0L; - } + public static JdbcOptions from(DataSource dataSource, String query, String username, + String password) { + return new JdbcOptions(dataSource, query, username, password); + } - @Override - public List> splitIntoBundles(long desiredBundleSizeBytes, - PipelineOptions options) { - // related to getEstimatedSizeBytes, the only way to define the split is by hand (user - // defined). We return an unique source for JDBC. - List> sources = new ArrayList(); - sources.add(this); - return sources; - } + public DataSource getDataSource() { + return dataSource; + } - } + public String getQuery() { + return query; + } - private static class BoundedJdbcReader extends BoundedSource.BoundedReader { + @Nullable + public String getUsername() { + return username; + } - private final BoundedJdbcSource source; + @Nullable + public String getPassword() { + return password; + } + } - private Connection connection; - private PreparedStatement statement; - private ResultSet resultSet; - private ResultSetMetaData resultSetMetaData; + public static class ReadFn extends DoFn { - private JdbcDataRecord current; + private final JdbcOptions options; - public BoundedJdbcReader(BoundedJdbcSource source) { - this.source = source; - } + private Connection connection; + private PreparedStatement statement; + private ResultSet resultSet; + private ResultSetMetaData resultSetMetaData; - @Override - public boolean start() { - try { - if (source.username != null) { - connection = source.dataSource.getConnection(source.username, source.password); - } else { - connection = source.dataSource.getConnection(); - } - statement = connection.prepareCall(source.query); - resultSet = statement.executeQuery(); - resultSetMetaData = resultSet.getMetaData(); - - return advance(); - } catch (Exception e) { - LOGGER.error("Can't connect to the database", e); - return false; + private ReadFn(JdbcOptions options) { + this.options = options; } - } - @Override - public boolean advance() { - try { - resultSet.next(); - - JdbcDataRecord record = new JdbcDataRecord(resultSetMetaData.getColumnCount()); - - for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) { - String columnName = resultSetMetaData.getColumnName(i); - record.getColumnNames()[i - 1] = columnName; - String tableName = resultSetMetaData.getTableName(i); - record.getTableNames()[i - 1] = tableName; - int columnType = resultSetMetaData.getColumnType(i); - record.getColumnTypes()[i - 1] = columnType; - Object payload = null; - switch (columnType) { - case Types.ARRAY: - payload = resultSet.getArray(i); - break; - case Types.BIGINT: - payload = resultSet.getInt(i); - break; - case Types.BIT: - payload = resultSet.getInt(i); - break; - case Types.BLOB: - payload = resultSet.getBlob(i); - break; - case Types.BOOLEAN: - payload = resultSet.getBoolean(i); - break; - case Types.CHAR: - payload = resultSet.getString(i); - break; - case Types.CLOB: - payload = resultSet.getClob(i); - break; - case Types.DATE: - payload = resultSet.getDate(i); - break; - case Types.DECIMAL: - payload = resultSet.getBigDecimal(i); - break; - case Types.DOUBLE: - payload = resultSet.getDouble(i); - break; - case Types.FLOAT: - payload = resultSet.getFloat(i); - break; - case Types.INTEGER: - payload = resultSet.getInt(i); - break; - case Types.LONGNVARCHAR: - payload = resultSet.getString(i); - break; - case Types.LONGVARCHAR: - payload = resultSet.getString(i); - break; - case Types.NCHAR: - payload = resultSet.getNString(i); - break; - case Types.NCLOB: - payload = resultSet.getNClob(i); - break; - case Types.SMALLINT: - payload = resultSet.getInt(i); - break; - case Types.TIME: - payload = resultSet.getTime(i); - break; - case Types.TIMESTAMP: - payload = resultSet.getTimestamp(i); - break; - case Types.TINYINT: - payload = resultSet.getInt(i); - break; - case Types.VARCHAR: - payload = resultSet.getString(i); - break; - default: - payload = resultSet.getObject(i); - break; + @Setup + public void setup() throws Exception { + if (options.getUsername() != null) { + connection = options.getDataSource().getConnection( + options.getUsername(), options.getPassword()); + } else { + connection = options.getDataSource().getConnection(); } - record.getColumnValues()[i - 1] = payload; - } - - current = record; - - return true; - } catch (Exception e) { - LOGGER.warn("Can't read result set", e); + statement = connection.prepareStatement(options.getQuery()); + resultSet = statement.executeQuery(); + resultSetMetaData = resultSet.getMetaData(); } - return false; - } - @Override - public void close() { - if (resultSet != null) { - try { - resultSet.close(); - } catch (Exception e) { - // nothing to do + @ProcessElement + public void processElement(ProcessContext context) throws Exception { + while (resultSet.next()) { + JdbcDataRecord record = new JdbcDataRecord(resultSetMetaData.getColumnCount()); + + for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) { + String columnName = resultSetMetaData.getColumnName(i); + record.getColumnNames()[i - 1] = columnName; + String tableName = resultSetMetaData.getTableName(i); + record.getTableNames()[i - 1] = tableName; + int columnType = resultSetMetaData.getColumnType(i); + record.getColumnTypes()[i - 1] = columnType; + Object payload = null; + switch (columnType) { + case Types.ARRAY: + payload = resultSet.getArray(i); + break; + case Types.BIGINT: + payload = resultSet.getInt(i); + break; + case Types.BIT: + payload = resultSet.getInt(i); + break; + case Types.BLOB: + payload = resultSet.getBlob(i); + break; + case Types.BOOLEAN: + payload = resultSet.getBoolean(i); + break; + case Types.CHAR: + payload = resultSet.getString(i); + break; + case Types.CLOB: + payload = resultSet.getClob(i); + break; + case Types.DATE: + payload = resultSet.getDate(i); + break; + case Types.DECIMAL: + payload = resultSet.getBigDecimal(i); + break; + case Types.DOUBLE: + payload = resultSet.getDouble(i); + break; + case Types.FLOAT: + payload = resultSet.getFloat(i); + break; + case Types.INTEGER: + payload = resultSet.getInt(i); + break; + case Types.LONGNVARCHAR: + payload = resultSet.getString(i); + break; + case Types.LONGVARCHAR: + payload = resultSet.getString(i); + break; + case Types.NCHAR: + payload = resultSet.getNString(i); + break; + case Types.NCLOB: + payload = resultSet.getNClob(i); + break; + case Types.SMALLINT: + payload = resultSet.getInt(i); + break; + case Types.TIME: + payload = resultSet.getTime(i); + break; + case Types.TIMESTAMP: + payload = resultSet.getTimestamp(i); + break; + case Types.TINYINT: + payload = resultSet.getInt(i); + break; + case Types.VARCHAR: + payload = resultSet.getString(i); + break; + default: + payload = resultSet.getObject(i); + break; + } + record.getColumnValues()[i - 1] = payload; + } + context.output(record); } } - if (statement != null) { - try { - statement.close(); - } catch (Exception e) { - // nothing to do + + @Teardown + public void tearDown() { + if (resultSet != null) { + try { + resultSet.close(); + } catch (Exception e) { + LOGGER.warn("Can't close resultSet", e); + } } - } - if (connection != null) { - try { - connection.close(); - } catch (Exception e) { - // nothing to do + if (statement != null) { + try { + statement.close(); + } catch (Exception e) { + LOGGER.warn("Can't close statement", e); + } + } + if (connection != null) { + try { + connection.close(); + } catch (Exception e) { + LOGGER.warn("Can't close connection", e); + } } } - } - - @Override - public BoundedSource getCurrentSource() { - return source; - } - @Override - public Object getCurrent() { - return current; } }