Skip to content

Deltastreamer/SparkDatasource ingestion breaks when changing target schema provider options #14796

@hudi-bot

Description

@hudi-bot

Currently, there are few different options to the user to provide target schemas such as file based, schema registry. At a high level, there are 2 main flows 

Target Schema is provided by the user

Target schema is not provided by the user (which is then inferred from the incoming data)

 
||Schema post processor enabled||Transformers||User provided target schema||Cur behavior||
|yes|No|Yes|table schema's has no namespace. matches user provided schema|
|yes|yes|No|had to make minor fix in post processor for NPE. with the fix, table schema has namespace in it.|
|yes|yes|yes|table schema has namespace|
|no|no|yes|table schema's has no namespace. matches user provided schema|
|no|yes|yes|table schema's has no namespace. matches user provided schema|
|no|yes|no|table's schema has namespace.|

 

Source -> [https://github.com//pull/2937]

As you can see above, if one switches from a non-user-provided schema flow to a user-provided-schema flow, we switch from namespace in schema to no namespace in schema. 

Parquet does not store the namespace, so when moving across avro schemas with and without namespace, the parquet-avro writer or reader does not complain since parquet itself does not store namespace. 

However, for MergeOnRead tables, we serialize data and schema in the log blocks. The GenericDatumReader that takes a reader & writer schema to translate breaks when one schema has namespace while the other doesn't. 

 

The following exception is thrown 
{noformat}
51511 [Executor task launch worker for task 502] ERROR org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner - Got exception when reading log file
org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting fixed
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ValidatingDecoder.checkFixed(ValidatingDecoder.java:135)
at org.apache.avro.io.ValidatingDecoder.readFixed(ValidatingDecoder.java:146)
at org.apache.avro.generic.GenericDatumReader.readFixed(GenericDatumReader.java:342)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at org.apache.hudi.common.table.log.block.HoodieAvroDataBlock.deserializeRecords(HoodieAvroDataBlock.java:157)
at org.apache.hudi.common.table.log.block.HoodieDataBlock.createRecordsFromContentBytes(HoodieDataBlock.java:128)
at org.apache.hudi.common.table.log.block.HoodieDataBlock.getRecords(HoodieDataBlock.java:106)
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processDataBlock(AbstractHoodieLogRecordScanner.java:275)
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processQueuedBlocksForInstant(AbstractHoodieLogRecordScanner.java:308)
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:241)
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.(HoodieMergedLogRecordScanner.java:81)
at org.apache.hudi.HoodieMergeOnReadRDD$.scanLog(HoodieMergeOnReadRDD.scala:259)
at org.apache.hudi.HoodieMergeOnReadRDD$$anon$2.(HoodieMergeOnReadRDD.scala:164)
at org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:154)
at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:67)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748{noformat}
 This seems like an AVRO shortcoming. We need a way to avoid breaking the decoding of avro data in log files if the user moved around provider options. One way is to implement a custom GenericDatumReader. 

JIRA info

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:ingestIngestion into Hudiarea:schemaSchema evolution and data typesfrom-jirapriority:highSignificant impact; potential bugspriority:mediumModerate impact; usability gapstype:improvementImprovements to existing functionality

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions