Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Deltastreamer errors ingesting kafka-streams topics (transactional / exactly_once producers) #8258

Open
danielfordfc opened this issue Mar 21, 2023 · 7 comments
Assignees
Labels
hudistreamer issues related to Hudi streamer (Formely deltastreamer) priority:critical production down; pipelines stalled; Need help asap.

Comments

@danielfordfc
Copy link

Describe the problem you faced

Deltastreamer, when being run on a transactional topic (one being produced to by a transactional producer, like kafka-streams) is unable to be read.

Caused by: java.lang.IllegalArgumentException: requirement failed:
 Failed to get records for compacted spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 after polling for 1000
 
 OR when using AllowNonConsecutiveOffsets=false..
 
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 4) (192.168.1.240 executor driver): java.lang.IllegalArgumentException: requirement failed: Failed to get records for spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 5 after polling for 1000
        at scala.Predef$.require(Predef.scala:281)
        at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:143)

Full stack traces will be linked below.

Our configuration is as follows:

Note: this works fine on non-transactional topics, and compacted topics (cleanup.policy=compact) when setting AllowNonConsecutiveOffsets=true
Note: This isn't a networking issue or timeout issue which we'll discuss below.

hoodie.datasource.write.recordkey.field=viewtime
hoodie.datasource.write.partitionpath.field=pageid
hoodie.deltastreamer.source.kafka.topic=${topic}
hoodie.deltastreamer.schemaprovider.registry.url=[http://localhost:8081/subjects/${topic}-value/versions/latest](http://localhost:8081/subjects/$%7Btopic%7D-value/versions/latest)
schema.registry.url=http://localhost:8081/
# Kafka Consumer props
bootstrap.servers=localhost:9092
auto.offset.reset=earliest
# Consumer Group
group.id=hudi-deltastreamer-${topic}
#isolation.level=read_committed <-- tried adjusting this with no effect
#enable.auto.commit=false <-- tried adjusting this with no effect

# spark.properties
spark.streaming.kafka.allowNonConsecutiveOffsets=true <--  so we use this by default as some of our topics are compacted
spark.streaming.kafka.consumer.poll.ms=1000 <-- To make it fail faster, from default of 120,000
spark.executor.cores=1

spark-submit \
  --master local[1] \
  --num-executors=1 \
  --executor-cores=1 \
  --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
  --conf "spark.driver.extraJavaOptions=${LOG4J_SETTING}" \
  --conf "spark.executor.extraJavaOptions=${LOG4J_SETTING}" \
  --properties-file ~/Sandbox/spark-sandbox/src/main/resources/spark.properties \
 ~/Workspace/github.com/apache/hudi/target/hudi-utilities-bundle_2.12-0.12.1.jar \
 --op INSERT \
 --props /tmp/hoodie-conf-${topic}.properties \
 --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
 --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
 --source-ordering-field viewtime  \
 --table-type COPY_ON_WRITE \
 --target-base-path file://${target_base_path} \
 --target-table $target_table

We've identified that this is because the get() / compactedNext() method used by the InternalKafkaDataConsumer is failing to poll records/batches for these transactional topics..

If we create a simple non-compacted topic that we'll be writing to non-transactionally, and one transactionally:

kafka-topics --bootstrap-server localhost:9092 --create --topic my-transactional-topic --partitions 5 
kafka-topics --bootstrap-server localhost:9092 --create --pageviews --partitions 5 
# Following messages can be consumed when produced **without** transactional producer, but not with..
[{:viewtime 100 :userid "User_0" :pageid "Page_0"}
               {:viewtime 101 :userid "User_1" :pageid "Page_1"}
               {:viewtime 102 :userid "User_2" :pageid "Page_2"}
               ...etc...
               {:viewtime 115 :userid "User_15" :pageid "Page_15"}]

Produced 16 message non-transactionally -- as you can see the end/"next available" offset is the one after the last offset containing data in each partition
Screenshot 2023-03-21 at 12 36 09

Produced 16 message transactionally -- as you can see the end/"next available" offset is 2 more than the last offset containing data in each partition, because the end of that batch of write placed a commit marker/offset message in each partition
Screenshot 2023-03-21 at 12 36 37

And we see the stack traces mentioned at the bottom:
hoodie-allow-consecutive-off-false.log
hoodie-allow-consecutive-off-true.log

Notably

Extra Information gathered from running this locally

Dive into our local example showing how we get the poll of [topic-partition] 5, followed by a poll of [] 0, followed by the crash when AllowNonConsecutiveOffsets=true

Interestingly, in the below, when setting AllowNonConsecutiveOffsets=False, we see that the initial poll for the partition 0 (which from the above screenshot, showed offset 0->4 being valid messages, offset 5 being the commit marker, has it poll those first 5 messages, then fail on the next poll.

23/03/21 12:48:57 INFO org.apache.spark.streaming.kafka010.KafkaRDD: Computing topic my-transactional-topic, partition 0 offsets 0 -> 6
23/03/21 12:48:57 DEBUG org.apache.spark.streaming.kafka010.KafkaDataConsumer: Not used cached consumer found, re-using it InternalKafkaConsumer(hash=511066e5, groupId=spark-executor-hudi-deltastreamer-my-transactional-topic, topicPartition=my-transactional-topic-0)
23/03/21 12:48:57 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Get spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 nextOffset 1 requested 0
23/03/21 12:48:57 INFO org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Initial fetch for spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 0
23/03/21 12:48:57 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Seeking to my-transactional-topic-0 0
23/03/21 12:48:57 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Polled [my-transactional-topic-0]  5
23/03/21 12:48:57 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Get spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 nextOffset 1 requested 1
23/03/21 12:48:57 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Get spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 nextOffset 2 requested 2
23/03/21 12:48:57 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Get spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 nextOffset 3 requested 3
23/03/21 12:48:57 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Get spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 nextOffset 4 requested 4
23/03/21 12:48:57 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Get spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 nextOffset 5 requested 5
23/03/21 12:48:58 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Polled []  0
23/03/21 12:48:59 WARN org.apache.spark.storage.BlockManager: Putting block rdd_2_0 failed due to exception java.lang.IllegalArgumentException: requirement failed: Failed to get records for spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 5 after polling for 1000.

If we create another topic with one partition and write a single batch of 16 records transactionally (so 0->15 is data, 16 is commit marker, end of topic is 17), we see similar behaviour.

my-transactional-topic-single-partition.log

If we remove the possibility that it might be crashing because the endOffset is the "invisible" marker that it can't read, by adding another 16 records (putting 17->32 as data, 33 as the marker and 34 as endOffset), we see a similar issue with the following:

requirement failed: Got wrong record for spark-executor-hudi-deltastreamer-my-transactional-topic-single-partition my-transactional-topic-single-partition-0 even after seeking to offset 16 got offset 17 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets

my-transactional-topic-single-partition-32-msgs.log

Changing to AllowNonConsecutiveOffsets=true on the above topic yields the following:

23/03/21 13:24:10 INFO org.apache.spark.streaming.kafka010.KafkaRDD: Computing topic my-transactional-topic-single-partition, partition 0 offsets 0 -> 34
23/03/21 13:24:10 DEBUG org.apache.spark.streaming.kafka010.KafkaDataConsumer: Not used cached consumer found, re-using it InternalKafkaConsumer(hash=9903e40, groupId=spark-executor-hudi-deltastreamer-my-transactional-topic-single-partition, topicPartition=my-transactional-topic-single-partition-0)
23/03/21 13:24:10 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: compacted start spark-executor-hudi-deltastreamer-my-transactional-topic-single-partition my-transactional-topic-single-partition-0 starting 0
23/03/21 13:24:10 INFO org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Initial fetch for compacted spark-executor-hudi-deltastreamer-my-transactional-topic-single-partition my-transactional-topic-single-partition-0 0
23/03/21 13:24:10 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Seeking to my-transactional-topic-single-partition-0 0
23/03/21 13:24:10 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Polled [my-transactional-topic-single-partition-0]  32
23/03/21 13:24:10 INFO org.apache.spark.storage.BlockManager: Removing RDD 6
23/03/21 13:24:11 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Polled []  0
23/03/21 13:24:11 WARN org.apache.spark.storage.BlockManager: Putting block rdd_2_0 failed due to exception java.lang.IllegalArgumentException: requirement failed: Failed to get records for compacted spark-executor-hudi-deltastreamer-my-transactional-topic-single-partition my-transactional-topic-single-partition-0 after polling for 1000.
23/03/21 13:24:11 WARN org.apache.spark.storage.BlockManager: Block rdd_2_0 could not be removed as it was not found on disk or in memory
23/03/21 13:24:11 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 4.0 (TID 4)
java.lang.IllegalArgumentException: requirement failed: Failed to get records for compacted spark-executor-hudi-deltastreamer-my-transactional-topic-single-partition my-transactional-topic-single-partition-0 after polling for 1000

Stack trace for the above:
my-transactional-topic-single-partition-allownonconsecutiveoffsetsTrue.log

Answers Required

So we know what the problem is, we are just unsure on how to fix.
We've taken this to the hudi office hours before and the host suggested to ask @yihua for advice.

Usual Environment in Production, but all of this has been reproduced locally

Hudi version : Deltastreamer on EMR 6.8.0 running Hudi 0.11.1-amzn-0
Spark version : 3.3.0
Hive version : 3.1.3
Hadoop version : Amazon 3.2.1
Storage (HDFS/S3/GCS..) : S3
Running on Docker? (yes/no) : No

Additional context

hudi 0.12.1 used for local testing
Can add more details if required.

Stacktrace
Stacktraces have been littered throughout but pasted here again:

my-transactional-topic-single-partition-32-msgs.log
my-transactional-topic-single-partition-allownonconsecutiveoffsetsTrue.log
hoodie-allow-consecutive-off-false.log
hoodie-allow-consecutive-off-true.log
my-transactional-topic-single-partition.log

@WarFox
Copy link

WarFox commented Mar 23, 2023

More observations on this

  • It is observed that the problem occurs when transaction marker is at the last offset for consumption
  • The ingestion is successful when we have non-transactional messages after the transaction marker offset

As seen in the logs attached above, we suspect that the underlying Spark KafkaRDD library is confused about the transaction marker when it is the last record in the topic

@ad1happy2go
Copy link
Contributor

ad1happy2go commented Mar 26, 2023

I am able to reproduce your issue with HoodieDeltaStreamer. We are looking into this issue.

meanwhile you can use spark structured streaming to read the data from transactional topic and write to Hudi. I am not seeing any error while using the structured streaming. Below is the code you can refer.
code.txt

@WarFox
Copy link

WarFox commented Mar 26, 2023

Thanks @ad1happy2go

We are also leaning towards using either Spark Structured streaming or Batch processing as a workaround. We have some working examples for both cases now, thanks again for sharing your example code too

Could you confirm if we need to have a separate process for compaction when using Spark Structured streaming for writing to Hudi?

@ad1happy2go
Copy link
Contributor

@WarFox
We can either use async compaction enable or offline compaction job.
With streaming, we normally don't use inline compaction job as that will increase the latency.

@yihua yihua added priority:critical production down; pipelines stalled; Need help asap. hudistreamer issues related to Hudi streamer (Formely deltastreamer) labels Mar 29, 2023
@ad1happy2go
Copy link
Contributor

@danielfordfc JIRA created for the tracking the fix - https://issues.apache.org/jira/browse/HUDI-6297

@codope codope added release-0.14.2 Patches/Issue fixes targetted for 0.14.2 release and removed release-0.14.1 labels Jan 3, 2024
@codope codope removed the release-0.14.2 Patches/Issue fixes targetted for 0.14.2 release label Feb 29, 2024
@danielfordfc
Copy link
Author

@ad1happy2go @nsivabalan is this absolutely not going to happen anytime soon? It's preventing us from directly ingesting a large majority of our Kafka topics in our organisation and i'm very surprised it's not a more widely experienced issue, given its a common feature of topics produced through kafka-streams applications

@ad1happy2go
Copy link
Contributor

Currently I didn't had any viable solution to fix this. Not sure if anybody is looking into it currently. So it is not going to happen anytime soon.
@bvaradar Do you have any insights how can we handle transactional topics? PR - #9059

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
hudistreamer issues related to Hudi streamer (Formely deltastreamer) priority:critical production down; pipelines stalled; Need help asap.
Projects
Status: 🏁 Triaged
Development

No branches or pull requests

6 participants