Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT]hudi how to upsert a non null array data to a existing column with array of nulls,optional binary. java.lang.ClassCastException: optional binary element (UTF8) is not a group #5701

Closed
gtwuser opened this issue May 27, 2022 · 9 comments
Assignees
Labels
priority:critical production down; pipelines stalled; Need help asap. schema-and-data-types

Comments

@gtwuser
Copy link

gtwuser commented May 27, 2022

Describe the problem you faced
We are trying to update an existing column col1 which has schema of a empty array, which is by default taken as array<string>. Perhaps the issue is that the new upcoming records has data in this existing column col1
that is it's an array of not null values. While upserting it throws error of •••binary Utf8 optional element of not group ••••. We don't have any predefined schema for these records, it's all inferred by default. Hence during insert this column col1 schema becomes array by default. But since the new upcoming records have non null or non empty array values while upserting them to tu his column it fails the upsert operation.

In short this issue comes whenever we are trying to update the schema of a column from array<string> to array<struct<>> or array<array<>>. Kindly let me know if there is a work around or solution for it.

A clear and concise description of the problem.

To Reproduce

Steps to reproduce the behavior:

  1. Insert records which has a column with only empty array as value
  2. Upsert records with atleast one entry of non empty array as value in that column which previously had only empty array.

Expected behavior
Expected behaviour would be to upgrade schema of columns which had a default schema for an empty array(i.e array) to the new recieved non empty array value schema.
That is upgrade a array based column schema from default array to a more complex schema of the data which the non empty array holds.

Environment Description

  • AWS glue 3.0

  • Hudi version : 0.10.1

  • Spark version : 3.1.2

  • Running on Docker? (yes/no) : no, we are running glue jobs using pyspark

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.
java.lang.ClassCastException: optional binary element (UTF8) is not a group

@gtwuser gtwuser changed the title [SUPPORT]hudi how to upsert a non null array data to a existing column with array of nulls [SUPPORT]hudi how to upsert a non null array data to a existing column with array of nulls,optional binary. java.lang.ClassCastException: optional binary element (UTF8) is not a group May 27, 2022
@gtwuser
Copy link
Author

gtwuser commented May 27, 2022

@nsivabalan @n3nash @umehrot2 @ Kindly suggest what should be done in this use case, we are stuck with this issue for 1 month now.
Existing column schema in the hudi table created via bulk-insert.
Here the value for this array column was like:

.....
{ "id":1, "NWDepStatus": [] }
{ "id":2, "NWDepStatus": null }
....

This resulted in below schema for this column in hudi table during bulk insert

root
 |-- NWDepStatus: array (nullable = true)
 |    |-- element: string (containsNull = true)

New incoming record schema for the same column is as below. This record is meant to be saved via upsert

with value as

{
    "id": 1,
    "NWDepCount": 0,
    "NWDepStatus": [
        {
            "ClassId": "metric.DepStatus",
            "Id": 21,
            "Name": "MyNW_3",
            "ObjectType": "metric.DepStatus",
            "Status": "NA"
        },
        {
            "ClassId": "metric.DepStatus",
            "Id": 22,
            "Name": "MyNW2",
            "ObjectType": "metric.DepStatus",
            "Status": "NA"
        }
    ]
}

Resulting in schema as below which is different from the existing schema saved in hudi

root
 |-- NWDepStatus: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- ClassId: string (nullable = true)
 |    |    |-- Id: long (nullable = true)
 |    |    |-- Name: string (nullable = true)
 |    |    |-- ObjectType: string (nullable = true)
 |    |    |-- Status: string (nullable = true)

I even tried altering the existing column schema before writing the new records by making the schema similar to new records with non empty array and retaining nulls in it but with no success.

+------------------------+
|NWDepStatus|
+------------------------+
|null                   |
|null                   |
+------------------------+

Configs are as follows:

          commonConfig = {
            'className': 'org.apache.hudi', 'hoodie.datasource.hive_sync.use_jdbc': 'false',
            'hoodie.datasource.write.precombine.field': 'MdTimestamp',
            'hoodie.datasource.write.recordkey.field': 'id',
            'hoodie.table.name': 'hudi-table', 
            'hoodie.consistency.check.enabled': 'true',
            'hoodie.datasource.hive_sync.database': args['database_name'],
            'hoodie.datasource.write.reconcile.schema': 'true',
            'hoodie.datasource.hive_sync.table': 'hudi + prefix.replace("/", "_").lower(),
            'hoodie.datasource.hive_sync.enable': 'true', 'path': 's3://' + args['curated_bucket'] + '/hudi' + prefix,
            'hoodie.parquet.small.file.limit': '134217728' # 1,024 * 1,024 * 128 = 134,217,728 (128 MB)
        }
        unpartitionDataConfig = {
            'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.NonPartitionedExtractor', 
            'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.NonpartitionedKeyGenerator'
        }
        initLoadConfig = {
                         'hoodie.bulkinsert.shuffle.parallelism': 68,
                          'hoodie.datasource.write.operation': 'bulk_insert'
        }
      incrementalConfig = {
            'hoodie.upsert.shuffle.parallelism': 68, 
            'hoodie.datasource.write.operation': 'upsert',
            'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS', 
            'hoodie.cleaner.commits.retained': 10
        }

Checked this issue #2265 and the fix #2927. But even with the configs given as solution its not working and failing with the same error

inputDf.write
.format('org.apache.hudi')
.option('hoodie.datasource.write.operation', 'upsert')
.option("spark.hadoop.parquet.avro.write-old-list-structure", "false")
.option("parquet.avro.write-old-list-structure", "false")
.option("hoodie.parquet.avro.write-old-list-structure", "false")
.option("hoodie.datasource.write.reconcile.schema", "true")
.options(**combinedConf)
.mode('append')
.save()

2022-05-27 18:22:12,568 WARN [task-result-getter-0] scheduler.TaskSetManager (Logging.scala:logWarning(69)): Lost task 0.0 in stage 363.0 (TID 8061) (172.36.166.181 executor 24): org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$execute$ecf5068c$1(BaseSparkCommitActionExecutor.java:174)
at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
at org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:102)
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:351)
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:342)
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:315)
... 28 more
Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:147)
at org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:100)
... 31 more
Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141)
... 32 more
Caused by: org.apache.hudi.exception.HoodieException: operation has failed
at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:248)
at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226)
at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52)
at org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:278)
at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36)
at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
... 3 more
Caused by: org.apache.hudi.exception.HoodieException: unable to read next record from parquet file
at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:54)
at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
... 4 more
Caused by: java.lang.ClassCastException: optional binary element (UTF8) is not a group
at org.apache.parquet.schema.Type.asGroupType(Type.java:207)
at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:279)
at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:232)
at org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:78)
at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.(AvroRecordConverter.java:536)
at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.(AvroRecordConverter.java:486)
at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:289)
at org.apache.parquet.avro.AvroRecordConverter.(AvroRecordConverter.java:141)
at org.apache.parquet.avro.AvroRecordConverter.(AvroRecordConverter.java:95)
at org.apache.parquet.avro.AvroRecordMaterializer.(AvroRecordMaterializer.java:33)
at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138)
at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:183)
at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
... 8 more
2022-05-27 18:22:12,571 INFO [dispatcher-CoarseGrainedScheduler] scheduler.TaskSetManager (Logging.scala:logInfo(57)): Starting task 0.1 in stage 363.0 (TID 8062) (172.36.140.28, executor 15, partition 0, PROCESS_LOCAL, 4444 bytes) taskResourceAssignments Map()

@gtwuser
Copy link
Author

gtwuser commented May 28, 2022

Small update i tried to drop the column with nulls during upsert.
Scenario:

  1. drop the column with all empty array during upsert and update table with same column name and non empty array data/
    but this fails:

Please suggest/correct whats am I doing wrong here.

"error during schema update:" An error occurred while calling o615.save.
: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20220528225756057
at org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:63)
at org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:46)
at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:119)
at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:103)
at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:160)
at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:217)
at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:277)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 253.0 failed 4 times, most recent failure: Lost task 1.3 in stage 253.0 (TID 5698) (172.34.21.0 executor 24): java.lang.ClassCastException: java.lang.String cannot be cast to scala.collection.Seq
at org.apache.hudi.AvroConversionHelper$.$anonfun$createConverterToAvro$10(AvroConversionHelper.scala:327)
at org.apache.hudi.AvroConversionHelper$.$anonfun$createConverterToAvro$14(AvroConversionHelper.scala:373)
at org.apache.hudi.HoodieSparkUtils$.$anonfun$createRddInternal$3(HoodieSparkUtils.scala:157)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2465)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2414)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2413)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2413)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2679)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2621)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2610)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:914)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2259)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2278)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
at org.apache.spark.rdd.PairRDDFunctions.$anonfun$countByKey$1(PairRDDFunctions.scala:366)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:366)
at org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:314)
at org.apache.hudi.data.HoodieJavaPairRDD.countByKey(HoodieJavaPairRDD.java:103)
at org.apache.hudi.index.bloom.HoodieBloomIndex.lookupIndex(HoodieBloomIndex.java:115)
at org.apache.hudi.index.bloom.HoodieBloomIndex.tagLocation(HoodieBloomIndex.java:85)
at org.apache.hudi.table.action.commit.SparkWriteHelper.tag(SparkWriteHelper.java:56)
at org.apache.hudi.table.action.commit.SparkWriteHelper.tag(SparkWriteHelper.java:39)
at org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:52)
... 45 more
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to scala.collection.Seq
at org.apache.hudi.AvroConversionHelper$.$anonfun$createConverterToAvro$10(AvroConversionHelper.scala:327)
at org.apache.hudi.AvroConversionHelper$.$anonfun$createConverterToAvro$14(AvroConversionHelper.scala:373)
at org.apache.hudi.HoodieSparkUtils$.$anonfun$createRddInternal$3(HoodieSparkUtils.scala:157)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more

@xushiyan
Copy link
Member

are you able to try spark 3.2 which has major parquet upgrade to 1.12 ?

@xushiyan xushiyan added schema-and-data-types priority:critical production down; pipelines stalled; Need help asap. labels May 30, 2022
@xushiyan xushiyan added this to Awaiting Triage in GI Tracker Board via automation May 30, 2022
@gtwuser
Copy link
Author

gtwuser commented May 30, 2022

Thanks for getting back @xushiyan AWS glue supports Spark 3.1, but i suppose with Hudi 0.11.0 bundle we get the parquet upgraded to 1.12, unfortunately we are not able to upgrade to the latest version of Hudi. Issue #5636 and i havent yet raised the aws ticket. Also wondering how is it working for others if it is the case.
From aws console:

Glue 3.0 – Supports Spark 3.1.1 and Python 3.7. Also includes new AWS Glue Spark runtime optimizations for performance and reliability. Upgraded several dependencies that were required for the new Spark version.

@minihippo
Copy link
Contributor

@gtwuser For the first bulk insert, are values of NWDepStatus empty? If it is, u could try to only write other columns into the table

@phillycoder
Copy link

phillycoder commented Jun 28, 2022

I have opened similar issue throwing same exception during update. I have a spark-shell example in it.
In my example schema have a array of structs with one field causing this issue.
#5985

@minihippo
Copy link
Contributor

@phillycoder It looks like we have a workaround in the other issue.

@codope
Copy link
Member

codope commented Aug 1, 2022

Looks like it is an open issue of Parquet format iself that has not yet been resolved https://issues.apache.org/jira/browse/PARQUET-1681
This seems to affect even the latest version 1.12.0 and the parquet community is still discussing the solution https://issues.apache.org/jira/browse/PARQUET-2069
Let's keep an eye on the above issues. There isn't much that can be done from Hudi.

@codope codope moved this from Awaiting Triage to Awaiting Ack Triaged in GI Tracker Board Aug 1, 2022
@nsivabalan nsivabalan assigned codope and unassigned codope Aug 9, 2022
@codope
Copy link
Member

codope commented Sep 7, 2022

We need to upgrade parquet-avro once the above issues are fixed.
Closing this as it is not related to Hudi. Created HUDI-4798 to track parquet upgrade.

@codope codope closed this as completed Sep 7, 2022
GI Tracker Board automation moved this from Awaiting Ack Triaged to Done Sep 7, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority:critical production down; pipelines stalled; Need help asap. schema-and-data-types
Projects
Development

No branches or pull requests

5 participants