From 36f236c41f6bc437ec8469e7bdb3d0a13044d825 Mon Sep 17 00:00:00 2001 From: Yash Mayya Date: Wed, 12 Jul 2023 23:09:29 +0530 Subject: [PATCH 1/4] KAFKA-15182: Normalize source connector offsets before invoking SourceConnector::alterOffsets --- .../apache/kafka/connect/runtime/Worker.java | 38 +++++++++++++++++-- .../kafka/connect/runtime/WorkerTest.java | 28 ++++++++++++++ 2 files changed, 62 insertions(+), 4 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index bda9ecbaa854..ae9a5c846bca 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -83,8 +83,10 @@ import org.apache.kafka.connect.storage.HeaderConverter; import org.apache.kafka.connect.storage.KafkaOffsetBackingStore; import org.apache.kafka.connect.storage.OffsetBackingStore; +import org.apache.kafka.connect.storage.OffsetStorageReader; import org.apache.kafka.connect.storage.OffsetStorageReaderImpl; import org.apache.kafka.connect.storage.OffsetStorageWriter; +import org.apache.kafka.connect.storage.OffsetUtils; import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConnectorTaskId; @@ -1547,9 +1549,11 @@ void modifySourceConnectorOffsets(String connName, Connector connector, Map, Map> normalizedOffsets = normalizeOffsets(offsetsToWrite); + boolean alterOffsetsResult; try { - alterOffsetsResult = ((SourceConnector) connector).alterOffsets(connectorConfig, offsetsToWrite); + alterOffsetsResult = ((SourceConnector) connector).alterOffsets(connectorConfig, normalizedOffsets); } catch (UnsupportedOperationException e) { log.error("Failed to modify offsets for connector {} because it doesn't support external modification of offsets", connName, e); @@ -1561,7 +1565,7 @@ void modifySourceConnectorOffsets(String connName, Connector connector, Map offsetWriterCallback = new FutureCallback<>(); offsetWriter.doFlush(offsetWriterCallback); if (config.exactlyOnceSourceEnabled()) { @@ -1608,6 +1612,32 @@ void modifySourceConnectorOffsets(String connName, Connector connector, Map + * Visible for testing. + * + * @param originalOffsets the offsets that are to be normalized + * @return the normalized offsets + */ + @SuppressWarnings("unchecked") + Map, Map> normalizeOffsets(Map, Map> originalOffsets) { + Map, Map> normalizedOffsets = new HashMap<>(); + for (Map.Entry, Map> entry : originalOffsets.entrySet()) { + OffsetUtils.validateFormat(entry.getKey()); + OffsetUtils.validateFormat(entry.getValue()); + byte[] serializedKey = internalKeyConverter.fromConnectData("", null, entry.getKey()); + byte[] serializedValue = internalKeyConverter.fromConnectData("", null, entry.getValue()); + Object deserializedKey = internalKeyConverter.toConnectData("", serializedKey).value(); + Object deserializedValue = internalKeyConverter.toConnectData("", serializedValue).value(); + normalizedOffsets.put((Map) deserializedKey, (Map) deserializedValue); + } + + return normalizedOffsets; + } + /** * Update the provided timer, check if it's expired and throw a {@link ConnectException} with the provided error * message if it is. diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index 646ced752bec..268e1539b12e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -124,6 +124,7 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_CONFIG; +import static org.apache.kafka.connect.json.JsonConverterConfig.SCHEMAS_ENABLE_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX; import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX; @@ -2015,6 +2016,33 @@ public void testAlterOffsetsSourceConnectorError() throws Exception { verifyKafkaClusterId(); } + @Test + public void testNormalizeOffsets() throws Exception { + Map, Map> offsets = Collections.singletonMap( + Collections.singletonMap("filename", "/path/to/filename"), + Collections.singletonMap("position", 20) + ); + + assertTrue(offsets.values().iterator().next().get("position") instanceof Integer); + + JsonConverter jsonConverter = new JsonConverter(); + jsonConverter.configure(Collections.singletonMap(SCHEMAS_ENABLE_CONFIG, false), false); + when(plugins.newInternalConverter(eq(true), anyString(), anyMap())) + .thenReturn(jsonConverter); + when(plugins.newInternalConverter(eq(false), anyString(), anyMap())) + .thenReturn(jsonConverter); + + mockKafkaClusterId(); + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService, + noneConnectorClientConfigOverridePolicy, null); + + Map, Map> normalizedOffsets = worker.normalizeOffsets(offsets); + assertEquals(1, normalizedOffsets.size()); + + // The integer value 20 gets deserialized as a long value by the JsonConverter + assertTrue(normalizedOffsets.values().iterator().next().get("position") instanceof Long); + } + @Test public void testAlterOffsetsSinkConnectorNoDeletes() throws Exception { @SuppressWarnings("unchecked") From 54b8b6f845e69614f654280f2af55bca27bb5e12 Mon Sep 17 00:00:00 2001 From: Yash Mayya Date: Wed, 12 Jul 2023 23:26:36 +0530 Subject: [PATCH 2/4] Fix worker tests --- .../kafka/connect/runtime/WorkerTest.java | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index 268e1539b12e..0302a571a8b4 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -1919,6 +1919,7 @@ public void testGetSourceConnectorOffsetsError() { public void testAlterOffsetsConnectorDoesNotSupportOffsetAlteration() { mockKafkaClusterId(); + mockInternalConverters(); worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, Executors.newSingleThreadExecutor(), allConnectorClientConfigOverridePolicy, null); worker.start(); @@ -1947,6 +1948,7 @@ public void testAlterOffsetsConnectorDoesNotSupportOffsetAlteration() { @SuppressWarnings("unchecked") public void testAlterOffsetsSourceConnector() throws Exception { mockKafkaClusterId(); + mockInternalConverters(); worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, Executors.newSingleThreadExecutor(), allConnectorClientConfigOverridePolicy, null); worker.start(); @@ -1983,6 +1985,7 @@ public void testAlterOffsetsSourceConnector() throws Exception { @SuppressWarnings("unchecked") public void testAlterOffsetsSourceConnectorError() throws Exception { mockKafkaClusterId(); + mockInternalConverters(); worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, Executors.newSingleThreadExecutor(), allConnectorClientConfigOverridePolicy, null); worker.start(); @@ -2294,6 +2297,7 @@ public void testResetOffsetsSourceConnectorExactlyOnceSupportEnabled() throws Ex workerProps.put("status.storage.topic", "connect-statuses"); config = new DistributedConfig(workerProps); mockKafkaClusterId(); + mockInternalConverters(); worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, Executors.newSingleThreadExecutor(), allConnectorClientConfigOverridePolicy, null); worker.start(); @@ -2305,8 +2309,8 @@ public void testResetOffsetsSourceConnectorExactlyOnceSupportEnabled() throws Ex OffsetStorageWriter offsetWriter = mock(OffsetStorageWriter.class); Set> connectorPartitions = new HashSet<>(); - connectorPartitions.add(Collections.singletonMap("partitionKey1", new Object())); - connectorPartitions.add(Collections.singletonMap("partitionKey2", new Object())); + connectorPartitions.add(Collections.singletonMap("partitionKey", "partitionValue1")); + connectorPartitions.add(Collections.singletonMap("partitionKey", "partitionValue2")); when(offsetStore.connectorPartitions(eq(CONNECTOR_ID))).thenReturn(connectorPartitions); when(offsetWriter.doFlush(any())).thenAnswer(invocation -> { invocation.getArgument(0, Callback.class).onCompletion(null, null); @@ -2410,6 +2414,7 @@ public void testResetOffsetsSinkConnectorDeleteConsumerGroupError() throws Excep public void testModifySourceConnectorOffsetsTimeout() throws Exception { mockKafkaClusterId(); Time time = new MockTime(); + mockInternalConverters(); worker = new Worker(WORKER_ID, time, plugins, config, offsetBackingStore, Executors.newSingleThreadExecutor(), allConnectorClientConfigOverridePolicy, null); worker.start(); @@ -2526,14 +2531,14 @@ private void verifyStorage() { } private void mockInternalConverters() { - Converter internalKeyConverter = mock(JsonConverter.class); - Converter internalValueConverter = mock(JsonConverter.class); + JsonConverter jsonConverter = new JsonConverter(); + jsonConverter.configure(Collections.singletonMap(SCHEMAS_ENABLE_CONFIG, false), false); when(plugins.newInternalConverter(eq(true), anyString(), anyMap())) - .thenReturn(internalKeyConverter); + .thenReturn(jsonConverter); when(plugins.newInternalConverter(eq(false), anyString(), anyMap())) - .thenReturn(internalValueConverter); + .thenReturn(jsonConverter); } private void verifyConverters() { From 8da5398906d5d67cabb487ed24af1020f5f9e257 Mon Sep 17 00:00:00 2001 From: Yash Mayya Date: Thu, 13 Jul 2023 09:55:55 +0530 Subject: [PATCH 3/4] Rename normalize method to indicate applicability only to source connector offsets --- .../main/java/org/apache/kafka/connect/runtime/Worker.java | 4 ++-- .../java/org/apache/kafka/connect/runtime/WorkerTest.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index ae9a5c846bca..cda8f909b746 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -1549,7 +1549,7 @@ void modifySourceConnectorOffsets(String connName, Connector connector, Map, Map> normalizedOffsets = normalizeOffsets(offsetsToWrite); + Map, Map> normalizedOffsets = normalizeSourceConnectorOffsets(offsetsToWrite); boolean alterOffsetsResult; try { @@ -1623,7 +1623,7 @@ void modifySourceConnectorOffsets(String connName, Connector connector, Map, Map> normalizeOffsets(Map, Map> originalOffsets) { + Map, Map> normalizeSourceConnectorOffsets(Map, Map> originalOffsets) { Map, Map> normalizedOffsets = new HashMap<>(); for (Map.Entry, Map> entry : originalOffsets.entrySet()) { OffsetUtils.validateFormat(entry.getKey()); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index 0302a571a8b4..a7b48704dd32 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -2020,7 +2020,7 @@ public void testAlterOffsetsSourceConnectorError() throws Exception { } @Test - public void testNormalizeOffsets() throws Exception { + public void testNormalizeSourceConnectorOffsets() throws Exception { Map, Map> offsets = Collections.singletonMap( Collections.singletonMap("filename", "/path/to/filename"), Collections.singletonMap("position", 20) @@ -2039,7 +2039,7 @@ public void testNormalizeOffsets() throws Exception { worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService, noneConnectorClientConfigOverridePolicy, null); - Map, Map> normalizedOffsets = worker.normalizeOffsets(offsets); + Map, Map> normalizedOffsets = worker.normalizeSourceConnectorOffsets(offsets); assertEquals(1, normalizedOffsets.size()); // The integer value 20 gets deserialized as a long value by the JsonConverter From d1b8b186a8e85d49f70126eefafc886aae3210af Mon Sep 17 00:00:00 2001 From: Yash Mayya Date: Fri, 14 Jul 2023 09:27:42 +0530 Subject: [PATCH 4/4] Use internal value converter for normalizing offset values; minor test refactor --- .../main/java/org/apache/kafka/connect/runtime/Worker.java | 4 ++-- .../java/org/apache/kafka/connect/runtime/WorkerTest.java | 7 +------ 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index cda8f909b746..d0e410c386cc 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -1629,9 +1629,9 @@ void modifySourceConnectorOffsets(String connName, Connector connector, Map) deserializedKey, (Map) deserializedValue); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index a7b48704dd32..0de444be2025 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -2028,12 +2028,7 @@ public void testNormalizeSourceConnectorOffsets() throws Exception { assertTrue(offsets.values().iterator().next().get("position") instanceof Integer); - JsonConverter jsonConverter = new JsonConverter(); - jsonConverter.configure(Collections.singletonMap(SCHEMAS_ENABLE_CONFIG, false), false); - when(plugins.newInternalConverter(eq(true), anyString(), anyMap())) - .thenReturn(jsonConverter); - when(plugins.newInternalConverter(eq(false), anyString(), anyMap())) - .thenReturn(jsonConverter); + mockInternalConverters(); mockKafkaClusterId(); worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService,