New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce optional external.version.header config #697
Introduce optional external.version.header config #697
Conversation
request.version((Long) record.headers().lastWithName( | ||
config.externalVersionHeader()).value()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we put this in service, we noticed that even though we used an SMT to write a long
type header, it didn't come out that way in this code. We got it to work by using org.apache.kafka.connect.storage.SimpleHeaderConverter #fromConnectHeader
and then reading it as a String and parsing it to a Long. Does that sound correct or did we misunderstand something about kafka connect headers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @rjosal-indeed
Apologies, I missed this before.
Would it be possible for you to elaborate it? I am not sure if I quite understood how you used SimpleHeaderConverter.
Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, the code for this lines with the connect header converter looks like:
final Header versionHeader = record.headers().lastWithName(config.externalVersionHeader());
final byte[] versionValue = HEADER_CONVERTER.fromConnectHeader(
record.topic(),
versionHeader.key(),
versionHeader.schema(),
versionHeader.value()
);
request.version(Long.parseLong(new String(versionValue)));
where HEADER_CONVERTER is an constant new SimpleHeaderConverter()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @rjosal-indeed
Great! Thank you so much, this makes sense.
I think we can go ahead with this change. Also, can you refer to the suggestions mentioned here (ignore 2nd point) and make the changes accordingly? After, we can go ahead and merge the changes real quick.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sp-gupta ready for review, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @rjosal-indeed
There was one import missing due to which the build was failing. I have corrected it.
Again, Thank you so much for your contribution. Really appreciate!!
src/main/java/io/confluent/connect/elasticsearch/DataConverter.java
Outdated
Show resolved
Hide resolved
…nal version instead of kafka offset for non-datastream indices
ee17b6e
to
6efa59d
Compare
Introduce optional external.version.header config to use for ES external version instead of kafka offset for non-datastream indices
Problem
After changing an index mapping, we need to reindex. To do this, we create a new index, run a batch job with all the existing input documents, while at the same time streaming realtime updates, ensuring we do not miss any documents. Today both can't run at the same time because we might overwrite a new streamed update with a batch index.
Solution
ES external versioning can be used to compare document versions for idempotency. We will be able to pull a version out of the document into a kafka header and use the same version in the batch job. We choose header so that deletes with null values are also idempotent.
Does this solution apply anywhere else?
If yes, where?
There are a couple of other uses cases that may be solved by this control.
https://github.com/confluentinc/kafka-connect-elasticsearch/issues?q=is%3Aissue+is%3Aopen+external+version
Test Strategy
Testing done:
Release Plan
If merged, we will run this connector in Confluent Cloud. Else, we will run the fork. As an optional feature, with a default that does not affect existing usages, it is backwards compatible.
master
n/a
Thank you, and please pick this apart!