-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Description
The spark job fails when it reads the entire input dfs root path when delta streamer is run in upsert continuous mode or bulk insert . The job is successful when the particular path is provided in dfs root.
i.e if we try to read from this path s3://itz-ds-hudi-data/firehose/output/wk-dev/, the spark job fails.
but when we try to each partition day wise s3://itz-ds-hudi-data/firehose/output/wk-dev/2023-02-21/, we are able to run the job successfully.
Expected behavior
- The spark job should have been successful when delta streamer was run in bulk insert.
Environment Description
-
Hudi version :0.11.1
-
Spark version :3.3.0
-
Storage (HDFS/S3/GCS..) : S3
-
Running on Docker? (yes/no) : No
Additional context
Add any other context about the problem here.
Stacktrace
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2863)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2799)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2798)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2798)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1239)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1239)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1239)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3051)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2993)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2229)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2250)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2269)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2294)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
at org.apache.spark.RangePartitioner$.sketch(Partitioner.scala:304)
at org.apache.spark.RangePartitioner.(Partitioner.scala:171)
at org.apache.spark.RangePartitioner.(Partitioner.scala:151)
at org.apache.spark.rdd.OrderedRDDFunctions.$anonfun$sortByKey$1(OrderedRDDFunctions.scala:64)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
at org.apache.spark.rdd.OrderedRDDFunctions.sortByKey(OrderedRDDFunctions.scala:63)
at org.apache.spark.rdd.RDD.$anonfun$sortBy$1(RDD.scala:677)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
at org.apache.spark.rdd.RDD.sortBy(RDD.scala:678)
at org.apache.spark.api.java.JavaRDD.sortBy(JavaRDD.scala:224)
at org.apache.hudi.execution.bulkinsert.GlobalSortPartitioner.repartitionRecords(GlobalSortPartitioner.java:41)
at org.apache.hudi.execution.bulkinsert.GlobalSortPartitioner.repartitionRecords(GlobalSortPartitioner.java:34)
at org.apache.hudi.table.action.commit.SparkBulkInsertHelper.bulkInsert(SparkBulkInsertHelper.java:111)
at org.apache.hudi.table.action.commit.SparkBulkInsertHelper.bulkInsert(SparkBulkInsertHelper.java:81)
at org.apache.hudi.table.action.commit.SparkBulkInsertCommitActionExecutor.execute(SparkBulkInsertCommitActionExecutor.java:59)
... 22 more
Caused by: org.apache.hudi.exception.SchemaCompatibilityException: Unable to validate the rewritten record {"taxonomy": null, "email": null, "slack": null, "gen": null, "description": null, "value": null, "dimensions": null, "guid": null, "category": null, "action": null, "version": null, "timestamp": null, "event_date": null, "event_month": null, "environment": null} against schema {"type":"record","name":"hoodie_source","namespace":"hoodie.source","fields":[{"name":"category","type":["null","string"],"default":null},{"name":"action","type":["null","string"],"default":null},{"name":"version","type":["null","string"],"default":null},{"name":"taxonomy","type":["null","string"],"default":null},{"name":"email","type":["null","string"],"default":null},{"name":"slack","type":["null","string"],"default":null},{"name":"gen","type":["null","string"],"default":null},{"name":"description","type":["null","string"],"default":null},{"name":"value","type":["null","long"],"default":null},{"name":"dimensions","type":["null",{"type":"map","values":"string"}],"default":null},{"name":"environment","type":["null","string"],"default":null},{"name":"guid","type":["null","string"],"default":null},{"name":"timestamp","type":{"type":"long","logicalType":"timestamp-micros"}},{"name":"event_date","type":{"type":"int","logicalType":"date"}},{"name":"event_month","type":"string"}]}
at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecord(HoodieAvroUtils.java:378)
at org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$4(HoodieSparkUtils.scala:175)
at org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$5(HoodieSparkUtils.scala:183)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at org.apache.spark.util.random.SamplingUtils$.reservoirSampleAndCount(SamplingUtils.scala:42)
at org.apache.spark.RangePartitioner$.$anonfun$sketch$1(Partitioner.scala:306)
at org.apache.spark.RangePartitioner$.$anonfun$sketch$1$adapted(Partitioner.scala:304)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:907)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:907)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:138)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
23/03/24 13:04:20 INFO ShutdownHookManager: Shutdown hook called
23/03/24 13:04:20 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-7ac1dc48-66c0-4798-9af7-5a4154737729
23/03/24 13:04:20 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-94c30104-49dd-4ff0-a6a8-edc2563330a7
23/03/24 13:04:20 INFO MetricsSystemImpl: Stopping s3a-file-system metrics system...
23/03/24 13:04:20 INFO MetricsSystemImpl: s3a-file-system metrics system stopped.
23/03/24 13:04:20 INFO MetricsSystemImpl: s3a-file-system metrics system shutdown complete.
Metadata
Metadata
Assignees
Labels
Type
Projects
Status