Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Ali Gurbuz committed Jun 29, 2018
1 parent e91c4ac commit f8fdd4b
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 39 deletions.
Expand Up @@ -411,6 +411,7 @@ public static <T> ProcessorMetaSupplier readJdbcP(
@Nonnull String query,
@Nonnull DistributedFunction<ResultSet, T> mapOutputFn
) {
checkSerializable(mapOutputFn, "mapOutputFn");
return ReadJdbcP.supplier(connectionURL, query, mapOutputFn);
}

Expand Down
Expand Up @@ -23,7 +23,6 @@
import com.hazelcast.jet.core.processor.SourceProcessors;
import com.hazelcast.jet.function.DistributedFunction;
import com.hazelcast.jet.function.DistributedSupplier;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.pipeline.ResultSetForPartitionFunction;

import javax.annotation.Nonnull;
Expand All @@ -35,10 +34,9 @@
import java.sql.Statement;

import static com.hazelcast.jet.impl.util.Util.uncheckCall;
import static com.hazelcast.jet.impl.util.Util.uncheckRun;

/**
* Private API, use {@link SourceProcessors#readJdbcP}.
* Use {@link SourceProcessors#readJdbcP}.
*/
public final class ReadJdbcP<T> extends AbstractProcessor {

Expand All @@ -64,7 +62,7 @@ private ReadJdbcP(
}

/**
* Private API, use {@link SourceProcessors#readJdbcP}.
* Use {@link SourceProcessors#readJdbcP}.
*/
public static <T> ProcessorMetaSupplier supplier(
@Nonnull DistributedSupplier<Connection> connectionSupplier,
Expand All @@ -84,12 +82,12 @@ public static <T> ProcessorMetaSupplier supplier(
new ReadJdbcP<>(
() -> uncheckCall(() -> DriverManager.getConnection(connectionURL)),
(connection, parallelism, index) -> {
PreparedStatement statement = uncheckCall(() -> connection.prepareStatement(query));
PreparedStatement statement = connection.prepareStatement(query);
try {
return statement.executeQuery();
} catch (SQLException e) {
uncheckRun(statement::close);
throw ExceptionUtil.rethrow(e);
statement.close();
throw e;
}
},
mapOutputFn)
Expand All @@ -98,15 +96,15 @@ public static <T> ProcessorMetaSupplier supplier(

@Override
protected void init(@Nonnull Context context) {
connection = connectionSupplier.get();
this.connection = connectionSupplier.get();
this.parallelism = context.totalParallelism();
this.index = context.globalProcessorIndex();
}

@Override
public boolean complete() {
if (traverser == null) {
resultSet = resultSetFn.createResultSet(connection, parallelism, index);
resultSet = uncheckCall(() -> resultSetFn.createResultSet(connection, parallelism, index));
traverser = ((Traverser<ResultSet>) () -> uncheckCall(() -> resultSet.next() ? resultSet : null))
.map(mapOutputFn);
}
Expand Down
Expand Up @@ -39,7 +39,7 @@
import static java.util.concurrent.TimeUnit.SECONDS;

/**
* Private API, use {@link SinkProcessors#writeJdbcP}.
* Use {@link SinkProcessors#writeJdbcP}.
*/
public final class WriteJdbcP<T> implements Processor {

Expand Down Expand Up @@ -70,7 +70,7 @@ private WriteJdbcP(
}

/**
* Private API, use {@link SinkProcessors#writeJdbcP}.
* Use {@link SinkProcessors#writeJdbcP}.
*/
public static <T> ProcessorMetaSupplier metaSupplier(
@Nonnull String updateQuery,
Expand Down
Expand Up @@ -19,12 +19,13 @@
import java.io.Serializable;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;

/**
* Represents a function that accepts a JDBC connection to the database,
* a total parallelism and a processor index as arguments and
* produces a result set. This result set should return a part of the whole
* result set specific to this processor.
* the total parallelism and processor index as arguments and produces a
* result set. This result set should return a part of the whole result
* set specific to this processor.
*/
@FunctionalInterface
public interface ResultSetForPartitionFunction extends Serializable {
Expand All @@ -37,5 +38,5 @@ public interface ResultSetForPartitionFunction extends Serializable {
* @param parallelism the total parallelism for the processor
* @param index the global processor index
*/
ResultSet createResultSet(Connection connection, int parallelism, int index);
ResultSet createResultSet(Connection connection, int parallelism, int index) throws SQLException;
}
Expand Up @@ -888,18 +888,10 @@ public static JmsSourceBuilder jmsTopicBuilder(DistributedSupplier<ConnectionFac
* parallelism * member count) and global processor index as arguments and
* produces a result set. The parallelism and processor index arguments
* should be used to fetch a part of the whole result set specific to the
* processor. For example:
* <pre> {@code
* (connection, parallelism, index) ->
* PreparedStatement stmt = connection.prepareStatement("select * from TABLE where mod(id,%d)=%d)
* stmt.setInt(1, parallelism);
* stmt.setInt(2, index);
* return stmt.executeQuery();
* }</pre>
* If the table itself isn't partitioned by the same key, then running
* multiple queries might not really be faster than using the {@linkplain
* #jdbc(String, String, DistributedFunction) simpler version} of this
* method, do your own testing.
* processor. If the table itself isn't partitioned by the same key, then
* running multiple queries might not really be faster than using the
* {@linkplain #jdbc(String, String, DistributedFunction) simpler
* version} of this method, do your own testing.
* <p>
* {@code createOutputFn} gets the {@link ResultSet} and creates desired
* output object. The function is called for each row of the result set,
Expand All @@ -917,7 +909,7 @@ public static JmsSourceBuilder jmsTopicBuilder(DistributedSupplier<ConnectionFac
* },
* (con, parallelism, index) -> {
* try {

This comment has been minimized.

Copy link
@viliam-durina

viliam-durina Jul 2, 2018

Contributor

This try-catch should be removed

* return con.prepareStatement("select * from TABLE where mod(id, ?) = ?);
* return con.prepareStatement("SELECT * FROM TABLE WHERE MOD(id, ?) = ?);
* stmt.setInt(1, parallelism);
* stmt.setInt(2, index);
* return stmt.executeQuery();
Expand Down
Expand Up @@ -16,7 +16,6 @@

package com.hazelcast.jet.impl.connector;

import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.pipeline.PipelineTestSupport;
import com.hazelcast.jet.pipeline.Sources;
import org.h2.tools.DeleteDbFiles;
Expand Down Expand Up @@ -47,18 +46,14 @@ public static void setupClass() throws SQLException {
}

@Test
public void testPartitionedQuery() {
public void test_whenPartitionedQuery() {
p.drawFrom(Sources.jdbc(
() -> uncheckCall(() -> DriverManager.getConnection(DB_CONNECTION_URL)),
(con, parallelism, index) -> {
try {
PreparedStatement statement = con.prepareStatement("select * from PERSON where mod(id,?)=?");
statement.setInt(1, parallelism);
statement.setInt(2, index);
return statement.executeQuery();
} catch (SQLException e) {
throw ExceptionUtil.rethrow(e);
}
PreparedStatement statement = con.prepareStatement("select * from PERSON where mod(id,?)=?");
statement.setInt(1, parallelism);
statement.setInt(2, index);
return statement.executeQuery();
},
resultSet -> uncheckCall(() -> new Person(resultSet.getInt(1), resultSet.getString(2)))))
.drainTo(sink);
Expand All @@ -69,7 +64,7 @@ public void testPartitionedQuery() {
}

@Test
public void testTotalParallelismOne() {
public void test_whenTotalParallelismOne() {
p.drawFrom(Sources.jdbc(DB_CONNECTION_URL, "select * from PERSON",
resultSet -> uncheckCall(() -> new Person(resultSet.getInt(1), resultSet.getString(2)))))
.drainTo(sink);
Expand Down

0 comments on commit f8fdd4b

Please sign in to comment.