Skip to content

[SUPPORT] Async Clustering failing for MoR in 0.13.0 #8153

@haripriyarhp

Description

@haripriyarhp

Tips before filing an issue

  • Have you gone through our FAQs?

  • Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.

  • If you have triaged this as a bug, then file an issue directly.

Describe the problem you faced

I am having a spark structured streaming job reading from kafka and writing to S3 as hudi table. Based on the below link https://hudi.apache.org/docs/clustering/#spark-structured-streaming , I set up asynchronus clustering in my spark job. I have an MoR table and async clustering produces the below error. I checked the same by creating a CoW table. And async clustering works fine.

To Reproduce

Steps to reproduce the behavior:

Expected behavior

I expected async clustering is also successful for MoR table.

Environment Description

  • Hudi version : 0.13.0

  • Spark version : 3.1.2

  • Hive version :

  • Hadoop version :

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : running on spark operator in kubernetes

Additional context

Map[String,String](
"hoodie.table.name" -> tableName,
"path" -> "s3a://path to table/tableName,
"hoodie.datasource.write.table.name" -> tableName,
"hoodie.datasource.write.table.type" -> MERGE_ON_READ,
"hoodie.datasource.write.operation" -> "upsert",
"hoodie.datasource.write.recordkey.field" -> "col4,col5,col6,col7",
"hoodie.datasource.write.partitionpath.field" -> "col1,col2,col3",
"hoodie.datasource.write.keygenerator.class" -> "org.apache.hudi.keygen.ComplexKeyGenerator",
"hoodie.datasource.write.hive_style_partitioning" -> "true",
//Cleaning options
"hoodie.clean.automatic" -> "true",
"hoodie.clean.async" -> "true",
//hive_sync_options
"hoodie.datasource.hive_sync.partition_fields" -> "col1,col2,col3",
"hoodie.datasource.hive_sync.database" -> dbName,
"hoodie.datasource.hive_sync.table" -> tableName,
"hoodie.datasource.hive_sync.enable" -> "true",
"hoodie.datasource.hive_sync.mode" -> "hms",
"hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor",
//Clustering options
"hoodie.write.lock.provider" -> "org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider",
"hoodie.write.lock.dynamodb.table" -> "hudi_occ",
"hoodie.clustering.plan.strategy.target.file.max.bytes" -> "52428800",
"hoodie.clustering.async.enabled" -> "true",
"hoodie.clustering.async.max.commits" -> "4",
"hoodie.clustering.execution.strategy.class" -> "org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy",
"hoodie.upsert.shuffle.parallelism" -> "200",
"hoodie.insert.shuffle.parallelism" -> "200",
"hoodie.datasource.compaction.async.enable" -> "false",
"hoodie.compact.inline.max.delta.commits" -> "5",
"hoodie.index.type" -> "BLOOM"
)

Stacktrace

23/03/08 16:43:15 INFO AbstractTableFileSystemView: addFilesToView: NumFiles=1, NumFileGroups=1, FileGroupsCreationTime=0, StoreTimeTaken=0
23/03/08 16:43:15 INFO ClusteringUtils: Found 30 files in pending clustering operations
23/03/08 16:43:15 INFO AbstractTableFileSystemView: addFilesToView: NumFiles=1, NumFileGroups=1, FileGroupsCreationTime=0, StoreTimeTaken=0
23/03/08 16:43:15 ERROR AsyncClusteringService: Clustering executor failed
java.util.concurrent.CompletionException: java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.PartitionedFile.(Lorg/apache/spark/sql/catalyst/InternalRow;Ljava/lang/String;JJ[Ljava/lang/String;)V
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.PartitionedFile.(Lorg/apache/spark/sql/catalyst/InternalRow;Ljava/lang/String;JJ[Ljava/lang/String;)V
at org.apache.hudi.BaseMergeOnReadSnapshotRelation.$anonfun$buildSplits$2(MergeOnReadSnapshotRelation.scala:238)
at scala.Option.map(Option.scala:230)
at org.apache.hudi.BaseMergeOnReadSnapshotRelation.$anonfun$buildSplits$1(MergeOnReadSnapshotRelation.scala:236)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.hudi.BaseMergeOnReadSnapshotRelation.buildSplits(MergeOnReadSnapshotRelation.scala:232)
at org.apache.hudi.BaseMergeOnReadSnapshotRelation.collectFileSplits(MergeOnReadSnapshotRelation.scala:227)
at org.apache.hudi.BaseMergeOnReadSnapshotRelation.collectFileSplits(MergeOnReadSnapshotRelation.scala:64)
at org.apache.hudi.HoodieBaseRelation.buildScan(HoodieBaseRelation.scala:346)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$apply$4(DataSourceStrategy.scala:351)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$pruneFilterProject$1(DataSourceStrategy.scala:384)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:439)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:383)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:351)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:162)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:162)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:162)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:162)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:162)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:162)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:431)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:104)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:163)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:163)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:104)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:97)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:117)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:163)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:163)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:114)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:110)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
at org.apache.hudi.HoodieDatasetBulkInsertHelper$.bulkInsert(HoodieDatasetBulkInsertHelper.scala:142)
at org.apache.hudi.HoodieDatasetBulkInsertHelper.bulkInsert(HoodieDatasetBulkInsertHelper.scala)
at org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy.performClusteringWithRecordsAsRow(SparkSortAndSizeExecutionStrategy.java:72)
at org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy.lambda$runClusteringForGroupAsyncAsRow$6(MultipleSparkJobExecutionStrategy.java:249)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
... 5 more
23/03/08 16:43:15 INFO HoodieStreamingSink: Async clustering service shutdown. Errored ? true

Metadata

Metadata

Assignees

Type

No type

Projects

Status

🚧 Needs Repro

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions