From 70a24dfca0b051aced7f9914c7c1004143fc3025 Mon Sep 17 00:00:00 2001 From: voonhous Date: Fri, 23 Sep 2022 18:14:01 +0800 Subject: [PATCH] Prevent single commit multi instant issue [short term fix] Co-authored-by: TengHuo --- .../apache/hudi/sink/bulk/BulkInsertWriteFunction.java | 10 +++++++++- .../hudi/sink/common/AbstractStreamWriteFunction.java | 4 +++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java index 06d9fcd851c22..5d40c047e33cd 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java @@ -21,6 +21,7 @@ import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.sink.StreamWriteOperatorCoordinator; import org.apache.hudi.sink.common.AbstractWriteFunction; @@ -198,7 +199,7 @@ private String instantToWrite() { .timeout(config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT)) .action("instant initialize") .build(); - while (instant == null || instant.equals(this.initInstant)) { + while (instant == null || invalidInstant(instant)) { // wait condition: // 1. there is no inflight instant // 2. the inflight instant does not change @@ -209,4 +210,11 @@ private String instantToWrite() { } return instant; } + + /** + * Returns whether the pending instant is invalid to write with. + */ + private boolean invalidInstant(String instant) { + return HoodieTimeline.compareTimestamps(instant, HoodieTimeline.LESSER_THAN_OR_EQUALS, this.initInstant); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java index 674cd3588aaf2..d51e283c632d5 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java @@ -21,6 +21,7 @@ import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.sink.StreamWriteOperatorCoordinator; @@ -279,6 +280,7 @@ protected String instantToWrite(boolean hasData) { * Returns whether the pending instant is invalid to write with. */ private boolean invalidInstant(String instant, boolean hasData) { - return instant.equals(this.currentInstant) && hasData; + return HoodieTimeline.compareTimestamps(instant, HoodieTimeline.LESSER_THAN_OR_EQUALS, this.currentInstant) + && hasData; } }