Skip to content

[SUPPORT] reconcile schema failing to inject default values for missing fields #4914

@TarunMootala

Description

@TarunMootala

Tips before filing an issue

  • Have you gone through our FAQs?

  • Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.

  • If you have triaged this as a bug, then file an issue directly.

Describe the problem you faced

My use case is to populate default values for the missing fields. Using the property hoodie.datasource.write.reconcile.schema to inject default values for the missing fields but it's failing with error "org.apache.avro.UnresolvedUnionException: Not in union ["null","string"]: 1"

To Reproduce

Steps to reproduce the behavior:

  1. Create a Hudi COW table and insert some sample data.
inputDF = spark.createDataFrame(
    [
        ("100", "AAA", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
        ("101", "BBB", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
        ("102", "CCC", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
        ("103", "DDD", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
        ("104", "EEE", "2015-01-01", "2015-01-01T12:15:00.512679Z"),
        ("105", "FFF", "2015-01-01", "2015-01-01T13:51:42.248818Z")
    ],
    ["id", "name", "creation_date", "last_update_time"]
)

table_name = "first_hudi_table"
table_path =  f"s3://<bucket_name>/Hudi/{table_name}"

hudiOptions = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.recordkey.field': 'id',
'hoodie.datasource.write.precombine.field': 'last_update_time',
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.database':'streaming_dev',
'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.NonPartitionedExtractor'
}

print(table_name, table_path)

inputDF.write\
.format('hudi')\
.option('hoodie.datasource.write.operation', 'insert')\
.options(**hudiOptions)\
.mode('overwrite')\
.save(table_path)

  1. Create a spark dataframe with less number of fields when compared to the schema of table. All the mandatory fields like Recordkey, precombine, partitionpath fields should present in the dataframe.
  2. Enable the property hoodie.datasource.write.reconcile.schema, and upsert the Spark dataframe into Hudi table.
inputDF = spark.createDataFrame(
    [
        ("110", '2015-01-01', "2015-01-02T13:51:39.340396Z"),
    ],
    ["id", "creation_date", "last_update_time"]
)

hudiOptions = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.recordkey.field': 'id',
'hoodie.datasource.write.precombine.field': 'last_update_time',
'hoodie.datasource.write.reconcile.schema': 'true',
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.database':'streaming_dev',
'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.NonPartitionedExtractor'
}

inputDF.write\
.format('hudi')\
.option('hoodie.datasource.write.operation', 'upsert')\
.options(**hudiOptions)\
.mode('append')\
.save(table_path)

Expected behavior
Expecting upsert should succeed and default values injected for missing fields.

Environment Description
Using Jupyter notebook with AWS EMR 6.5.0 and Glue Data Catalog as Hive metastore.

  • Hudi version : 0.9

  • Spark version : 3.1.2

  • Hive version : 3.1.2

  • Hadoop version : 3.2.1

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : No

Stacktrace

An error was encountered:
An error occurred while calling o205.save.
: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20220225180756
	at org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:62)
	at org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:46)
	at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:98)
	at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:88)
	at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:157)
	at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:214)
	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:265)
	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:169)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:194)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
	at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 12.0 failed 4 times, most recent failure: Lost task 3.3 in stage 12.0 (TID 34) (ip-172-31-0-85.ec2.internal executor 4): java.io.IOException: Could not create payload for class: org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
	at org.apache.hudi.DataSourceUtils.createPayload(DataSourceUtils.java:133)
	at org.apache.hudi.DataSourceUtils.createHoodieRecord(DataSourceUtils.java:236)
	at org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$write$7(HoodieSparkSqlWriter.scala:237)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hudi.exception.HoodieException: Unable to instantiate class org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
	at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:91)
	at org.apache.hudi.DataSourceUtils.createPayload(DataSourceUtils.java:130)
	... 16 more
Caused by: java.lang.reflect.InvocationTargetException
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:89)
	... 17 more
Caused by: org.apache.avro.UnresolvedUnionException: Not in union ["null","string"]: 1
	at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:740)
	at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:205)
	at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:123)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
	at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
	at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
	at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
	at org.apache.hudi.avro.HoodieAvroUtils.indexedRecordToBytes(HoodieAvroUtils.java:102)
	at org.apache.hudi.avro.HoodieAvroUtils.avroToBytes(HoodieAvroUtils.java:94)
	at org.apache.hudi.common.model.BaseAvroPayload.<init>(BaseAvroPayload.java:49)
	at org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.<init>(OverwriteWithLatestAvroPayload.java:42)
	... 22 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2470)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2419)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2418)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2418)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1125)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1125)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1125)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2684)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2626)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2615)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:914)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2241)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2262)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2281)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2306)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$countByKey$1(PairRDDFunctions.scala:366)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:366)
	at org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:314)
	at org.apache.hudi.index.bloom.SparkHoodieBloomIndex.lookupIndex(SparkHoodieBloomIndex.java:114)
	at org.apache.hudi.index.bloom.SparkHoodieBloomIndex.tagLocation(SparkHoodieBloomIndex.java:84)
	at org.apache.hudi.index.bloom.SparkHoodieBloomIndex.tagLocation(SparkHoodieBloomIndex.java:60)
	at org.apache.hudi.table.action.commit.AbstractWriteHelper.tag(AbstractWriteHelper.java:69)
	at org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:51)
	... 45 more

Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 1109, in save
    self._jwrite.save(path)
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)

Metadata

Metadata

Assignees

Labels

area:writerWrite client and core write operationsengine:sparkSpark integrationpriority:highSignificant impact; potential bugs

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions