-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Description
Describe the problem you faced
source.avsc
{
"type": "record",
"name": "t3_app_td_ad_info",
"fields": [{
"name": "dataId",
"type": "string"
},
{
"name": "collectTime",
"type": "string"
},
{
"name": "clickTime",
"type": "string"
},
{
"name": "spreadUrl",
"type": ["null", "string"],
"default": null
},{
"name": "spreadName",
"type": ["null", "string"],
"default": null
},
{
"name": "ua",
"type": ["null", "string"],
"default": null
},
{
"name": "adnetName",
"type": ["null", "string"],
"default": null
},
{
"name": "adnetDesc",
"type": ["null", "string"],
"default": null
}
]
}
target.avsc
{
"type": "record",
"name": "t3_app_td_ad_info",
"fields": [{
"name": "dataId",
"type": "string"
},
{
"name": "collectTime",
"type": "string"
},
{
"name": "clickTime",
"type": "string"
},
{
"name": "ds",
"type": ["null", "string"],
"default": null
},
{
"name": "spreadUrl",
"type": ["null", "string"],
"default": null
},{
"name": "spreadName",
"type": ["null", "string"],
"default": null
},
{
"name": "ua",
"type": ["null", "string"],
"default": null
},
{
"name": "adnetName",
"type": ["null", "string"],
"default": null
},
{
"name": "adnetDesc",
"type": ["null", "string"],
"default": null
}
]
}
Use deltastreamer to get data from Kafka, define the source and target schemas as follows. Use a custom transform to add a new field named ds and type string. Found that an exception is thrown and cannot be written normally
Stacktrace
20/10/10 17:36:32 ERROR io.HoodieWriteHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=915FABFAD0A796B26B0E40E7602460C3 partitionPath=2020/10/10}, currentLocation='null', newLocation='null'}
java.lang.ArrayIndexOutOfBoundsException: 10
at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:436)
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:144)
at org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:131)
at org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:70)
at org.apache.hudi.execution.LazyInsertIterable$HoodieInsertValueGenResult.(LazyInsertIterable.java:92)
at org.apache.hudi.execution.LazyInsertIterable.lambda$getTransformFunction$0(LazyInsertIterable.java:105)
at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:190)
at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46)
at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)