From 40c56bc6a8ac20f5327768bc6c7f7ce64b092b78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Gaw=C4=99da?= Date: Tue, 7 Jun 2022 16:24:32 +0200 Subject: [PATCH] Test simplification, cleanup --- .../jet/cdc/impl/ChangeRecordImpl.java | 36 +++------ .../jet/cdc/impl/RecordPartImpl.java | 4 +- .../com/hazelcast/jet/cdc/CdcSinksTest.java | 79 ++++++++++--------- 3 files changed, 56 insertions(+), 63 deletions(-) diff --git a/extensions/cdc-debezium/src/main/java/com/hazelcast/jet/cdc/impl/ChangeRecordImpl.java b/extensions/cdc-debezium/src/main/java/com/hazelcast/jet/cdc/impl/ChangeRecordImpl.java index 4d831264ece7f..dc082c52ce951 100644 --- a/extensions/cdc-debezium/src/main/java/com/hazelcast/jet/cdc/impl/ChangeRecordImpl.java +++ b/extensions/cdc-debezium/src/main/java/com/hazelcast/jet/cdc/impl/ChangeRecordImpl.java @@ -23,21 +23,23 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.io.Serializable; import java.util.Map; import static java.util.Objects.requireNonNull; -public class ChangeRecordImpl implements ChangeRecord { +public class ChangeRecordImpl implements ChangeRecord, Serializable { + private static final long serialVersionUID = 1L; private final long sequenceSource; private final long sequenceValue; private final String keyJson; - private Long timestamp; + private final Long timestamp; private final Operation operation; - private String database; - private String schema; - private String table; + private final String database; + private final String schema; + private final String table; private RecordPart key; private final RecordPart oldValue; private final RecordPart newValue; @@ -79,37 +81,19 @@ public Operation operation() throws ParsingException { @Nonnull @Override - public String database() throws ParsingException { - if (database == null) { - database = get(value().toMap(), "__db", String.class); - if (database == null) { - throw new ParsingException("No parsable database name field found"); - } - } + public String database() { return database; } @Nonnull @Override - public String schema() throws ParsingException { - if (schema == null) { - schema = get(value().toMap(), "__schema", String.class); - if (schema == null) { - throw new ParsingException("No parsable schema name field found"); - } - } + public String schema() { return schema; } @Nonnull @Override - public String table() throws ParsingException { - if (table == null) { - table = get(value().toMap(), "__table", String.class); - if (table == null) { - throw new ParsingException("No parsable table name field found"); - } - } + public String table() { return table; } diff --git a/extensions/cdc-debezium/src/main/java/com/hazelcast/jet/cdc/impl/RecordPartImpl.java b/extensions/cdc-debezium/src/main/java/com/hazelcast/jet/cdc/impl/RecordPartImpl.java index 9399f2d4918cf..44da0d5971d21 100644 --- a/extensions/cdc-debezium/src/main/java/com/hazelcast/jet/cdc/impl/RecordPartImpl.java +++ b/extensions/cdc-debezium/src/main/java/com/hazelcast/jet/cdc/impl/RecordPartImpl.java @@ -22,10 +22,12 @@ import javax.annotation.Nonnull; import java.io.IOException; +import java.io.Serializable; import java.util.Map; import java.util.Objects; -class RecordPartImpl implements RecordPart { +class RecordPartImpl implements RecordPart, Serializable { + private static final long serialVersionUID = 1L; private final String json; diff --git a/extensions/cdc-debezium/src/test/java/com/hazelcast/jet/cdc/CdcSinksTest.java b/extensions/cdc-debezium/src/test/java/com/hazelcast/jet/cdc/CdcSinksTest.java index 91c0b42a3713b..76672ec879a33 100644 --- a/extensions/cdc-debezium/src/test/java/com/hazelcast/jet/cdc/CdcSinksTest.java +++ b/extensions/cdc-debezium/src/test/java/com/hazelcast/jet/cdc/CdcSinksTest.java @@ -32,18 +32,18 @@ import org.junit.experimental.categories.Category; import javax.annotation.Nonnull; -import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Objects; import java.util.concurrent.Callable; import java.util.stream.Collectors; -import static com.hazelcast.jet.cdc.Operation.*; +import static com.hazelcast.jet.cdc.Operation.DELETE; +import static com.hazelcast.jet.cdc.Operation.INSERT; +import static com.hazelcast.jet.cdc.Operation.SYNC; +import static com.hazelcast.jet.cdc.Operation.UPDATE; +import static java.util.Arrays.asList; @Category(QuickTest.class) public class CdcSinksTest extends PipelineTestSupport { @@ -84,7 +84,7 @@ public void after() { @Test public void insertIntoLocalMap() { - p.readFrom(items(() -> Arrays.asList(SYNC1, INSERT2).iterator())) + p.readFrom(items(SYNC1, INSERT2)) .writeTo(localSync()); execute().join(); @@ -100,7 +100,7 @@ public void insertIntoRemoteMap() { ClientConfig clientConfig = getClientConfigForRemoteCluster(remoteInstance); - p.readFrom(items(() -> Arrays.asList(SYNC1, INSERT2).iterator())) + p.readFrom(items(SYNC1, INSERT2)) .writeTo(remoteSync(clientConfig)); execute().join(); @@ -111,7 +111,7 @@ public void insertIntoRemoteMap() { @Test public void updateLocalMap() { - p.readFrom(items(() -> Arrays.asList(SYNC1, INSERT2, UPDATE1).iterator())) + p.readFrom(items(SYNC1, INSERT2, UPDATE1)) .writeTo(localSync()); execute().join(); @@ -127,7 +127,7 @@ public void updateRemoteMap() { ClientConfig clientConfig = getClientConfigForRemoteCluster(remoteInstance); - p.readFrom(items(() -> Arrays.asList(SYNC1, INSERT2, UPDATE1).iterator())) + p.readFrom(items(SYNC1, INSERT2, UPDATE1)) .writeTo(remoteSync(clientConfig)); execute().join(); @@ -138,7 +138,7 @@ public void updateRemoteMap() { @Test public void deleteFromLocalMap() { - p.readFrom(items(() -> Arrays.asList(SYNC1, INSERT2, DELETE2).iterator())) + p.readFrom(items(SYNC1, INSERT2, DELETE2)) .writeTo(localSync()); execute().join(); @@ -154,7 +154,7 @@ public void deleteFromRemoteMap() { ClientConfig clientConfig = getClientConfigForRemoteCluster(remoteInstance); - p.readFrom(items(() -> Arrays.asList(SYNC1, INSERT2, DELETE2).iterator())) + p.readFrom(items(SYNC1, INSERT2, DELETE2)) .writeTo(remoteSync(clientConfig)); execute().join(); @@ -165,12 +165,12 @@ public void deleteFromRemoteMap() { @Test public void deleteFromLocalMap_ViaValueProjection() { - p.readFrom(items(() -> Arrays.asList(SYNC1, INSERT2).iterator())) + p.readFrom(items(SYNC1, INSERT2)) .writeTo(localSync()); execute().join(); p = Pipeline.create(); - p.readFrom(items(() -> Collections.singletonList(UPDATE1).iterator())) + p.readFrom(items(UPDATE1)) .writeTo(CdcSinks.map(MAP, r -> (Integer) r.key().toMap().get(ID), r -> null @@ -189,12 +189,12 @@ public void deleteFromRemoteMap_ViaValueProjection() { ClientConfig clientConfig = getClientConfigForRemoteCluster(remoteInstance); - p.readFrom(items(() -> Arrays.asList(SYNC1, INSERT2).iterator())) + p.readFrom(items(SYNC1, INSERT2)) .writeTo(remoteSync(clientConfig)); execute().join(); p = Pipeline.create(); - p.readFrom(items(() -> Collections.singletonList(UPDATE1).iterator())) + p.readFrom(items(UPDATE1)) .writeTo(CdcSinks.remoteMap(MAP, clientConfig, r -> (Integer) r.key().toMap().get(ID), r -> null @@ -208,7 +208,7 @@ public void deleteFromRemoteMap_ViaValueProjection() { @Test public void reordering() { - SupplierEx> supplier = () -> Arrays.asList( + ChangeRecord[] records = new ChangeRecord[] { SYNC1, UPDATE1, changeRecord(10, UPDATE, UPDATE1.key().toJson(), null, @@ -219,9 +219,9 @@ public void reordering() { UPDATE1.value().toJson().replace("sthomas@acme.com", "sthomas4@acme.com")), changeRecord(13, UPDATE, UPDATE1.key().toJson(), null, UPDATE1.value().toJson().replace("sthomas@acme.com", "sthomas5@acme.com")) - ).iterator(); - Util.checkSerializable(supplier, "kaka"); - p.readFrom(items(supplier)) + }; + Util.checkSerializable(records, "kaka"); + p.readFrom(items(records)) .rebalance() .map(r -> r) .writeTo(localSync()); @@ -234,7 +234,7 @@ public void reordering() { @Test public void reordering_syncUpdate() { - p.readFrom(items(() -> Arrays.asList(UPDATE1, SYNC1).iterator())) + p.readFrom(items(UPDATE1, SYNC1)) .writeTo(localSync()); execute().join(); @@ -245,7 +245,7 @@ public void reordering_syncUpdate() { @Test public void reordering_insertDelete() { - p.readFrom(items(() -> Arrays.asList(DELETE2, INSERT2).iterator())) + p.readFrom(items(DELETE2, INSERT2)) .writeTo(localSync()); execute().join(); @@ -256,7 +256,7 @@ public void reordering_insertDelete() { @Test public void reordering_differentIds() { - p.readFrom(items(() -> Arrays.asList(DELETE2, UPDATE1, INSERT2, SYNC1).iterator())) + p.readFrom(items(DELETE2, UPDATE1, INSERT2, SYNC1)) .writeTo(localSync()); execute().join(); @@ -267,7 +267,7 @@ public void reordering_differentIds() { @Test public void deleteWithoutInsertNorUpdate() { - p.readFrom(items(() -> Arrays.asList(SYNC1, DELETE2).iterator())) + p.readFrom(items(SYNC1, DELETE2)) .writeTo(localSync()); execute().join(); @@ -278,12 +278,13 @@ public void deleteWithoutInsertNorUpdate() { @Test public void sourceSwitch() { - p.readFrom(items(() -> Arrays.asList( - UPDATE1, INSERT2, - changeRecord( 0, UPDATE, UPDATE1.key().toJson(), null, - UPDATE1.value().toJson().replace("sthomas@acme.com", "sthomas2@acme.com"))) - .iterator())) - .writeTo(localSync()); + String updatedKey = UPDATE1.key().toJson(); + String updatedJson = UPDATE1.value().toJson().replace("sthomas@acme.com", "sthomas2@acme.com"); + ChangeRecord changedRecord = changeRecord(4, UPDATE, updatedKey, null, updatedJson); + + p.readFrom(items(UPDATE1, INSERT2, changedRecord)) + .writeTo(localSync()) + .setLocalParallelism(1); execute().join(); @@ -323,19 +324,25 @@ private Sink remoteSync(ClientConfig clientConfig) { ); } - private static BatchSource items(@Nonnull SupplierEx> supplier) { - Objects.requireNonNull(supplier, "supplier"); + @SafeVarargs + private static BatchSource items(@Nonnull T... items) { + SupplierEx> listSupplier = () -> asList(items); return SourceBuilder.batch("items", ctx -> null) .fillBufferFn((ignored, buf) -> { - Iterator iterator = supplier.get(); - while (iterator.hasNext()) { - buf.add(iterator.next()); + List list = listSupplier.get(); + for (T item : list) { + buf.add(item); } buf.close(); - }).build(); + }) + .distributed(1) + .build(); } - private static ChangeRecord changeRecord(int sequenceValue, Operation operation, String keyJson, String oldValueJson, String newValueJson) { + private static ChangeRecord changeRecord( + int sequenceValue, Operation operation, + String keyJson, String oldValueJson, String newValueJson + ) { return new ChangeRecordImpl(0, 0, sequenceValue, operation, keyJson, oldValueJson, newValueJson, "t", "s", "d"); }