[fix](flink) Fix FetchResult and MemoryScratchSink stuck#42216
[fix](flink) Fix FetchResult and MemoryScratchSink stuck#42216xinyiZzz merged 3 commits intoapache:masterfrom
Conversation
|
Thank you for your contribution to Apache Doris. Since 2024-03-18, the Document has been moved to doris-website. |
|
run buildall |
|
clang-tidy review says "All clean, LGTM! 👍" |
|
TeamCity be ut coverage result: |
|
run buildall |
|
clang-tidy review says "All clean, LGTM! 👍" |
1 similar comment
|
clang-tidy review says "All clean, LGTM! 👍" |
|
TeamCity be ut coverage result: |
|
PR approved by at least one committer and no changes requested. |
|
PR approved by anyone and no changes requested. |
Before each get queue, will set sink task dependency ready.
so if the sink task put queue faster than the fetch result get queue,
the queue size will always be 10.
Be sure to set sink dependency ready before getting queue.
otherwise, if queue is emptied after sink task put queue and before
block dependency, get queue will stuck and will never set sink
dependency ready.
Fix:
```
WARN org.apache.doris.flink.backend.BackendClient [] - Get next from Doris BE{host='', port=9060} failed.
org.apache.doris.shaded.org.apache.thrift.transport.TTransportException: java.net.SocketTimeoutException: Read timed out
at org.apache.doris.shaded.org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:179) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
at org.apache.doris.shaded.org.apache.thrift.transport.TTransport.readAll(TTransport.java:109) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
at org.apache.doris.shaded.org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:464) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
at org.apache.doris.shaded.org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:362) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
at org.apache.doris.shaded.org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:245) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
at org.apache.doris.shaded.org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
at org.apache.doris.sdk.thrift.TDorisExternalService$Client.recvGetNext(TDorisExternalService.java:92) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
at org.apache.doris.sdk.thrift.TDorisExternalService$Client.getNext(TDorisExternalService.java:79) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
at org.apache.doris.flink.backend.BackendClient.getNext(BackendClient.java:185) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
at org.apache.doris.flink.source.reader.DorisValueReader.hasNext(DorisValueReader.java:243) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
at org.apache.doris.flink.source.split.DorisSplitRecords.nextRecordFromSplit(DorisSplitRecords.java:71) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
at org.apache.doris.flink.source.split.DorisSplitRecords.nextRecordFromSplit(DorisSplitRecords.java:34) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:140) ~[flink-connector-files-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:417) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) [flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) [flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) [flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) [flink-dist-1.17.1.jar:1.17.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_191]
Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method) ~[?:1.8.0_191]
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) ~[?:1.8.0_191]
at java.net.SocketInputStream.read(SocketInputStream.java:171) ~[?:1.8.0_191]
at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[?:1.8.0_191]
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) ~[?:1.8.0_191]
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286) ~[?:1.8.0_191]
at java.io.BufferedInputStream.read(BufferedInputStream.java:345) ~[?:1.8.0_191]
at org.apache.doris.shaded.org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:177) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
... 24 more
```
Before each get queue, will set sink task dependency ready.
so if the sink task put queue faster than the fetch result get queue,
the queue size will always be 10.
Be sure to set sink dependency ready before getting queue.
otherwise, if queue is emptied after sink task put queue and before
block dependency, get queue will stuck and will never set sink
dependency ready.
Fix:
```
WARN org.apache.doris.flink.backend.BackendClient [] - Get next from Doris BE{host='', port=9060} failed.
org.apache.doris.shaded.org.apache.thrift.transport.TTransportException: java.net.SocketTimeoutException: Read timed out
at org.apache.doris.shaded.org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:179) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
at org.apache.doris.shaded.org.apache.thrift.transport.TTransport.readAll(TTransport.java:109) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
at org.apache.doris.shaded.org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:464) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
at org.apache.doris.shaded.org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:362) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
at org.apache.doris.shaded.org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:245) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
at org.apache.doris.shaded.org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
at org.apache.doris.sdk.thrift.TDorisExternalService$Client.recvGetNext(TDorisExternalService.java:92) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
at org.apache.doris.sdk.thrift.TDorisExternalService$Client.getNext(TDorisExternalService.java:79) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
at org.apache.doris.flink.backend.BackendClient.getNext(BackendClient.java:185) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
at org.apache.doris.flink.source.reader.DorisValueReader.hasNext(DorisValueReader.java:243) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
at org.apache.doris.flink.source.split.DorisSplitRecords.nextRecordFromSplit(DorisSplitRecords.java:71) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
at org.apache.doris.flink.source.split.DorisSplitRecords.nextRecordFromSplit(DorisSplitRecords.java:34) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:140) ~[flink-connector-files-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:417) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) [flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) [flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) [flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) [flink-dist-1.17.1.jar:1.17.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_191]
Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method) ~[?:1.8.0_191]
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) ~[?:1.8.0_191]
at java.net.SocketInputStream.read(SocketInputStream.java:171) ~[?:1.8.0_191]
at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[?:1.8.0_191]
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) ~[?:1.8.0_191]
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286) ~[?:1.8.0_191]
at java.io.BufferedInputStream.read(BufferedInputStream.java:345) ~[?:1.8.0_191]
at org.apache.doris.shaded.org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:177) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
... 24 more
```
Before each get queue, will set sink task dependency ready.
so if the sink task put queue faster than the fetch result get queue, the queue size will always be 10.
Be sure to set sink dependency ready before getting queue.
otherwise, if queue is emptied after sink task put queue and before block dependency, get queue will stuck and will never set sink dependency ready.
Fix: