Skip to content
Permalink
Browse files
[FLINK-27251][checkpoint] Timeout aligned to unaligned checkpoint bar…
…rier in the output buffers
  • Loading branch information
1996fanrui authored and pnowojski committed May 25, 2022
1 parent 10b7afa commit dd8f4e2603309493300099396568ffc681e76e80
Showing 25 changed files with 766 additions and 42 deletions.
@@ -186,6 +186,10 @@ public boolean isUnalignedCheckpoint() {
return alignmentType == AlignmentType.UNALIGNED;
}

public boolean needsChannelState() {
return isUnalignedCheckpoint() || isTimeoutable();
}

public CheckpointOptions withUnalignedSupported() {
if (alignmentType == AlignmentType.FORCED_ALIGNED) {
return alignedCheckpointTimeout != NO_ALIGNED_CHECKPOINT_TIME_OUT
@@ -196,7 +200,7 @@ public CheckpointOptions withUnalignedSupported() {
}

public CheckpointOptions withUnalignedUnsupported() {
if (isUnalignedCheckpoint() || isTimeoutable()) {
if (needsChannelState()) {
return forceAligned(checkpointType, targetLocation, alignedCheckpointTimeout);
}
return this;
@@ -24,6 +24,9 @@
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;

@@ -72,6 +75,48 @@ static ChannelStateWriteRequest write(
(writer, buffer) -> writer.writeOutput(info, buffer));
}

static ChannelStateWriteRequest write(
long checkpointId,
ResultSubpartitionInfo info,
CompletableFuture<List<Buffer>> dataFuture) {
return buildFutureWriteRequest(
checkpointId,
"writeOutputFuture",
dataFuture,
(writer, buffer) -> writer.writeOutput(info, buffer));
}

static ChannelStateWriteRequest buildFutureWriteRequest(
long checkpointId,
String name,
CompletableFuture<List<Buffer>> dataFuture,
BiConsumer<ChannelStateCheckpointWriter, Buffer> bufferConsumer) {
return new CheckpointInProgressRequest(
name,
checkpointId,
writer -> {
List<Buffer> buffers;
try {
buffers = dataFuture.get();
} catch (ExecutionException e) {
// If dataFuture fails, fail only the single related writer
writer.fail(e);
return;
}
for (Buffer buffer : buffers) {
checkBufferIsBuffer(buffer);
bufferConsumer.accept(writer, buffer);
}
},
throwable -> {
try {
CloseableIterator.fromList(dataFuture.get(), Buffer::recycleBuffer).close();
} catch (ExecutionException ignored) {
}
},
false);
}

static ChannelStateWriteRequest buildWriteRequest(
long checkpointId,
String name,
@@ -83,19 +128,23 @@ static ChannelStateWriteRequest buildWriteRequest(
writer -> {
while (iterator.hasNext()) {
Buffer buffer = iterator.next();
try {
checkArgument(buffer.isBuffer());
} catch (Exception e) {
buffer.recycleBuffer();
throw e;
}
checkBufferIsBuffer(buffer);
bufferConsumer.accept(writer, buffer);
}
},
throwable -> iterator.close(),
false);
}

static void checkBufferIsBuffer(Buffer buffer) {
try {
checkArgument(buffer.isBuffer());
} catch (Exception e) {
buffer.recycleBuffer();
throw e;
}
}

static ChannelStateWriteRequest start(
long checkpointId,
ChannelStateWriteResult targetResult,
@@ -27,6 +27,7 @@
import java.io.Closeable;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;

/** Writes channel state during checkpoint/savepoint. */
@@ -128,6 +129,22 @@ void addOutputData(
long checkpointId, ResultSubpartitionInfo info, int startSeqNum, Buffer... data)
throws IllegalArgumentException;

/**
* Add in-flight bufferFuture from the {@link
* org.apache.flink.runtime.io.network.partition.ResultSubpartition ResultSubpartition}. Must be
* called after {@link #start} and before {@link #finishOutput(long)}. Buffers are recycled
* after they are written or exception occurs.
*
* <p>The method will be called when the unaligned checkpoint is enabled and received an aligned
* barrier.
*/
void addOutputDataFuture(
long checkpointId,
ResultSubpartitionInfo info,
int startSeqNum,
CompletableFuture<List<Buffer>> data)
throws IllegalArgumentException;

/**
* Finalize write of channel state data for the given checkpoint id. Must be called after {@link
* #start(long, CheckpointOptions)} and all of the input data of the given checkpoint added.
@@ -178,6 +195,13 @@ public void addInputData(
public void addOutputData(
long checkpointId, ResultSubpartitionInfo info, int startSeqNum, Buffer... data) {}

@Override
public void addOutputDataFuture(
long checkpointId,
ResultSubpartitionInfo info,
int startSeqNum,
CompletableFuture<List<Buffer>> data) {}

@Override
public void finishInput(long checkpointId) {}

@@ -31,6 +31,8 @@
import javax.annotation.concurrent.ThreadSafe;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

@@ -171,6 +173,22 @@ public void addOutputData(
enqueue(write(checkpointId, info, data), false);
}

@Override
public void addOutputDataFuture(
long checkpointId,
ResultSubpartitionInfo info,
int startSeqNum,
CompletableFuture<List<Buffer>> dataFuture)
throws IllegalArgumentException {
LOG.trace(
"{} adding output data future, checkpoint {}, channel: {}, startSeqNum: {}",
taskName,
checkpointId,
info,
startSeqNum);
enqueue(write(checkpointId, info, dataFuture), false);
}

@Override
public void finishInput(long checkpointId) {
LOG.debug("{} finishing input data, checkpoint {}", taskName, checkpointId);
@@ -22,6 +22,7 @@
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
@@ -122,6 +123,14 @@ public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws
}
}

public void alignedBarrierTimeout(long checkpointId) throws IOException {
targetPartition.alignedBarrierTimeout(checkpointId);
}

public void abortCheckpoint(long checkpointId, CheckpointException cause) {
targetPartition.abortCheckpoint(checkpointId, cause);
}

@VisibleForTesting
public static ByteBuffer serializeRecord(
DataOutputSerializer serializer, IOReadableWritable record) throws IOException {
@@ -18,6 +18,7 @@

package org.apache.flink.runtime.io.network.api.writer;

import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.io.network.api.StopMode;
@@ -67,6 +68,12 @@ public interface ResultPartitionWriter extends AutoCloseable, AvailabilityProvid
/** Writes the given {@link AbstractEvent} to all channels. */
void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException;

/** Timeout the aligned barrier to unaligned barrier. */
void alignedBarrierTimeout(long checkpointId) throws IOException;

/** Abort the checkpoint. */
void abortCheckpoint(long checkpointId, CheckpointException cause);

/**
* Notifies the downstream tasks that this {@code ResultPartitionWriter} have emitted all the
* user records.
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.network.partition;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -291,6 +292,16 @@ protected long getTotalNumberOfBytesUnsafe() {
return data.getSize();
}

@Override
public void alignedBarrierTimeout(long checkpointId) {
// Nothing to do.
}

@Override
public void abortCheckpoint(long checkpointId, CheckpointException cause) {
// Nothing to do.
}

int getBuffersInBacklogUnsafe() {
return numDataBuffersWritten;
}
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.network.partition;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
@@ -206,6 +207,20 @@ public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws
}
}

@Override
public void alignedBarrierTimeout(long checkpointId) throws IOException {
for (ResultSubpartition subpartition : subpartitions) {
subpartition.alignedBarrierTimeout(checkpointId);
}
}

@Override
public void abortCheckpoint(long checkpointId, CheckpointException cause) {
for (ResultSubpartition subpartition : subpartitions) {
subpartition.abortCheckpoint(checkpointId, cause);
}
}

@Override
public void setMetricGroup(TaskIOMetricGroup metrics) {
super.setMetricGroup(metrics);

0 comments on commit dd8f4e2

Please sign in to comment.