You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
[Bug] When Flink uses DorisStreamLoad, an exception "Encountered unqualified data" occurs, but the error url is not returned and the data cannot be located.
#36506
Open
2 of 3 tasks
Mou-arch opened this issue
Jun 19, 2024
· 1 comment
I had searched in the issues and found no similar issues.
Version
version: 2.1.4
What's Wrong?
When I use flink cdc to import mongo data into Doris, everything starts fine and runs for a while. I guess there is a problem with the metadata. Then stream load returns an error, but does not return the error log, which means that the specific error data cannot be located.
09:21:43,729 WARN org.apache.flink.runtime.taskmanager.Task [] - ChangelogNormalize[3] -> xxx_stats_cdc[4]: Writer -> xxx_stats_cdc[4]: Committer (1/20)#0 (d2e691a388e0657aeee23073662b7adc_20ba6b65f97481d5570070de90e4e791_0_0) switched from RUNNING to FAILED with failure cause: java.io.IOException: Could not perform checkpoint 7 for operator ChangelogNormalize[3] -> xxx_stats_cdc[4]: Writer -> xxx_stats_cdc[4]: Committer (1/20)#0. at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1326) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:122) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:579) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-runtime-1.19.0.jar:1.19.0] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) [flink-runtime-1.19.0.jar:1.19.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) [flink-runtime-1.19.0.jar:1.19.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) [flink-runtime-1.19.0.jar:1.19.0] at java.lang.Thread.run(Thread.java:833) [?:?] Caused by: org.apache.doris.flink.exception.DorisRuntimeException: table xxxdb.xxx_stats_cdc stream load error: [CANCELLED]Encountered unqualified data, stop processing, see more in null at org.apache.doris.flink.sink.writer.DorisWriter.prepareCommit(DorisWriter.java:274) ~[flink-doris-connector-1.19-1.6.1.jar:1.6.1] at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:198) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:168) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:323) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$16(StreamTask.java:1369) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1357) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1314) ~[flink-streaming-java-1.19.0.jar:1.19.0] ... 22 more
What You Expected?
Return the wrong URL to help developers troubleshoot error data. It is recommended to provide query statements for the error log. I tried to use SHOW STREAM LOAD where Label = "xxx" and found that the URL was N/A. The query was be.WARNING.log.20240613-164831 , but the record only has a brief description and no specific error message. It cannot help me troubleshoot the information. It looks like the following W20240619 09:21:43.803699 663722 stream_load_executor.cpp:100] fragment execute failed, err_msg=[CANCELLED]Encountered unqualified data, stop processing, id=3643c7416dac2a8c-0d768e7fd6259ab9, job_id=-1, txn_id=65132, label=shop_stats_cdc_9eWyMeuCC1DnzZoUOvWk079weHsiso5N_xxxx_stats_cdc_5_7_0562b13f-5419-4e08-9c12-4fc494be8802, elapse(s)=31
A large number of data sets cannot troubleshoot positioning data, which means that Doris cannot continue to work. We hope to provide a convenient and stable way to query errors.
How to Reproduce?
Because I couldn't query the error, I actually didn't know which piece of data had the problem, so I just did a simple cdc synchronization, got it from mongob and inserted it into doris
Temporary solution
1.Try increasing VARCHAR field length to 65533
2.Try adding sink.properties.column separator with '|' and sink.properties.line_delimiter with '//n//r'
Search before asking
Version
version: 2.1.4
What's Wrong?
When I use flink cdc to import mongo data into Doris, everything starts fine and runs for a while. I guess there is a problem with the metadata. Then stream load returns an error, but does not return the error log, which means that the specific error data cannot be located.
09:21:43,729 WARN org.apache.flink.runtime.taskmanager.Task [] - ChangelogNormalize[3] -> xxx_stats_cdc[4]: Writer -> xxx_stats_cdc[4]: Committer (1/20)#0 (d2e691a388e0657aeee23073662b7adc_20ba6b65f97481d5570070de90e4e791_0_0) switched from RUNNING to FAILED with failure cause: java.io.IOException: Could not perform checkpoint 7 for operator ChangelogNormalize[3] -> xxx_stats_cdc[4]: Writer -> xxx_stats_cdc[4]: Committer (1/20)#0. at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1326) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:122) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:579) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-runtime-1.19.0.jar:1.19.0] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) [flink-runtime-1.19.0.jar:1.19.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) [flink-runtime-1.19.0.jar:1.19.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) [flink-runtime-1.19.0.jar:1.19.0] at java.lang.Thread.run(Thread.java:833) [?:?] Caused by: org.apache.doris.flink.exception.DorisRuntimeException: table xxxdb.xxx_stats_cdc stream load error: [CANCELLED]Encountered unqualified data, stop processing, see more in null at org.apache.doris.flink.sink.writer.DorisWriter.prepareCommit(DorisWriter.java:274) ~[flink-doris-connector-1.19-1.6.1.jar:1.6.1] at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:198) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:168) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:323) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$16(StreamTask.java:1369) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1357) ~[flink-streaming-java-1.19.0.jar:1.19.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1314) ~[flink-streaming-java-1.19.0.jar:1.19.0] ... 22 more
What You Expected?
Return the wrong URL to help developers troubleshoot error data. It is recommended to provide query statements for the error log. I tried to use SHOW STREAM LOAD where Label = "xxx" and found that the URL was N/A. The query was be.WARNING.log.20240613-164831 , but the record only has a brief description and no specific error message. It cannot help me troubleshoot the information. It looks like the following
W20240619 09:21:43.803699 663722 stream_load_executor.cpp:100] fragment execute failed, err_msg=[CANCELLED]Encountered unqualified data, stop processing, id=3643c7416dac2a8c-0d768e7fd6259ab9, job_id=-1, txn_id=65132, label=shop_stats_cdc_9eWyMeuCC1DnzZoUOvWk079weHsiso5N_xxxx_stats_cdc_5_7_0562b13f-5419-4e08-9c12-4fc494be8802, elapse(s)=31
A large number of data sets cannot troubleshoot positioning data, which means that Doris cannot continue to work. We hope to provide a convenient and stable way to query errors.
How to Reproduce?
Because I couldn't query the error, I actually didn't know which piece of data had the problem, so I just did a simple cdc synchronization, got it from mongob and inserted it into doris
Anything Else?
No response
Are you willing to submit PR?
Code of Conduct
The text was updated successfully, but these errors were encountered: