Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Exception when using MERGE INTO #9469

Open
praneethh opened this issue Aug 17, 2023 · 17 comments
Open

[SUPPORT] Exception when using MERGE INTO #9469

praneethh opened this issue Aug 17, 2023 · 17 comments
Labels
data-consistency phantoms, duplicates, write skew, inconsistent snapshot on-call-triaged priority:critical production down; pipelines stalled; Need help asap.

Comments

@praneethh
Copy link

praneethh commented Aug 17, 2023

I'm trying to use merge into and perform partial update on the target data but getting the following error:

java.lang.UnsupportedOperationException: MERGE INTO TABLE is not supported temporarily.
  at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:718)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
  at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
  at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:67)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)

Steps to reproduce:

  1. Load the target table
 val df = Seq(("1","neo","2023-08-04 12:00:00","2023-08-04 12:00:00","2023-08-04")).toDF("emp_id", "emp_name", "log_ts", "load_ts", "log_dt")

df.select(col("emp_id").cast("int"),col("emp_name").cast("string"),col("log_ts").cast("timestamp"),col("load_ts").cast("timestamp"),col("log_dt").cast("date"))

res0.write.format("hudi")
.option("hoodie.payload.ordering.field", "load_ts")
.option("hoodie.datasource.write.recordkey.field", "emp_id")
.option("hoodie.datasource.write.partitionpath.field", "log_dt")
.option("hoodie.index.type","GLOBAL_SIMPLE")
.option("hoodie.table.name", "hudi_test")
.option("hoodie.simple.index.update.partition.path", "false")
.option("hoodie.datasource.write.precombine.field", "load_ts")
.option("hoodie.datasource.write.payload.class","org.apache.hudi.common.model.PartialUpdateAvroPayload")
.option("hoodie.datasource.write.reconcile.schema","true")
.option("hoodie.schema.on.read.enable","true")
.option("hoodie.datasource.write.hive_style_partitioning", "true")
.option("hoodie.datasource.write.row.writer.enable","false")
.option("hoodie.datasource.hive_sync.enable","true")
.option("hoodie.datasource.hive_sync.database","pharpan")
.option("hoodie.datasource.hive_sync.table", "hudi_test")
.option("hoodie.datasource.hive_sync.partition_fields", "log_dt")
.option("hoodie.datasource.hive_sync.ignore_exceptions", "true")
.option("hoodie.datasource.hive_sync.mode", "hms")
.option("hoodie.datasource.hive_sync.use_jdbc", "false")
.option("hoodie.datasource.write.operation","upsert")
.mode("append")
.save("gs://sample_bucket/hudi_sample_output_data") 
  1. Load the incremental data

val df2 = Seq(("1","neo","2023-08-05 14:00:00","2023-08-04 12:00:00","2023-08-05"),("2","trinity","2023-08-05 14:00:00","2023-08-05 15:00:00","2023-08-05")).toDF("emp_id", "emp_name", "log_ts","load_ts","log_dt")

df2.select(col("emp_id").cast("int"),col("emp_name").cast("string"),col("log_ts").cast("timestamp"),col("load_ts").cast("timestamp"),col("log_dt").cast("date"))

res2.createOrReplaceTempView("incremental_data")

  1. Perform merge
val sqlPartialUpdate =
           s"""
             | merge into pharpan.hudi_test as target
             | using (
            |   select * from incremental_data
            | ) source
            | on  target.emp_id = source.emp_id
            | when matched then
            |   update set target.log_ts = source.log_ts, target.log_dt = source.log_dt
            | when not matched then insert *
            """.stripMargin

spark.sql(sqlPartialUpdate)

Hudi verison: 0.13.1
Using "org.apache.hudi.common.model.PartialUpdateAvroPayload" for partial update.

Can someone please help in resolving this error? Also, please share the documentation on using MERGE INTO if I'm using it in the wrong way.

@jonvex
Copy link
Contributor

jonvex commented Aug 18, 2023

Hi, one thing I notice is that you are updating the partitionpath field value target.log_dt = source.log_dt. This may not be the cause of the exception, but also having "hoodie.simple.index.update.partition.path", "false" could result in some unknown behavior.
How are you running spark? What version of spark are you using?

@praneethh
Copy link
Author

praneethh commented Aug 18, 2023

@jonvex As of now, I'm using spark-shell to test but eventually move to dataproc spark-submit. The spark version is 3.1.2

@jonvex
Copy link
Contributor

jonvex commented Aug 18, 2023

Can you check that the configs:

spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension

are set and spelled correctly?
Also can you make sure that the spark jar is using scala 2.12?

@praneethh
Copy link
Author

praneethh commented Aug 19, 2023

@jonvex by adding the HoodieSparkSessionExtension config merge is working. However, I'm seeing three records instead of two records even though I changed this property to true "hoodie.simple.index.update.partition.path", "true" . Can you please suggest what else might be the issue?

+-------------------+--------------------+------------------+----------------------+--------------------+------+--------+-------------------+-------------------+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|emp_id|emp_name|             log_ts|            load_ts|    log_dt|
+-------------------+--------------------+------------------+----------------------+--------------------+------+--------+-------------------+-------------------+----------+
|  20230819020902715|20230819020902715...|                 1|     log_dt=2023-08-05|9f6919fe-a51a-454...|     1|     neo|2023-08-05 14:00:00|2023-08-04 12:00:00|2023-08-05|
|  20230819020902715|20230819020902715...|                 2|     log_dt=2023-08-05|9f6919fe-a51a-454...|     2| trinity|2023-08-05 14:00:00|2023-08-05 15:00:00|2023-08-05|
|  20230819020809307|20230819020809307...|                 1|     log_dt=2023-08-04|5e9cfb58-760d-421...|     1|     neo|2023-08-04 12:00:00|2023-08-04 12:00:00|2023-08-04|
+-------------------+--------------------+------------------+----------------------+--------------------+------+--------+-------------------+-------------------+----------+

@ad1happy2go
Copy link
Collaborator

ad1happy2go commented Aug 21, 2023

@praneethh I tried the similar stuff using spark-sql. It gave me two records only. below if the code I used -

CREATE TABLE issue_9469_42 USING HUDI
PARTITIONED BY(log_dt)
tblproperties (
  "hoodie.payload.ordering.field"="load_ts",
  "hoodie.datasource.write.recordkey.field"="emp_id",
  "hoodie.datasource.write.partitionpath.field"="log_dt",
  "hoodie.datasource.write.precombine.field"="load_ts",
  "hoodie.index.type"="GLOBAL_SIMPLE",
  "hoodie.simple.index.update.partition.path"="true",
  "hoodie.datasource.write.payload.class"="org.apache.hudi.common.model.PartialUpdateAvroPayload",
  "hoodie.datasource.write.reconcile.schema"="true",
  "hoodie.schema.on.read.enable"="true",
  "hoodie.datasource.write.hive_style_partitioning"="true",
  "hoodie.datasource.write.row.writer.enable"="false"
) AS SELECT 1 as emp_id, 'neo' as emp_name, cast('2023-08-04 12:00:00' as timestamp) as log_ts, cast('2023-08-04 12:00:00' as timestamp) as load_ts, cast('2023-08-04' as date) as log_dt;


merge into issue_9469_42 as target using ( SELECT 1 as emp_id, 'neo_1' as emp_name, cast('2023-08-05 12:00:00' as timestamp) as log_ts, cast('2023-08-05 12:00:00' as timestamp) as load_ts, cast('2023-08-05' as date) as log_dt
                UNION 
                SELECT 2 as emp_id, 'trinity' as emp_name, cast('2023-08-05 14:00:00' as timestamp) as log_ts, cast('2023-08-05 15:00:00' as timestamp) as load_ts, cast('2023-08-05' as date) as log_dt
  ) source on  target.emp_id = source.emp_id when matched then 
  update set target.log_ts = source.log_ts, target.log_dt = source.log_dt, target.load_ts = source.load_ts
  when not matched then insert *;
  
select emp_name from issue_9469_42;

@codope codope added priority:critical production down; pipelines stalled; Need help asap. data-consistency phantoms, duplicates, write skew, inconsistent snapshot labels Aug 21, 2023
@ad1happy2go
Copy link
Collaborator

@praneethh Also, when we use MERGE INTO, we can achieve partial update with the regular payload only. No need to use org.apache.hudi.common.model.PartialUpdateAvroPayload.

I have tried code version you shared with spark shell but it was giving me some exception. Figuring that out and will update soon. Please share the latest code in case you made some other changes to make it work.

@praneethh
Copy link
Author

@ad1happy2go still getting three records as output. I'm using the same properties you have used and additionally hive_sync properties. Also not creating the table manually, it is created by the hudi from the first run.

@ad1happy2go
Copy link
Collaborator

@praneethh I am able to reproduce the issue. Below is the code I used -

val tableName = "issue_9469_53"
val path = s"file:///tmp/${tableName}"
val df = Seq(("1","neo","1","1","2023-08-04")).toDF("emp_id", "emp_name", "log_ts", "load_ts", "log_dt")

df.select(col("emp_id").cast("int"),col("emp_name").cast("string"),col("log_ts").cast("int"),col("load_ts").cast("int"),col("log_dt").cast("date")).write.format("hudi")
.option("hoodie.datasource.write.recordkey.field", "emp_id")
.option("hoodie.datasource.write.partitionpath.field", "log_dt")
.option("hoodie.index.type","GLOBAL_SIMPLE")
.option("hoodie.table.name", tableName)
.option("hoodie.simple.index.update.partition.path", "false")
.option("hoodie.datasource.write.precombine.field", "load_ts")
.option("hoodie.datasource.hive_sync.enable","true")
.option("hoodie.datasource.hive_sync.database","default")
.option("hoodie.datasource.hive_sync.table", tableName)
.option("hoodie.datasource.hive_sync.partition_fields", "log_dt")
.option("hoodie.datasource.hive_sync.ignore_exceptions", "true")
.option("hoodie.datasource.hive_sync.mode", "hms")
.option("hoodie.datasource.hive_sync.use_jdbc", "false")
.option("hoodie.datasource.write.operation","upsert")
.mode("append")
.save(path)

val df2 = Seq(("1","neo","2","2","2023-08-05"),("2","trinity","2","2","2023-08-05")).toDF("emp_id", "emp_name", "log_ts","load_ts","log_dt")

val df3 = df2.select(col("emp_id").cast("int"),col("emp_name").cast("string"),col("log_ts").cast("int"),col("load_ts").cast("int"),col("log_dt").cast("date"))

df3.createOrReplaceTempView("incremental_data")

val sqlPartialUpdate =
           s"""
             | merge into ${tableName} as target
             | using (
            |   select * from incremental_data
            | ) source
            | on  target.emp_id = source.emp_id
            | when matched then
            |   update set target.log_ts = source.log_ts, target.log_dt = source.log_dt, target.load_ts = source.load_ts
            | when not matched then insert *
            """.stripMargin

spark.sql(sqlPartialUpdate)

Created the JIRA (https://issues.apache.org/jira/browse/HUDI-6737) for the same and will be working on it.

@praneethh
Copy link
Author

Hi @ad1happy2go wanted to check if there is any update on the above Jira. Thanks

@ad1happy2go
Copy link
Collaborator

@praneethh We are working on this. We will update soon.

@ad1happy2go
Copy link
Collaborator

@jonvex Can you post latest update on this?

@ad1happy2go
Copy link
Collaborator

@praneethh I have checked, and it appears that this was placed on the back burner. We will make it a priority this week. I apologize for any delay.

@praneethh
Copy link
Author

@ad1happy2go Thanks for the update.

@praneethh
Copy link
Author

@jonvex @ad1happy2go Circling back on this issue to check if there is any update. Sorry to bother on this multiple times.

@jonvex
Copy link
Contributor

jonvex commented Oct 4, 2023

Hi, really sorry we keep delaying this.

@ad1happy2go
Copy link
Collaborator

@praneethh We were facing some of the issues when you tried to fix it. Current code actually don't support partition path update with MERGE INTO. We will provide more update on this soon. Sorry for all the delay.

@praneethh
Copy link
Author

@codope @ad1happy2go @jonvex Hi Team, any update on the above issue?or is there any alternative solution?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
data-consistency phantoms, duplicates, write skew, inconsistent snapshot on-call-triaged priority:critical production down; pipelines stalled; Need help asap.
Projects
Status: 🏁 Triaged
Development

No branches or pull requests

4 participants