Skip to content

Commit

Permalink
Sending operations concurrently in peer recovery (#58018)
Browse files Browse the repository at this point in the history
Today, we send operations in phase2 of peer recoveries batch by batch
sequentially. Normally that's okay as we should have a fairly small of
operations in phase 2 due to the file-based threshold. However, if
phase1 takes a lot of time and we are actively indexing, then phase2 can
have a lot of operations to replay.

With this change, we will send multiple batches concurrently (defaults
to 1) to reduce the recovery time.

Backport of #58018
  • Loading branch information
dnhatn committed Jul 8, 2020
1 parent 01fb727 commit ef5c397
Show file tree
Hide file tree
Showing 11 changed files with 311 additions and 168 deletions.
10 changes: 10 additions & 0 deletions docs/reference/modules/indices/recovery.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,13 @@ sent in parallel for each recovery. Defaults to `2`.
+
You can increase the value of this setting when the recovery of a single shard
is not reaching the traffic limit set by `indices.recovery.max_bytes_per_sec`.

`indices.recovery.max_concurrent_operations`::
(<<cluster-update-settings,Dynamic>>, Expert) Number of operations sent
in parallel for each recovery. Defaults to `1`.
+
Concurrently replaying operations during recovery can be very resource-intensive
and may interfere with indexing, search, and other activities in your cluster.
Do not increase this setting without carefully verifying that your cluster has
the resources available to handle the extra load that will result.

Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING,
RecoverySettings.INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.store.StoreFileMetadata;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -57,64 +56,64 @@
* one of the networking threads which receive/handle the responses of the current pending file chunk requests. This process will continue
* until all chunk requests are sent/responded.
*/
public abstract class MultiFileTransfer<Request extends MultiFileTransfer.ChunkRequest> implements Closeable {
public abstract class MultiChunkTransfer<Source, Request extends MultiChunkTransfer.ChunkRequest> implements Closeable {
private Status status = Status.PROCESSING;
private final Logger logger;
private final ActionListener<Void> listener;
private final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED);
private final AsyncIOProcessor<FileChunkResponseItem> processor;
private final int maxConcurrentFileChunks;
private StoreFileMetadata currentFile = null;
private final Iterator<StoreFileMetadata> remainingFiles;
private Tuple<StoreFileMetadata, Request> readAheadRequest = null;

protected MultiFileTransfer(Logger logger, ThreadContext threadContext, ActionListener<Void> listener,
int maxConcurrentFileChunks, List<StoreFileMetadata> files) {
private final AsyncIOProcessor<FileChunkResponseItem<Source>> processor;
private final int maxConcurrentChunks;
private Source currentSource = null;
private final Iterator<Source> remainingSources;
private Tuple<Source, Request> readAheadRequest = null;

protected MultiChunkTransfer(Logger logger, ThreadContext threadContext, ActionListener<Void> listener,
int maxConcurrentChunks, List<Source> sources) {
this.logger = logger;
this.maxConcurrentFileChunks = maxConcurrentFileChunks;
this.maxConcurrentChunks = maxConcurrentChunks;
this.listener = listener;
this.processor = new AsyncIOProcessor<FileChunkResponseItem>(logger, maxConcurrentFileChunks, threadContext) {
this.processor = new AsyncIOProcessor<FileChunkResponseItem<Source>>(logger, maxConcurrentChunks, threadContext) {
@Override
protected void write(List<Tuple<FileChunkResponseItem, Consumer<Exception>>> items) {
protected void write(List<Tuple<FileChunkResponseItem<Source>, Consumer<Exception>>> items) throws IOException {
handleItems(items);
}
};
this.remainingFiles = files.iterator();
this.remainingSources = sources.iterator();
}

public final void start() {
addItem(UNASSIGNED_SEQ_NO, null, null); // put a dummy item to start the processor
}

private void addItem(long requestSeqId, StoreFileMetadata md, Exception failure) {
processor.put(new FileChunkResponseItem(requestSeqId, md, failure), e -> { assert e == null : e; });
private void addItem(long requestSeqId, Source resource, Exception failure) {
processor.put(new FileChunkResponseItem<>(requestSeqId, resource, failure), e -> { assert e == null : e; });
}

private void handleItems(List<Tuple<FileChunkResponseItem, Consumer<Exception>>> items) {
private void handleItems(List<Tuple<FileChunkResponseItem<Source>, Consumer<Exception>>> items) {
if (status != Status.PROCESSING) {
assert status == Status.FAILED : "must not receive any response after the transfer was completed";
// These exceptions will be ignored as we record only the first failure, log them for debugging purpose.
items.stream().filter(item -> item.v1().failure != null).forEach(item ->
logger.debug(new ParameterizedMessage("failed to transfer a file chunk request {}", item.v1().md), item.v1().failure));
logger.debug(new ParameterizedMessage("failed to transfer a chunk request {}", item.v1().source), item.v1().failure));
return;
}
try {
for (Tuple<FileChunkResponseItem, Consumer<Exception>> item : items) {
final FileChunkResponseItem resp = item.v1();
for (Tuple<FileChunkResponseItem<Source>, Consumer<Exception>> item : items) {
final FileChunkResponseItem<Source> resp = item.v1();
if (resp.requestSeqId == UNASSIGNED_SEQ_NO) {
continue; // not an actual item
}
requestSeqIdTracker.markSeqNoAsProcessed(resp.requestSeqId);
if (resp.failure != null) {
handleError(resp.md, resp.failure);
handleError(resp.source, resp.failure);
throw resp.failure;
}
}
while (requestSeqIdTracker.getMaxSeqNo() - requestSeqIdTracker.getProcessedCheckpoint() < maxConcurrentFileChunks) {
final Tuple<StoreFileMetadata, Request> request = readAheadRequest != null ? readAheadRequest : getNextRequest();
while (requestSeqIdTracker.getMaxSeqNo() - requestSeqIdTracker.getProcessedCheckpoint() < maxConcurrentChunks) {
final Tuple<Source, Request> request = readAheadRequest != null ? readAheadRequest : getNextRequest();
readAheadRequest = null;
if (request == null) {
assert currentFile == null && remainingFiles.hasNext() == false;
assert currentSource == null && remainingSources.hasNext() == false;
if (requestSeqIdTracker.getMaxSeqNo() == requestSeqIdTracker.getProcessedCheckpoint()) {
onCompleted(null);
}
Expand Down Expand Up @@ -149,48 +148,50 @@ private void onCompleted(Exception failure) {
listener.onResponse(null);
}

private Tuple<StoreFileMetadata, Request> getNextRequest() throws Exception {
private Tuple<Source, Request> getNextRequest() throws Exception {
try {
if (currentFile == null) {
if (remainingFiles.hasNext()) {
currentFile = remainingFiles.next();
onNewFile(currentFile);
if (currentSource == null) {
if (remainingSources.hasNext()) {
currentSource = remainingSources.next();
onNewResource(currentSource);
} else {
return null;
}
}
final StoreFileMetadata md = currentFile;
final Source md = currentSource;
final Request request = nextChunkRequest(md);
if (request.lastChunk()) {
currentFile = null;
currentSource = null;
}
return Tuple.tuple(md, request);
} catch (Exception e) {
handleError(currentFile, e);
handleError(currentSource, e);
throw e;
}
}

/**
* This method is called when starting sending/requesting a new file. Subclasses should override
* This method is called when starting sending/requesting a new source. Subclasses should override
* this method to reset the file offset or close the previous file and open a new file if needed.
*/
protected abstract void onNewFile(StoreFileMetadata md) throws IOException;
protected void onNewResource(Source resource) throws IOException {

protected abstract Request nextChunkRequest(StoreFileMetadata md) throws IOException;
}

protected abstract Request nextChunkRequest(Source resource) throws IOException;

protected abstract void executeChunkRequest(Request request, ActionListener<Void> listener);

protected abstract void handleError(StoreFileMetadata md, Exception e) throws Exception;
protected abstract void handleError(Source resource, Exception e) throws Exception;

private static class FileChunkResponseItem {
private static class FileChunkResponseItem<Source> {
final long requestSeqId;
final StoreFileMetadata md;
final Source source;
final Exception failure;

FileChunkResponseItem(long requestSeqId, StoreFileMetadata md, Exception failure) {
FileChunkResponseItem(long requestSeqId, Source source, Exception failure) {
this.requestSeqId = requestSeqId;
this.md = md;
this.source = source;
this.failure = failure;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,9 @@ private Tuple<RecoverySourceHandler, RemoteRecoveryTargetHandler> createRecovery
new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService,
request.targetNode(), recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime));
handler = new RecoverySourceHandler(shard, recoveryTarget, shard.getThreadPool(), request,
Math.toIntExact(recoverySettings.getChunkSize().getBytes()), recoverySettings.getMaxConcurrentFileChunks());
Math.toIntExact(recoverySettings.getChunkSize().getBytes()),
recoverySettings.getMaxConcurrentFileChunks(),
recoverySettings.getMaxConcurrentOperations());
return Tuple.tuple(handler, recoveryTarget);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ public class RecoverySettings {
public static final Setting<Integer> INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING =
Setting.intSetting("indices.recovery.max_concurrent_file_chunks", 2, 1, 5, Property.Dynamic, Property.NodeScope);

/**
* Controls the maximum number of operation chunk requests that can be sent concurrently from the source node to the target node.
*/
public static final Setting<Integer> INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING =
Setting.intSetting("indices.recovery.max_concurrent_operations", 1, 1, 4, Property.Dynamic, Property.NodeScope);

/**
* how long to wait before retrying after issues cause by cluster state syncing between nodes
* i.e., local node is not yet known on remote node, remote shard not yet started etc.
Expand Down Expand Up @@ -91,6 +97,7 @@ public class RecoverySettings {

private volatile ByteSizeValue maxBytesPerSec;
private volatile int maxConcurrentFileChunks;
private volatile int maxConcurrentOperations;
private volatile SimpleRateLimiter rateLimiter;
private volatile TimeValue retryDelayStateSync;
private volatile TimeValue retryDelayNetwork;
Expand All @@ -104,6 +111,7 @@ public class RecoverySettings {
public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
this.retryDelayStateSync = INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.get(settings);
this.maxConcurrentFileChunks = INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING.get(settings);
this.maxConcurrentOperations = INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING.get(settings);
// doesn't have to be fast as nodes are reconnected every 10s by default (see InternalClusterService.ReconnectToNodes)
// and we want to give the master time to remove a faulty node
this.retryDelayNetwork = INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.get(settings);
Expand All @@ -125,6 +133,8 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {

clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, this::setMaxConcurrentFileChunks);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING,
this::setMaxConcurrentOperations);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING, this::setRetryDelayStateSync);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING, this::setRetryDelayNetwork);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING, this::setInternalActionTimeout);
Expand Down Expand Up @@ -209,4 +219,12 @@ public int getMaxConcurrentFileChunks() {
private void setMaxConcurrentFileChunks(int maxConcurrentFileChunks) {
this.maxConcurrentFileChunks = maxConcurrentFileChunks;
}

public int getMaxConcurrentOperations() {
return maxConcurrentOperations;
}

private void setMaxConcurrentOperations(int maxConcurrentOperations) {
this.maxConcurrentOperations = maxConcurrentOperations;
}
}

0 comments on commit ef5c397

Please sign in to comment.