Skip to content

[SUPPORT]RecordKey set as nested field isnt getting picked up correctly - org.apache.hudi.exception.HoodieException: Cannot find a record at part value :metadata at org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:513) #6797

@gtwuser

Description

@gtwuser

Tips before filing an issue

  • Have you gone through our FAQs?

  • Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.

  • If you have triaged this as a bug, then file an issue directly.

Describe the problem you faced
When RecordKey is set as nested field the hudi job isn't picking it up correctly:

org.apache.hudi.exception.HoodieException: Cannot find a record at part value :metadata
	at org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:513)

A clear and concise description of the problem.
Dataframe created using below mentioned json body:

{
    "version": "1.0",
    "metadata": {
        "topic": "ASA.telemetry",
        "contentType": "application/json",
        "msgID": "44162"                  ## this msgID is expected to be set as recordKey
    }
}

commonConfig used:

commonConfig = {
                'className': 'org.apache.hudi',
                'hoodie.datasource.hive_sync.use_jdbc': 'false',
                'hoodie.datasource.write.precombine.field': 'payload.recordedAt', ## this is set for a nested field using dot notation
                'hoodie.datasource.write.recordkey.field': 'metadata.msgID', ## this is set for a nested field using dot notation
                'hoodie.table.name': 'sse',
....
......
}

But while saving to hudi table it throws below error :

org.apache.hudi.exception.HoodieException: Cannot find a record at part value :metadata
	at org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:513)

To Reproduce

Steps to reproduce the behavior:

  1. create a dataframe using json body this json body
{
    "version": "1.0",
    "metadata": {
        "topic": "ASA.telemetry",
        "contentType": "application/json",
        "msgID": "44162"
    }
}
  1. using these commonConfig for hudi
commonConfig = {
                'className': 'org.apache.hudi',
                'hoodie.datasource.hive_sync.use_jdbc': 'false',
                'hoodie.datasource.write.precombine.field': 'payload.recordedAt',     ## this is set for a nested field using dot notation
                'hoodie.datasource.write.recordkey.field': 'metadata.msgID',      ## this is set for a nested field using dot notation
                'hoodie.table.name': 'sse',
                # 'hoodie.consistency.check.enabled': 'true',
                'hoodie.datasource.hive_sync.database': args['database_name'],
                'hoodie.datasource.write.reconcile.schema': 'true',
                'hoodie.datasource.hive_sync.table': f'sse_{"_".join(prefix.split("/")[-7:-5])}'.lower(),
                'hoodie.datasource.hive_sync.enable': 'true',
                'path': 's3://' + args['curated_bucket'] + '/merged/sse-native',
                # 1,024 * 1,024 * 128 = 134,217,728 (134 MB)
                'hoodie.parquet.small.file.limit': '307200',
                'hoodie.parquet.max.file.size': '128000000'
            }

initLoadConfig = {'hoodie.bulkinsert.shuffle.parallelism': 68,
                              'hoodie.datasource.write.operation': 'bulk_insert'}

partitionDataConfig = {
                'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
                'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.CustomKeyGenerator',
                'hoodie.datasource.write.partitionpath.field': 'year:SIMPLE, month:SIMPLE, day:SIMPLE, hour:SIMPLE, device_id:SIMPLE',
                'hoodie.datasource.hive_sync.partition_fields': 'year, month, day, hour, device_id',
                'hoodie.datasource.write.hive_style_partitioning': 'true'
            }

combinedConf = {**commonConfig, **
                            partitionDataConfig, **initLoadConfig}
  1. Save this input dataframe using above confs
inputDf.write \
                .format('org.apache.hudi') \
                .option("spark.hadoop.parquet.avro.write-old-list-structure", "false") \
                .option("parquet.avro.write-old-list-structure", "false") \
                .option("spark.hadoop.parquet.avro.add-list-element-records", "false") \
                .option("parquet.avro.add-list-element-records", "false") \
                .option("hoodie.parquet.avro.write-old-list-structure", "false") \
                .option("hoodie.datasource.write.reconcile.schema", "true") \
                .options(**combinedConf) \
                .mode('append') \
                .save()

Expected behavior

A clear and concise description of what you expected to happen.
It is failing while reading recordKey as nested value, we expect it to save the data using given recordKey successfully
Environment Description

  • Hudi version : 0.11.1

  • Spark version : 3.1

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : no

Additional context

Add any other context about the problem here.
We are running this hudi script under a glue context as a glue job using pyspark

Stacktrace

Add the stacktrace of the error.

2022-09-26 11:29:24,934 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(73)): Error from Python:Traceback (most recent call last):
  File "/tmp/second-delete-upsert.py", line 297, in <module>
    startMerging(df_prefix_map_list)
  File "/tmp/second-delete-upsert.py", line 234, in startMerging
    .mode('append') \
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 1107, in save
    self._jwrite.save()
  File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o10765.save.
: org.apache.spark.SparkException: Writing job aborted.
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:388)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:336)
	at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:218)
	at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:225)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.doExecute(V2CommandExec.scala:55)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
	at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:370)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
	at org.apache.hudi.HoodieSparkSqlWriter$.bulkInsertAsRow(HoodieSparkSqlWriter.scala:586)
	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:178)
	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:184)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
	at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 324.0 failed 4 times, most recent failure: Lost task 0.3 in stage 324.0 (TID 326) (172.34.28.240 executor 1): org.apache.hudi.exception.HoodieException: Cannot find a record at part value :metadata
	at org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:513)
	at org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldValAsString(HoodieAvroUtils.java:487)
	at org.apache.hudi.keygen.KeyGenUtils.getRecordKey(KeyGenUtils.java:140)
	at org.apache.hudi.keygen.SimpleAvroKeyGenerator.getRecordKey(SimpleAvroKeyGenerator.java:50)
	at org.apache.hudi.keygen.CustomAvroKeyGenerator.getRecordKey(CustomAvroKeyGenerator.java:107)
	at org.apache.hudi.keygen.CustomKeyGenerator.getRecordKey(CustomKeyGenerator.java:78)
	at org.apache.hudi.keygen.BuiltinKeyGenerator.getRecordKey(BuiltinKeyGenerator.java:103)
	at org.apache.hudi.HoodieDatasetBulkInsertHelper$.$anonfun$prepareForBulkInsert$2(HoodieDatasetBulkInsertHelper.scala:70)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:455)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:484)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:413)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:452)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:360)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2465)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2414)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2413)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2413)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2679)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2621)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2610)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:914)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:357)
	... 69 more
Caused by: org.apache.hudi.exception.HoodieException: Cannot find a record at part value :metadata
	at org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:513)
	at org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldValAsString(HoodieAvroUtils.java:487)
	at org.apache.hudi.keygen.KeyGenUtils.getRecordKey(KeyGenUtils.java:140)
	at org.apache.hudi.keygen.SimpleAvroKeyGenerator.getRecordKey(SimpleAvroKeyGenerator.java:50)
	at org.apache.hudi.keygen.CustomAvroKeyGenerator.getRecordKey(CustomAvroKeyGenerator.java:107)
	at org.apache.hudi.keygen.CustomKeyGenerator.getRecordKey(CustomKeyGenerator.java:78)
	at org.apache.hudi.keygen.BuiltinKeyGenerator.getRecordKey(BuiltinKeyGenerator.java:103)
	at org.apache.hudi.HoodieDatasetBulkInsertHelper$.$anonfun$prepareForBulkInsert$2(HoodieDatasetBulkInsertHelper.scala:70)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:455)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:484)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:413)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:452)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:360)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more

Metadata

Metadata

Assignees

Labels

area:writerWrite client and core write operationspriority:highSignificant impact; potential bugs

Type

No type

Projects

Status

✅ Done

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions