From f3e0c9295603b00793d4bcb815dd36fd29a7a8e9 Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Fri, 2 Jul 2021 16:20:00 -0700 Subject: [PATCH] Upgrade Debezium to v.1.5.4 (last buildable with Java 8) --- pom.xml | 7 +- pulsar-io/debezium/core/pom.xml | 7 ++ .../io/debezium/PulsarDatabaseHistory.java | 5 ++ .../debezium/PulsarDatabaseHistoryTest.java | 12 +-- .../connect/PulsarKafkaSinkTaskContext.java | 33 +++---- .../connect/PulsarOffsetBackingStore.java | 10 +-- .../connect/PulsarOffsetBackingStoreTest.java | 22 +---- .../integration/io/sources/SourceTester.java | 4 +- .../PulsarIODebeziumSourceRunner.java | 87 ++++++++++--------- 9 files changed, 88 insertions(+), 99 deletions(-) diff --git a/pom.xml b/pom.xml index b24e9e3bb5bf3..76d2829746213 100644 --- a/pom.xml +++ b/pom.xml @@ -140,7 +140,7 @@ flexible messaging model and an intuitive client API. 2.2.0 3.6.0 4.4.8 - 2.3.0 + 2.7.0 5.1.1 1.11.774 1.10.2 @@ -154,9 +154,10 @@ flexible messaging model and an intuitive client API. 3.3.0 7.9.1 334 - 2.11 - 2.11.12 + 2.13 + 2.13.6 1.0.0.Final + 1.5.4.Final 0.11.1 0.18.0 2.3.0 diff --git a/pulsar-io/debezium/core/pom.xml b/pulsar-io/debezium/core/pom.xml index 7b3a5b0f5db3f..916b340330883 100644 --- a/pulsar-io/debezium/core/pom.xml +++ b/pulsar-io/debezium/core/pom.xml @@ -90,6 +90,13 @@ test-jar + + io.debezium + debezium-connector-mysql + ${debezium.version} + test + + diff --git a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java index 46878c150a19a..6960af2db99c2 100644 --- a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java +++ b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java @@ -266,6 +266,11 @@ public boolean exists() { } } + @Override + public boolean storageExists() { + return true; + } + @Override public String toString() { if (topicName != null) { diff --git a/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java b/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java index 6a21812d171bf..ba3bc6a1951d0 100644 --- a/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java +++ b/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java @@ -23,9 +23,9 @@ import static org.testng.Assert.assertTrue; import io.debezium.config.Configuration; +import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; import io.debezium.relational.Tables; -import io.debezium.relational.ddl.DdlParserSql2003; -import io.debezium.relational.ddl.LegacyDdlParser; +import io.debezium.relational.ddl.DdlParser; import io.debezium.relational.history.DatabaseHistory; import io.debezium.relational.history.DatabaseHistoryListener; import io.debezium.text.ParsingException; @@ -86,8 +86,8 @@ private void testHistoryTopicContent(boolean skipUnparseableDDL) { // Calling it another time to ensure we can work with the DB history topic already existing history.initializeStorage(); - LegacyDdlParser recoveryParser = new DdlParserSql2003(); - LegacyDdlParser ddlParser = new DdlParserSql2003(); + DdlParser recoveryParser = new MySqlAntlrDdlParser(); + DdlParser ddlParser = new MySqlAntlrDdlParser(); ddlParser.setCurrentSchema("db1"); // recover does this, so we need to as well Tables tables1 = new Tables(); Tables tables2 = new Tables(); @@ -102,9 +102,9 @@ private void testHistoryTopicContent(boolean skipUnparseableDDL) { // Now record schema changes, which writes out to kafka but doesn't actually change the Tables ... setLogPosition(10); - ddl = "CREATE TABLE foo ( name VARCHAR(255) NOT NULL PRIMARY KEY); \n" + + ddl = "CREATE TABLE foo ( first VARCHAR(22) NOT NULL ); \n" + "CREATE TABLE customers ( id INTEGER NOT NULL PRIMARY KEY, name VARCHAR(100) NOT NULL ); \n" + - "CREATE TABLE products ( productId INTEGER NOT NULL PRIMARY KEY, desc VARCHAR(255) NOT NULL); \n"; + "CREATE TABLE products ( productId INTEGER NOT NULL PRIMARY KEY, description VARCHAR(255) NOT NULL ); \n"; history.record(source, position, "db1", ddl); // Parse the DDL statement 3x and each time update a different Tables object ... diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java index f06329f6eecc9..b1dcf35a752b2 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java @@ -97,30 +97,21 @@ private Long currentOffset(TopicPartition topicPartition) { List req = Lists.newLinkedList(); ByteBuffer key = topicPartitionAsKey(topicPartition); req.add(key); - CompletableFuture offsetFuture = new CompletableFuture<>(); - offsetStore.get(req, (Throwable ex, Map result) -> { - if (ex == null) { - if (result != null && result.size() != 0) { - Optional val = result.entrySet().stream() - .filter(entry -> entry.getKey().equals(key)) - .findFirst().map(entry -> entry.getValue()); - if (val.isPresent()) { - long received = val.get().getLong(); - if (log.isDebugEnabled()) { - log.debug("read initial offset for {} == {}", topicPartition, received); - } - offsetFuture.complete(received); - return; + try { + Map result = offsetStore.get(req).get(); + if (result != null && result.size() != 0) { + Optional val = result.entrySet().stream() + .filter(entry -> entry.getKey().equals(key)) + .findFirst().map(entry -> entry.getValue()); + if (val.isPresent()) { + long received = val.get().getLong(); + if (log.isDebugEnabled()) { + log.debug("read initial offset for {} == {}", topicPartition, received); } + return received; } - offsetFuture.complete(-1L); - } else { - offsetFuture.completeExceptionally(ex); } - }); - - try { - return offsetFuture.get(); + return -1L; } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("error getting initial state of {}", topicPartition, e); diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java index 74b0a6a2eccf8..6a0d29f761f92 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java @@ -192,8 +192,7 @@ public void stop() { } @Override - public Future> get(Collection keys, - Callback> callback) { + public Future> get(Collection keys) { CompletableFuture endFuture = new CompletableFuture<>(); readToEnd(endFuture); return endFuture.thenApply(ignored -> { @@ -207,14 +206,7 @@ public Future> get(Collection keys, values.put(key, value); } } - if (null != callback) { - callback.onCompletion(null, values); - } return values; - }).whenComplete((ignored, cause) -> { - if (null != cause && null != callback) { - callback.onCompletion(cause, null); - } }); } diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java index fc338c7b81591..825430697cad1 100644 --- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java +++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.connect.util.Callback; @@ -80,27 +81,10 @@ protected void cleanup() throws Exception { @Test public void testGetFromEmpty() throws Exception { assertTrue(offsetBackingStore.get( - Arrays.asList(ByteBuffer.wrap("empty-key".getBytes(UTF_8))), - null + Arrays.asList(ByteBuffer.wrap("empty-key".getBytes(UTF_8))) ).get().isEmpty()); } - @Test - public void testGetFromEmptyCallback() throws Exception { - CompletableFuture> callbackFuture = new CompletableFuture<>(); - assertTrue(offsetBackingStore.get( - Arrays.asList(ByteBuffer.wrap("empty-key".getBytes(UTF_8))), - (error, result) -> { - if (null != error) { - callbackFuture.completeExceptionally(error); - } else { - callbackFuture.complete(result); - } - } - ).get().isEmpty()); - assertTrue(callbackFuture.get().isEmpty()); - } - @Test public void testGetSet() throws Exception { testGetSet(false); @@ -138,7 +122,7 @@ private void testGetSet(boolean testCallback) throws Exception { } Map result = - offsetBackingStore.get(keys, null).get(); + offsetBackingStore.get(keys).get(); assertEquals(numKeys, result.size()); AtomicInteger count = new AtomicInteger(); new TreeMap<>(result).forEach((key, value) -> { diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java index b74f52f9158ae..d028dd5ad700f 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java @@ -58,6 +58,7 @@ public abstract class SourceTester { add("source"); add("op"); add("ts_ms"); + add("transaction"); }}; protected SourceTester(String sourceType) { @@ -145,7 +146,8 @@ public void validateSourceResultAvro(Consumer 0); for (Field field : valueRecord.getFields()) { - Assert.assertTrue(DEBEZIUM_FIELD_SET.contains(field.getName())); + log.info("field {} expected {}", field, DEBEZIUM_FIELD_SET); + Assert.assertTrue(DEBEZIUM_FIELD_SET.contains(field.getName()), "unexpected field "+field.getName()+" only "+DEBEZIUM_FIELD_SET); } if (eventType != null) { diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarIODebeziumSourceRunner.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarIODebeziumSourceRunner.java index 6f0bbfd5cef7d..5c2846bbd0d30 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarIODebeziumSourceRunner.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarIODebeziumSourceRunner.java @@ -72,48 +72,55 @@ public void testSource(SourceTester sourceTester // get source info getSourceInfoSuccess(sourceTester, tenant, namespace, sourceName); - // get source status - Failsafe.with(statusRetryPolicy).run(() -> getSourceStatus(tenant, namespace, sourceName)); - - // wait for source to process messages - Failsafe.with(statusRetryPolicy).run(() -> - waitForProcessingSourceMessages(tenant, namespace, sourceName, numMessages)); - - @Cleanup - Consumer consumer = client.newConsumer(getSchema(jsonWithEnvelope)) - .topic(consumeTopicName) - .subscriptionName("debezium-source-tester") - .subscriptionType(SubscriptionType.Exclusive) - .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) - .subscribe(); - log.info("[debezium mysql test] create consumer finish. converterName: {}", converterClassName); - - // validate the source result - sourceTester.validateSourceResult(consumer, 9, null, converterClassName); - - final int numEntriesToInsert = sourceTester.getNumEntriesToInsert(); - Preconditions.checkArgument(numEntriesToInsert >= 1); - - for (int i = 1; i <= numEntriesToInsert; i++) { - // prepare insert event - sourceTester.prepareInsertEvent(); - log.info("inserted entry {} of {}", i, numEntriesToInsert); - // validate the source insert event - sourceTester.validateSourceResult(consumer, 1, SourceTester.INSERT, converterClassName); + try + { + + // get source status + Failsafe.with(statusRetryPolicy).run(() -> getSourceStatus(tenant, namespace, sourceName)); + + // wait for source to process messages + Failsafe.with(statusRetryPolicy).run(() -> + waitForProcessingSourceMessages(tenant, namespace, sourceName, numMessages)); + + @Cleanup + Consumer consumer = client.newConsumer(getSchema(jsonWithEnvelope)) + .topic(consumeTopicName) + .subscriptionName("debezium-source-tester") + .subscriptionType(SubscriptionType.Exclusive) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + log.info("[debezium mysql test] create consumer finish. converterName: {}", converterClassName); + + // validate the source result + sourceTester.validateSourceResult(consumer, 9, null, converterClassName); + + final int numEntriesToInsert = sourceTester.getNumEntriesToInsert(); + Preconditions.checkArgument(numEntriesToInsert >= 1); + + for (int i = 1; i <= numEntriesToInsert; i++) + { + // prepare insert event + sourceTester.prepareInsertEvent(); + log.info("inserted entry {} of {}", i, numEntriesToInsert); + // validate the source insert event + sourceTester.validateSourceResult(consumer, 1, SourceTester.INSERT, converterClassName); + } + + // prepare update event + sourceTester.prepareUpdateEvent(); + + // validate the source update event + sourceTester.validateSourceResult(consumer, numEntriesToInsert, SourceTester.UPDATE, converterClassName); + + // prepare delete event + sourceTester.prepareDeleteEvent(); + + // validate the source delete event + sourceTester.validateSourceResult(consumer, numEntriesToInsert, SourceTester.DELETE, converterClassName); + } finally { + pulsarCluster.dumpFunctionLogs(sourceName); } - // prepare update event - sourceTester.prepareUpdateEvent(); - - // validate the source update event - sourceTester.validateSourceResult(consumer, numEntriesToInsert, SourceTester.UPDATE, converterClassName); - - // prepare delete event - sourceTester.prepareDeleteEvent(); - - // validate the source delete event - sourceTester.validateSourceResult(consumer, numEntriesToInsert, SourceTester.DELETE, converterClassName); - // delete the source deleteSource(tenant, namespace, sourceName);