Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] AWSDmsAvroPayload does not work correctly with any version above 0.10.0 #6552

Closed
joao-miranda opened this issue Aug 31, 2022 · 8 comments
Assignees
Labels
aws-support on-call-triaged priority:critical production down; pipelines stalled; Need help asap. writer-core Issues relating to core transactions/write actions

Comments

@joao-miranda
Copy link

Describe the problem you faced
We are getting Full Load + CDC data from a RDBMS using AWS Database Migration Service into an S3 bucket. We then use Hudi in a Scala Glue Job to concatenate the files into a correct representation of the current status of the database. DMS adds two columns to the data: Op (with values null, I, U or D) and ts (timestamp of the operation). We are not using Hive or Avro.

This works fine with Hudi 0.9.0 and Hudi 0.10.0. Once we try to upgrade to Hudi 0.11.0, 0.11.1 or 0.12.0, AWSDmsAvroPayload fails with the following error:

33061 [consumer-thread-1] ERROR org.apache.hudi.io.HoodieWriteHandle  - Error writing record HoodieRecord{key=HoodieKey { recordKey=id:3 partitionPath=}, currentLocation='null', newLocation='null'}
java.util.NoSuchElementException: No value present in Option
        at org.apache.hudi.common.util.Option.get(Option.java:89)
        at org.apache.hudi.common.model.AWSDmsAvroPayload.getInsertValue(AWSDmsAvroPayload.java:72)
        at org.apache.hudi.execution.HoodieLazyInsertIterable$HoodieInsertValueGenResult.<init>(HoodieLazyInsertIterable.java:90)
        at org.apache.hudi.execution.HoodieLazyInsertIterable.lambda$getTransformFunction$0(HoodieLazyInsertIterable.java:103)
        at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:190)
        at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46)
        at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:106)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Removing the PAYLOAD_CLASS_OPT_KEY option from the config makes it so that the Job doesn't fail, but the delete operations are not applied. No other payload class seems to work with the DMS format.

Steps to reproduce the behavior
Dependencies:

"org.apache.hudi" %% "hudi-spark-bundle" % "2.12-0.12.0"
"org.apache.hudi" %% "hudi-utilities-bundle" % "2.12-0.12.0"

Configuration used:

var hudiOptions = scala.collection.mutable.Map[String, String](
      HoodieWriteConfig.TABLE_NAME -> "hudiTableName",
      HoodieWriteConfig.COMBINE_BEFORE_INSERT.key() -> "true",
      DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
      DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
      DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "primaryKeyField",
      DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY ->  "ts",
      DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY -> classOf[AWSDmsAvroPayload].getName,
      DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> classOf[CustomKeyGenerator].getName,
      DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, ""
    )

Following options are added if a partition key is defined:

      hudiOptions.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "partitionKeyField")
      hudiOptions.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true")
      hudiOptions.put(HoodieIndexConfig.INDEX_TYPE.key(), "GLOBAL_BLOOM")
      hudiOptions.put(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE.key(), "true")
      hudiOptions.put(DataSourceWriteOptions.DROP_PARTITION_COLUMNS.key(), "true")

Saved into a file:

    // Write the DataFrame as a Hudi dataset
    mappedDF
      .dropDuplicates()
      .write
      .format("org.apache.hudi")
      .options(hudiOptions)
      .mode(SaveMode.Append)
      .save("targetDirectory")

Expected behavior
Data obtained from using Hudi reflects the data present in the DB.

Environment Description

  • Hudi version : 0.12.0
  • Spark version : 3.1.1
  • Scala version: 2.12.15
  • AWS Glue version : 3.0.0
@codope
Copy link
Member

codope commented Sep 1, 2022

Configuration looks fine to me, except you don't really need to set HoodieWriteConfig.COMBINE_BEFORE_INSERT.key() -> "true".
Looking at the implementation of getInsertValue, an empty optional can result due to two scnearios:

  1. If the incoming record has a _hoodie_is_deleted field and it's value is set to true.
  2. If the incoming record is null/empty and hence the record byte length is 0.
    My hunch is it may be the latter. For the failing batch, is it possible to log the dataframe and inspect the records?

@codope codope added priority:major degraded perf; unable to move forward; potential bugs writer-core Issues relating to core transactions/write actions labels Sep 1, 2022
@joao-miranda
Copy link
Author

joao-miranda commented Sep 1, 2022

Thank you for your quick reply. It doesn't seem to be either of those options for this scenario. I created a unit test and added the full code so you can understand our use case better:

test("test_hudi_upgrade") {
    println("=============test_hudi_upgrade=============")

    val yesterday_date = 1654520683000L
    val today_date = 1654607083000L
    val tomorrow_date = 1654608083000L

    val testDF_upgrade: DataFrame = sparkSession.createDataFrame(
      List(
        (null, yesterday_date, 1, "Clark Kent", "Superman"),
        (null, yesterday_date, 2, "Bruce Wayne", "Batman"),
        (null, yesterday_date, 3, "Diana Prince", "Wonder Woman"),
        (null, yesterday_date, 4, "Hal Jordan", "Green Lantern"),
        ("I", today_date, 5, "Barry Allen", "The Flash"),
        ("I", today_date, 6, "Arthur Curry", "Aquaman"),
        ("D", today_date, 2, "Bruce Wayne", "Detective Comics"),
        ("U", today_date, 4, "John Stewart", "Green Lantern"),
        ("U", tomorrow_date, 4, "Guy Gardner", "Green Lantern")
      )
    ).toDF("Op", "ts", "id", "name", "alias")

    val expectedDF_upgrade = sparkSession.createDataFrame(
      List(
        (yesterday_date, 1, "Clark Kent", "Superman"),
        (yesterday_date, 3, "Diana Prince", "Wonder Woman"),
        (tomorrow_date, 4, "Guy Gardner", "Green Lantern"),
        (today_date, 5, "Barry Allen", "The Flash"),
        (today_date, 6, "Arthur Curry", "Aquaman"),
      )
    )
      .toDF("ts", "id", "name", "alias")
      .orderBy("id")

    val testDirectory = "file:/tmp/hudi/"
    val testDatabase = "testDatabase"
    val testTable = "testTable"
    val tableName = "upgrade_table"


    var hudiOptions = scala.collection.mutable.Map[String, String](
      HoodieWriteConfig.TABLE_NAME -> tableName,
      DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
      DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
      DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
      DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY ->  "ts",
      DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY -> classOf[AWSDmsAvroPayload].getName,
      DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> classOf[CustomKeyGenerator].getName,
      DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> ""
    )

    // Write the DataFrame as a Hudi dataset
    testDF_upgrade
      .dropDuplicates()
      .write
      .format("org.apache.hudi")
      .options(hudiOptions)
      .mode(SaveMode.Append)
      .save(s"$testDirectory/$testDatabase/$tableName")



    val actualDF_upgrade = sparkSession.read.format("org.apache.hudi").load(
      s"$testDirectory/$testDatabase/$tableName"
    )
      .orderBy("id")

    println("=============Expected DF=============")
    expectedDF_upgrade.show(false)
    println("=============Actual DF=============")
    actualDF_upgrade.show(false)

    assertSmallDatasetEquality(actualDF_upgrade.select("ts", "id", "name", "alias"), expectedDF_upgrade, true, false, true)
  }

This runs succesfully with Hudi 0.10.0:

=============test_hudi_upgrade=============
=============Expected DF=============
+-------------+---+------------+-------------+
|ts           |id |name        |alias        |
+-------------+---+------------+-------------+
|1654520683000|1  |Clark Kent  |Superman     |
|1654520683000|3  |Diana Prince|Wonder Woman |
|1654608083000|4  |Guy Gardner |Green Lantern|
|1654607083000|5  |Barry Allen |The Flash    |
|1654607083000|6  |Arthur Curry|Aquaman      |
+-------------+---+------------+-------------+

=============Actual DF=============
+-------------------+---------------------+------------------+----------------------+--------------------------------------------------------------------------+----+-------------+---+------------+-------------+
|_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name                                                         |Op  |ts           |id |name        |alias        |
+-------------------+---------------------+------------------+----------------------+--------------------------------------------------------------------------+----+-------------+---+------------+-------------+
|20220901100706768  |20220901100706768_0_3|1                 |                      |431a7e13-bc66-404a-8193-b1c8a1adb194-0_0-32-2205_20220901100706768.parquet|null|1654520683000|1  |Clark Kent  |Superman     |
|20220901100706768  |20220901100706768_0_4|3                 |                      |431a7e13-bc66-404a-8193-b1c8a1adb194-0_0-32-2205_20220901100706768.parquet|null|1654520683000|3  |Diana Prince|Wonder Woman |
|20220901100706768  |20220901100706768_0_5|4                 |                      |431a7e13-bc66-404a-8193-b1c8a1adb194-0_0-32-2205_20220901100706768.parquet|U   |1654608083000|4  |Guy Gardner |Green Lantern|
|20220901100706768  |20220901100706768_0_1|5                 |                      |431a7e13-bc66-404a-8193-b1c8a1adb194-0_0-32-2205_20220901100706768.parquet|I   |1654607083000|5  |Barry Allen |The Flash    |
|20220901100706768  |20220901100706768_0_2|6                 |                      |431a7e13-bc66-404a-8193-b1c8a1adb194-0_0-32-2205_20220901100706768.parquet|I   |1654607083000|6  |Arthur Curry|Aquaman      |
+-------------------+---------------------+------------------+----------------------+--------------------------------------------------------------------------+----+-------------+---+------------+-------------+

and fails with Hudi 0.12.0:

=============test_hudi_upgrade=============
32131 [consumer-thread-1] ERROR org.apache.hudi.io.HoodieWriteHandle  - Error writing record HoodieRecord{key=HoodieKey { recordKey=2 partitionPath=}, currentLocation='null', newLocation='null'}
java.util.NoSuchElementException: No value present in Option
        at org.apache.hudi.common.util.Option.get(Option.java:89)
        at org.apache.hudi.common.model.AWSDmsAvroPayload.getInsertValue(AWSDmsAvroPayload.java:72)
        at org.apache.hudi.execution.HoodieLazyInsertIterable$HoodieInsertValueGenResult.<init>(HoodieLazyInsertIterable.java:90)
        at org.apache.hudi.execution.HoodieLazyInsertIterable.lambda$getTransformFunction$0(HoodieLazyInsertIterable.java:103)
        at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:190)
        at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46)
        at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:106)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
32345 [pool-6-thread-3-ScalaTest-running-RawCDCMergeTest] ERROR org.apache.hudi.HoodieSparkSqlWriter$  - UPSERT failed with errors
[info] RawCDCMergeTest:
[info] - test_hudi_upgrade *** FAILED ***
[info]   org.apache.spark.sql.AnalysisException: cannot resolve '`id`' given input columns: [];
[info] 'Sort ['id ASC NULLS FIRST], true
[info] +- Relation[] org.apache.hudi.EmptyRelation@5144bb56
[info]   at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
[info]   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:155)
[info]   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:152)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:341)
[info]   at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:73)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:341)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:338)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:407)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:243)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:405)
[info]   ...

Please let me know if you need any other information.

@nsivabalan nsivabalan added priority:critical production down; pipelines stalled; Need help asap. and removed priority:major degraded perf; unable to move forward; potential bugs labels Sep 1, 2022
@umehrot2
Copy link
Contributor

umehrot2 commented Sep 1, 2022

This appears to be genuine issue introduced with ba4e732.

@rahil-c
Copy link
Contributor

rahil-c commented Sep 7, 2022

Currently investigating this issue

@yihua yihua assigned rahil-c and unassigned codope Sep 7, 2022
@nsivabalan
Copy link
Contributor

yeah. Udit pointed out the right commit.
here is the fix that worked out for me locally.

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java
index 20a20fb629..a3c6dde99e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java
@@ -69,21 +69,21 @@ public class AWSDmsAvroPayload extends OverwriteWithLatestAvroPayload {
 
   @Override
   public Option<IndexedRecord> getInsertValue(Schema schema, Properties properties) throws IOException {
-    IndexedRecord insertValue = super.getInsertValue(schema, properties).get();
-    return handleDeleteOperation(insertValue);
+    Option<IndexedRecord> insertValue = super.getInsertValue(schema, properties);
+    return insertValue.isPresent() ? handleDeleteOperation(insertValue.get()) : insertValue;
   }
 
   @Override
   public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException {
-    IndexedRecord insertValue = super.getInsertValue(schema).get();
-    return handleDeleteOperation(insertValue);
+    Option<IndexedRecord> insertValue = super.getInsertValue(schema);
+    return insertValue.isPresent() ? handleDeleteOperation(insertValue.get()) : insertValue;
   }
 
   @Override
   public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties)
       throws IOException {
-    IndexedRecord insertValue = super.getInsertValue(schema, properties).get();
-    return handleDeleteOperation(insertValue);
+    Option<IndexedRecord> insertValue = super.getInsertValue(schema, properties);
+    return insertValue.isPresent() ? handleDeleteOperation(insertValue.get()) : insertValue;
   }
 

@yihua
Copy link
Contributor

yihua commented Sep 8, 2022

@rahil-c and I discussed this today. The proper fix is to call the corresponding API instead of repeating the invocation of handleDeleteOperation:

FIXED ->
@Override
  public Option<IndexedRecord> getInsertValue(Schema schema, Properties properties) throws IOException {
    return getInsertValue(schema);
  }

  @Override
  public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException {
    IndexedRecord insertValue = super.getInsertValue(schema).get();
    return handleDeleteOperation(insertValue);
  }

@Override
  public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties)
      throws IOException {
    return combineAndGetUpdateValue(currentValue, schema);
  }

  @Override
  public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema)
      throws IOException {
    IndexedRecord insertValue = super.getInsertValue(schema).get();
    return handleDeleteOperation(insertValue);
  }

@rahil-c will put up a fix.

@rahil-c
Copy link
Contributor

rahil-c commented Sep 8, 2022

Draft pr: #6637

@nsivabalan
Copy link
Contributor

closing this as we have a fix. thanks for reporting.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
aws-support on-call-triaged priority:critical production down; pipelines stalled; Need help asap. writer-core Issues relating to core transactions/write actions
Projects
Archived in project
Development

No branches or pull requests

6 participants