-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Description
I am getting below error when using Deltasteamer with Transformation (--transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer) against kafka source.
ERROR io.HoodieWriteHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=123 partitionPath=CA}, currentLocation='null', newLocation='null'}
java.lang.ArrayIndexOutOfBoundsException: 25
at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424)
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:119)
at org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:70)
at org.apache.hudi.execution.LazyInsertIterable$HoodieInsertValueGenResult.<init>(LazyInsertIterable.java:92)
at org.apache.hudi.execution.LazyInsertIterable.lambda$getTransformFunction$0(LazyInsertIterable.java:105)
at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:170)
at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46)
at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
I see this is caused by incompatiblity between schema that is used to convert avro to bytes HoodieAvroUtils.avroToBytes and bytes to avro HoodieAvroUtils.bytesToAvro.
For eg: below is schema used to convert avro to bytes. This schema is created in this method from spark data type. This method uses SchemaConverters, which always seems to be adding null as second data type in union Schema.createUnion(schema, nullSchema)
{
"type": "record",
"name": "hoodie_source",
"namespace": "hoodie.source",
"fields": [
{
"name": "field1",
"type": [
"double",
"null"
]
},
{
"name": "field2",
"type": [
"double",
"null"
]
}
]
}
Below is the schema used to convert bytes to avro. This schema is retrieved from hoodie.deltastreamer.schemaprovider.registry.url
{
"type": "record",
"name": "MyRecord",
"namespace": "com.xyz.abc",
"fields": [
{
"name": "field1",
"type": [
"null",
"double"
],
"default": null
},
{
"name": "field2",
"type": [
"null",
"double"
],
"default": null
}
]
}
As we can see in the first schema, it is union of double and null but in the second one it is null and double. In the avro schema, union types must be in same order. Since it is not, hudi is not able to convert the bytes back to avro