Skip to content

Commit

Permalink
[Fix][Connector-V2] Fix doris sink can not be closed when stream load…
Browse files Browse the repository at this point in the history
… not read any data (#6570)
  • Loading branch information
Hisoka-X committed Mar 27, 2024
1 parent 5fd98ea commit 341615f
Showing 1 changed file with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ public class DorisSinkWriter
private final int intervalTime;
private final DorisSerializer serializer;
private final CatalogTable catalogTable;
private final transient ScheduledExecutorService scheduledExecutorService;
private transient Thread executorThread;
private transient volatile Exception loadException = null;
private final ScheduledExecutorService scheduledExecutorService;
private Thread executorThread;
private volatile Exception loadException = null;

public DorisSinkWriter(
SinkWriter.Context context,
Expand Down Expand Up @@ -114,8 +114,6 @@ private void initializeLoad() {
} catch (Exception e) {
throw new DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, e);
}
// get main work thread.
executorThread = Thread.currentThread();
startLoad(labelGenerator.generateLabel(lastCheckpointId + 1));
// when uploading data in streaming mode, we need to regularly detect whether there are
// exceptions.
Expand All @@ -125,7 +123,7 @@ private void initializeLoad() {

@Override
public void write(SeaTunnelRow element) throws IOException {
checkLoadException();
checkLoadExceptionAndResetThread();
byte[] serialize =
serializer.serialize(
dorisConfig.isNeedsUnsupportedTypeCasting()
Expand Down Expand Up @@ -222,9 +220,11 @@ private void checkDone() {
}
}

private void checkLoadException() {
private void checkLoadExceptionAndResetThread() {
if (loadException != null) {
throw new RuntimeException("error while loading data.", loadException);
} else {
executorThread = Thread.currentThread();
}
}

Expand Down

0 comments on commit 341615f

Please sign in to comment.