Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] INSERT_OVERWRITE_TABLE on subsequent runs fails with a metadata file not found error (v0.14.0) #10445

Open
howardcho opened this issue Jan 4, 2024 · 2 comments
Labels
on-call-triaged priority:critical production down; pipelines stalled; Need help asap. writer-core Issues relating to core transactions/write actions

Comments

@howardcho
Copy link

howardcho commented Jan 4, 2024

Describe the problem you faced

I recently upgraded some of my AWS Glue processes to use Hudi v0.14.0, and the process overwrites an entire table on every run. The initial load runs just fine, but on every subsequent run, I'm getting a metadata error like below:

java.io.FileNotFoundException: No such file or directory 's3://bucket-name/5eda3c2b-38c0-4f3f-8163-8a9a2c88f8a4-0_0-22-75_20240104030715270.parquet'

The only changes to the settings from the older version used in Glue (v0.12.1) are:

  • added "hoodie.spark.sql.insert.into.operation": "bulk_insert"
  • added "hoodie.metadata.record.index.enable": True

Full Hoodie settings:

'hoodie.table.name': 'table_name',
'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
'hoodie.datasource.write.operation': 'INSERT_OVERWRITE_TABLE',
'hoodie.datasource.write.recordkey.field': 'id',
'hoodie.datasource.write.precombine.field': 'updated',
'hoodie.datasource.write.hive_style_partitioning': True,
'hoodie.metadata.record.index.enable': True,
'hoodie.index.type': 'RECORD_INDEX',
'hoodie.parquet.max.file.size': 536870912,
'hoodie.parquet.small.file.limit': 104857600,
'hoodie.clean.automatic': 'true',
'hoodie.clean.async': 'true',
'hoodie.cleaner.policy': 'KEEP_LATEST_FILE_VERSIONS',
'hoodie.cleaner.fileversions.retained': '3',
'hoodie-conf hoodie.cleaner.parallelism': '200',
'hoodie.cleaner.commits.retained': 5,
'hoodie.parquet.compression.codec': 'gzip',
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.database': 'db_name',
'hoodie.datasource.hive_sync.table': 'table_name',
'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
'hoodie.datasource.hive_sync.use_jdbc': 'false',
'hoodie.datasource.hive_sync.mode': 'hms',
'hoodie.datasource.hive_sync.support_timestamp': True,
'hive_sync.support_timestamp': True,
'hoodie.spark.sql.insert.into.operation': 'bulk_insert'

To Reproduce

Steps to reproduce the behavior:

  1. Create a table using INSERT_OVERWRITE_TABLE, and the Hudi settings above
  2. The initial run should succeed
  3. Try overwriting the existing table by running the same job
  4. This should fail with a FileNotFoundException error on save

Expected behavior

The table data should've been overwritten

Environment Description

  • Hudi version : 0.14.0

  • Spark version : 3.3.0 (AWS Glue 4.0)

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : No, on AWS

Additional context

Add any other context about the problem here.

Stacktrace

An error occurred while calling o154.save.
: org.apache.hudi.exception.HoodieException: Failed to update metadata
	at org.apache.hudi.client.BaseHoodieWriteClient.writeTableMetadata(BaseHoodieWriteClient.java:367)
	at org.apache.hudi.client.BaseHoodieWriteClient.commit(BaseHoodieWriteClient.java:285)
	at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:236)
	at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:104)
	at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:1059)
	at org.apache.hudi.HoodieSparkSqlWriter$.writeInternal(HoodieSparkSqlWriter.scala:441)
	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:132)
	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
	at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:124)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 30.0 failed 4 times, most recent failure: Lost task 0.3 in stage 30.0 (TID 122) (172.36.232.135 executor 1): java.io.FileNotFoundException: No such file or directory 's3://bucket-name/5eda3c2b-38c0-4f3f-8163-8a9a2c88f8a4-0_0-22-75_20240104030715270.parquet'
	at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:524)
	at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:617)
	at org.apache.parquet.hadoop.ParquetReader$Builder.build(ParquetReader.java:337)
	at org.apache.hudi.io.storage.HoodieAvroParquetReader.getIndexedRecordIteratorInternal(HoodieAvroParquetReader.java:168)
	at org.apache.hudi.io.storage.HoodieAvroParquetReader.getIndexedRecordIterator(HoodieAvroParquetReader.java:94)
	at org.apache.hudi.io.storage.HoodieAvroParquetReader.getRecordKeyIterator(HoodieAvroParquetReader.java:176)
	at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.lambda$readRecordKeysFromBaseFiles$69dfe3a9$1(HoodieBackedTableMetadataWriter.java:550)
	at org.apache.hudi.data.HoodieJavaRDD.lambda$flatMap$a6598fcb$1(HoodieJavaRDD.java:137)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:142)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:138)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2863)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2799)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2798)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2798)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1239)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1239)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3051)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2993)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2229)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2250)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2269)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2294)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$countByKey$1(PairRDDFunctions.scala:367)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:367)
	at org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:314)
	at org.apache.hudi.data.HoodieJavaPairRDD.countByKey(HoodieJavaPairRDD.java:105)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.buildProfile(BaseSparkCommitActionExecutor.java:200)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:174)
	at org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor.execute(SparkUpsertPreppedDeltaCommitActionExecutor.java:44)
	at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsertPrepped(HoodieSparkMergeOnReadTable.java:126)
	at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsertPrepped(HoodieSparkMergeOnReadTable.java:88)
	at org.apache.hudi.client.SparkRDDWriteClient.upsertPreppedRecords(SparkRDDWriteClient.java:156)
	at org.apache.hudi.client.SparkRDDWriteClient.upsertPreppedRecords(SparkRDDWriteClient.java:63)
	at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.commitInternal(HoodieBackedTableMetadataWriter.java:1132)
	at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.commit(SparkHoodieBackedTableMetadataWriter.java:117)
	at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.processAndCommit(HoodieBackedTableMetadataWriter.java:855)
	at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.update(HoodieBackedTableMetadataWriter.java:910)
	at org.apache.hudi.client.BaseHoodieWriteClient.writeTableMetadata(BaseHoodieWriteClient.java:362)
	... 54 more
Caused by: java.io.FileNotFoundException: No such file or directory 's3://bucket-name/5eda3c2b-38c0-4f3f-8163-8a9a2c88f8a4-0_0-22-75_20240104030715270.parquet'
	at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:524)
	at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:617)
	at org.apache.parquet.hadoop.ParquetReader$Builder.build(ParquetReader.java:337)
	at org.apache.hudi.io.storage.HoodieAvroParquetReader.getIndexedRecordIteratorInternal(HoodieAvroParquetReader.java:168)
	at org.apache.hudi.io.storage.HoodieAvroParquetReader.getIndexedRecordIterator(HoodieAvroParquetReader.java:94)
	at org.apache.hudi.io.storage.HoodieAvroParquetReader.getRecordKeyIterator(HoodieAvroParquetReader.java:176)
	at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.lambda$readRecordKeysFromBaseFiles$69dfe3a9$1(HoodieBackedTableMetadataWriter.java:550)
	at org.apache.hudi.data.HoodieJavaRDD.lambda$flatMap$a6598fcb$1(HoodieJavaRDD.java:137)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:142)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:138)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
@ad1happy2go
Copy link
Contributor

Thanks for raising this @howardcho . Record Index is not working with INSERT_OVERWRITE_TABLE. It is working with col stats index and bloom. Below is the minimal reproducible code -

# Define your DataFrame
data = [(1, "John", "2022-01-01"), (2, "Jane", "2022-01-02")]
columns = ["id", "name", "updated"]
df = spark.createDataFrame(data, columns)

# Define the Hudi write options
hudi_options = {
    'hoodie.table.name': 'table_name',
    'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
    'hoodie.datasource.write.operation': 'INSERT_OVERWRITE_TABLE',
    'hoodie.datasource.write.recordkey.field': 'id',
    'hoodie.datasource.write.precombine.field': 'updated',
    'hoodie.metadata.record.index.enable': True,
}

# Write DataFrame to Hudi table
df.write.format("org.apache.hudi").options(**hudi_options).save(PATH)

df.write.format("org.apache.hudi").mode("append").options(**hudi_options).save(PATH)

Raised JIRA for the fix - https://issues.apache.org/jira/browse/HUDI-7273

@codope codope added priority:critical production down; pipelines stalled; Need help asap. writer-core Issues relating to core transactions/write actions on-call-triaged labels Jan 8, 2024
@nsivabalan
Copy link
Contributor

just to get past the issue, you can completely delete the table and rewrite. or use overwrite mode w/ spark. until we have a proper fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
on-call-triaged priority:critical production down; pipelines stalled; Need help asap. writer-core Issues relating to core transactions/write actions
Projects
Status: 🏁 Triaged
Development

No branches or pull requests

4 participants