Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Issue with Repartition on Kafka Input DataFrame and Same Precombine Value Rows In One Batch #10995

Closed
brightwon opened this issue Apr 11, 2024 · 6 comments
Labels
on-call-triaged priority:minor everything else; usability gaps; questions; feature reqs spark Issues related to spark

Comments

@brightwon
Copy link
Contributor

brightwon commented Apr 11, 2024

Describe the problem you faced

I'm operating a typical Hudi workload that involves using spark structured streaming to read CDC events from Kafka and perform Upserts into S3.

I've encountered an issue where, when rows with same precombine values are present in the same batch, applying repartition to a Kafka input dataframe results saving rows that are not the last.

I faced a slow performance issue during the Tagging stage. To address this, I applied repartition to the input dataframe, which indeed improved the performance. Unfortunately, this led to a situation where data with incorrect order was saved.

To Reproduce

Steps to reproduce the behavior:

  1. Sent test data to Kafka using a Kafka producer. Here's an example of the data:

Offset 1 :

{
   "pk": 1,
   "c1": 1,
   "c2": 1
}

Offset 2 :

{
   "pk": 1,
   "c1": 1,
   "c2": 2
}

produced 1000 records, incrementing the value of the c2 field by 1 for each (1 ~ 1000). c1 is the precombine field, and c2 is the field used to distinguish each row.

  1. Run the example spark application to save the data to S3.

code sample

    val cdcDF = ss
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "broker1,broker2,broker3")
      .option("topic", "hudi_inorder_test")
      .option("maxOffsetsPerTrigger", "1000000")
      .load()

    val stream = cdcDF
      .repartition(100)  // apply repartition for tagging stage parallelism
      .writeStream
      .foreachBatch { (batchDF: DataFrame, batchId: Long) => saveBatch(batchDF, batchId) }
      .trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS))
      .start()

    stream.awaitTermination()
    
    def saveBatch(batchDF: DataFrame, batchId: Long): Unit = {
      batchDF.persist()
      
      // ...deserialize the avro offset value...
      val valueDF = batchDF.select(...)

      // save hudi dataset
      valueDF
        .write
        .format("hudi")
        .options(hudiOptions)
        .mode("append")
        .save("s3://mybucket/test/")

      batchDF.unpersist()
    }

write config (hudiOptions)

{
    "hoodie.table.name": "hudi_inorder_test",
    "hoodie.datasource.write.table.type": "MERGE_ON_READ",
    "hoodie.datasource.write.recordkey.field": "pk",
    "hoodie.datasource.write.partitionpath.field": "",
    "hoodie.datasource.write.precombine.field": "c1",
    "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.CustomKeyGenerator",
    "hoodie.compact.inline": "true",
    "hoodie.datasource.compaction.async.enable": "false",
    "hoodie.compact.inline.max.delta.commits": "1"
}
  1. Check the saved results. (I used AWS Athena.)

  2. Repeat steps 1 and 3 to verify if the value of c2 continues to change.

The c2 value changes with each repetition. here is the saved results.
1.

{
    "_hoodie_commit_time": "20240405065000817",
    "_hoodie_commit_seqno": "20240405065000817_0_1",
    "_hoodie_record_key": "1",
    "_hoodie_partition_path": "",
    "_hoodie_file_name": "1cee1e0b-2d25-434a-adb7-3d7e0ae4a275-0_0-58-9709_20240405065000817.parquet",
    "pk": "1",
    "c1": "1",
    "c2": "496"
}
{
    "_hoodie_commit_time": "20240405065319747",
    "_hoodie_commit_seqno": "20240405065319747_0_1",
    "_hoodie_record_key": "1",
    "_hoodie_partition_path": "",
    "_hoodie_file_name": "1cee1e0b-2d25-434a-adb7-3d7e0ae4a275-0_0-115-15330_20240405065319747.parquet",
    "pk": "1",
    "c1": "1",
    "c2": "316"
}

If repartition() is removed from the code, the c2 value is correctly saved as the last offset value, 1000.

Since I'll be using a single Kafka partition, I considered using the Kafka offset number as the precombine field. However, Hudi does not support changes to the precombine field.

I want to improve the performance of the tagging stage through increased parallelism without causing issues with data ordering. How can I resolve this?

Environment Description

  • Hudi version : 0.10.1-amzn-0 (EMR 6.6.0)

  • Spark version : 3.2.0 (EMR 6.6.0)

  • Hive version : 3.1.2 (EMR 6.6.0)

  • Hadoop version : 3.2.1 (EMR 6.6.0)

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : no

@ad1happy2go
Copy link
Contributor

@brightwon I do understand your issue. As precombine key should be more of ordering field ideally should contains different values for same record key. In your case, If the precombine key is same, then hudi just need to take one out of all. normally the last processed record of the rdd partition. When you do repartition data gets shuffles and it can become random, so multiple retries might give random result.

@brightwon
Copy link
Contributor Author

@ad1happy2go Thank you for your reply.

What I want is to speed up the tagging stage. Could you suggest a solution? I can achieve this by using repartition with a completely unique precombine field (such as the Kafka offset), but Hudi does not allow changing the precombine field of an existing table.

@ad1happy2go
Copy link
Contributor

@brightwon Yes changing precombining key will not be allowed. I do understand you trying to repartition to scale the tagging stage. You can try repartition on record key and see then if it gives consistent result.

@ad1happy2go
Copy link
Contributor

@brightwon Do you have any more doubts? Feel free to close if you are good on this. Thanks.

@brightwon
Copy link
Contributor Author

@ad1happy2go The problem has been resolved! Following your advice, I used the kafka offset key column in repartition. Now, I can parallel process the tagging stage using repartition, and there are no data ordering issues, even if the precombine field values are the same. Thank you!

val stream = cdcDF
      .repartition(20, col("key"))
      .writeStream
      ...

@codope codope added priority:minor everything else; usability gaps; questions; feature reqs spark Issues related to spark on-call-triaged labels Apr 16, 2024
@ad1happy2go
Copy link
Contributor

@brightwon Thanks for the update.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
on-call-triaged priority:minor everything else; usability gaps; questions; feature reqs spark Issues related to spark
Projects
Archived in project
Development

No branches or pull requests

3 participants