-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-244] Add JDBC IO #942
Conversation
6d77b13
to
12674a8
Compare
*/ | ||
public class JdbcDataRecord implements Serializable { | ||
|
||
private String[] tableNames; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi, why field tableNames in class JdbcDataRecord use String[]?
each record only belongs to one table, maybe better to declare as:
private String tableName;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the user query is like this:
select t1.id,t2.name from table1 t1, table t2 where t1.id = t2.id
each JdbcDataRecord
can have different table.
Thanks! Will take a deeper look later today, but first comment: the Source API is unnecessary here since the connector uses none of its features (ignores desiredBundleSizeBytes, doesn't provide estimated size, progress reporting from the reader, or liquid sharding); please refactor the source to be a simpler composite PTransform made of ParDo's. |
Thanks ! Yes, it's what I'm doing now (this initial code was on an old local branch ;)). |
OK, one more high-level comment that needs to be resolved before continuing. I would recommend a slightly different approach that, e.g., Spring JDBC uses (RowMapper and BatchPreparedStatementSetter, see http://docs.spring.io/spring/docs/current/spring-framework-reference/html/jdbc.html): make this source and sink generic and let the user provide callbacks for extracting their type from a ResultSet/ResultSetMetadata, and for filling in a PreparedStatement from their type. I don't mean necessarily use Spring JDBC's classes directly (I don't know if we can or want to have that as a dependency, though if the answer was "yes" then it'd be nice) Additionally, in the sink:
I'll also make a pass now over the code and comment on things that will likely stay relevant after such a refactoring. |
return advance(); | ||
} catch (Exception e) { | ||
LOGGER.error("Can't connect to the database", e); | ||
return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the point of view of the runner, this is simply a "return false" - meaning, the runner will happily declare that the bundle is complete because we reached EOF. From the point of view of the user, this is silent data loss signaled only by an error message in worker logs. I don't think that's what you intended, so it's better to rethrow the exception, both here and in advance().
Thanks for your comments. I will update the PR accordingly. |
12674a8
to
51a0bd8
Compare
First refactoring to use |
@@ -203,223 +156,218 @@ public BoundedJdbcSource withPassword(String password) { | |||
@Nullable | |||
private final String password; | |||
|
|||
public BoundedJdbcSource(DataSource dataSource, String query, String username, | |||
String password) { | |||
private Read(DataSource dataSource, String query, @Nullable String username, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't duplicate variables between the Read class and the JdbcOptions class. Instead make Read have a private variable of type JdbcOptions.
51a0bd8
to
dcd89f4
Compare
Fix checkstyle and use |
Resuming on this PR. |
dcd89f4
to
2049571
Compare
Updated with |
I'm fixing Javadoc issue. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! A bunch of high-level comments; haven't looked at the tests yet.
* {@code | ||
* | ||
* pipeline.apply(JdbcIO.read() | ||
* .withDataSource(myDataSource) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add the RowMapper to this example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
* | ||
* pipeline | ||
* .apply(...) | ||
* .apply(JdbcIO.write().withDataSource(myDataSource) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Likewise
* object used in the {@link PCollection}. | ||
*/ | ||
public interface RowMapper<T> extends Serializable { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the unnecessary blank lines
try (ResultSet resultSet = statement.executeQuery()) { | ||
while (resultSet.next()) { | ||
T record = rowMapper.mapRow(resultSet); | ||
if (record != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this non-null check? If it's allowed to return nulls, as a user I'd be surprised that nulls aren't making it into my PCollection. If it's not allowed to return nulls, I'd be surprised that the error is silently swallowed.
T record = context.element(); | ||
try { | ||
PreparedStatement statement = elementInserter.insert(record, connection); | ||
if (statement != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question - why the non-null check.
} | ||
} | ||
} catch (Exception e) { | ||
LOGGER.warn("Can't insert data into table", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rethrow the exception; otherwise in case of failures the user's pipeline will succeed but experience data loss with no apparent indication of that. A pipeline should not succeed if it was unable to do the processing requested.
public void processElement(ProcessContext context) { | ||
T record = context.element(); | ||
try { | ||
PreparedStatement statement = elementInserter.insert(record, connection); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like there's no batching - we're doing 1 database transaction per record?
|
||
@FinishBundle | ||
public void finishBundle(Context context) throws Exception { | ||
connection.commit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bundles can be arbitrarily large - e.g. it is entirely possible that the bundle will live for 3 days and accumulate a billion elements. You need to commit every time the transaction reaches a certain size, as well as when the bundle finishes.
This also means that the user should be aware that some data can be committed multiple times, in case the bundle commits something but then fails and is retried and commits the same thing again. I believe this is currently not easily avoidable (since a DoFn can not force the runner to commit the bundle), it's just something that should be documented, so that e.g. the user should use idempotent PreparedStatement's if possible, or be okay with duplicates.
} | ||
|
||
/** | ||
* An interface used by the JdbcIO Write for mapping {@link PCollection} elements as a rows of a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment appears copy-pasted from RowMapper, please update.
*/ | ||
public interface ElementInserter<T> extends Serializable { | ||
|
||
PreparedStatement insert(T element, Connection connection); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps it'd be better to make the API only slightly less generic but slightly more clear to use / harder to mis-use, by making the following change:
- Pass the SQL string to the Write transform
- Make the WriteFn create the prepared statement itself, in the Setup method
- Make ElementInserter's signature be: void setParameters(T element, PreparedStatement statement) (and rename it to PreparedStatementSetter?)
This way:
- the PreparedStatement object can be reused, which will give much better performance
- the user doesn't have to call .prepareStatement
- the user can not accidentally mess up the connection by doing something to it that the WriteFn doesn't expect (e.g. calling commit, or close, or doing queries)
- it is easier to understand the semantics of the transform because it always uses the same SQL just with different parameters
- an ElementInserter is easier to test without a Connection
- it becomes easier to write composable ElementInserter classes, e.g. you could imagine expressing something like { st.setString(1, t.getKey()); st.setInteger(2, t.getValue()); } as "allOf(stringAt(1, key()), integerAt(2, value()))" where allOf, stringAt, integerAt, key, value are static utility functions. Not suggesting to do this in the current PR, but I once developed a library of composable row mappers and prepared statement setters like this at one of my previous companies and it was very handy to use.
8a44cb4
to
a75fcfd
Compare
Added batching for Write, use of a preparedStatementSetter with query for write. |
@@ -57,14 +59,25 @@ | |||
* | |||
* pipeline.apply(JdbcIO.read() | |||
* .withDataSource(myDataSource) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where does the query go?
* public MyElement mapRow(ResultSet resultSet) { | ||
* // use the resultSet to build the element of the PCollection | ||
* // for instance: | ||
* // return resultSet.getString(2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line should not be commented out; and it should return a MyElement.
E.g.: return new MyElement(resultSet.getInt(1), resultSet.getString(2)) or something
(it should also correspond to the query - internal inconsistency within an example can be confusing to readers of the documentation)
* | ||
* } | ||
* </pre> | ||
* <p> | ||
* You can find an full {@link RowMapper} example in {@code JdbcIOTest}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a full
* <h3>Writing to JDBC datasource</h3> | ||
* <p> | ||
* JDBC sink supports writing records into a database. It expects a | ||
* {@code PCollection<T>}, converts the {@code T} elements as SQL statement | ||
* and insert into the database. T is the type expected by the provided {@link ElementInserter}. | ||
* and setParameters into the database. T is the type expected by the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The grammar seems off - "converts the T elements as SQL statement and setParameters".
Perhaps:
It writes a PCollection to the database by converting each T into a PreparedStatement via a user-provided PreparedStatementSetter?
* .apply(JdbcIO.write().withDataSource(myDataSource) | ||
* .apply(JdbcIO.write() | ||
* .withDataSource(myDataSource) | ||
* .withQuery(query) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be called a statement, rather than a query
} | ||
|
||
/** | ||
* Example of {@link PCollection} element that a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This example is unnecessarily complicated, it does not reflect the kind of code a user will write (users typically know the schema of the data they're reading/writing), and it does not increase coverage of the code under test (the code under test doesn't care about the schema, it just passes it to your RowMapper/Setter, so there's no additional coverage gained from having a very sophisticated row mapper or setter). Please use a simpler example - e.g. a simple POJO with a couple of fields.
PCollection<JdbcDataRecord> output = pipeline.apply( | ||
JdbcIO.read() | ||
.withDataSource(dataSource) | ||
.withQuery("select * from BEAM") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using "select *" would be poor practice in an actual pipeline because it will break if fields in the table are reordered; or it will start performing poorly if a bulky column is added to the database, etc. I think "select *" is generally considered a code smell in database code and should be effectively never used in production code.
It's better to supply a query that explicitly lists the columns you're reading.
public KV<String, Void> apply(JdbcDataRecord input) { | ||
// find NAME column id | ||
int index = -1; | ||
for (int i = 0; i < input.getColumnNames().length; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again: if you make the example simpler by deleting JdbcDataRecord and using a simple POJO, this loop will be unnecessary and you'll just do regular field access on the POJO.
|
||
private class InsertRecord { | ||
|
||
private int columnType; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class is unused.
Connection connection = dataSource.getConnection(); | ||
|
||
Statement statement = connection.createStatement(); | ||
ResultSet resultSet = statement.executeQuery("select * from TEST"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just use select count(*).
Agree with your comments. My concern is about data source. Even if the interface is not serializable most of implementations are. I think it's better to let user define his own data source in order to deal with connection pooling, etc. Thoughts ? |
It might be the case that most DataSource are serializable; but the user will be unable to use this connector with those that aren't; and there's no fundamental reason why this has to be the case. |
Let me think and experiment this. I don't think ConnectionFactory is a good name as it's confusing with JMS ConnectionFactory. Thanks. I'm preparing updates. |
Implemented large part of the comments. TODO:
|
I'm evaluating using of DBCP2 and let user provide driver classname, JDBC URL, and optionally username, password, and pooling size. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Here's one more round, just on the latest commit. Please let me know after addressing all comments from this and the previous round and I'll do one more pass on the full code.
* // use the resultSet to build the element of the PCollection | ||
* // for instance: | ||
* // return resultSet.getString(2); | ||
* .withStatement("select id,name from Person") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the read, it should be query. I'm using common terminology: query is something read-only; statement is something that modifies things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Understood, and it makes sense.
* .withStatement("select id,name from Person") | ||
* .withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() { | ||
* public KV<Integer, String> mapRow(ResultSet resultSet) { | ||
* KV<Integer, String> kv = KV.of(resultSet.getInt(1), resultSet.getString(2)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return statement forgotten?
* and setParameters into the database. T is the type expected by the | ||
* provided {@link PreparedStatementSetter}. | ||
* JDBC sink supports writing records into a database. It writes a {@link PCollection} to the | ||
* database by converting each T into a {@link PreparedStatement} via an user-provided |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a user-provided
* .withQuery(query) | ||
* .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<MyElement>() { | ||
* .withStatement("insert into Person values(?, ?)") | ||
* .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<KV<Integer, String>>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line uses KV[Integer,String] but the next line uses MyElement - please fix.
* } | ||
* }) | ||
* | ||
* } | ||
* </pre> | ||
* <p> | ||
* You can find a full {@link PreparedStatementSetter} in {@code JdbcIOTest}. | ||
* You can find a full {@link PreparedStatementSetter} in {@code JdbcIOTest}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a full ... example?
} | ||
// TODO clear only record actually inserted in the database (remove from batch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per offline discussion, this is moot.
} catch (Exception e) { | ||
LOGGER.error("Can't map row", e); | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code says "if can't parse record, silently drop it and return null instead" - this is silent data loss and it's a poor example to show to users, especially given that the documentation of JdbcIO delegates to the test for a complete example. As a user, I would be very surprised to find nulls in my PCollection with no indication of how they got there. I'd much prefer if the read failed, than if it silently produced nulls.
Please remove the error handling and let it fail. I think this is a general recommendation that I've made several times in these PRs: the runner does error handling for you. Unless you are dealing with a rare case where the error is transient/recoverable AND your logic does a better job of recovering from it than the runner would have (by retrying your bundle), you should not have error handling code at all (I can think of only one case: retrying individual RPCs in case of transient RPC errors, but usually various client libraries do that for you).
And most importantly, it is absolutely never OK to have error handling code that masks a data-loss error (e.g. failure to write data or failure to parse data).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 (my bad)
* } | ||
* </pre> | ||
* <p> | ||
* You can find a full {@link RowMapper} example in {@code JdbcIOTest}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The example in JdbcIOTest seems not particularly more complete than the one in this doc; maybe it's not worth mentioning here. Likewise for the write.
} | ||
new SimpleFunction<KV<Integer, String>, KV<String, Void>>() { | ||
public KV<String, Void> apply(KV<Integer, String> input) { | ||
return KV.of(input.getValue(), null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the KV[String, Void]? If all you're doing is Count.perKey(), you can apply it to the "output" collection directly, without this MapElements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a KV<Integer, String>, and key is non deterministic. So, at minimum, I have to define String as key (or change the query/rowMapper).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see, you're flipping the Integer, String into String, Void.
Instead of that, how about using Values.create() and Count.perElement() to count the String's directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated in the last commit to directly use the name as key (more straight forward).
LOGGER.error("Can't insert into database", e); | ||
} | ||
preparedStatementSetter.setParameters(record, preparedStatement); | ||
preparedStatement.executeUpdate(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just noticed you're doing executeUpdate rather than executeBatch. This still executes the statements one-by-one, which is inefficient. You need to do addBatch (you can do it in processElement), and do executeBatch in finishBundle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case, I don't need the batch
list anymore: just batchSize
.
In the processElement
method, I can do:
T record = context.element();
preparedStatement.clearParameters();
preparedStatementSetter.setParameters(record, preparedStatement);
preparedStatement.addBatch();
batchCount++;
if (batchCount >= batchSize) {
finishBundle(context);
}
Then, in the finishBundle
, I do:
preparedStatement.executeBatch();
Agreed ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems reasonable. Also set batchCount back to 0 after executing the batch. And make sure to commit the batch as a single transaction, for efficiency (http://stackoverflow.com/questions/6892105/bulk-insert-in-java-using-prepared-statements-batch-update recommends not using autocommit when using this pattern)
…the "parsing" of ResultSet/PCollection element
…se an unique PreparedStatement for write, simplify tests, fix indentation and comments
…urce provided by user or created by DBCP2 using configuration, DataSourceConfiguration is used by both Read and Write
…esource, improve main javadoc
…taSourceConfiguration, improve tests and some cleanups
557a80a
to
759650f
Compare
Merged @jkff improvements, added connection pool configuration. What about preventing fusion in the |
…f sense for users to change default values
Add random key generation, following by GroupByKey and then ungroup to prevent fusion. |
@Setup | ||
public void setup() { | ||
Random random = new Random(); | ||
randInt = random.nextInt(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This uses the same key for the whole bundle, which is equivalent to not doing this at all. The random key should be generated in processElement so that different elements get distributed onto different keys.
Ideally (but not required): can you try running an example pipeline against, e.g., a local spark or flink cluster with a local database and confirm that it speeds up as you add more workers? (I wish we had some infrastructure for performance testing...) I'd suggest a scenario where the database contains a small number of records (e.g. 100), but processing of every record is, e.g., a 10-second sleep. It should take less than 1000 seconds to complete and should scale linearly with number of workers; but it'd be sufficient to test with, e.g., 10 workers and confirm that it takes about 100 seconds (modulo per-worker startup time).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad, I wanted to do new Random in @setup, but nextInt in @ProcessElement. I'm fixing that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And yes for the test, I have my dockerized cluster. I will test on it.
} | ||
@ProcessElement | ||
public void processElement(ProcessContext context) { | ||
T record = context.element(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not necessary to extract all the subexpressions into variables, this can be a one-liner:
context.output(KV.of(random.nextInt(), context.element()));
LGTM assuming you fix the empty executeBatch() thing. Thanks for bearing with me! |
Updated: prevent to do |
LGTM. |
Thanks @jkff. I will merge later today ! |
@jbonofre I found this PR after googling for how to configure connection pooling for BEAM. I'm not too familiar with JDBC, but I saw your commit removing the options to specify pooling, I'm curious what were your thoughts behind not exposing those options? |
I suggested to not expose those options, for a couple of reasons.
Do you have a use case where you can reliably achieve much better performance by specifying values different from the defaults? In that case, how are you choosing the values and what assumptions are you making when choosing them? I'm not 100% opposed to adding these knobs if they are necessary, but want to understand whether they really are. Also, perhaps does the overload of JdbcIO.ConnectionConfiguration.create(DataSource) work for you, where you specify your parameters on the DataSource manually? |
Thank you for these very elaborate explanations. I don't think I have a specific use case where it would be different from the defaults. As said, I'm not very familiar with Beam and JDBC in general. From my background (Web and Ruby), I know that sometimes tuning the connection pool size can greatly improve throughput and I was surprised to find no options. That being said your explanation totally makes sense to me, so I will report back when I'd find such use case (but probably won't!) Also thanks for the manual override of DataSource, will consult that first! |
One thing I did just notice was running my Beam pipeline on Spark with 8 workers and giving me a EDIT: It also seems like I'm missing some rows that haven't been inserted? |
To resolve this issue, I'd recommend to post a question on StackOverflow with more details under tag apache-beam http://stackoverflow.com/questions/tagged/apache-beam (e.g. the full stack trace of the error; and relevant parts of the code if you're comfortable sharing them). |
@nambrot Hey, can you post (on the user mailing list) a pipeline to reproduce the issue ? By the way, I remember to have seen deadlock as well (I think I created a Jira about that). Do you use DBCP datasource ? |
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
[BEAM-<Jira issue #>] Description of pull request
mvn clean verify
. (Even better, enableTravis-CI on your fork and ensure the whole test matrix passes).
<Jira issue #>
in the title with the actual Jira issuenumber, if there is one.
Individual Contributor License Agreement.