Skip to content

Commit

Permalink
Prevent single commit multi instant issue [short term fix]
Browse files Browse the repository at this point in the history
Co-authored-by: TengHuo <teng_huo@outlook.com>
  • Loading branch information
voonhous and TengHuo committed Sep 23, 2022
1 parent a47dec7 commit 70a24df
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.common.AbstractWriteFunction;
Expand Down Expand Up @@ -198,7 +199,7 @@ private String instantToWrite() {
.timeout(config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT))
.action("instant initialize")
.build();
while (instant == null || instant.equals(this.initInstant)) {
while (instant == null || invalidInstant(instant)) {
// wait condition:
// 1. there is no inflight instant
// 2. the inflight instant does not change
Expand All @@ -209,4 +210,11 @@ private String instantToWrite() {
}
return instant;
}

/**
* Returns whether the pending instant is invalid to write with.
*/
private boolean invalidInstant(String instant) {
return HoodieTimeline.compareTimestamps(instant, HoodieTimeline.LESSER_THAN_OR_EQUALS, this.initInstant);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
Expand Down Expand Up @@ -279,6 +280,7 @@ protected String instantToWrite(boolean hasData) {
* Returns whether the pending instant is invalid to write with.
*/
private boolean invalidInstant(String instant, boolean hasData) {
return instant.equals(this.currentInstant) && hasData;
return HoodieTimeline.compareTimestamps(instant, HoodieTimeline.LESSER_THAN_OR_EQUALS, this.currentInstant)
&& hasData;
}
}

0 comments on commit 70a24df

Please sign in to comment.