Skip to content

Commit

Permalink
Reduce recovery time with compress or secure transport (#36981)
Browse files Browse the repository at this point in the history
Today file-chunks are sent sequentially one by one in peer-recovery. This is a
correct choice since the implementation is straightforward and recovery is
network bound in most of the time. However, if the connection is encrypted, we
might not be able to saturate the network pipe because encrypting/decrypting
are cpu bound rather than network-bound.

With this commit, a source node can send multiple (default to 2) file-chunks
without waiting for the acknowledgments from the target.

Below are the benchmark results for PMC and NYC_taxis.

- PMC (20.2 GB)

| Transport | Baseline | chunks=1 | chunks=2 | chunks=3 | chunks=4 |
| ----------| ---------| -------- | -------- | -------- | -------- |
| Plain     | 184s     | 137s     | 106s     | 105s     | 106s     |
| TLS       | 346s     | 294s     | 176s     | 153s     | 117s     |
| Compress  | 1556s    | 1407s    | 1193s    | 1183s    | 1211s    |

- NYC_Taxis (38.6GB)

| Transport | Baseline | chunks=1 | chunks=2 | chunks=3 | chunks=4 |
| ----------| ---------| ---------| ---------| ---------| -------- |
| Plain     | 321s     | 249s     | 191s     |  *       | *        |
| TLS       | 618s     | 539s     | 323s     | 290s     | 213s     |
| Compress  | 2622s    | 2421s    | 2018s    | 2029s    | n/a      |

Relates #33844
  • Loading branch information
dnhatn committed Jan 15, 2019
1 parent ddd7878 commit 8803161
Show file tree
Hide file tree
Showing 14 changed files with 559 additions and 144 deletions.
9 changes: 9 additions & 0 deletions docs/reference/modules/indices/recovery.asciidoc
Expand Up @@ -20,5 +20,14 @@ peer recoveries:
consume an excess of bandwidth (or other resources) which could destabilize
the cluster. Defaults to `40mb`.

`indices.recovery.max_concurrent_file_chunks`::
Controls the number of file chunk requests that can be sent in parallel per recovery.
As multiple recoveries are already running in parallel (controlled by
cluster.routing.allocation.node_concurrent_recoveries), increasing this expert-level
setting might only help in situations where peer recovery of a single shard is not
reaching the total inbound and outbound peer recovery traffic as configured by
indices.recovery.max_bytes_per_sec, but is CPU-bound instead, typically when using
transport-level security or compression. Defaults to `1`.

This setting can be dynamically updated on a live cluster with the
<<cluster-update-settings,cluster-update-settings>> API.
Expand Up @@ -203,6 +203,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING,
RecoverySettings.INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING,
Expand Down
Expand Up @@ -175,7 +175,8 @@ private RecoverySourceHandler createRecoverySourceHandler(StartRecoveryRequest r
final RemoteRecoveryTargetHandler recoveryTarget =
new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService,
request.targetNode(), recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime));
handler = new RecoverySourceHandler(shard, recoveryTarget, request, recoverySettings.getChunkSize().bytesAsInt());
handler = new RecoverySourceHandler(shard, recoveryTarget, request,
Math.toIntExact(recoverySettings.getChunkSize().getBytes()), recoverySettings.getMaxConcurrentFileChunks());
return handler;
}
}
Expand Down
Expand Up @@ -29,6 +29,8 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
Expand Down Expand Up @@ -598,8 +600,7 @@ class FileChunkTransportRequestHandler implements TransportRequestHandler<Recove

@Override
public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
final RecoveryTarget recoveryTarget = recoveryRef.target();
final RecoveryState.Index indexState = recoveryTarget.state().getIndex();
if (request.sourceThrottleTimeInNanos() != RecoveryState.Index.UNKNOWN) {
Expand All @@ -617,12 +618,12 @@ public void messageReceived(final RecoveryFileChunkRequest request, TransportCha
recoveryTarget.indexShard().recoveryStats().addThrottleTime(throttleTimeInNanos);
}
}

recoveryTarget.writeFileChunk(request.metadata(), request.position(), request.content(),
request.lastChunk(), request.totalTranslogOps()
);
final ActionListener<TransportResponse> listener =
new HandledTransportAction.ChannelActionListener<>(channel, Actions.FILE_CHUNK, request);
recoveryTarget.writeFileChunk(request.metadata(), request.position(), request.content(), request.lastChunk(),
request.totalTranslogOps(),
ActionListener.wrap(nullVal -> listener.onResponse(TransportResponse.Empty.INSTANCE), listener::onFailure));
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}

Expand Down
Expand Up @@ -39,6 +39,12 @@ public class RecoverySettings {
Setting.byteSizeSetting("indices.recovery.max_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB),
Property.Dynamic, Property.NodeScope);

/**
* Controls the maximum number of file chunk requests that can be sent concurrently from the source node to the target node.
*/
public static final Setting<Integer> INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING =
Setting.intSetting("indices.recovery.max_concurrent_file_chunks", 1, 1, 5, Property.Dynamic, Property.NodeScope);

/**
* how long to wait before retrying after issues cause by cluster state syncing between nodes
* i.e., local node is not yet known on remote node, remote shard not yet started etc.
Expand Down Expand Up @@ -78,6 +84,7 @@ public class RecoverySettings {
public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB);

private volatile ByteSizeValue maxBytesPerSec;
private volatile int maxConcurrentFileChunks;
private volatile SimpleRateLimiter rateLimiter;
private volatile TimeValue retryDelayStateSync;
private volatile TimeValue retryDelayNetwork;
Expand All @@ -89,6 +96,7 @@ public class RecoverySettings {

public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
this.retryDelayStateSync = INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.get(settings);
this.maxConcurrentFileChunks = INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING.get(settings);
// doesn't have to be fast as nodes are reconnected every 10s by default (see InternalClusterService.ReconnectToNodes)
// and we want to give the master time to remove a faulty node
this.retryDelayNetwork = INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.get(settings);
Expand All @@ -108,6 +116,7 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
logger.debug("using max_bytes_per_sec[{}]", maxBytesPerSec);

clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, this::setMaxConcurrentFileChunks);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING, this::setRetryDelayStateSync);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING, this::setRetryDelayNetwork);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING, this::setInternalActionTimeout);
Expand Down Expand Up @@ -180,4 +189,12 @@ private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) {
rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac());
}
}

public int getMaxConcurrentFileChunks() {
return maxConcurrentFileChunks;
}

private void setMaxConcurrentFileChunks(int maxConcurrentFileChunks) {
this.maxConcurrentFileChunks = maxConcurrentFileChunks;
}
}
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
Expand All @@ -44,7 +45,6 @@
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.core.internal.io.Streams;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
Expand All @@ -59,10 +59,9 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteTransportException;

import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
Expand All @@ -71,10 +70,12 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.StreamSupport;

import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;

/**
* RecoverySourceHandler handles the three phases of shard recovery, which is
* everything relating to copying the segment files as well as sending translog
Expand All @@ -96,17 +97,19 @@ public class RecoverySourceHandler {
private final StartRecoveryRequest request;
private final int chunkSizeInBytes;
private final RecoveryTargetHandler recoveryTarget;
private final int maxConcurrentFileChunks;
private final CancellableThreads cancellableThreads = new CancellableThreads();

public RecoverySourceHandler(final IndexShard shard, RecoveryTargetHandler recoveryTarget,
final StartRecoveryRequest request,
final int fileChunkSizeInBytes) {
public RecoverySourceHandler(final IndexShard shard, RecoveryTargetHandler recoveryTarget, final StartRecoveryRequest request,
final int fileChunkSizeInBytes, final int maxConcurrentFileChunks) {
this.shard = shard;
this.recoveryTarget = recoveryTarget;
this.request = request;
this.shardId = this.request.shardId().id();
this.logger = Loggers.getLogger(getClass(), request.shardId(), "recover to " + request.targetNode().getName());
this.chunkSizeInBytes = fileChunkSizeInBytes;
// if the target is on an old version, it won't be able to handle out-of-order file chunks.
this.maxConcurrentFileChunks = request.targetNode().getVersion().onOrAfter(Version.V_6_7_0) ? maxConcurrentFileChunks : 1;
}

public StartRecoveryRequest getRequest() {
Expand Down Expand Up @@ -407,10 +410,7 @@ public SendFileResult phase1(final IndexCommit snapshot, final Supplier<Integer>
phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize));
cancellableThreads.execute(() -> recoveryTarget.receiveFileInfo(
phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, translogOps.get()));
// How many bytes we've copied since we last called RateLimiter.pause
final Function<StoreFileMetaData, OutputStream> outputStreamFactories =
md -> new BufferedOutputStream(new RecoveryOutputStream(md, translogOps), chunkSizeInBytes);
sendFiles(store, phase1Files.toArray(new StoreFileMetaData[phase1Files.size()]), outputStreamFactories);
sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps);
// Send the CLEAN_FILES request, which takes all of the files that
// were transferred and renames them from their temporary file
// names to the actual file names. It also writes checksums for
Expand Down Expand Up @@ -649,73 +649,72 @@ public String toString() {
'}';
}


final class RecoveryOutputStream extends OutputStream {
private final StoreFileMetaData md;
private final Supplier<Integer> translogOps;
private long position = 0;

RecoveryOutputStream(StoreFileMetaData md, Supplier<Integer> translogOps) {
this.md = md;
this.translogOps = translogOps;
}

@Override
public void write(int b) throws IOException {
throw new UnsupportedOperationException("we can't send single bytes over the wire");
void sendFiles(Store store, StoreFileMetaData[] files, Supplier<Integer> translogOps) throws Exception {
ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetaData::length)); // send smallest first
final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED);
final AtomicReference<Tuple<StoreFileMetaData, Exception>> error = new AtomicReference<>();
final byte[] buffer = new byte[chunkSizeInBytes];
for (final StoreFileMetaData md : files) {
if (error.get() != null) {
break;
}
try (IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE);
InputStream in = new InputStreamIndexInput(indexInput, md.length())) {
long position = 0;
int bytesRead;
while ((bytesRead = in.read(buffer, 0, buffer.length)) != -1) {
final BytesArray content = new BytesArray(buffer, 0, bytesRead);
final boolean lastChunk = position + content.length() == md.length();
final long requestSeqId = requestSeqIdTracker.generateSeqNo();
cancellableThreads.execute(() -> requestSeqIdTracker.waitForOpsToComplete(requestSeqId - maxConcurrentFileChunks));
cancellableThreads.checkForCancel();
if (error.get() != null) {
break;
}
final long requestFilePosition = position;
cancellableThreads.executeIO(() ->
recoveryTarget.writeFileChunk(md, requestFilePosition, content, lastChunk, translogOps.get(),
ActionListener.wrap(
r -> requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId),
e -> {
error.compareAndSet(null, Tuple.tuple(md, e));
requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
}
)));
position += content.length();
}
} catch (Exception e) {
error.compareAndSet(null, Tuple.tuple(md, e));
break;
}
}

@Override
public void write(byte[] b, int offset, int length) throws IOException {
sendNextChunk(position, new BytesArray(b, offset, length), md.length() == position + length);
position += length;
assert md.length() >= position : "length: " + md.length() + " but positions was: " + position;
// When we terminate exceptionally, we don't wait for the outstanding requests as we don't use their results anyway.
// This allows us to end quickly and eliminate the complexity of handling requestSeqIds in case of error.
if (error.get() == null) {
cancellableThreads.execute(() -> requestSeqIdTracker.waitForOpsToComplete(requestSeqIdTracker.getMaxSeqNo()));
}

private void sendNextChunk(long position, BytesArray content, boolean lastChunk) throws IOException {
// Actually send the file chunk to the target node, waiting for it to complete
cancellableThreads.executeIO(() ->
recoveryTarget.writeFileChunk(md, position, content, lastChunk, translogOps.get())
);
if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us
throw new IndexShardClosedException(request.shardId());
}
if (error.get() != null) {
handleErrorOnSendFiles(store, error.get().v1(), error.get().v2());
}
}

void sendFiles(Store store, StoreFileMetaData[] files, Function<StoreFileMetaData, OutputStream> outputStreamFactory) throws Exception {
store.incRef();
try {
ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetaData::length)); // send smallest first
for (int i = 0; i < files.length; i++) {
final StoreFileMetaData md = files[i];
try (IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE)) {
// it's fine that we are only having the indexInput in the try/with block. The copy methods handles
// exceptions during close correctly and doesn't hide the original exception.
Streams.copy(new InputStreamIndexInput(indexInput, md.length()), outputStreamFactory.apply(md));
} catch (Exception e) {
final IOException corruptIndexException;
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) {
if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail!
logger.warn("{} Corrupted file detected {} checksum mismatch", shardId, md);
failEngine(corruptIndexException);
throw corruptIndexException;
} else { // corruption has happened on the way to replica
RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but " +
"checksums are ok", null);
exception.addSuppressed(e);
logger.warn(() -> new ParameterizedMessage(
"{} Remote file corruption on node {}, recovering {}. local checksum OK",
shardId, request.targetNode(), md), corruptIndexException);
throw exception;
}
} else {
throw e;
}
}
private void handleErrorOnSendFiles(Store store, StoreFileMetaData md, Exception e) throws Exception {
final IOException corruptIndexException;
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) {
if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail!
logger.warn("{} Corrupted file detected {} checksum mismatch", shardId, md);
failEngine(corruptIndexException);
throw corruptIndexException;
} else { // corruption has happened on the way to replica
RemoteTransportException exception = new RemoteTransportException(
"File corruption occurred on recovery but checksums are ok", null);
exception.addSuppressed(e);
logger.warn(() -> new ParameterizedMessage("{} Remote file corruption on node {}, recovering {}. local checksum OK",
shardId, request.targetNode(), md), corruptIndexException);
throw exception;
}
} finally {
store.decRef();
} else {
throw e;
}
}

Expand Down

0 comments on commit 8803161

Please sign in to comment.