New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-15182: Normalize source connector offsets before invoking SourceConnector::alterOffsets #14003
Conversation
…eConnector::alterOffsets
for (Map.Entry<Map<String, ?>, Map<String, ?>> entry : originalOffsets.entrySet()) { | ||
OffsetUtils.validateFormat(entry.getKey()); | ||
OffsetUtils.validateFormat(entry.getValue()); | ||
byte[] serializedKey = internalKeyConverter.fromConnectData("", null, entry.getKey()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be safe to do because the OffsetStorageReaderImpl
also serializes the connector / task specified source partition before retrieving its corresponding source offset. The difference here is that there is an extra ser / deser hop although that shouldn't cause issues. So, for instance:
Map<String, Object> p1 = Collections.singletonMap("partition_key", 10);
Map<String, Object> p2 = Collections.singletonMap("partition_key", 10L);
ByteBuffer serializedP1 = ByteBuffer.wrap(converter.fromConnectData("", null, p1));
ByteBuffer serializedP2 = ByteBuffer.wrap(converter.fromConnectData("", null, p2));
assertTrue(serializedP1.equals(serializedP2));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Yash! Looks great, some minor thoughts and then this should be good to go.
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks Yash!
…eConnector::alterOffsets (apache#14003) Reviewers: Chris Egerton <chrise@aiven.io>
FileStreamSourceConnector
.Committer Checklist (excluded from commit message)