Skip to content

Commit

Permalink
[FLINK-33932][checkpointing] Add retry mechanism in RocksDBStateDataT…
Browse files Browse the repository at this point in the history
…ransfer
  • Loading branch information
xiangyuf committed Jan 7, 2024
1 parent 793a66b commit 7ca00fd
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,16 @@
package org.apache.flink.contrib.streaming.state;

import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FixedRetryStrategy;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;

import java.io.Closeable;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import static org.apache.flink.util.concurrent.Executors.newDirectExecutorService;

Expand All @@ -30,6 +36,12 @@ class RocksDBStateDataTransfer implements Closeable {

protected final ExecutorService executorService;

/** ExecutorService to run data transfer operations that can be retried on exceptions. */
protected final ScheduledExecutorService retryExecutorService;

private static final int DEFAULT_RETRY_TIMES = 3;
private static final Duration DEFAULT_RETRY_DELAY = Duration.ofSeconds(1L);

RocksDBStateDataTransfer(int threadNum) {
if (threadNum > 1) {
executorService =
Expand All @@ -38,10 +50,23 @@ class RocksDBStateDataTransfer implements Closeable {
} else {
executorService = newDirectExecutorService();
}

retryExecutorService =
Executors.newSingleThreadScheduledExecutor(
new ExecutorThreadFactory("Flink-RocksDBStateDataTransfer-Retry"));
}

@Override
public void close() {
executorService.shutdownNow();
retryExecutorService.shutdownNow();
}

protected <T> CompletableFuture<T> retry(CompletableFuture<T> dataTransferOperation) {
return FutureUtils.retryWithDelay(
() -> dataTransferOperation,
new FixedRetryStrategy(DEFAULT_RETRY_TIMES, DEFAULT_RETRY_DELAY),
throwable -> true,
new ScheduledExecutorServiceAdapter(retryExecutorService));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@

import org.apache.flink.shaded.guava31.com.google.common.collect.Streams;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
Expand All @@ -37,10 +40,12 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/** Help class for downloading RocksDB state files. */
public class RocksDBStateDownloader extends RocksDBStateDataTransfer {

private static final Logger LOG = LoggerFactory.getLogger(RocksDBStateDownloader.class);

public RocksDBStateDownloader(int restoringThreadNum) {
super(restoringThreadNum);
}
Expand All @@ -62,8 +67,7 @@ public void transferAllStateDataToDirectory(
closeableRegistry.registerCloseable(internalCloser);
try {
List<CompletableFuture<Void>> futures =
transferAllStateDataToDirectoryAsync(downloadRequests, internalCloser)
.collect(Collectors.toList());
transferAllStateDataToDirectoryAsync(downloadRequests, internalCloser);
// Wait until either all futures completed successfully or one failed exceptionally.
FutureUtils.completeAll(futures).get();
} catch (Exception e) {
Expand All @@ -88,7 +92,7 @@ public void transferAllStateDataToDirectory(
}

/** Asynchronously runs the specified download requests on executorService. */
private Stream<CompletableFuture<Void>> transferAllStateDataToDirectoryAsync(
private List<CompletableFuture<Void>> transferAllStateDataToDirectoryAsync(
Collection<StateHandleDownloadSpec> handleWithPaths,
CloseableRegistry closeableRegistry) {
return handleWithPaths.stream()
Expand Down Expand Up @@ -117,7 +121,8 @@ private Stream<CompletableFuture<Void>> transferAllStateDataToDirectoryAsync(
remoteFileHandle,
closeableRegistry));
}))
.map(runnable -> CompletableFuture.runAsync(runnable, executorService));
.map(runnable -> retry(CompletableFuture.runAsync(runnable, executorService)))
.collect(Collectors.toList());
}

/** Copies the file from a single state handle to the given path. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,16 @@

/** Help class for uploading RocksDB state files. */
public class RocksDBStateUploader extends RocksDBStateDataTransfer {

private static final int READ_BUFFER_SIZE = 16 * 1024;

public RocksDBStateUploader(int numberOfSnapshottingThreads) {
super(numberOfSnapshottingThreads);
}

/**
* Upload all the files to checkpoint fileSystem using specified number of threads.
* Upload all the files to checkpoint fileSystem using specified number of threads. When the
* upload runs into problems, it will retry with a fixed number of times at a fixed interval.
*
* @param files The files will be uploaded to checkpoint filesystem.
* @param checkpointStreamFactory The checkpoint streamFactory used to create outputstream.
Expand Down Expand Up @@ -105,16 +107,17 @@ private List<CompletableFuture<HandleAndLocalPath>> createUploadFutures(
return files.stream()
.map(
e ->
CompletableFuture.supplyAsync(
CheckedSupplier.unchecked(
() ->
uploadLocalFileToCheckpointFs(
e,
checkpointStreamFactory,
stateScope,
closeableRegistry,
tmpResourcesRegistry)),
executorService))
retry(
CompletableFuture.supplyAsync(
CheckedSupplier.unchecked(
() ->
uploadLocalFileToCheckpointFs(
e,
checkpointStreamFactory,
stateScope,
closeableRegistry,
tmpResourcesRegistry)),
executorService)))
.collect(Collectors.toList());
}

Expand Down

0 comments on commit 7ca00fd

Please sign in to comment.