Skip to content

Commit

Permalink
Snapshot/Restore: restore with wait_for_completion=true should wait f…
Browse files Browse the repository at this point in the history
…or succesfully restored shards to get started

This commit ensures that restore operation with wait_for_completion=true doesn't return until all successfully restored shards are started. Before it was returning as soon as restore operation was over, which cause some shards to be unavailable immediately after restore completion.

Fixes elastic#8340
  • Loading branch information
imotov committed Nov 25, 2014
1 parent 065098a commit 060a371
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 90 deletions.
Expand Up @@ -31,7 +31,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.RestoreService.RestoreSnapshotListener;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand Down Expand Up @@ -75,25 +74,26 @@ protected void masterOperation(final RestoreSnapshotRequest request, ClusterStat
request.indices(), request.indicesOptions(), request.renamePattern(), request.renameReplacement(),
request.settings(), request.masterNodeTimeout(), request.includeGlobalState(), request.partial(), request.includeAliases());

restoreService.restoreSnapshot(restoreRequest, new RestoreSnapshotListener() {
restoreService.restoreSnapshot(restoreRequest, new ActionListener<RestoreInfo>() {
@Override
public void onResponse(RestoreInfo restoreInfo) {
if (restoreInfo == null) {
if (request.waitForCompletion()) {
restoreService.addListener(new RestoreService.RestoreCompletionListener() {
SnapshotId snapshotId = new SnapshotId(request.repository(), request.snapshot());
if (restoreInfo == null && request.waitForCompletion()) {
restoreService.addListener(new ActionListener<RestoreService.RestoreCompletionResponse>() {
SnapshotId snapshotId = new SnapshotId(request.repository(), request.snapshot());

@Override
public void onRestoreCompletion(SnapshotId snapshotId, RestoreInfo snapshot) {
if (this.snapshotId.equals(snapshotId)) {
listener.onResponse(new RestoreSnapshotResponse(snapshot));
restoreService.removeListener(this);
}
@Override
public void onResponse(RestoreService.RestoreCompletionResponse restoreCompletionResponse) {
if (this.snapshotId.equals(restoreCompletionResponse.getSnapshotId())) {
listener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.getRestoreInfo()));
restoreService.removeListener(this);
}
});
} else {
listener.onResponse(new RestoreSnapshotResponse(null));
}
}

@Override
public void onFailure(Throwable e) {
listener.onFailure(e);
}
});
} else {
listener.onResponse(new RestoreSnapshotResponse(restoreInfo));
}
Expand Down
139 changes: 88 additions & 51 deletions src/main/java/org/elasticsearch/snapshots/RestoreService.java
Expand Up @@ -25,6 +25,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.block.ClusterBlocks;
Expand All @@ -33,6 +34,7 @@
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -46,10 +48,7 @@
import org.elasticsearch.transport.*;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;

import static com.google.common.collect.Lists.newArrayList;
Expand All @@ -61,7 +60,7 @@
* <p/>
* Restore operation is performed in several stages.
* <p/>
* First {@link #restoreSnapshot(RestoreRequest, RestoreSnapshotListener)}
* First {@link #restoreSnapshot(RestoreRequest, org.elasticsearch.action.ActionListener))}
* method reads information about snapshot and metadata from repository. In update cluster state task it checks restore
* preconditions, restores global state if needed, creates {@link RestoreMetaData} record with list of shards that needs
* to be restored and adds this shard to the routing table using {@link RoutingTable.Builder#addAsRestore(IndexMetaData, RestoreSource)}
Expand Down Expand Up @@ -90,7 +89,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis

private final MetaDataCreateIndexService createIndexService;

private final CopyOnWriteArrayList<RestoreCompletionListener> listeners = new CopyOnWriteArrayList<>();
private final CopyOnWriteArrayList<ActionListener<RestoreCompletionResponse>> listeners = new CopyOnWriteArrayList<>();

@Inject
public RestoreService(Settings settings, ClusterService clusterService, RepositoriesService repositoriesService, TransportService transportService, AllocationService allocationService, MetaDataCreateIndexService createIndexService) {
Expand All @@ -110,7 +109,7 @@ public RestoreService(Settings settings, ClusterService clusterService, Reposito
* @param request restore request
* @param listener restore listener
*/
public void restoreSnapshot(final RestoreRequest request, final RestoreSnapshotListener listener) {
public void restoreSnapshot(final RestoreRequest request, final ActionListener<RestoreInfo> listener) {
try {
// Read snapshot info and metadata from the repository
Repository repository = repositoriesService.repository(request.repository());
Expand Down Expand Up @@ -326,6 +325,24 @@ public void indexShardRestoreCompleted(SnapshotId snapshotId, ShardId shardId) {
}
}

public final static class RestoreCompletionResponse {
private final SnapshotId snapshotId;
private final RestoreInfo restoreInfo;

private RestoreCompletionResponse(SnapshotId snapshotId, RestoreInfo restoreInfo) {
this.snapshotId = snapshotId;
this.restoreInfo = restoreInfo;
}

public SnapshotId getSnapshotId() {
return snapshotId;
}

public RestoreInfo getRestoreInfo() {
return restoreInfo;
}
}

/**
* Updates shard restore record in the cluster state.
*
Expand All @@ -335,6 +352,7 @@ private void innerUpdateRestoreState(final UpdateIndexShardRestoreStatusRequest
clusterService.submitStateUpdateTask("update snapshot state", new ProcessedClusterStateUpdateTask() {

private RestoreInfo restoreInfo = null;
private HashMap<ShardId, ShardRestoreStatus> shards = null;

@Override
public ClusterState execute(ClusterState currentState) {
Expand All @@ -343,9 +361,12 @@ public ClusterState execute(ClusterState currentState) {
RestoreMetaData restore = metaData.custom(RestoreMetaData.TYPE);
if (restore != null) {
boolean changed = false;
boolean found = false;
ArrayList<RestoreMetaData.Entry> entries = newArrayList();
for (RestoreMetaData.Entry entry : restore.entries()) {
if (entry.snapshotId().equals(request.snapshotId())) {
assert !found;
found = true;
HashMap<ShardId, ShardRestoreStatus> shards = newHashMap(entry.shards());
logger.trace("[{}] Updating shard [{}] with status [{}]", request.snapshotId(), request.shardId(), request.status().state());
shards.put(request.shardId(), request.status());
Expand All @@ -354,6 +375,7 @@ public ClusterState execute(ClusterState currentState) {
} else {
logger.info("restore [{}] is done", request.snapshotId());
restoreInfo = new RestoreInfo(entry.snapshotId().getSnapshot(), entry.indices(), shards.size(), shards.size() - failedShards(shards));
this.shards = shards;
}
changed = true;
} else {
Expand All @@ -370,20 +392,71 @@ public ClusterState execute(ClusterState currentState) {
}

@Override
public void onFailure(String source, Throwable t) {
public void onFailure(String source, @Nullable Throwable t) {
logger.warn("[{}][{}] failed to update snapshot status to [{}]", t, request.snapshotId(), request.shardId(), request.status());
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (restoreInfo != null) {
for (RestoreCompletionListener listener : listeners) {
try {
listener.onRestoreCompletion(request.snapshotId, restoreInfo);
} catch (Throwable e) {
logger.warn("failed to update snapshot status for [{}]", e, listener);
RoutingTable routingTable = newState.getRoutingTable();
final List<ShardId> waitForStarted = newArrayList();
for (HashMap.Entry<ShardId, ShardRestoreStatus> shard : shards.entrySet()) {
if (shard.getValue().state() == RestoreMetaData.State.SUCCESS ) {
ShardId shardId = shard.getKey();
ShardRouting shardRouting = findPrimaryShard(routingTable, shardId);
if (shardRouting != null && !shardRouting.active()) {
logger.trace("[{}][{}] waiting for the shard to start", request.snapshotId(), shardId);
waitForStarted.add(shardId);
}
}
}
if (waitForStarted.isEmpty()) {
notifyListeners();
} else {
clusterService.addLast(new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.routingTableChanged()) {
RoutingTable routingTable = event.state().getRoutingTable();
for (Iterator<ShardId> iterator = waitForStarted.iterator(); iterator.hasNext();) {
ShardId shardId = iterator.next();
ShardRouting shardRouting = findPrimaryShard(routingTable, shardId);
// Shard disappeared (index deleted) or became active
if (shardRouting == null || shardRouting.active()) {
iterator.remove();
logger.trace("[{}][{}] shard disappeared or started - removing", request.snapshotId(), shardId);
}
}
}
if (waitForStarted.isEmpty()) {
notifyListeners();
clusterService.remove(this);
}
}
});
}
}
}

private ShardRouting findPrimaryShard(RoutingTable routingTable, ShardId shardId) {
IndexRoutingTable indexRoutingTable = routingTable.index(shardId.getIndex());
if (indexRoutingTable != null) {
IndexShardRoutingTable indexShardRoutingTable = indexRoutingTable.shard(shardId.id());
if (indexShardRoutingTable != null) {
return indexShardRoutingTable.primaryShard();
}
}
return null;
}

private void notifyListeners() {
for (ActionListener<RestoreCompletionResponse> listener : listeners) {
try {
listener.onResponse(new RestoreCompletionResponse(request.snapshotId, restoreInfo));
} catch (Throwable e) {
logger.warn("failed to update snapshot status for [{}]", e, listener);
}
}
}
});
Expand Down Expand Up @@ -509,7 +582,7 @@ private boolean failed(Snapshot snapshot, String index) {
*
* @param listener restore completion listener
*/
public void addListener(RestoreCompletionListener listener) {
public void addListener(ActionListener<RestoreCompletionResponse> listener) {
this.listeners.add(listener);
}

Expand All @@ -520,7 +593,7 @@ public void addListener(RestoreCompletionListener listener) {
*
* @param listener restore completion listener
*/
public void removeListener(RestoreCompletionListener listener) {
public void removeListener(ActionListener<RestoreCompletionResponse> listener) {
this.listeners.remove(listener);
}

Expand Down Expand Up @@ -726,42 +799,6 @@ public TimeValue masterNodeTimeout() {

}


/**
* This listener is called as soon as restore operation starts in the cluster.
* <p/>
* To receive notifications about when operation ends in the cluster use {@link RestoreCompletionListener}
*/
public static interface RestoreSnapshotListener {
/**
* Called when restore operations successfully starts in the cluster. Not null value of {@code snapshot} parameter
* means that restore operation didn't involve any shards and therefore has already completed.
*
* @param restoreInfo if restore operation finished, contains information about restore operation, null otherwise
*/
void onResponse(RestoreInfo restoreInfo);

/**
* Called when restore operation failed to start
*
* @param t exception that prevented the restore operation to start
*/
void onFailure(Throwable t);
}

/**
* This listener is called every time a snapshot is restored in the cluster
*/
public static interface RestoreCompletionListener {
/**
* Called for every snapshot that is completed in the cluster
*
* @param snapshotId snapshot id
* @param restoreInfo restore completion information
*/
void onRestoreCompletion(SnapshotId snapshotId, RestoreInfo restoreInfo);
}

/**
* Internal class that is used to send notifications about finished shard restore operations to master node
*/
Expand Down
Expand Up @@ -299,8 +299,6 @@ public boolean apply(Object o) {
assertThat(restoreSnapshotResponse.getRestoreInfo().successfulShards(), equalTo(6));
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0));

ensureGreen("test-idx-all");

assertThat(client().prepareCount("test-idx-all").get().getCount(), equalTo(100L));

logger.info("--> restore snapshot for the partial index");
Expand All @@ -312,7 +310,6 @@ public boolean apply(Object o) {
assertThat(restoreSnapshotResponse.getRestoreInfo().successfulShards(), allOf(greaterThan(0), lessThan(6)));
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), greaterThan(0));

ensureGreen("test-idx-some");
assertThat(client().prepareCount("test-idx-some").get().getCount(), allOf(greaterThan(0L), lessThan(100L)));

logger.info("--> restore snapshot for the index that didn't have any shards snapshotted successfully");
Expand All @@ -324,7 +321,6 @@ public boolean apply(Object o) {
assertThat(restoreSnapshotResponse.getRestoreInfo().successfulShards(), equalTo(0));
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(6));

ensureGreen("test-idx-some");
assertThat(client().prepareCount("test-idx-some").get().getCount(), allOf(greaterThan(0L), lessThan(100L)));
}

Expand Down

0 comments on commit 060a371

Please sign in to comment.