Skip to content

Commit

Permalink
Test simplification, cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
TomaszGaweda committed Jun 7, 2022
1 parent 46e804a commit 40c56bc
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 63 deletions.
Expand Up @@ -23,21 +23,23 @@

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.Map;

import static java.util.Objects.requireNonNull;

public class ChangeRecordImpl implements ChangeRecord {
public class ChangeRecordImpl implements ChangeRecord, Serializable {
private static final long serialVersionUID = 1L;

private final long sequenceSource;
private final long sequenceValue;
private final String keyJson;

private Long timestamp;
private final Long timestamp;
private final Operation operation;
private String database;
private String schema;
private String table;
private final String database;
private final String schema;
private final String table;
private RecordPart key;
private final RecordPart oldValue;
private final RecordPart newValue;
Expand Down Expand Up @@ -79,37 +81,19 @@ public Operation operation() throws ParsingException {

@Nonnull
@Override
public String database() throws ParsingException {
if (database == null) {
database = get(value().toMap(), "__db", String.class);
if (database == null) {
throw new ParsingException("No parsable database name field found");
}
}
public String database() {
return database;
}

@Nonnull
@Override
public String schema() throws ParsingException {
if (schema == null) {
schema = get(value().toMap(), "__schema", String.class);
if (schema == null) {
throw new ParsingException("No parsable schema name field found");
}
}
public String schema() {
return schema;
}

@Nonnull
@Override
public String table() throws ParsingException {
if (table == null) {
table = get(value().toMap(), "__table", String.class);
if (table == null) {
throw new ParsingException("No parsable table name field found");
}
}
public String table() {
return table;
}

Expand Down
Expand Up @@ -22,10 +22,12 @@

import javax.annotation.Nonnull;
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
import java.util.Objects;

class RecordPartImpl implements RecordPart {
class RecordPartImpl implements RecordPart, Serializable {
private static final long serialVersionUID = 1L;

private final String json;

Expand Down
Expand Up @@ -32,18 +32,18 @@
import org.junit.experimental.categories.Category;

import javax.annotation.Nonnull;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;

import static com.hazelcast.jet.cdc.Operation.*;
import static com.hazelcast.jet.cdc.Operation.DELETE;
import static com.hazelcast.jet.cdc.Operation.INSERT;
import static com.hazelcast.jet.cdc.Operation.SYNC;
import static com.hazelcast.jet.cdc.Operation.UPDATE;
import static java.util.Arrays.asList;

@Category(QuickTest.class)
public class CdcSinksTest extends PipelineTestSupport {
Expand Down Expand Up @@ -84,7 +84,7 @@ public void after() {

@Test
public void insertIntoLocalMap() {
p.readFrom(items(() -> Arrays.asList(SYNC1, INSERT2).iterator()))
p.readFrom(items(SYNC1, INSERT2))
.writeTo(localSync());
execute().join();

Expand All @@ -100,7 +100,7 @@ public void insertIntoRemoteMap() {

ClientConfig clientConfig = getClientConfigForRemoteCluster(remoteInstance);

p.readFrom(items(() -> Arrays.asList(SYNC1, INSERT2).iterator()))
p.readFrom(items(SYNC1, INSERT2))
.writeTo(remoteSync(clientConfig));
execute().join();

Expand All @@ -111,7 +111,7 @@ public void insertIntoRemoteMap() {

@Test
public void updateLocalMap() {
p.readFrom(items(() -> Arrays.asList(SYNC1, INSERT2, UPDATE1).iterator()))
p.readFrom(items(SYNC1, INSERT2, UPDATE1))
.writeTo(localSync());
execute().join();

Expand All @@ -127,7 +127,7 @@ public void updateRemoteMap() {

ClientConfig clientConfig = getClientConfigForRemoteCluster(remoteInstance);

p.readFrom(items(() -> Arrays.asList(SYNC1, INSERT2, UPDATE1).iterator()))
p.readFrom(items(SYNC1, INSERT2, UPDATE1))
.writeTo(remoteSync(clientConfig));
execute().join();

Expand All @@ -138,7 +138,7 @@ public void updateRemoteMap() {

@Test
public void deleteFromLocalMap() {
p.readFrom(items(() -> Arrays.asList(SYNC1, INSERT2, DELETE2).iterator()))
p.readFrom(items(SYNC1, INSERT2, DELETE2))
.writeTo(localSync());
execute().join();

Expand All @@ -154,7 +154,7 @@ public void deleteFromRemoteMap() {

ClientConfig clientConfig = getClientConfigForRemoteCluster(remoteInstance);

p.readFrom(items(() -> Arrays.asList(SYNC1, INSERT2, DELETE2).iterator()))
p.readFrom(items(SYNC1, INSERT2, DELETE2))
.writeTo(remoteSync(clientConfig));
execute().join();

Expand All @@ -165,12 +165,12 @@ public void deleteFromRemoteMap() {

@Test
public void deleteFromLocalMap_ViaValueProjection() {
p.readFrom(items(() -> Arrays.asList(SYNC1, INSERT2).iterator()))
p.readFrom(items(SYNC1, INSERT2))
.writeTo(localSync());
execute().join();

p = Pipeline.create();
p.readFrom(items(() -> Collections.singletonList(UPDATE1).iterator()))
p.readFrom(items(UPDATE1))
.writeTo(CdcSinks.map(MAP,
r -> (Integer) r.key().toMap().get(ID),
r -> null
Expand All @@ -189,12 +189,12 @@ public void deleteFromRemoteMap_ViaValueProjection() {

ClientConfig clientConfig = getClientConfigForRemoteCluster(remoteInstance);

p.readFrom(items(() -> Arrays.asList(SYNC1, INSERT2).iterator()))
p.readFrom(items(SYNC1, INSERT2))
.writeTo(remoteSync(clientConfig));
execute().join();

p = Pipeline.create();
p.readFrom(items(() -> Collections.singletonList(UPDATE1).iterator()))
p.readFrom(items(UPDATE1))
.writeTo(CdcSinks.remoteMap(MAP, clientConfig,
r -> (Integer) r.key().toMap().get(ID),
r -> null
Expand All @@ -208,7 +208,7 @@ public void deleteFromRemoteMap_ViaValueProjection() {

@Test
public void reordering() {
SupplierEx<Iterator<? extends ChangeRecord>> supplier = () -> Arrays.asList(
ChangeRecord[] records = new ChangeRecord[] {
SYNC1,
UPDATE1,
changeRecord(10, UPDATE, UPDATE1.key().toJson(), null,
Expand All @@ -219,9 +219,9 @@ public void reordering() {
UPDATE1.value().toJson().replace("sthomas@acme.com", "sthomas4@acme.com")),
changeRecord(13, UPDATE, UPDATE1.key().toJson(), null,
UPDATE1.value().toJson().replace("sthomas@acme.com", "sthomas5@acme.com"))
).iterator();
Util.checkSerializable(supplier, "kaka");
p.readFrom(items(supplier))
};
Util.checkSerializable(records, "kaka");
p.readFrom(items(records))
.rebalance()
.map(r -> r)
.writeTo(localSync());
Expand All @@ -234,7 +234,7 @@ public void reordering() {

@Test
public void reordering_syncUpdate() {
p.readFrom(items(() -> Arrays.asList(UPDATE1, SYNC1).iterator()))
p.readFrom(items(UPDATE1, SYNC1))
.writeTo(localSync());
execute().join();

Expand All @@ -245,7 +245,7 @@ public void reordering_syncUpdate() {

@Test
public void reordering_insertDelete() {
p.readFrom(items(() -> Arrays.asList(DELETE2, INSERT2).iterator()))
p.readFrom(items(DELETE2, INSERT2))
.writeTo(localSync());
execute().join();

Expand All @@ -256,7 +256,7 @@ public void reordering_insertDelete() {

@Test
public void reordering_differentIds() {
p.readFrom(items(() -> Arrays.asList(DELETE2, UPDATE1, INSERT2, SYNC1).iterator()))
p.readFrom(items(DELETE2, UPDATE1, INSERT2, SYNC1))
.writeTo(localSync());
execute().join();

Expand All @@ -267,7 +267,7 @@ public void reordering_differentIds() {

@Test
public void deleteWithoutInsertNorUpdate() {
p.readFrom(items(() -> Arrays.asList(SYNC1, DELETE2).iterator()))
p.readFrom(items(SYNC1, DELETE2))
.writeTo(localSync());
execute().join();

Expand All @@ -278,12 +278,13 @@ public void deleteWithoutInsertNorUpdate() {

@Test
public void sourceSwitch() {
p.readFrom(items(() -> Arrays.asList(
UPDATE1, INSERT2,
changeRecord( 0, UPDATE, UPDATE1.key().toJson(), null,
UPDATE1.value().toJson().replace("sthomas@acme.com", "sthomas2@acme.com")))
.iterator()))
.writeTo(localSync());
String updatedKey = UPDATE1.key().toJson();
String updatedJson = UPDATE1.value().toJson().replace("sthomas@acme.com", "sthomas2@acme.com");
ChangeRecord changedRecord = changeRecord(4, UPDATE, updatedKey, null, updatedJson);

p.readFrom(items(UPDATE1, INSERT2, changedRecord))
.writeTo(localSync())
.setLocalParallelism(1);

execute().join();

Expand Down Expand Up @@ -323,19 +324,25 @@ private Sink<ChangeRecord> remoteSync(ClientConfig clientConfig) {
);
}

private static <T> BatchSource<T> items(@Nonnull SupplierEx<Iterator<? extends T>> supplier) {
Objects.requireNonNull(supplier, "supplier");
@SafeVarargs
private static <T> BatchSource<T> items(@Nonnull T... items) {
SupplierEx<List<T>> listSupplier = () -> asList(items);
return SourceBuilder.batch("items", ctx -> null)
.<T>fillBufferFn((ignored, buf) -> {
Iterator<? extends T> iterator = supplier.get();
while (iterator.hasNext()) {
buf.add(iterator.next());
List<T> list = listSupplier.get();
for (T item : list) {
buf.add(item);
}
buf.close();
}).build();
})
.distributed(1)
.build();
}

private static ChangeRecord changeRecord(int sequenceValue, Operation operation, String keyJson, String oldValueJson, String newValueJson) {
private static ChangeRecord changeRecord(
int sequenceValue, Operation operation,
String keyJson, String oldValueJson, String newValueJson
) {
return new ChangeRecordImpl(0, 0, sequenceValue, operation,
keyJson, oldValueJson, newValueJson, "t", "s", "d");
}
Expand Down

0 comments on commit 40c56bc

Please sign in to comment.