New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add jdbc connector #931
Add jdbc connector #931
Conversation
hazelcast-jet-core/pom.xml
Outdated
<dependency> | ||
<groupId>com.h2database</groupId> | ||
<artifactId>h2</artifactId> | ||
<version>${h2.version}</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should have test scope, you can also embed the version like with the other test libraries
*/ | ||
public static <T> ProcessorMetaSupplier readJdbcP( | ||
@Nonnull DistributedSupplier<java.sql.Connection> connectionSupplier, | ||
@Nonnull DistributedFunction<java.sql.Connection, Statement> statementFn, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how come sink works with PreparedStatement
and source only with Statement
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PreparedStatement
extends Statement
, I'll use Statement
for both source and sink
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes sense how it was. PreparedStatement
extends Statement
and allows it to bind variables. For source, we don't need to bind variables, because the statement is executed only once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I thought it would get repeatedly executed, but we get a cursor so don't need to deal with pagination.
* projection, use {@link Util#mapPutEvents} to pass only {@link | ||
* com.hazelcast.core.EntryEventType#ADDED ADDED} and {@link | ||
* com.hazelcast.core.EntryEventType#UPDATED UPDATED} events. | ||
* projection, use {@link com.hazelcast.jet.Util#mapPutEvents} to pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need for explicit imports here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've converted these to explicit imports so that I can usecom.hazelcast.jet.impl.util.Util
with import
@Override | ||
public void close(@Nullable Throwable error) throws Exception { | ||
if (statement != null) { | ||
statement.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can throw exception, which must be caught
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Closing the connection closes all the statements. No need to catch, exception in close
is just logged and ignored.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant that if statement.close() throws connection.close() will not be called
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know, I meant connection.close()
is enough, no need to close the statement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checking google, people tend to recommend closing all three: connection, statement and resultset. Even though after connection is closed there's nothing to do for the other two, they say that it's not specified and that some drivers misbehave...
@Nonnull | ||
public static <T> Sink<T> jdbc(@Nonnull String connectionUrl, | ||
@Nonnull String updateQuery, | ||
@Nonnull DistributedBiConsumer<PreparedStatement, T> updateFn) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better if this is just bindFn
: a function expected to bind parameters in the statement, but not expected to call addBatch
. The flushFn
does executeBatch
, i'm not sure if executeBatch
wouldn't fail if nothing was added, for example if the user used executeUpdate
instead of addBatch
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the convenience method, for the full version user may choose to execute the query directly in the updateFn
rather than add to batch. I know batching is better for performance wise but this brings flexibility to the user. If you want batch, use addBatch
and executeBatch
. If you don't want batch just execute the query and don't do anything on flush function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any scenario where we would not want to batch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from javadoc SQLException if a database access error occurs, this method is called on a closed <code>Statement</code> or the driver does not support batch statements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can still have bindFn
expected only to bind parameters. We can query the driver if batch is supported. If it is, we'll do addBatch/executeBatch
. If it's not, we'll do executeQuery
only. I think it makes sense for this simple version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, we can do that
@Nonnull | ||
public static <T> Sink<T> jdbc(@Nonnull String connectionUrl, | ||
@Nonnull String updateQuery, | ||
@Nonnull DistributedBiConsumer<PreparedStatement, T> updateFn) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we also require the connection to be in auto-commit mode because we don't commit. We can add
if (!con.getAutoCommit()) {
con.commit();
}
to the flushFn
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from javadoc By default a <code>Connection</code> object is in auto-commit mode...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, but the user can change it. We depend on auto-commiting, we should document it. But i still prefer not depending on it if you add the code above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
user cannot change it, this is convenience method, we do create the connection using the connectionURL.
() -> uncheckCall(() -> DriverManager.getConnection(connectionUrl)),
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, true. Then i'll disable auto-commit and i'll commit in flushFn. It's much more performant.
What about error handling? I think we should try to reconnect, afaik JDBC drivers don't support this transparently. |
* DistributedFunction, DistributedBiConsumer, DistributedBiConsumer)}. | ||
*/ | ||
@Nonnull | ||
public static <T> Sink<T> jdbc(@Nonnull String connectionUrl, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also need driver class here and do Class.forName(driverClass)
before calling DriverManager.getConnection
. JDBC drivers register themselves in a static initializer in the driver class, if we don't do this, the connection will fail. The test doesn't fail because the DeleteDbFiles.execute
likely loads the driver.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In previous versions of JDBC, to obtain a connection, you first had to initialize your JDBC driver by calling the method Class.forName. This methods required an object of type java.sql.Driver. Each JDBC driver contains one or more classes that implements the interface java.sql.Driver. ... Any JDBC 4.0 drivers that are found in your class path are automatically loaded. (However, you must manually load any drivers prior to JDBC 4.0 with the method Class.forName.)
JDBC 4.0 is introduced with Java6
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, didn't know that. Maybe you should test whether it works with the dynamic class loading when the driver is submitted in job resources.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll test it
return null; | ||
} | ||
|
||
private static ResultSetForPartitionFunction resultSetFunction(String query) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function should be inlined or it's name should be more specific, something like singlePartitionResultSetFn
|
||
private static ResultSetForPartitionFunction resultSetFunction(String query) { | ||
return (connection, parallelism, index) -> { | ||
PreparedStatement statement = uncheckCall(() -> connection.prepareStatement(query)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can assert parallelism == 1
, otherwise it would emit duplicates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we use ProcessorMetaSupplier.forceTotalParallelismOne
on SourceProcessors.readJdbcP
, isn't it enough ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is, but it's in different place. It's safety if anything goes wrong...
Btw, why do we return PS from the supplier and then wrap it in another place? It requires to be wrapped, why not return PMS right away?
flushFn.accept(connection, statement); | ||
itemList.clear(); | ||
} catch (Exception e) { | ||
if (e.getCause() instanceof SQLNonTransientException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is inverted
@Override | ||
public void init(@Nonnull Outbox outbox, @Nonnull Context context) { | ||
logger = context.logger(); | ||
connection = connectionSupplier.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not fail on connection error here too. We can assign idleCount=1
and call reconnectIfNecessary
public final class WriteJdbcP<T> implements Processor { | ||
|
||
private static final IdleStrategy IDLER = | ||
new BackoffIdleStrategy(0, 0, MILLISECONDS.toNanos(1), SECONDS.toNanos(10)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shortest idle can be 1 second
private List<T> itemList = new ArrayList<>(); | ||
private int idleCount; | ||
|
||
public WriteJdbcP( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be made private
idleCount++; | ||
} | ||
} | ||
idleCount = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We miss test for reconnection, i think it doesn't work.
This line will always assign idleCount=0 in case of a recoverable exception
statement.addBatch(); | ||
} | ||
statement.executeBatch(); | ||
itemList.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We miss commit
here
@Override | ||
public void close() throws Exception { | ||
Exception stmtException = close(statement); | ||
Exception connectionException = close(connection); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should check for null connection
and statement
Exception stmtException = close(statement); | ||
Exception connectionException = close(connection); | ||
if (stmtException != null) { | ||
throw stmtException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's also no need to throw, we can just do
closeWithLogging(logger, statement);
closeWithLogging(logger, connection);
and add null-check inside of closeWithLogging
try { | ||
for (T item : itemList) { | ||
bindFn.accept(statement, item); | ||
statement.addBatch(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also support drivers which don't support batch updates, it's easy to.
* processor. For example: | ||
* <pre> {@code | ||
* (connection, parallelism, index) -> | ||
* PreparedStatement stmt = connection.prepareStatement("select * from TABLE where mod(id,%d)=%d) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's more typical that SELECT is capitalized and table is lowercase
* }, | ||
* (con, parallelism, index) -> { | ||
* try { | ||
* return con.prepareStatement("select * from TABLE where mod(id, ?) = ?); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this code is repeated twice in the javadoc
|
||
/** | ||
* Represents a function that accepts a JDBC connection to the database, | ||
* a total parallelism and a processor index as arguments and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should say "the total processor" and "processor index"
* @param parallelism the total parallelism for the processor | ||
* @param index the global processor index | ||
*/ | ||
ResultSet createResultSet(Connection connection, int parallelism, int index); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the function name says partition, but it's named index here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
method should also throw SQLException
} | ||
|
||
@Test | ||
public void testPartitionedQuery() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tests should use the when_
convention..
* limitations under the License. | ||
*/ | ||
|
||
package com.hazelcast.jet.pipeline; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be under pipeline.jdbc
package if there will be several such functions
import static com.hazelcast.jet.impl.util.Util.uncheckRun; | ||
|
||
/** | ||
* Private API, use {@link SourceProcessors#readJdbcP}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not an API, but impl class so the comment doesn't make sense. You can just refer to the processor static method and it's enough I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are 12 matches for "Private API" in code...
|
||
@Override | ||
protected void init(@Nonnull Context context) { | ||
connection = connectionSupplier.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor but inconsistent use of this
import static java.util.concurrent.TimeUnit.SECONDS; | ||
|
||
/** | ||
* Private API, use {@link SinkProcessors#writeJdbcP}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see comments on ReadJdbcP
|
||
@Override | ||
public void close() { | ||
closeWithLogging(statement); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like different closing approach with ReadJdbcP
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we are already logging while reconnecting instead of throwing the exception
Fixes #869