Skip to content

Commit

Permalink
refactor test
Browse files Browse the repository at this point in the history
  • Loading branch information
Ali Gurbuz committed Jun 29, 2018
1 parent 842c7ad commit e91c4ac
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void process(int ordinal, @Nonnull Inbox inbox) {
inbox.drainTo(itemList);
while (!itemList.isEmpty()) {
if (!reconnectIfNecessary()) {
return;
continue;
}
try {
for (T item : itemList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package com.hazelcast.jet.impl.connector;

import com.hazelcast.jet.function.DistributedBiConsumer;
import com.hazelcast.jet.function.DistributedSupplier;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.pipeline.PipelineTestSupport;
import com.hazelcast.jet.pipeline.Sinks;
import org.h2.tools.DeleteDbFiles;
Expand All @@ -28,6 +31,7 @@
import java.io.Serializable;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLNonTransientException;
Expand Down Expand Up @@ -85,18 +89,8 @@ public void testReconnect() throws SQLException {
addToSrcList(sequence(PERSON_COUNT));
p.drawFrom(source)
.map(item -> new Person((Integer) item, item.toString()))
.drainTo(Sinks.jdbc("INSERT INTO " + tableName + "(id, name) VALUES(?, ?)", DB_CONNECTION_URL,
(stmt, item) -> {
try {
if (stmt.hashCode() % 2 == 0) {
throw new SQLException();
}
stmt.setInt(1, item.id);
stmt.setString(2, item.name);
} catch (SQLException e) {
throw rethrow(e);
}
}
.drainTo(Sinks.jdbc("INSERT INTO " + tableName + "(id, name) VALUES(?, ?)",
failOnceConnectionSupplier(), failOnceBindFn()
));

execute();
Expand Down Expand Up @@ -132,6 +126,43 @@ private int rowCount() throws SQLException {
}
}

private static DistributedSupplier<Connection> failOnceConnectionSupplier() {
return new DistributedSupplier<Connection>() {
boolean exceptionThrown;
@Override
public Connection get() {
try {
if (exceptionThrown) {
exceptionThrown = true;
throw new SQLException();
}
return DriverManager.getConnection(DB_CONNECTION_URL);
} catch (SQLException e) {
throw ExceptionUtil.rethrow(e);
}
}
};
}

private static DistributedBiConsumer<PreparedStatement, Person> failOnceBindFn() {
return new DistributedBiConsumer<PreparedStatement, Person>() {
boolean exceptionThrown;
@Override
public void accept(PreparedStatement stmt, Person item) {
try {
if (exceptionThrown) {
exceptionThrown = true;
throw new SQLException();
}
stmt.setInt(1, item.id);
stmt.setString(2, item.name);
} catch (SQLException e) {
throw rethrow(e);
}
}
};
}

private static final class Person implements Serializable {

private final int id;
Expand Down

0 comments on commit e91c4ac

Please sign in to comment.