From 2d37665871e2af710a830aae6d7c933752c14bbe Mon Sep 17 00:00:00 2001 From: "yuzhao.cyz" Date: Fri, 5 Nov 2021 12:03:40 +0800 Subject: [PATCH] [HUDI-2696] Remove the aborted checkpoint notification from coordinator --- .../sink/StreamWriteOperatorCoordinator.java | 14 +++------- .../sink/bulk/BulkInsertWriteFunction.java | 12 ++++++--- .../common/AbstractStreamWriteFunction.java | 24 +++-------------- .../org/apache/hudi/sink/utils/TimeWait.java | 26 ++++--------------- .../org/apache/hudi/util/StreamerUtil.java | 2 +- .../hudi/sink/TestWriteCopyOnWrite.java | 7 ++--- .../apache/hudi/sink/utils/TestWriteBase.java | 13 +++++----- 7 files changed, 32 insertions(+), 66 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index feb348fe39b5..0af38c41fbc5 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -235,16 +235,6 @@ public void notifyCheckpointComplete(long checkpointId) { ); } - @Override - public void notifyCheckpointAborted(long checkpointId) { - // once the checkpoint was aborted, unblock the writer tasks to - // reuse the last instant. - if (!WriteMetadataEvent.BOOTSTRAP_INSTANT.equals(this.instant)) { - executor.execute(() -> sendCommitAckEvents(checkpointId), - "unblock data write with aborted checkpoint %s", checkpointId); - } - } - @Override public void resetToCheckpoint(long checkpointID, byte[] checkpointData) { // no operation @@ -334,8 +324,10 @@ private void addEventToBuffer(WriteMetadataEvent event) { private void startInstant() { final String instant = HoodieActiveTimeline.createNewInstantTime(); - this.writeClient.startCommitWithTime(instant, tableState.commitAction); + // put the assignment in front of metadata generation, + // because the instant request from write task is asynchronous. this.instant = instant; + this.writeClient.startCommitWithTime(instant, tableState.commitAction); this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction, this.instant); this.writeClient.upgradeDowngrade(this.instant); LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant, diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java index f3cfbae66735..4089907243c8 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java @@ -183,14 +183,20 @@ private void sendBootstrapEvent() { LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID); } + /** + * Returns the last pending instant time. + */ + protected String lastPendingInstant() { + return StreamerUtil.getLastPendingInstant(this.metaClient); + } + private String instantToWrite() { - String instant = StreamerUtil.getLastPendingInstant(this.metaClient); + String instant = lastPendingInstant(); // if exactly-once semantics turns on, // waits for the checkpoint notification until the checkpoint timeout threshold hits. TimeWait timeWait = TimeWait.builder() .timeout(config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT)) .action("instant initialize") - .throwsT(true) .build(); while (instant == null || instant.equals(this.initInstant)) { // wait condition: @@ -199,7 +205,7 @@ private String instantToWrite() { // sleep for a while timeWait.waitFor(); // refresh the inflight instant - instant = StreamerUtil.getLastPendingInstant(this.metaClient); + instant = lastPendingInstant(); } return instant; } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java index 5ad2935e2ca8..0e7300591286 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java @@ -114,11 +114,6 @@ public abstract class AbstractStreamWriteFunction */ protected List writeStatuses; - /** - * Current checkpoint id. - */ - private long checkpointId = -1; - /** * Constructs a StreamWriteFunctionBase. * @@ -152,7 +147,6 @@ public void initializeState(FunctionInitializationContext context) throws Except @Override public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { - this.checkpointId = functionSnapshotContext.getCheckpointId(); snapshotState(); // Reload the snapshot state as the current state. reloadWriteMetaState(); @@ -216,10 +210,7 @@ private void reloadWriteMetaState() throws Exception { public void handleOperatorEvent(OperatorEvent event) { ValidationUtils.checkArgument(event instanceof CommitAckEvent, "The write function can only handle CommitAckEvent"); - long checkpointId = ((CommitAckEvent) event).getCheckpointId(); - if (checkpointId == -1 || checkpointId == this.checkpointId) { - this.confirming = false; - } + this.confirming = false; } /** @@ -249,16 +240,9 @@ protected String instantToWrite(boolean hasData) { // 2. the inflight instant does not change and the checkpoint has buffering data if (instant == null || (instant.equals(this.currentInstant) && hasData)) { // sleep for a while - boolean timeout = timeWait.waitFor(); - if (timeout && instant != null) { - // if the timeout threshold hits but the last instant still not commit, - // and the task does not receive commit ask event(no data or aborted checkpoint), - // assumes the checkpoint was canceled silently and unblock the data flushing - confirming = false; - } else { - // refresh the inflight instant - instant = lastPendingInstant(); - } + timeWait.waitFor(); + // refresh the inflight instant + instant = lastPendingInstant(); } else { // the pending instant changed, that means the last instant was committed // successfully. diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/TimeWait.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/TimeWait.java index 453c2314d523..0441673c33d1 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/TimeWait.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/TimeWait.java @@ -35,14 +35,13 @@ public class TimeWait { private final long timeout; // timeout in SECONDS private final long interval; // interval in MILLISECONDS private final String action; // action to report error message - private final boolean throwsT; // whether to throw when timeout + private long waitingTime = 0L; - private TimeWait(long timeout, long interval, String action, boolean throwsT) { + private TimeWait(long timeout, long interval, String action) { this.timeout = timeout; this.interval = interval; this.action = action; - this.throwsT = throwsT; } public static Builder builder() { @@ -51,23 +50,14 @@ public static Builder builder() { /** * Wait for an interval time. - * - * @return true if is timed out */ - public boolean waitFor() { + public void waitFor() { try { if (waitingTime > timeout) { - final String msg = "Timeout(" + waitingTime + "ms) while waiting for " + action; - if (this.throwsT) { - throw new HoodieException(msg); - } else { - LOG.warn(msg); - return true; - } + throw new HoodieException("Timeout(" + waitingTime + "ms) while waiting for " + action); } TimeUnit.MILLISECONDS.sleep(interval); waitingTime += interval; - return false; } catch (InterruptedException e) { throw new HoodieException("Error while waiting for " + action, e); } @@ -80,7 +70,6 @@ public static class Builder { private long timeout = 5 * 60 * 1000L; // default 5 minutes private long interval = 1000; private String action; - private boolean throwsT = false; private Builder() { } @@ -102,14 +91,9 @@ public Builder action(String action) { return this; } - public Builder throwsT(boolean throwsT) { - this.throwsT = throwsT; - return this; - } - public TimeWait build() { Objects.requireNonNull(this.action); - return new TimeWait(this.timeout, this.interval, this.action, this.throwsT); + return new TimeWait(this.timeout, this.interval, this.action); } } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 7aa023acd425..5aab5cb05882 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -468,7 +468,7 @@ public static String getLastPendingInstant(HoodieTableMetaClient metaClient, boo if (reloadTimeline) { metaClient.reloadActiveTimeline(); } - return metaClient.getCommitsTimeline().filterInflightsAndRequested() + return metaClient.getCommitsTimeline().filterInflights() .lastInstant() .map(HoodieInstant::getTimestamp) .orElse(null); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index d8588f8cf83f..a91f45263ff2 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -95,8 +95,8 @@ public void testCheckpointFails() throws Exception { .assertEmptyEvent() .checkpointFails(1) .consume(TestData.DATA_SET_INSERT) - .checkpointNotThrow(2, - "The stream writer reuse the last instant time when waiting for the last instant commit timeout") + .checkpointThrows(2, + "Timeout(1000ms) while waiting for instant initialize") // do not send the write event and fails the checkpoint, // behaves like the last checkpoint is successful. .checkpointFails(2) @@ -390,7 +390,8 @@ public void testWriteExactlyOnce() throws Exception { .consume(TestData.DATA_SET_INSERT) .assertNotConfirming() .checkpoint(2) - .assertConsumeDoesNotThrow(TestData.DATA_SET_INSERT) + .assertConsumeThrows(TestData.DATA_SET_INSERT, + "Timeout(1000ms) while waiting for instant initialize") .end(); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java index e3b1226a6ac7..a03f870296db 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.OptionsResolver; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.event.WriteMetadataEvent; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestData; @@ -51,11 +52,11 @@ import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; /** @@ -151,10 +152,8 @@ public TestHarness consume(List inputs) throws Exception { return this; } - public TestHarness assertConsumeDoesNotThrow(List inputs) { - assertDoesNotThrow(() -> { - consume(inputs); - }, "The stream writer reuse the last instant time when waiting for the last instant commit timeout"); + public TestHarness assertConsumeThrows(List inputs, String message) { + assertThrows(HoodieException.class, () -> consume(inputs), message); return this; } @@ -294,9 +293,9 @@ public TestHarness checkpointFails(long checkpointId) { return this; } - public TestHarness checkpointNotThrow(long checkpointId, String message) { + public TestHarness checkpointThrows(long checkpointId, String message) { // this returns early because there is no inflight instant - assertDoesNotThrow(() -> checkpoint(checkpointId), message); + assertThrows(HoodieException.class, () -> checkpoint(checkpointId), message); return this; }