Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove kafka lookup records when a record is tombstoned #12819

Merged

Conversation

hwball
Copy link
Contributor

@hwball hwball commented Jul 26, 2022

Description

Lookups made using the druid-kafka-extraction-namespace extension do not currently handle records that have been deleted from a kafka topic. Deleting a record from a kafka topic involves sending a record with a null payload and the same key as the record you want to delete to the topic. This is also known as sending a tombstone. This PR aims to handle key value records from kafka that have keys but also null values.

On receiving a key value record that has a key and a null value, the corresponding record stored in the factory map will be removed.


Key changed/added classes in this PR
  • KafkaLookupExtractorFactory.java

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@abhishekagarwal87
Copy link
Contributor

Travis didn't trigger for some reason. I closed and re-opened the PR. changes LGTM. I will merge once CI passes.

Copy link
Contributor

@paul-rogers paul-rogers left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great fix, thanks!

Should we document this behavior in the docs? The only reference I could find is here, but that doesn't address streaming updates.

@cloventt
Copy link
Contributor

cloventt commented Aug 1, 2022

The docs are actually buried in the extensions area. I've updated them to mention this new behaviour.


long start = System.currentTimeMillis();
while (events == factory.getCompletedEventCount()) {
Thread.sleep(100);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use 10 here as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hwball @cloventt - can you address this comment? Last thing to do before we can merge this PR and #12842

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @abhishekagarwal87 I've pushed that change, sorry for the delay.

Copy link
Contributor Author

@hwball hwball Aug 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it fails on CI with a lower sleep time. It was tested locally a few times and passed with the lower value but I will revert the change so the CI can pass. Never mind it looks like an unrelated test caused the failure

@abhishekagarwal87
Copy link
Contributor

thank you. I will merge this once CI is green.

@abhishekagarwal87 abhishekagarwal87 merged commit abd7a97 into apache:master Aug 9, 2022
@abhishekagarwal87
Copy link
Contributor

@hwball @cloventt - Thank you for your contribution. I have merged the PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants