Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-2696] Remove the aborted checkpoint notification from coordinator #3926

Merged
merged 1 commit into from Nov 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -235,16 +235,6 @@ public void notifyCheckpointComplete(long checkpointId) {
);
}

@Override
public void notifyCheckpointAborted(long checkpointId) {
// once the checkpoint was aborted, unblock the writer tasks to
// reuse the last instant.
if (!WriteMetadataEvent.BOOTSTRAP_INSTANT.equals(this.instant)) {
executor.execute(() -> sendCommitAckEvents(checkpointId),
"unblock data write with aborted checkpoint %s", checkpointId);
}
}

@Override
public void resetToCheckpoint(long checkpointID, byte[] checkpointData) {
// no operation
Expand Down Expand Up @@ -334,8 +324,10 @@ private void addEventToBuffer(WriteMetadataEvent event) {

private void startInstant() {
final String instant = HoodieActiveTimeline.createNewInstantTime();
this.writeClient.startCommitWithTime(instant, tableState.commitAction);
// put the assignment in front of metadata generation,
// because the instant request from write task is asynchronous.
this.instant = instant;
this.writeClient.startCommitWithTime(instant, tableState.commitAction);
this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction, this.instant);
this.writeClient.upgradeDowngrade(this.instant);
LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant,
Expand Down
Expand Up @@ -183,14 +183,20 @@ private void sendBootstrapEvent() {
LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID);
}

/**
* Returns the last pending instant time.
*/
protected String lastPendingInstant() {
return StreamerUtil.getLastPendingInstant(this.metaClient);
}

private String instantToWrite() {
String instant = StreamerUtil.getLastPendingInstant(this.metaClient);
String instant = lastPendingInstant();
// if exactly-once semantics turns on,
// waits for the checkpoint notification until the checkpoint timeout threshold hits.
TimeWait timeWait = TimeWait.builder()
.timeout(config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT))
.action("instant initialize")
.throwsT(true)
.build();
while (instant == null || instant.equals(this.initInstant)) {
// wait condition:
Expand All @@ -199,7 +205,7 @@ private String instantToWrite() {
// sleep for a while
timeWait.waitFor();
// refresh the inflight instant
instant = StreamerUtil.getLastPendingInstant(this.metaClient);
instant = lastPendingInstant();
}
return instant;
}
Expand Down
Expand Up @@ -114,11 +114,6 @@ public abstract class AbstractStreamWriteFunction<I>
*/
protected List<WriteStatus> writeStatuses;

/**
* Current checkpoint id.
*/
private long checkpointId = -1;

/**
* Constructs a StreamWriteFunctionBase.
*
Expand Down Expand Up @@ -152,7 +147,6 @@ public void initializeState(FunctionInitializationContext context) throws Except

@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
this.checkpointId = functionSnapshotContext.getCheckpointId();
snapshotState();
// Reload the snapshot state as the current state.
reloadWriteMetaState();
Expand Down Expand Up @@ -216,10 +210,7 @@ private void reloadWriteMetaState() throws Exception {
public void handleOperatorEvent(OperatorEvent event) {
ValidationUtils.checkArgument(event instanceof CommitAckEvent,
"The write function can only handle CommitAckEvent");
long checkpointId = ((CommitAckEvent) event).getCheckpointId();
if (checkpointId == -1 || checkpointId == this.checkpointId) {
this.confirming = false;
}
this.confirming = false;
}

/**
Expand Down Expand Up @@ -249,16 +240,9 @@ protected String instantToWrite(boolean hasData) {
// 2. the inflight instant does not change and the checkpoint has buffering data
if (instant == null || (instant.equals(this.currentInstant) && hasData)) {
// sleep for a while
boolean timeout = timeWait.waitFor();
if (timeout && instant != null) {
// if the timeout threshold hits but the last instant still not commit,
// and the task does not receive commit ask event(no data or aborted checkpoint),
// assumes the checkpoint was canceled silently and unblock the data flushing
confirming = false;
} else {
// refresh the inflight instant
instant = lastPendingInstant();
}
timeWait.waitFor();
// refresh the inflight instant
instant = lastPendingInstant();
} else {
// the pending instant changed, that means the last instant was committed
// successfully.
Expand Down
26 changes: 5 additions & 21 deletions hudi-flink/src/main/java/org/apache/hudi/sink/utils/TimeWait.java
Expand Up @@ -35,14 +35,13 @@ public class TimeWait {
private final long timeout; // timeout in SECONDS
private final long interval; // interval in MILLISECONDS
private final String action; // action to report error message
private final boolean throwsT; // whether to throw when timeout

private long waitingTime = 0L;

private TimeWait(long timeout, long interval, String action, boolean throwsT) {
private TimeWait(long timeout, long interval, String action) {
this.timeout = timeout;
this.interval = interval;
this.action = action;
this.throwsT = throwsT;
}

public static Builder builder() {
Expand All @@ -51,23 +50,14 @@ public static Builder builder() {

/**
* Wait for an interval time.
*
* @return true if is timed out
*/
public boolean waitFor() {
public void waitFor() {
try {
if (waitingTime > timeout) {
final String msg = "Timeout(" + waitingTime + "ms) while waiting for " + action;
if (this.throwsT) {
throw new HoodieException(msg);
} else {
LOG.warn(msg);
return true;
}
throw new HoodieException("Timeout(" + waitingTime + "ms) while waiting for " + action);
}
TimeUnit.MILLISECONDS.sleep(interval);
waitingTime += interval;
return false;
} catch (InterruptedException e) {
throw new HoodieException("Error while waiting for " + action, e);
}
Expand All @@ -80,7 +70,6 @@ public static class Builder {
private long timeout = 5 * 60 * 1000L; // default 5 minutes
private long interval = 1000;
private String action;
private boolean throwsT = false;

private Builder() {
}
Expand All @@ -102,14 +91,9 @@ public Builder action(String action) {
return this;
}

public Builder throwsT(boolean throwsT) {
this.throwsT = throwsT;
return this;
}

public TimeWait build() {
Objects.requireNonNull(this.action);
return new TimeWait(this.timeout, this.interval, this.action, this.throwsT);
return new TimeWait(this.timeout, this.interval, this.action);
}
}
}
Expand Up @@ -468,7 +468,7 @@ public static String getLastPendingInstant(HoodieTableMetaClient metaClient, boo
if (reloadTimeline) {
metaClient.reloadActiveTimeline();
}
return metaClient.getCommitsTimeline().filterInflightsAndRequested()
return metaClient.getCommitsTimeline().filterInflights()
.lastInstant()
.map(HoodieInstant::getTimestamp)
.orElse(null);
Expand Down
Expand Up @@ -95,8 +95,8 @@ public void testCheckpointFails() throws Exception {
.assertEmptyEvent()
.checkpointFails(1)
.consume(TestData.DATA_SET_INSERT)
.checkpointNotThrow(2,
"The stream writer reuse the last instant time when waiting for the last instant commit timeout")
.checkpointThrows(2,
"Timeout(1000ms) while waiting for instant initialize")
// do not send the write event and fails the checkpoint,
// behaves like the last checkpoint is successful.
.checkpointFails(2)
Expand Down Expand Up @@ -390,7 +390,8 @@ public void testWriteExactlyOnce() throws Exception {
.consume(TestData.DATA_SET_INSERT)
.assertNotConfirming()
.checkpoint(2)
.assertConsumeDoesNotThrow(TestData.DATA_SET_INSERT)
.assertConsumeThrows(TestData.DATA_SET_INSERT,
"Timeout(1000ms) while waiting for instant initialize")
.end();
}

Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestData;
Expand All @@ -51,11 +52,11 @@
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
Expand Down Expand Up @@ -151,10 +152,8 @@ public TestHarness consume(List<RowData> inputs) throws Exception {
return this;
}

public TestHarness assertConsumeDoesNotThrow(List<RowData> inputs) {
assertDoesNotThrow(() -> {
consume(inputs);
}, "The stream writer reuse the last instant time when waiting for the last instant commit timeout");
public TestHarness assertConsumeThrows(List<RowData> inputs, String message) {
assertThrows(HoodieException.class, () -> consume(inputs), message);
return this;
}

Expand Down Expand Up @@ -294,9 +293,9 @@ public TestHarness checkpointFails(long checkpointId) {
return this;
}

public TestHarness checkpointNotThrow(long checkpointId, String message) {
public TestHarness checkpointThrows(long checkpointId, String message) {
// this returns early because there is no inflight instant
assertDoesNotThrow(() -> checkpoint(checkpointId), message);
assertThrows(HoodieException.class, () -> checkpoint(checkpointId), message);
return this;
}

Expand Down