From 250d982f2243e6c583eccfaa927d4a8c059a9582 Mon Sep 17 00:00:00 2001 From: sagarkapare Date: Wed, 17 Mar 2021 08:41:11 -0700 Subject: [PATCH] CDAP-17583 Use instance id for app name and store in-memory offset under the appname key. --- .../delta/mysql/MySqlConstantOffsetBackingStore.java | 11 +++++++---- .../java/io/cdap/delta/mysql/MySqlEventReader.java | 7 ++++--- .../SqlServerConstantOffsetBackingStore.java | 8 +++++--- .../io/cdap/delta/sqlserver/SqlServerEventReader.java | 7 ++++--- 4 files changed, 20 insertions(+), 13 deletions(-) diff --git a/mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlConstantOffsetBackingStore.java b/mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlConstantOffsetBackingStore.java index 454c8a24..c79d284a 100644 --- a/mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlConstantOffsetBackingStore.java +++ b/mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlConstantOffsetBackingStore.java @@ -42,11 +42,9 @@ public class MySqlConstantOffsetBackingStore extends MemoryOffsetBackingStore { static final String ROW = "row"; static final String EVENT = "event"; static final String GTID_SET = "gtids"; + static final String REPLICATION_CONNECTOR_NAME = "replication.connector.name"; private static final Gson GSON = new Gson(); - // The key is hardcoded here - private static final ByteBuffer KEY = - StandardCharsets.UTF_8.encode("{\"schema\":null,\"payload\":[\"delta\",{\"server\":\"dummy\"}]}"); @Override public void configure(WorkerConfig config) { @@ -57,7 +55,12 @@ public void configure(WorkerConfig config) { String rowStr = originalConfig.get(ROW); String eventStr = originalConfig.get(EVENT); String gtidSetStr = originalConfig.get(GTID_SET); + String replicationConnectorName = originalConfig.get(REPLICATION_CONNECTOR_NAME); + ByteBuffer key = + StandardCharsets.UTF_8.encode("{\"schema\":null,\"payload\":[\"" + + replicationConnectorName + + "\",{\"server\":\"dummy\"}]}"); Map offset = new HashMap<>(); if (!Strings.isNullOrEmpty(fileStr)) { offset.put(FILE, fileStr); @@ -82,6 +85,6 @@ public void configure(WorkerConfig config) { return; } - data.put(KEY, StandardCharsets.UTF_8.encode(GSON.toJson(offset))); + data.put(key, StandardCharsets.UTF_8.encode(GSON.toJson(offset))); } } diff --git a/mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlEventReader.java b/mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlEventReader.java index ac50904e..1b2e9172 100644 --- a/mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlEventReader.java +++ b/mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlEventReader.java @@ -49,7 +49,6 @@ import java.time.temporal.Temporal; import java.util.Map; import java.util.Set; -import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -99,6 +98,7 @@ public void start(Offset offset) { Collectors.toMap(t -> config.getDatabase() + "." + t.getTable(), t -> t)); Map state = offset.get(); // state map is always not null String isSnapshot = state.getOrDefault(MySqlConstantOffsetBackingStore.SNAPSHOT, ""); + String replicationConnectorName = "delta" + context.getInstanceId(); Configuration.Builder configBuilder = Configuration.create() .with("connector.class", MySqlConnector.class.getName()) .with("offset.storage", MySqlConstantOffsetBackingStore.class.getName()) @@ -111,7 +111,7 @@ public void start(Offset offset) { .with("event", state.getOrDefault(MySqlConstantOffsetBackingStore.EVENT, "")) .with("gtids", state.getOrDefault(MySqlConstantOffsetBackingStore.GTID_SET, "")) /* begin connector properties */ - .with("name", "delta" + UUID.randomUUID().toString().replace("-", "")) + .with("name", replicationConnectorName) .with("database.hostname", config.getHost()) .with("database.port", config.getPort()) .with("database.user", config.getUser()) @@ -121,7 +121,8 @@ public void start(Offset offset) { .with("database.server.name", "dummy") // this is the kafka topic for hosted debezium - it doesn't matter .with("database.serverTimezone", config.getServerTimezone()) .with("table.whitelist", String.join(",", sourceTableMap.keySet())) - .with("snapshot.mode", config.getReplicateExistingData() ? "initial" : "schema_only"); + .with("snapshot.mode", config.getReplicateExistingData() ? "initial" : "schema_only") + .with(MySqlConstantOffsetBackingStore.REPLICATION_CONNECTOR_NAME, replicationConnectorName);; if (config.getConsumerID() != null) { // If not provided debezium will randomly pick integer between 5400 and 6400. diff --git a/sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerConstantOffsetBackingStore.java b/sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerConstantOffsetBackingStore.java index d5ba4939..9d9d3868 100644 --- a/sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerConstantOffsetBackingStore.java +++ b/sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerConstantOffsetBackingStore.java @@ -33,8 +33,9 @@ */ public class SqlServerConstantOffsetBackingStore extends MemoryOffsetBackingStore { private static final Gson GSON = new Gson(); - private static final String KEY = "{\"schema\":null,\"payload\":[\"delta\",{\"server\":\"dummy\"}]}"; + static final String SNAPSHOT_COMPLETED = "snapshot_completed"; + static final String REPLICATION_CONNECTOR_NAME = "replication.connector.name"; @Override public void configure(WorkerConfig config) { @@ -45,7 +46,8 @@ public void configure(WorkerConfig config) { String commitStr = originalConfig.get(SourceInfo.COMMIT_LSN_KEY); String snapshot = originalConfig.get(SourceInfo.SNAPSHOT_KEY); String snapshotCompleted = originalConfig.get(SNAPSHOT_COMPLETED); - + String replicationConnectorName = originalConfig.get(REPLICATION_CONNECTOR_NAME); + String key = "{\"schema\":null,\"payload\":[\"" + replicationConnectorName + "\",{\"server\":\"dummy\"}]}"; Map offset = new HashMap<>(); if (!changeStr.isEmpty()) { offset.put(SourceInfo.CHANGE_LSN_KEY, changeStr); @@ -67,6 +69,6 @@ public void configure(WorkerConfig config) { } byte[] offsetBytes = Bytes.toBytes(GSON.toJson(offset)); - data.put(ByteBuffer.wrap(Bytes.toBytes(KEY)), ByteBuffer.wrap(offsetBytes)); + data.put(ByteBuffer.wrap(Bytes.toBytes(key)), ByteBuffer.wrap(offsetBytes)); } } diff --git a/sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerEventReader.java b/sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerEventReader.java index 1d845059..c21ddcfd 100644 --- a/sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerEventReader.java +++ b/sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerEventReader.java @@ -43,7 +43,6 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; -import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -107,6 +106,7 @@ public void start(Offset offset) { Map state = offset.get(); // this will never be null // offset config String isSnapshotCompleted = state.getOrDefault(SqlServerConstantOffsetBackingStore.SNAPSHOT_COMPLETED, ""); + String replicationConnectorName = "delta" + context.getInstanceId(); Configuration.Builder configBuilder = Configuration.create() .with("connector.class", SqlServerConnector.class.getName()) .with("offset.storage", SqlServerConstantOffsetBackingStore.class.getName()) @@ -117,7 +117,7 @@ public void start(Offset offset) { .with("snapshot", state.getOrDefault(SourceInfo.SNAPSHOT_KEY, "")) .with("snapshot_completed", isSnapshotCompleted) /* begin connector properties */ - .with("name", "delta" + UUID.randomUUID().toString().replace("-", "")) + .with("name", replicationConnectorName) .with("database.hostname", config.getHost()) .with("database.port", config.getPort()) .with("database.user", config.getUser()) @@ -127,7 +127,8 @@ public void start(Offset offset) { .with("table.whitelist", String.join(",", sourceTableMap.keySet())) .with("database.server.name", "dummy") // this is the kafka topic for hosted debezium - it doesn't matter .with("database.serverTimezone", config.getServerTimezone()) - .with("snapshot.mode", config.getReplicateExistingData() ? "initial" : "schema_only"); + .with("snapshot.mode", config.getReplicateExistingData() ? "initial" : "schema_only") + .with(SqlServerConstantOffsetBackingStore.REPLICATION_CONNECTOR_NAME, replicationConnectorName); LOG.info("Overriding sql server connector configs with arguments {}", debeziumConnectorConfigs); for (Map.Entry entry: debeziumConnectorConfigs.entrySet()) {