Bug Description
I was trying to use hudi 1.2 streaming source to read a hudi append-only table, but was seeing an error during hudi schema parsing (logs are in below session). Seems like we hit the limitations mentioned here:
https://github.com/apache/hudi/blob/master/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java#L477
Environment
**Hudi version: 1.2
Query engine: Flink
Relevant configs:
Logs and Stack Trace
Caused by: org.apache.flink.client.deployment.application.UnsuccessfulExecutionException: Application Status: FAILED
at org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:71)
... 56 common frames omitted
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:60)
... 56 common frames omitted
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=600000,backoffTimeMS=10000,maxFailuresPerInterval=1)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:176)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107)
at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:291)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:282)
at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:275)
at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:769)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:746)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)
at jdk.internal.reflect.GeneratedMethodAccessor100.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:569)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
... 5 common frames omitted
Caused by: java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:263)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:185)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:147)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
... 1 common frames omitted
Caused by: java.lang.ClassCastException: optional binary source_uuid (STRING) is not a group
at org.apache.hudi.org.apache.parquet.schema.Type.asGroupType(Type.java:248)
at org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:631)
at org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:584)
at org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:660)
at org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:668)
at org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:511)
at org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader.createWritableVectors(ParquetColumnarRowSplitReader.java:222)
at org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader.(ParquetColumnarRowSplitReader.java:161)
at org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.genPartColumnarRowReader(ParquetSplitReaderUtil.java:160)
at org.apache.hudi.table.format.RecordIterators.getParquetRecordIterator(RecordIterators.java:120)
at org.apache.hudi.table.format.RecordIterators.getParquetRecordIterator(RecordIterators.java:78)
at org.apache.hudi.table.format.HoodieRowDataParquetReader.getRowDataIterator(HoodieRowDataParquetReader.java:103)
at org.apache.hudi.table.format.FlinkRowDataReaderContext.getFileRecordIterator(FlinkRowDataReaderContext.java:110)
at org.apache.hudi.common.table.read.HoodieFileGroupReader.makeBaseFileIterator(HoodieFileGroupReader.java:162)
at org.apache.hudi.common.table.read.HoodieFileGroupReader.initRecordIterators(HoodieFileGroupReader.java:129)
at org.apache.hudi.common.table.read.HoodieFileGroupReader.getBufferedRecordIterator(HoodieFileGroupReader.java:292)
at org.apache.hudi.common.table.read.HoodieFileGroupReader.getClosableIterator(HoodieFileGroupReader.java:301)
at org.apache.hudi.source.reader.function.HoodieSplitReaderFunction.read(HoodieSplitReaderFunction.java:95)
at org.apache.hudi.source.reader.HoodieSourceSplitReader.fetch(HoodieSourceSplitReader.java:77)
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
... 6 common frames omitted
Bug Description
I was trying to use hudi 1.2 streaming source to read a hudi append-only table, but was seeing an error during hudi schema parsing (logs are in below session). Seems like we hit the limitations mentioned here:
https://github.com/apache/hudi/blob/master/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java#L477
Environment
**Hudi version: 1.2
Query engine: Flink
Relevant configs:
Logs and Stack Trace
Caused by: org.apache.flink.client.deployment.application.UnsuccessfulExecutionException: Application Status: FAILED
at org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:71)
... 56 common frames omitted
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:60)
... 56 common frames omitted
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=600000,backoffTimeMS=10000,maxFailuresPerInterval=1)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:176)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107)
at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:291)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:282)
at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:275)
at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:769)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:746)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)
at jdk.internal.reflect.GeneratedMethodAccessor100.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:569)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
... 5 common frames omitted
Caused by: java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:263)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:185)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:147)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
... 1 common frames omitted
Caused by: java.lang.ClassCastException: optional binary source_uuid (STRING) is not a group
at org.apache.hudi.org.apache.parquet.schema.Type.asGroupType(Type.java:248)
at org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:631)
at org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:584)
at org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:660)
at org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:668)
at org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:511)
at org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader.createWritableVectors(ParquetColumnarRowSplitReader.java:222)
at org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader.(ParquetColumnarRowSplitReader.java:161)
at org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.genPartColumnarRowReader(ParquetSplitReaderUtil.java:160)
at org.apache.hudi.table.format.RecordIterators.getParquetRecordIterator(RecordIterators.java:120)
at org.apache.hudi.table.format.RecordIterators.getParquetRecordIterator(RecordIterators.java:78)
at org.apache.hudi.table.format.HoodieRowDataParquetReader.getRowDataIterator(HoodieRowDataParquetReader.java:103)
at org.apache.hudi.table.format.FlinkRowDataReaderContext.getFileRecordIterator(FlinkRowDataReaderContext.java:110)
at org.apache.hudi.common.table.read.HoodieFileGroupReader.makeBaseFileIterator(HoodieFileGroupReader.java:162)
at org.apache.hudi.common.table.read.HoodieFileGroupReader.initRecordIterators(HoodieFileGroupReader.java:129)
at org.apache.hudi.common.table.read.HoodieFileGroupReader.getBufferedRecordIterator(HoodieFileGroupReader.java:292)
at org.apache.hudi.common.table.read.HoodieFileGroupReader.getClosableIterator(HoodieFileGroupReader.java:301)
at org.apache.hudi.source.reader.function.HoodieSplitReaderFunction.read(HoodieSplitReaderFunction.java:95)
at org.apache.hudi.source.reader.HoodieSourceSplitReader.fetch(HoodieSourceSplitReader.java:77)
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
... 6 common frames omitted