Skip to content

[SUPPORT] Doing clustering for bulked insert table, could cause: Can't redefine: list #5083

@boneanxs

Description

@boneanxs

Tips before filing an issue

  • Have you gone through our FAQs?

  • Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.

  • If you have triaged this as a bug, then file an issue directly.

Describe the problem you faced

A clear and concise description of the problem.

To Reproduce

Steps to reproduce the behavior:

  1. Bulk insert results from a table to a hudi table(insert will not cause this exception)
  2. Doing clustering for this table, could cause the exception:
  3. Spark can read these files without any exception
org.apache.hudi.exception.HoodieException: unable to read next record from parquet file 
	at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53)
	at org.apache.hudi.client.utils.ConcatenatingIterator.hasNext(ConcatenatingIterator.java:45)
	at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$SliceIterator.hasNext(Iterator.scala:266)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
	at scala.collection.AbstractIterator.to(Iterator.scala:1429)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1429)
	at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1449)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2244)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.avro.SchemaParseException: Can't redefine: list
	at org.apache.avro.Schema$Names.put(Schema.java:1128)
	at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:562)
	at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:690)
	at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:805)
	at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882)
	at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716)
	at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)
	at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882)
	at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716)
	at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)
	at org.apache.avro.Schema.toString(Schema.java:324)
	at org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility(SchemaCompatibility.java:68)
	at org.apache.parquet.avro.AvroRecordConverter.isElementType(AvroRecordConverter.java:866)
	at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:475)
	at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:289)
	at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:141)
	at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:95)
	at org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
	at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138)
	at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:183)
	at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156)
	at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
	at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
	... 31 more

I think this might be a bug in parquet-avro, related issue: apache/parquet-java#560 by 1.12). As bulk insert uses HoodieInternalRowParquetWriter to write internalRows while insert uses HoodieParquetWriter to write IndexedRecord, so insert will not cause the exception.

Also Spark doesn't use parquet-avro to read parquet files, so it also will not have any problems.

I simply change the clustering logic to read base files by spark dataframe to test, which can avoid this exception, it can work well for COW tables, but it can not handle MOR tables as it will also need to read log files. So raise it here to seek help from the community.

My change:

  private JavaRDD<HoodieRecord<T>> readRecordsForGroupBaseFiles(JavaSparkContext jsc,
                                                                List<ClusteringOperation> clusteringOps) {
    SerializableConfiguration hadoopConf = new SerializableConfiguration(getHoodieTable().getHadoopConf());
    HoodieWriteConfig writeConfig = getWriteConfig();

    // NOTE: It's crucial to make sure that we don't capture whole "this" object into the
    //       closure, as this might lead to issues attempting to serialize its nested fields
    String[] dataFilePaths = clusteringOps.stream()
            .map(ClusteringOperation::getDataFilePath)
            .toArray(String[]::new);
    SQLContext sqlContext = new SQLContext(jsc.sc());

    Dataset<Row> inputFrame;

    final String extension = FSUtils.getFileExtension(dataFilePaths[0]);
    if (PARQUET.getFileExtension().equals(extension)) {
      inputFrame = sqlContext.read().format("parquet").load(dataFilePaths);
    } else if (ORC.getFileExtension().equals(extension)) {
      inputFrame = sqlContext.read().format("orc").load(dataFilePaths);
    } else {
      throw new IllegalArgumentException("Not supported");
    }

    Tuple2<String, String> avroRecordNameAndNamespace =
            AvroConversionUtils.getAvroRecordNameAndNamespace(writeConfig.getTableName());
    return  HoodieSparkUtils.createRdd(inputFrame, avroRecordNameAndNamespace._1, avroRecordNameAndNamespace._2,
            scala.Option.apply(HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(writeConfig.getSchema()))))
             .toJavaRDD().map(record -> transform(record, writeConfig));
  }

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

  • Hudi version : 0.11-snapshot

  • Spark version :

  • Hive version :

  • Hadoop version :

  • Storage (HDFS/S3/GCS..) :

  • Running on Docker? (yes/no) :

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

Status

✅ Done

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions