Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15182: Normalize source connector offsets before invoking SourceConnector::alterOffsets #14003

Merged
merged 4 commits into from Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -83,8 +83,10 @@
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.storage.OffsetUtils;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
Expand Down Expand Up @@ -1547,9 +1549,11 @@ void modifySourceConnectorOffsets(String connName, Connector connector, Map<Stri
offsetsToWrite = offsets;
}

Map<Map<String, ?>, Map<String, ?>> normalizedOffsets = normalizeSourceConnectorOffsets(offsetsToWrite);

boolean alterOffsetsResult;
try {
alterOffsetsResult = ((SourceConnector) connector).alterOffsets(connectorConfig, offsetsToWrite);
alterOffsetsResult = ((SourceConnector) connector).alterOffsets(connectorConfig, normalizedOffsets);
} catch (UnsupportedOperationException e) {
log.error("Failed to modify offsets for connector {} because it doesn't support external modification of offsets",
connName, e);
Expand All @@ -1561,7 +1565,7 @@ void modifySourceConnectorOffsets(String connName, Connector connector, Map<Stri
// This should only occur for an offsets reset request when there are no source partitions found for the source connector in the
// offset store - either because there was a prior attempt to reset offsets or if there are no offsets committed by this source
// connector so far
if (offsetsToWrite.isEmpty()) {
if (normalizedOffsets.isEmpty()) {
log.info("No offsets found for source connector {} - this can occur due to a prior attempt to reset offsets or if the " +
"source connector hasn't committed any offsets yet", connName);
completeModifyOffsetsCallback(alterOffsetsResult, isReset, cb);
Expand All @@ -1570,7 +1574,7 @@ void modifySourceConnectorOffsets(String connName, Connector connector, Map<Stri

// The modifySourceConnectorOffsets method should only be called after all the connector's tasks have been stopped, and it's
// safe to write offsets via an offset writer
offsetsToWrite.forEach(offsetWriter::offset);
normalizedOffsets.forEach(offsetWriter::offset);

// We can call begin flush without a timeout because this newly created single-purpose offset writer can't do concurrent
// offset writes. We can also ignore the return value since it returns false if and only if there is no data to be flushed,
Expand All @@ -1581,7 +1585,7 @@ void modifySourceConnectorOffsets(String connName, Connector connector, Map<Stri
producer.initTransactions();
producer.beginTransaction();
}
log.debug("Committing the following offsets for source connector {}: {}", connName, offsetsToWrite);
log.debug("Committing the following offsets for source connector {}: {}", connName, normalizedOffsets);
FutureCallback<Void> offsetWriterCallback = new FutureCallback<>();
offsetWriter.doFlush(offsetWriterCallback);
if (config.exactlyOnceSourceEnabled()) {
Expand All @@ -1608,6 +1612,32 @@ void modifySourceConnectorOffsets(String connName, Connector connector, Map<Stri
}));
}

/**
* "Normalize" source connector offsets by serializing and deserializing them using the internal {@link JsonConverter}.
* This is done in order to prevent type mismatches between the offsets passed to {@link SourceConnector#alterOffsets(Map, Map)}
* and the offsets that connectors and tasks retrieve via an instance of {@link OffsetStorageReader}.
* <p>
* Visible for testing.
*
* @param originalOffsets the offsets that are to be normalized
* @return the normalized offsets
*/
@SuppressWarnings("unchecked")
Map<Map<String, ?>, Map<String, ?>> normalizeSourceConnectorOffsets(Map<Map<String, ?>, Map<String, ?>> originalOffsets) {
Map<Map<String, ?>, Map<String, ?>> normalizedOffsets = new HashMap<>();
for (Map.Entry<Map<String, ?>, Map<String, ?>> entry : originalOffsets.entrySet()) {
OffsetUtils.validateFormat(entry.getKey());
OffsetUtils.validateFormat(entry.getValue());
byte[] serializedKey = internalKeyConverter.fromConnectData("", null, entry.getKey());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be safe to do because the OffsetStorageReaderImpl also serializes the connector / task specified source partition before retrieving its corresponding source offset. The difference here is that there is an extra ser / deser hop although that shouldn't cause issues. So, for instance:

Map<String, Object> p1 = Collections.singletonMap("partition_key", 10);
Map<String, Object> p2 = Collections.singletonMap("partition_key", 10L);

ByteBuffer serializedP1 = ByteBuffer.wrap(converter.fromConnectData("", null, p1));
ByteBuffer serializedP2 = ByteBuffer.wrap(converter.fromConnectData("", null, p2));

assertTrue(serializedP1.equals(serializedP2));

byte[] serializedValue = internalValueConverter.fromConnectData("", null, entry.getValue());
Object deserializedKey = internalKeyConverter.toConnectData("", serializedKey).value();
Object deserializedValue = internalValueConverter.toConnectData("", serializedValue).value();
normalizedOffsets.put((Map<String, ?>) deserializedKey, (Map<String, ?>) deserializedValue);
}

return normalizedOffsets;
}

/**
* Update the provided timer, check if it's expired and throw a {@link ConnectException} with the provided error
* message if it is.
Expand Down
Expand Up @@ -124,6 +124,7 @@
import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_CONFIG;
import static org.apache.kafka.connect.json.JsonConverterConfig.SCHEMAS_ENABLE_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX;
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX;
Expand Down Expand Up @@ -1918,6 +1919,7 @@ public void testGetSourceConnectorOffsetsError() {
public void testAlterOffsetsConnectorDoesNotSupportOffsetAlteration() {
mockKafkaClusterId();

mockInternalConverters();
worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, Executors.newSingleThreadExecutor(),
allConnectorClientConfigOverridePolicy, null);
worker.start();
Expand Down Expand Up @@ -1946,6 +1948,7 @@ public void testAlterOffsetsConnectorDoesNotSupportOffsetAlteration() {
@SuppressWarnings("unchecked")
public void testAlterOffsetsSourceConnector() throws Exception {
mockKafkaClusterId();
mockInternalConverters();
worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, Executors.newSingleThreadExecutor(),
allConnectorClientConfigOverridePolicy, null);
worker.start();
Expand Down Expand Up @@ -1982,6 +1985,7 @@ public void testAlterOffsetsSourceConnector() throws Exception {
@SuppressWarnings("unchecked")
public void testAlterOffsetsSourceConnectorError() throws Exception {
mockKafkaClusterId();
mockInternalConverters();
worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, Executors.newSingleThreadExecutor(),
allConnectorClientConfigOverridePolicy, null);
worker.start();
Expand Down Expand Up @@ -2015,6 +2019,28 @@ public void testAlterOffsetsSourceConnectorError() throws Exception {
verifyKafkaClusterId();
}

@Test
public void testNormalizeSourceConnectorOffsets() throws Exception {
Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap(
Collections.singletonMap("filename", "/path/to/filename"),
Collections.singletonMap("position", 20)
);

assertTrue(offsets.values().iterator().next().get("position") instanceof Integer);

mockInternalConverters();

mockKafkaClusterId();
worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService,
noneConnectorClientConfigOverridePolicy, null);

Map<Map<String, ?>, Map<String, ?>> normalizedOffsets = worker.normalizeSourceConnectorOffsets(offsets);
assertEquals(1, normalizedOffsets.size());

// The integer value 20 gets deserialized as a long value by the JsonConverter
assertTrue(normalizedOffsets.values().iterator().next().get("position") instanceof Long);
}

@Test
public void testAlterOffsetsSinkConnectorNoDeletes() throws Exception {
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -2266,6 +2292,7 @@ public void testResetOffsetsSourceConnectorExactlyOnceSupportEnabled() throws Ex
workerProps.put("status.storage.topic", "connect-statuses");
config = new DistributedConfig(workerProps);
mockKafkaClusterId();
mockInternalConverters();
worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, Executors.newSingleThreadExecutor(),
allConnectorClientConfigOverridePolicy, null);
worker.start();
Expand All @@ -2277,8 +2304,8 @@ public void testResetOffsetsSourceConnectorExactlyOnceSupportEnabled() throws Ex
OffsetStorageWriter offsetWriter = mock(OffsetStorageWriter.class);

Set<Map<String, Object>> connectorPartitions = new HashSet<>();
connectorPartitions.add(Collections.singletonMap("partitionKey1", new Object()));
connectorPartitions.add(Collections.singletonMap("partitionKey2", new Object()));
connectorPartitions.add(Collections.singletonMap("partitionKey", "partitionValue1"));
connectorPartitions.add(Collections.singletonMap("partitionKey", "partitionValue2"));
when(offsetStore.connectorPartitions(eq(CONNECTOR_ID))).thenReturn(connectorPartitions);
when(offsetWriter.doFlush(any())).thenAnswer(invocation -> {
invocation.getArgument(0, Callback.class).onCompletion(null, null);
Expand Down Expand Up @@ -2382,6 +2409,7 @@ public void testResetOffsetsSinkConnectorDeleteConsumerGroupError() throws Excep
public void testModifySourceConnectorOffsetsTimeout() throws Exception {
mockKafkaClusterId();
Time time = new MockTime();
mockInternalConverters();
worker = new Worker(WORKER_ID, time, plugins, config, offsetBackingStore, Executors.newSingleThreadExecutor(),
allConnectorClientConfigOverridePolicy, null);
worker.start();
Expand Down Expand Up @@ -2498,14 +2526,14 @@ private void verifyStorage() {
}

private void mockInternalConverters() {
Converter internalKeyConverter = mock(JsonConverter.class);
Converter internalValueConverter = mock(JsonConverter.class);
JsonConverter jsonConverter = new JsonConverter();
jsonConverter.configure(Collections.singletonMap(SCHEMAS_ENABLE_CONFIG, false), false);

when(plugins.newInternalConverter(eq(true), anyString(), anyMap()))
.thenReturn(internalKeyConverter);
.thenReturn(jsonConverter);

when(plugins.newInternalConverter(eq(false), anyString(), anyMap()))
.thenReturn(internalValueConverter);
.thenReturn(jsonConverter);
}

private void verifyConverters() {
Expand Down