Skip to content

Commit

Permalink
Make finalize step of recovery source non-blocking (#37388)
Browse files Browse the repository at this point in the history
Relates #37291
  • Loading branch information
dnhatn committed Jan 15, 2019
1 parent 8803161 commit 821bb24
Show file tree
Hide file tree
Showing 11 changed files with 103 additions and 49 deletions.
13 changes: 13 additions & 0 deletions server/src/main/java/org/elasticsearch/action/ActionListener.java
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.CheckedSupplier;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -186,4 +187,16 @@ protected void innerOnFailure(Exception e) {
}
};
}

/**
* Completes the given listener with the result from the provided supplier accordingly.
* This method is mainly used to complete a listener with a block of synchronous code.
*/
static <Response> void completeWith(ActionListener<Response> listener, CheckedSupplier<Response, ? extends Exception> supplier) {
try {
listener.onResponse(supplier.get());
} catch (Exception e) {
listener.onFailure(e);
}
}
}
Expand Up @@ -37,10 +37,16 @@ public class ActionListenerResponseHandler<Response extends TransportResponse> i

private final ActionListener<? super Response> listener;
private final Writeable.Reader<Response> reader;
private final String executor;

public ActionListenerResponseHandler(ActionListener<? super Response> listener, Writeable.Reader<Response> reader) {
public ActionListenerResponseHandler(ActionListener<? super Response> listener, Writeable.Reader<Response> reader, String executor) {
this.listener = Objects.requireNonNull(listener);
this.reader = Objects.requireNonNull(reader);
this.executor = Objects.requireNonNull(executor);
}

public ActionListenerResponseHandler(ActionListener<? super Response> listener, Writeable.Reader<Response> reader) {
this(listener, reader, ThreadPool.Names.SAME);
}

@Override
Expand All @@ -55,7 +61,7 @@ public void handleException(TransportException e) {

@Override
public String executor() {
return ThreadPool.Names.SAME;
return executor;
}

@Override
Expand Down
Expand Up @@ -443,11 +443,12 @@ class FinalizeRecoveryRequestHandler implements TransportRequestHandler<Recovery

@Override
public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel) throws Exception {
try (RecoveryRef recoveryRef =
onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
recoveryRef.target().finalizeRecovery(request.globalCheckpoint());
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
final ActionListener<TransportResponse> listener =
new HandledTransportAction.ChannelActionListener<>(channel, Actions.FINALIZE, request);
recoveryRef.target().finalizeRecovery(request.globalCheckpoint(),
ActionListener.wrap(nullVal -> listener.onResponse(TransportResponse.Empty.INSTANCE), listener::onFailure));
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}

Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.StopWatch;
Expand Down Expand Up @@ -71,6 +72,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -137,6 +139,9 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));
throw e;
});
final Consumer<Exception> onFailure = e ->
IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));

runUnderPrimaryPermit(() -> {
final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable();
ShardRouting targetShardRouting = routingTable.getByAllocationId(request.targetAllocationId());
Expand Down Expand Up @@ -235,16 +240,21 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e);
}

finalizeRecovery(sendSnapshotResult.targetLocalCheckpoint);
final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time
assert resources.isEmpty() : "not every resource is released [" + resources + "]";
IOUtils.close(resources);
wrappedListener.onResponse(
new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes,
final StepListener<Void> finalizeStep = new StepListener<>();
finalizeRecovery(sendSnapshotResult.targetLocalCheckpoint, finalizeStep);
finalizeStep.whenComplete(r -> {
assert resources.isEmpty() : "not every resource is released [" + resources + "]";
final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time
final RecoveryResponse response = new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes,
sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize,
sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime, prepareEngineTime.millis(),
sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis())
);
sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime,
prepareEngineTime.millis(), sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis());
try {
wrappedListener.onResponse(response);
} finally {
IOUtils.close(resources);
}
}, onFailure);
} catch (Exception e) {
IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));
}
Expand Down Expand Up @@ -585,10 +595,7 @@ SendSnapshotResult phase2(long startingSeqNo, long requiredSeqNoRangeStart, long
return new SendSnapshotResult(targetLocalCheckpoint.get(), totalSentOps, tookTime);
}

/*
* finalizes the recovery process
*/
public void finalizeRecovery(final long targetLocalCheckpoint) throws IOException {
void finalizeRecovery(final long targetLocalCheckpoint, final ActionListener<Void> listener) throws IOException {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
Expand All @@ -604,21 +611,26 @@ public void finalizeRecovery(final long targetLocalCheckpoint) throws IOExceptio
runUnderPrimaryPermit(() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint),
shardId + " marking " + request.targetAllocationId() + " as in sync", shard, cancellableThreads, logger);
final long globalCheckpoint = shard.getGlobalCheckpoint();
cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint));
runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint),
shardId + " updating " + request.targetAllocationId() + "'s global checkpoint", shard, cancellableThreads, logger);

if (request.isPrimaryRelocation()) {
logger.trace("performing relocation hand-off");
// this acquires all IndexShard operation permits and will thus delay new recoveries until it is done
cancellableThreads.execute(() -> shard.relocated(recoveryTarget::handoffPrimaryContext));
/*
* if the recovery process fails after disabling primary mode on the source shard, both relocation source and
* target are failed (see {@link IndexShard#updateRoutingEntry}).
*/
}
stopWatch.stop();
logger.trace("finalizing recovery took [{}]", stopWatch.totalTime());
final StepListener<Void> finalizeListener = new StepListener<>();
cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint, finalizeListener));
finalizeListener.whenComplete(r -> {
runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint),
shardId + " updating " + request.targetAllocationId() + "'s global checkpoint", shard, cancellableThreads, logger);

if (request.isPrimaryRelocation()) {
logger.trace("performing relocation hand-off");
// TODO: make relocated async
// this acquires all IndexShard operation permits and will thus delay new recoveries until it is done
cancellableThreads.execute(() -> shard.relocated(recoveryTarget::handoffPrimaryContext));
/*
* if the recovery process fails after disabling primary mode on the source shard, both relocation source and
* target are failed (see {@link IndexShard#updateRoutingEntry}).
*/
}
stopWatch.stop();
logger.trace("finalizing recovery took [{}]", stopWatch.totalTime());
listener.onResponse(null);
}, listener::onFailure);
}

static final class SendSnapshotResult {
Expand Down
Expand Up @@ -375,12 +375,15 @@ public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTra
}

@Override
public void finalizeRecovery(final long globalCheckpoint) throws IOException {
final IndexShard indexShard = indexShard();
indexShard.updateGlobalCheckpointOnReplica(globalCheckpoint, "finalizing recovery");
// Persist the global checkpoint.
indexShard.sync();
indexShard.finalizeRecovery();
public void finalizeRecovery(final long globalCheckpoint, ActionListener<Void> listener) {
ActionListener.completeWith(listener, () -> {
final IndexShard indexShard = indexShard();
indexShard.updateGlobalCheckpointOnReplica(globalCheckpoint, "finalizing recovery");
// Persist the global checkpoint.
indexShard.sync();
indexShard.finalizeRecovery();
return null;
});
}

@Override
Expand Down
Expand Up @@ -43,8 +43,9 @@ public interface RecoveryTargetHandler {
* updates the global checkpoint.
*
* @param globalCheckpoint the global checkpoint on the recovery source
* @param listener the listener which will be notified when this method is completed
*/
void finalizeRecovery(long globalCheckpoint) throws IOException;
void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener);

/**
* Blockingly waits for cluster state with at least clusterStateVersion to be available
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportFuture;
import org.elasticsearch.transport.TransportRequestOptions;
Expand Down Expand Up @@ -85,11 +86,12 @@ public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTra
}

@Override
public void finalizeRecovery(final long globalCheckpoint) {
public void finalizeRecovery(final long globalCheckpoint, final ActionListener<Void> listener) {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.FINALIZE,
new RecoveryFinalizeRecoveryRequest(recoveryId, shardId, globalCheckpoint),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
new ActionListenerResponseHandler<>(ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure),
in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC));
}

@Override
Expand Down
Expand Up @@ -18,16 +18,20 @@
*/
package org.elasticsearch.action;

import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;

public class ActionListenerTests extends ESTestCase {

Expand Down Expand Up @@ -201,4 +205,16 @@ public void onFailure(Exception e) {
assertThat(onFailureTimes.get(), equalTo(1));
}
}

public void testCompleteWith() {
PlainActionFuture<Integer> onResponseListener = new PlainActionFuture<>();
ActionListener.completeWith(onResponseListener, () -> 100);
assertThat(onResponseListener.isDone(), equalTo(true));
assertThat(onResponseListener.actionGet(), equalTo(100));

PlainActionFuture<Integer> onFailureListener = new PlainActionFuture<>();
ActionListener.completeWith(onFailureListener, () -> { throw new IOException("not found"); });
assertThat(onFailureListener.isDone(), equalTo(true));
assertThat(expectThrows(ExecutionException.class, onFailureListener::get).getCause(), instanceOf(IOException.class));
}
}
Expand Up @@ -24,6 +24,7 @@
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.delete.DeleteRequest;
Expand Down Expand Up @@ -848,13 +849,13 @@ public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaDa
}

@Override
public void finalizeRecovery(long globalCheckpoint) throws IOException {
public void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener) {
if (hasBlocked() == false) {
// it maybe that not ops have been transferred, block now
blockIfNeeded(RecoveryState.Stage.TRANSLOG);
}
blockIfNeeded(RecoveryState.Stage.FINALIZE);
super.finalizeRecovery(globalCheckpoint);
super.finalizeRecovery(globalCheckpoint, listener);
}

}
Expand Down
Expand Up @@ -2585,9 +2585,8 @@ public long indexTranslogOperations(List<Translog.Operation> operations, int tot
}

@Override
public void finalizeRecovery(long globalCheckpoint) throws IOException {
super.finalizeRecovery(globalCheckpoint);
assertListenerCalled.accept(replica);
public void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener) {
super.finalizeRecovery(globalCheckpoint, ActionListener.runAfter(listener, () -> assertListenerCalled.accept(replica)));
}
}, false, true);

Expand Down
Expand Up @@ -686,7 +686,7 @@ public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTra
}

@Override
public void finalizeRecovery(long globalCheckpoint) {
public void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener) {
}

@Override
Expand Down

0 comments on commit 821bb24

Please sign in to comment.