Skip to content

[SUPPORT] Flink job failing with Avro ClassCastException #9596

@raghunittala

Description

@raghunittala

Hi Team,

I have a Flink job where I'm trying to consume Protobuf messages from Kafka and save them to Hudi table in S3 object storage. Here are few issues I'm facing while trying to do so:

  1. The job runs for sometime and when it tries to compact the files it is throwing a ClassCastException. Here is the complete stacktrace for that:
Caused by: org.apache.hudi.exception.HoodieException: Executor executes action [commits the instant 20230824070324708] error
    ... 6 more
Caused by: java.lang.ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class org.apache.avro.specific.SpecificRecordBase (org.apache.avro.generic.GenericData$Record and org.apache.avro.specific.SpecificRecordBase are in unnamed module of loader 'app')
    at org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeAvroMetadata(TimelineMetadataUtils.java:209)
    at org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeCompactionPlan(TimelineMetadataUtils.java:169)
    at org.apache.hudi.common.util.CompactionUtils.getCompactionPlan(CompactionUtils.java:191)
    at org.apache.hudi.common.util.CompactionUtils.lambda$getCompactionPlansByTimeline$4(CompactionUtils.java:163)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown Source)
    at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(Unknown Source)
    at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)
    at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(Unknown Source)
    at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
    at java.base/java.util.stream.ReferencePipeline.collect(Unknown Source)
    at org.apache.hudi.common.util.CompactionUtils.getCompactionPlansByTimeline(CompactionUtils.java:164)
    at org.apache.hudi.common.util.CompactionUtils.getAllPendingCompactionPlans(CompactionUtils.java:133)
    at org.apache.hudi.common.util.CompactionUtils.getAllPendingCompactionOperations(CompactionUtils.java:207)
    at org.apache.hudi.common.table.view.AbstractTableFileSystemView.init(AbstractTableFileSystemView.java:120)
    at org.apache.hudi.common.table.view.HoodieTableFileSystemView.init(HoodieTableFileSystemView.java:113)
    at org.apache.hudi.common.table.view.HoodieTableFileSystemView.<init>(HoodieTableFileSystemView.java:107)
    at org.apache.hudi.common.table.view.FileSystemViewManager.createInMemoryFileSystemView(FileSystemViewManager.java:177)
    at org.apache.hudi.common.table.view.FileSystemViewManager.lambda$createViewManager$5fcdabfe$1(FileSystemViewManager.java:272)
    at org.apache.hudi.common.table.view.FileSystemViewManager.lambda$getFileSystemView$1(FileSystemViewManager.java:115)
    at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(Unknown Source)
    at org.apache.hudi.common.table.view.FileSystemViewManager.getFileSystemView(FileSystemViewManager.java:114)
    at org.apache.hudi.table.HoodieTable.getSliceView(HoodieTable.java:320)
    at org.apache.hudi.table.action.compact.plan.generators.BaseHoodieCompactionPlanGenerator.generateCompactionPlan(BaseHoodieCompactionPlanGenerator.java:92)
    at org.apache.hudi.table.action.compact.ScheduleCompactionActionExecutor.scheduleCompaction(ScheduleCompactionActionExecutor.java:147)
    at org.apache.hudi.table.action.compact.ScheduleCompactionActionExecutor.execute(ScheduleCompactionActionExecutor.java:113)
    at org.apache.hudi.table.HoodieFlinkMergeOnReadTable.scheduleCompaction(HoodieFlinkMergeOnReadTable.java:105)
    at org.apache.hudi.client.BaseHoodieTableServiceClient.scheduleTableServiceInternal(BaseHoodieTableServiceClient.java:421)
    at org.apache.hudi.client.BaseHoodieTableServiceClient.scheduleTableService(BaseHoodieTableServiceClient.java:393)
    at org.apache.hudi.client.BaseHoodieWriteClient.scheduleTableService(BaseHoodieWriteClient.java:1097)
    at org.apache.hudi.client.BaseHoodieWriteClient.scheduleCompactionAtInstant(BaseHoodieWriteClient.java:876)
    at org.apache.hudi.client.BaseHoodieWriteClient.scheduleCompaction(BaseHoodieWriteClient.java:867)
    at org.apache.hudi.util.CompactionUtil.scheduleCompaction(CompactionUtil.java:65)
    at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$notifyCheckpointComplete$2(StreamWriteOperatorCoordinator.java:250)
    at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130)

I'm also copying hudi-flink1.16-bundle under flink/plugins/hudi folder.

I’m not seeing any parquet files being created in S3. I only see .log files being created. I suspect that due to the above exception compaction couldn't run and so parquets are not created.

Here is the hoodie.properties:

hoodie.table.type=MERGE_ON_READ
hoodie.table.precombine.field=start_time_unix_nano
hoodie.table.partition.fields=event_id,event_type
hoodie.table.cdc.enabled=false
hoodie.archivelog.folder=archived
hoodie.timeline.layout.version=1
hoodie.table.checksum=3942898242
hoodie.datasource.write.drop.partition.columns=false
hoodie.table.recordkey.fields=record_key
hoodie.table.name=ad_results_sink_table
hoodie.compaction.record.merger.strategy=eeb8d96f-b1e4-49fd-bbf8-28ac514178e5
hoodie.datasource.write.hive_style_partitioning=false
hoodie.table.keygenerator.class=org.apache.hudi.keygen.ComplexAvroKeyGenerator
hoodie.datasource.write.partitionpath.urlencode=false
hoodie.table.version=5
  • Hudi version: 0.13.1
  • Flink version: 1.17.1
  • Hadoop version: 3.3.5
  • Storage: S3
  • Running on Docker?: yes

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    Status

    ⏳ Awaiting Triage

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions