Skip to content

Commit

Permalink
Upgrade Debezium to v.1.5.4 (last buildable with Java 8)
Browse files Browse the repository at this point in the history
  • Loading branch information
dlg99 authored and eolivelli committed Jul 14, 2021
1 parent 246174a commit f3e0c92
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 99 deletions.
7 changes: 4 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ flexible messaging model and an intuitive client API.</description>
<hbc-core.version>2.2.0</hbc-core.version>
<cassandra-driver-core.version>3.6.0</cassandra-driver-core.version>
<aerospike-client.version>4.4.8</aerospike-client.version>
<kafka-client.version>2.3.0</kafka-client.version>
<kafka-client.version>2.7.0</kafka-client.version>
<rabbitmq-client.version>5.1.1</rabbitmq-client.version>
<aws-sdk.version>1.11.774</aws-sdk.version>
<avro.version>1.10.2</avro.version>
Expand All @@ -154,9 +154,10 @@ flexible messaging model and an intuitive client API.</description>
<hdfs-offload-version3>3.3.0</hdfs-offload-version3>
<elasticsearch.version>7.9.1</elasticsearch.version>
<presto.version>334</presto.version>
<scala.binary.version>2.11</scala.binary.version>
<scala-library.version>2.11.12</scala-library.version>
<scala.binary.version>2.13</scala.binary.version>
<scala-library.version>2.13.6</scala-library.version>
<debezium.version>1.0.0.Final</debezium.version>
<debezium.version>1.5.4.Final</debezium.version>
<jsonwebtoken.version>0.11.1</jsonwebtoken.version>
<opencensus.version>0.18.0</opencensus.version>
<hbase.version>2.3.0</hbase.version>
Expand Down
7 changes: 7 additions & 0 deletions pulsar-io/debezium/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@
<type>test-jar</type>
</dependency>

<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${debezium.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,11 @@ public boolean exists() {
}
}

@Override
public boolean storageExists() {
return true;
}

@Override
public String toString() {
if (topicName != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import static org.testng.Assert.assertTrue;

import io.debezium.config.Configuration;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParserSql2003;
import io.debezium.relational.ddl.LegacyDdlParser;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryListener;
import io.debezium.text.ParsingException;
Expand Down Expand Up @@ -86,8 +86,8 @@ private void testHistoryTopicContent(boolean skipUnparseableDDL) {
// Calling it another time to ensure we can work with the DB history topic already existing
history.initializeStorage();

LegacyDdlParser recoveryParser = new DdlParserSql2003();
LegacyDdlParser ddlParser = new DdlParserSql2003();
DdlParser recoveryParser = new MySqlAntlrDdlParser();
DdlParser ddlParser = new MySqlAntlrDdlParser();
ddlParser.setCurrentSchema("db1"); // recover does this, so we need to as well
Tables tables1 = new Tables();
Tables tables2 = new Tables();
Expand All @@ -102,9 +102,9 @@ private void testHistoryTopicContent(boolean skipUnparseableDDL) {

// Now record schema changes, which writes out to kafka but doesn't actually change the Tables ...
setLogPosition(10);
ddl = "CREATE TABLE foo ( name VARCHAR(255) NOT NULL PRIMARY KEY); \n" +
ddl = "CREATE TABLE foo ( first VARCHAR(22) NOT NULL ); \n" +
"CREATE TABLE customers ( id INTEGER NOT NULL PRIMARY KEY, name VARCHAR(100) NOT NULL ); \n" +
"CREATE TABLE products ( productId INTEGER NOT NULL PRIMARY KEY, desc VARCHAR(255) NOT NULL); \n";
"CREATE TABLE products ( productId INTEGER NOT NULL PRIMARY KEY, description VARCHAR(255) NOT NULL ); \n";
history.record(source, position, "db1", ddl);

// Parse the DDL statement 3x and each time update a different Tables object ...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,30 +97,21 @@ private Long currentOffset(TopicPartition topicPartition) {
List<ByteBuffer> req = Lists.newLinkedList();
ByteBuffer key = topicPartitionAsKey(topicPartition);
req.add(key);
CompletableFuture<Long> offsetFuture = new CompletableFuture<>();
offsetStore.get(req, (Throwable ex, Map<ByteBuffer, ByteBuffer> result) -> {
if (ex == null) {
if (result != null && result.size() != 0) {
Optional<ByteBuffer> val = result.entrySet().stream()
.filter(entry -> entry.getKey().equals(key))
.findFirst().map(entry -> entry.getValue());
if (val.isPresent()) {
long received = val.get().getLong();
if (log.isDebugEnabled()) {
log.debug("read initial offset for {} == {}", topicPartition, received);
}
offsetFuture.complete(received);
return;
try {
Map<ByteBuffer, ByteBuffer> result = offsetStore.get(req).get();
if (result != null && result.size() != 0) {
Optional<ByteBuffer> val = result.entrySet().stream()
.filter(entry -> entry.getKey().equals(key))
.findFirst().map(entry -> entry.getValue());
if (val.isPresent()) {
long received = val.get().getLong();
if (log.isDebugEnabled()) {
log.debug("read initial offset for {} == {}", topicPartition, received);
}
return received;
}
offsetFuture.complete(-1L);
} else {
offsetFuture.completeExceptionally(ex);
}
});

try {
return offsetFuture.get();
return -1L;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("error getting initial state of {}", topicPartition, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,7 @@ public void stop() {
}

@Override
public Future<Map<ByteBuffer, ByteBuffer>> get(Collection<ByteBuffer> keys,
Callback<Map<ByteBuffer, ByteBuffer>> callback) {
public Future<Map<ByteBuffer, ByteBuffer>> get(Collection<ByteBuffer> keys) {
CompletableFuture<Void> endFuture = new CompletableFuture<>();
readToEnd(endFuture);
return endFuture.thenApply(ignored -> {
Expand All @@ -207,14 +206,7 @@ public Future<Map<ByteBuffer, ByteBuffer>> get(Collection<ByteBuffer> keys,
values.put(key, value);
}
}
if (null != callback) {
callback.onCompletion(null, values);
}
return values;
}).whenComplete((ignored, cause) -> {
if (null != cause && null != callback) {
callback.onCompletion(cause, null);
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.util.Callback;
Expand Down Expand Up @@ -80,27 +81,10 @@ protected void cleanup() throws Exception {
@Test
public void testGetFromEmpty() throws Exception {
assertTrue(offsetBackingStore.get(
Arrays.asList(ByteBuffer.wrap("empty-key".getBytes(UTF_8))),
null
Arrays.asList(ByteBuffer.wrap("empty-key".getBytes(UTF_8)))
).get().isEmpty());
}

@Test
public void testGetFromEmptyCallback() throws Exception {
CompletableFuture<Map<ByteBuffer, ByteBuffer>> callbackFuture = new CompletableFuture<>();
assertTrue(offsetBackingStore.get(
Arrays.asList(ByteBuffer.wrap("empty-key".getBytes(UTF_8))),
(error, result) -> {
if (null != error) {
callbackFuture.completeExceptionally(error);
} else {
callbackFuture.complete(result);
}
}
).get().isEmpty());
assertTrue(callbackFuture.get().isEmpty());
}

@Test
public void testGetSet() throws Exception {
testGetSet(false);
Expand Down Expand Up @@ -138,7 +122,7 @@ private void testGetSet(boolean testCallback) throws Exception {
}

Map<ByteBuffer, ByteBuffer> result =
offsetBackingStore.get(keys, null).get();
offsetBackingStore.get(keys).get();
assertEquals(numKeys, result.size());
AtomicInteger count = new AtomicInteger();
new TreeMap<>(result).forEach((key, value) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public abstract class SourceTester<ServiceContainerT extends GenericContainer> {
add("source");
add("op");
add("ts_ms");
add("transaction");
}};

protected SourceTester(String sourceType) {
Expand Down Expand Up @@ -145,7 +146,8 @@ public void validateSourceResultAvro(Consumer<KeyValue<GenericRecord, GenericRec
Assert.assertNotNull(valueRecord.getFields());
Assert.assertTrue(valueRecord.getFields().size() > 0);
for (Field field : valueRecord.getFields()) {
Assert.assertTrue(DEBEZIUM_FIELD_SET.contains(field.getName()));
log.info("field {} expected {}", field, DEBEZIUM_FIELD_SET);
Assert.assertTrue(DEBEZIUM_FIELD_SET.contains(field.getName()), "unexpected field "+field.getName()+" only "+DEBEZIUM_FIELD_SET);
}

if (eventType != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,48 +72,55 @@ public <T extends GenericContainer> void testSource(SourceTester<T> sourceTester
// get source info
getSourceInfoSuccess(sourceTester, tenant, namespace, sourceName);

// get source status
Failsafe.with(statusRetryPolicy).run(() -> getSourceStatus(tenant, namespace, sourceName));

// wait for source to process messages
Failsafe.with(statusRetryPolicy).run(() ->
waitForProcessingSourceMessages(tenant, namespace, sourceName, numMessages));

@Cleanup
Consumer consumer = client.newConsumer(getSchema(jsonWithEnvelope))
.topic(consumeTopicName)
.subscriptionName("debezium-source-tester")
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
log.info("[debezium mysql test] create consumer finish. converterName: {}", converterClassName);

// validate the source result
sourceTester.validateSourceResult(consumer, 9, null, converterClassName);

final int numEntriesToInsert = sourceTester.getNumEntriesToInsert();
Preconditions.checkArgument(numEntriesToInsert >= 1);

for (int i = 1; i <= numEntriesToInsert; i++) {
// prepare insert event
sourceTester.prepareInsertEvent();
log.info("inserted entry {} of {}", i, numEntriesToInsert);
// validate the source insert event
sourceTester.validateSourceResult(consumer, 1, SourceTester.INSERT, converterClassName);
try
{

// get source status
Failsafe.with(statusRetryPolicy).run(() -> getSourceStatus(tenant, namespace, sourceName));

// wait for source to process messages
Failsafe.with(statusRetryPolicy).run(() ->
waitForProcessingSourceMessages(tenant, namespace, sourceName, numMessages));

@Cleanup
Consumer consumer = client.newConsumer(getSchema(jsonWithEnvelope))
.topic(consumeTopicName)
.subscriptionName("debezium-source-tester")
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
log.info("[debezium mysql test] create consumer finish. converterName: {}", converterClassName);

// validate the source result
sourceTester.validateSourceResult(consumer, 9, null, converterClassName);

final int numEntriesToInsert = sourceTester.getNumEntriesToInsert();
Preconditions.checkArgument(numEntriesToInsert >= 1);

for (int i = 1; i <= numEntriesToInsert; i++)
{
// prepare insert event
sourceTester.prepareInsertEvent();
log.info("inserted entry {} of {}", i, numEntriesToInsert);
// validate the source insert event
sourceTester.validateSourceResult(consumer, 1, SourceTester.INSERT, converterClassName);
}

// prepare update event
sourceTester.prepareUpdateEvent();

// validate the source update event
sourceTester.validateSourceResult(consumer, numEntriesToInsert, SourceTester.UPDATE, converterClassName);

// prepare delete event
sourceTester.prepareDeleteEvent();

// validate the source delete event
sourceTester.validateSourceResult(consumer, numEntriesToInsert, SourceTester.DELETE, converterClassName);
} finally {
pulsarCluster.dumpFunctionLogs(sourceName);
}

// prepare update event
sourceTester.prepareUpdateEvent();

// validate the source update event
sourceTester.validateSourceResult(consumer, numEntriesToInsert, SourceTester.UPDATE, converterClassName);

// prepare delete event
sourceTester.prepareDeleteEvent();

// validate the source delete event
sourceTester.validateSourceResult(consumer, numEntriesToInsert, SourceTester.DELETE, converterClassName);

// delete the source
deleteSource(tenant, namespace, sourceName);

Expand Down

0 comments on commit f3e0c92

Please sign in to comment.