Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-6297] Fixed issue in consuming transactional topic #9059

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

ad1happy2go
Copy link
Contributor

Change Logs

Fixed issue in consuming Transactional Kafka Topic.

Impact

none

Risk level (write none, low medium or high below)

none

Documentation Update

none

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@ad1happy2go ad1happy2go changed the title Fixed issue in consuming transactional topic [HUDI-6297] Fixed issue in consuming transactional topic Jun 27, 2023
@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

genRec = genericRecordIterator.next();
} catch (IllegalArgumentException e) {
LOG.warn("Handling exception for transaction topic - " + e.getMessage());
break;
Copy link
Contributor

@danny0405 danny0405 Jun 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does the break indicate here ? Does that mean all the subsequent records are invalid ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 For the transactional(exactly once) topics, there is an issue that even if it reaches the end of topic the hasNext function gives true but the next function is throwing the exception. In such cases only we will ignore the exception and break the loop.

For the normal Kafka topics catch block will not be running as next will not fail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any kafka config that we can rely on to deduce whether its transactional topics or not ?
we can't do this in all cases and try to have this special handling only in case of transactional kafka topics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback @nsivabalan. makes sense. Will check more on this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ad1happy2go quick question- so look is breaking here for exception, how does next records will be read in case of continues mode(streaming jobs) .? is there any mechanism to re-try in case of failure or does this only work for batch jobs .?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nsivabalan I don't think there is a very reliable kafka configuration that we can use to deduct this. It is up to the Kafka Producer to initiate a transaction and mark the beginning and ending of a transaction. If Kafka Streams is used then there is a configuration processing.guarantee which can be set to exactly_once for transactional topics. But not sure how Hudi can get this configuration.

Also, note that in a single topic we may have transactional messages and and non-transactional messages depending on the application logic.

The team using Hudi Deltastreamer will know if their topic has transactional messages. I would say safer bet is to introduce a Hudi configuration for this. That gives flexibility for the team to mark their topic as transactional or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danielfordfc Thanks for testing. I did test multiple rounds of producing records transactionally, but I kept the delta streamer running in continuous mode without stopping and rerunning the job. I will try to reproduce that scenario as well. If that turns out to be the case, fixing the transactional topic may not be so straightforward. We may need to explore alternative approaches and potentially rewrite the code to handle it similar to how Spark Structured Streaming handles it.

@WarFox, I also couldn't find a reliable way to detect this on the consumer side. So, if the fix works, I believe adding the configuration would be a good approach to handle this special case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danielfordfc The iterator is for one micro batch that is for one KafkaRDD. The next batch will come in different KafkaRDD.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aditiwari01 I see, I guess for most deltastreamer usages we would need it to handle both use cases, where it handles it during deltastreamer record production, and the above case we tested, as running the tool in both batch and continuous is very common

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will also admit I can't quite understand what the difference would be between the two cases? Surely if it's just skipping a record inside the micro batch, it shouldn't matter where that bad record is?

We also tried this with a continue instead of break and noticed the same behaviour

@codope codope added priority:major degraded perf; unable to move forward; potential bugs release-0.14.0 labels Jun 28, 2023
@nsivabalan nsivabalan added the pr:wip Work in Progress/PRs label Jul 3, 2023
@ad1happy2go ad1happy2go marked this pull request as draft August 30, 2023 07:10
@bvaradar
Copy link
Contributor

@ad1happy2go : Can you introduce the change in KafkaSource (with a config) which can wrap the RDD and handle the behavior instead of making the change in DeltaSync ?

@nsivabalan
Copy link
Contributor

hey @bvaradar : can you help with this issue. Do you have pointers on exactly once from kafka. looks like reading it OOB might result in data consistency issues as reported above.

@github-actions github-actions bot added the size:XS PR with lines of changes in <= 10 label Feb 26, 2024
@danielfordfc
Copy link

Hey @nsivabalan , @bvaradar , @ad1happy2go - is this absolutely not going to happen anytime soon? It's preventing us from directly ingesting a large majority of our Kafka topics in our organisation and i'm very surprised it's not a more widely experienced issue, given its a common feature of topics produced through kafka-streams applications

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
pr:wip Work in Progress/PRs priority:major degraded perf; unable to move forward; potential bugs release-0.14.1 size:XS PR with lines of changes in <= 10
Projects
Status: 🆕 New
Development

Successfully merging this pull request may close these issues.

None yet

9 participants