Skip to content

chunjun入门案例执行报越界异常且快速开始文档有误 #932

@huaxiapanda

Description

@huaxiapanda

Describe the bug
我在执行chunjun的入门案例时,按照quickstart.md文档执行时,出现索引越界异常,脏数据插件捕获到,之后程序中断,异常信息如下:
2022-06-09 16:55:56.096 [Sink: streamsinkfactory (1/1)#0] INFO o.a.f.runtime.io.network.partition.consumer.SingleInputGate - Converting recovered input channels (1 channels)
2022-06-09 16:56:05.885 [dirty-consumer-pool-1-thread-1] WARN com.dtstack.chunjun.dirty.log.LogDirtyDataCollector -
====================Dirty Data=====================
DirtyDataEntry[jobId='643cf45c8df12f201904fd28acbac367', jobName='Flink_Job', operatorName='Sink: streamsinkfactory', dirtyContent='{"arity":3,"nullBitsSizeInBytes":8,"segments":[{"allowWrap":true,"heapMemory":[0,0,0,0,0,0,0,0,-92,27,0,0,0,0,0,0,110,78,120,122,107,0,0,-123,87,106,-83,71,-127,1,0,0],"address":16,"addressLimit":48,"size":32}],"offset":0,"sizeInBytes":32}', errorMessage='com.dtstack.chunjun.throwable.WriteRecordException:
java.lang.ArrayIndexOutOfBoundsException: 12
at com.dtstack.chunjun.connector.stream.sink.StreamOutputFormat.writeSingleRecordInternal(StreamOutputFormat.java:65)
at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeSingleRecord(BaseRichOutputFormat.java:466)
at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeRecord(BaseRichOutputFormat.java:272)
at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeRecord(BaseRichOutputFormat.java:92)
at com.dtstack.chunjun.sink.DtOutputFormatSinkFunction.invoke(DtOutputFormatSinkFunction.java:117)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:398)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:619)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:583)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 12
at org.apache.flink.table.data.binary.BinarySegmentUtils.getLongMultiSegments(BinarySegmentUtils.java:736)
at org.apache.flink.table.data.binary.BinarySegmentUtils.getLong(BinarySegmentUtils.java:726)
at org.apache.flink.table.data.binary.BinarySegmentUtils.readTimestampData(BinarySegmentUtils.java:1022)
at org.apache.flink.table.data.binary.BinaryRowData.getTimestamp(BinaryRowData.java:355)
at com.dtstack.chunjun.connector.stream.converter.StreamRowConverter.lambda$createExternalConverter$ac55cfac$1(StreamRowConverter.java:212)
at com.dtstack.chunjun.connector.stream.converter.StreamRowConverter.lambda$wrapIntoNullableExternalConverter$9859bb03$1(StreamRowConverter.java:79)
at com.dtstack.chunjun.connector.stream.converter.StreamRowConverter.toExternal(StreamRowConverter.java:96)
at com.dtstack.chunjun.connector.stream.converter.StreamRowConverter.toExternal(StreamRowConverter.java:53)
at com.dtstack.chunjun.connector.stream.sink.StreamOutputFormat.writeSingleRecordInternal(StreamOutputFormat.java:57)
... 16 more
', fieldName='null', createTime=2022-06-09 16:56:05.673]

===================================================
2022-06-09 16:56:05.891 [Sink: streamsinkfactory (1/1)#0] INFO com.dtstack.chunjun.connector.stream.sink.StreamOutputFormat - taskNumber[0] close()
2022-06-09 16:56:25.892 [Sink: streamsinkfactory (1/1)#0] INFO com.dtstack.chunjun.dirty.log.LogDirtyDataCollector - Print consumer closed.
2022-06-09 16:56:25.893 [Sink: streamsinkfactory (1/1)#0] INFO com.dtstack.chunjun.connector.stream.sink.StreamOutputFormat - subtask[0}] close() finished
2022-06-09 16:56:25.899 [Sink: streamsinkfactory (1/1)#0] WARN org.apache.flink.runtime.taskmanager.Task - Sink: streamsinkfactory (1/1)#0 (4dbc793ad6703b5c77bb7dc87884d55f) switched from RUNNING to FAILED.
com.dtstack.chunjun.throwable.NoRestartException: The dirty consumer shutdown, due to the consumed count exceed the max-consumed [0]
at com.dtstack.chunjun.dirty.consumer.DirtyDataCollector.addConsumed(DirtyDataCollector.java:105)
at com.dtstack.chunjun.dirty.consumer.DirtyDataCollector.offer(DirtyDataCollector.java:79)
at com.dtstack.chunjun.dirty.manager.DirtyManager.collect(DirtyManager.java:140)
at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeSingleRecord(BaseRichOutputFormat.java:469)
at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeRecord(BaseRichOutputFormat.java:272)
at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeRecord(BaseRichOutputFormat.java:92)
at com.dtstack.chunjun.sink.DtOutputFormatSinkFunction.invoke(DtOutputFormatSinkFunction.java:117)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:398)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:619)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:583)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)
at java.lang.Thread.run(Thread.java:748)
2022-06-09 16:56:25.900 [Sink: streamsinkfactory (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Sink: streamsinkfactory (1/1)#0 (4dbc793ad6703b5c77bb7dc87884d55f).
2022-06-09 16:56:25.909 [flink-akka.actor.default-dispatcher-9] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Sink: streamsinkfactory (1/1)#0 4dbc793ad6703b5c77bb7dc87884d55f.
2022-06-09 16:56:25.913 [flink-akka.actor.default-dispatcher-2] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: streamsinkfactory (1/1) (4dbc793ad6703b5c77bb7dc87884d55f) switched from RUNNING to FAILED on cd9aa117-851c-4d18-86da-b161ff3a5e9f @ 127.0.0.1 (dataPort=-1).
com.dtstack.chunjun.throwable.NoRestartException: The dirty consumer shutdown, due to the consumed count exceed the max-consumed [0]
at com.dtstack.chunjun.dirty.consumer.DirtyDataCollector.addConsumed(DirtyDataCollector.java:105)
at com.dtstack.chunjun.dirty.consumer.DirtyDataCollector.offer(DirtyDataCollector.java:79)
at com.dtstack.chunjun.dirty.manager.DirtyManager.collect(DirtyManager.java:140)
at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeSingleRecord(BaseRichOutputFormat.java:469)
at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeRecord(BaseRichOutputFormat.java:272)
at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeRecord(BaseRichOutputFormat.java:92)
at com.dtstack.chunjun.sink.DtOutputFormatSinkFunction.invoke(DtOutputFormatSinkFunction.java:117)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:398)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:619)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:583)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)
at java.lang.Thread.run(Thread.java:748)
2022-06-09 16:56:25.922 [flink-akka.actor.default-dispatcher-2] INFO o.a.f.r.e.f.flip1.RestartPipelinedRegionFailoverStrategy - Calculating tasks to restart to recover the failed task ddb598ad156ed281023ba4eebbe487e3_0.
2022-06-09 16:56:25.923 [flink-akka.actor.default-dispatcher-2] INFO o.a.f.r.e.f.flip1.RestartPipelinedRegionFailoverStrategy - 5 tasks should be restarted to recover the failed task ddb598ad156ed281023ba4eebbe487e3_0.
2022-06-09 16:56:25.928 [flink-akka.actor.default-dispatcher-2] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink_Job (643cf45c8df12f201904fd28acbac367) switched from state RUNNING to FAILING.
org.apache.flink.runtime.JobException: The failure is not recoverable
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:107)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:666)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:446)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage$$$capture(ActorCell.scala:592)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: com.dtstack.chunjun.throwable.NoRestartException: The dirty consumer shutdown, due to the consumed count exceed the max-consumed [0]
at com.dtstack.chunjun.dirty.consumer.DirtyDataCollector.addConsumed(DirtyDataCollector.java:105)
at com.dtstack.chunjun.dirty.consumer.DirtyDataCollector.offer(DirtyDataCollector.java:79)
at com.dtstack.chunjun.dirty.manager.DirtyManager.collect(DirtyManager.java:140)
at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeSingleRecord(BaseRichOutputFormat.java:469)
at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeRecord(BaseRichOutputFormat.java:272)
at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeRecord(BaseRichOutputFormat.java:92)
at com.dtstack.chunjun.sink.DtOutputFormatSinkFunction.invoke(DtOutputFormatSinkFunction.java:117)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:398)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:619)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:583)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)
at java.lang.Thread.run(Thread.java:748)

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions