diff --git a/blackbox/docs/appendices/release-notes/unreleased.rst b/blackbox/docs/appendices/release-notes/unreleased.rst index 15f9c1bc6d3c..5bbad8b28c62 100644 --- a/blackbox/docs/appendices/release-notes/unreleased.rst +++ b/blackbox/docs/appendices/release-notes/unreleased.rst @@ -150,6 +150,8 @@ Deprecations Changes ======= +- Improved resiliency of the :ref:`ref-create-snapshot` operation. + - Added support for `SQL Standard Timestamp Format `_ to the :ref:`date-time-types`. diff --git a/es/es-server/src/main/java/org/elasticsearch/action/ActionListener.java b/es/es-server/src/main/java/org/elasticsearch/action/ActionListener.java index c3bff85ab0cd..40e2858e2da4 100644 --- a/es/es-server/src/main/java/org/elasticsearch/action/ActionListener.java +++ b/es/es-server/src/main/java/org/elasticsearch/action/ActionListener.java @@ -19,8 +19,12 @@ package org.elasticsearch.action; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.common.CheckedFunction; +import java.util.ArrayList; +import java.util.List; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -49,8 +53,8 @@ public interface ActionListener { * @return a listener that listens for responses and invokes the consumer when received */ static ActionListener wrap(CheckedConsumer onResponse, - Consumer onFailure) { - return new ActionListener() { + Consumer onFailure) { + return new ActionListener<>() { @Override public void onResponse(Response response) { try { @@ -79,6 +83,57 @@ static ActionListener wrap(Runnable runnable) { return wrap(r -> runnable.run(), e -> runnable.run()); } + /** + * Notifies every given listener with the response passed to {@link #onResponse(Object)}. If a listener itself throws an exception + * the exception is forwarded to {@link #onFailure(Exception)}. If in turn {@link #onFailure(Exception)} fails all remaining + * listeners will be processed and the caught exception will be re-thrown. + */ + static void onResponse(Iterable> listeners, Response response) { + List exceptionList = new ArrayList<>(); + for (ActionListener listener : listeners) { + try { + listener.onResponse(response); + } catch (Exception ex) { + try { + listener.onFailure(ex); + } catch (Exception ex1) { + exceptionList.add(ex1); + } + } + } + ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptionList); + } + + /** + * Notifies every given listener with the failure passed to {@link #onFailure(Exception)}. If a listener itself throws an exception + * all remaining listeners will be processed and the caught exception will be re-thrown. + */ + static void onFailure(Iterable> listeners, Exception failure) { + List exceptionList = new ArrayList<>(); + for (ActionListener listener : listeners) { + try { + listener.onFailure(failure); + } catch (Exception ex) { + exceptionList.add(ex); + } + } + ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptionList); + } + + /** + * Creates a listener that wraps another listener, mapping response values via the given mapping function and passing along + * exceptions to the delegate. + * + * @param listener Listener to delegate to + * @param fn Function to apply to listener response + * @param Response type of the new listener + * @param Response type of the wrapped listener + * @return a listener that maps the received response and then passes it to its delegate listener + */ + static ActionListener map(ActionListener listener, CheckedFunction fn) { + return wrap(r -> listener.onResponse(fn.apply(r)), listener::onFailure); + } + /** * Converts a listener to a {@link BiConsumer} for compatibility with the {@link java.util.concurrent.CompletableFuture} * api. diff --git a/es/es-server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotRequest.java b/es/es-server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotRequest.java index fda45eed2f31..007046fc45a8 100644 --- a/es/es-server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotRequest.java +++ b/es/es-server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotRequest.java @@ -62,7 +62,7 @@ * */ public class CreateSnapshotRequest extends MasterNodeRequest - implements IndicesRequest.Replaceable, ToXContentObject { + implements IndicesRequest.Replaceable, ToXContentObject { private String snapshot; @@ -94,6 +94,31 @@ public CreateSnapshotRequest(String repository, String snapshot) { this.repository = repository; } + public CreateSnapshotRequest(StreamInput in) throws IOException { + super(in); + snapshot = in.readString(); + repository = in.readString(); + indices = in.readStringArray(); + indicesOptions = IndicesOptions.readIndicesOptions(in); + settings = readSettingsFromStream(in); + includeGlobalState = in.readBoolean(); + waitForCompletion = in.readBoolean(); + partial = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(snapshot); + out.writeString(repository); + out.writeStringArray(indices); + indicesOptions.writeIndicesOptions(out); + writeSettingsToStream(settings, out); + out.writeBoolean(includeGlobalState); + out.writeBoolean(waitForCompletion); + out.writeBoolean(partial); + } + @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; @@ -414,28 +439,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - snapshot = in.readString(); - repository = in.readString(); - indices = in.readStringArray(); - indicesOptions = IndicesOptions.readIndicesOptions(in); - settings = readSettingsFromStream(in); - includeGlobalState = in.readBoolean(); - waitForCompletion = in.readBoolean(); - partial = in.readBoolean(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeString(snapshot); - out.writeString(repository); - out.writeStringArray(indices); - indicesOptions.writeIndicesOptions(out); - writeSettingsToStream(settings, out); - out.writeBoolean(includeGlobalState); - out.writeBoolean(waitForCompletion); - out.writeBoolean(partial); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override diff --git a/es/es-server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java b/es/es-server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java index 84e6ebcff71a..6373b23575a8 100644 --- a/es/es-server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java +++ b/es/es-server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java @@ -27,8 +27,6 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.snapshots.Snapshot; -import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -40,12 +38,11 @@ public class TransportCreateSnapshotAction extends TransportMasterNodeAction listener) { - final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot()); - SnapshotsService.SnapshotRequest snapshotRequest = - new SnapshotsService.SnapshotRequest(request.repository(), snapshotName, "create_snapshot [" + snapshotName + "]") - .indices(request.indices()) - .indicesOptions(request.indicesOptions()) - .partial(request.partial()) - .settings(request.settings()) - .includeGlobalState(request.includeGlobalState()) - .masterNodeTimeout(request.masterNodeTimeout()); - snapshotsService.createSnapshot(snapshotRequest, new SnapshotsService.CreateSnapshotListener() { - @Override - public void onResponse() { - if (request.waitForCompletion()) { - snapshotsService.addListener(new SnapshotsService.SnapshotCompletionListener() { - @Override - public void onSnapshotCompletion(Snapshot snapshot, SnapshotInfo snapshotInfo) { - if (snapshot.getRepository().equals(request.repository()) && - snapshot.getSnapshotId().getName().equals(snapshotName)) { - listener.onResponse(new CreateSnapshotResponse(snapshotInfo)); - snapshotsService.removeListener(this); - } - } - - @Override - public void onSnapshotFailure(Snapshot snapshot, Exception e) { - if (snapshot.getRepository().equals(request.repository()) && - snapshot.getSnapshotId().getName().equals(snapshotName)) { - listener.onFailure(e); - snapshotsService.removeListener(this); - } - } - }); - } else { - listener.onResponse(new CreateSnapshotResponse()); - } - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + protected void masterOperation(final CreateSnapshotRequest request, ClusterState state, + final ActionListener listener) { + if (request.waitForCompletion()) { + snapshotsService.executeSnapshot(request, ActionListener.map(listener, CreateSnapshotResponse::new)); + } else { + snapshotsService.createSnapshot(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse())); + } } } diff --git a/es/es-server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/DeleteSnapshotRequest.java b/es/es-server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/DeleteSnapshotRequest.java index 246d3662f814..5f81857d8d2c 100644 --- a/es/es-server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/DeleteSnapshotRequest.java +++ b/es/es-server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/DeleteSnapshotRequest.java @@ -58,13 +58,17 @@ public DeleteSnapshotRequest(String repository, String snapshot) { this.snapshot = snapshot; } - /** - * Constructs a new delete snapshots request with repository name - * - * @param repository repository name - */ - public DeleteSnapshotRequest(String repository) { - this.repository = repository; + public DeleteSnapshotRequest(StreamInput in) throws IOException { + super(in); + repository = in.readString(); + snapshot = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(repository); + out.writeString(snapshot); } @Override @@ -115,15 +119,6 @@ public DeleteSnapshotRequest snapshot(String snapshot) { @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - repository = in.readString(); - snapshot = in.readString(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeString(repository); - out.writeString(snapshot); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } } diff --git a/es/es-server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java b/es/es-server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java index 838dce774585..ae3cbe444b4e 100644 --- a/es/es-server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java +++ b/es/es-server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java @@ -39,12 +39,11 @@ public class TransportDeleteSnapshotAction extends TransportMasterNodeAction listener) { - snapshotsService.deleteSnapshot(request.repository(), request.snapshot(), new SnapshotsService.DeleteSnapshotListener() { - @Override - public void onResponse() { - listener.onResponse(new AcknowledgedResponse(true)); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }, false); + protected void masterOperation(final DeleteSnapshotRequest request, ClusterState state, + final ActionListener listener) { + snapshotsService.deleteSnapshot(request.repository(), request.snapshot(), + ActionListener.map(listener, v -> new AcknowledgedResponse(true)), false); } } diff --git a/es/es-server/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java b/es/es-server/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java index 138788251c90..f72aa1c957c4 100644 --- a/es/es-server/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java +++ b/es/es-server/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java @@ -93,6 +93,10 @@ public String toString() { return builder.append("]").toString(); } + public boolean isEmpty() { + return entries.isEmpty(); + } + /** * Restore metadata */ diff --git a/es/es-server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/es/es-server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index ddec958a08ac..c5d5a5a8b7ea 100644 --- a/es/es-server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/es/es-server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -24,6 +24,7 @@ import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState.Custom; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -48,10 +49,6 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implements Custom { public static final String TYPE = "snapshots"; - // denotes an undefined repository state id, which will happen when receiving a cluster state with - // a snapshot in progress from a pre 5.2.x node - public static final long UNDEFINED_REPOSITORY_STATE_ID = -2L; - @Override public boolean equals(Object o) { if (this == o) return true; @@ -91,9 +88,11 @@ public static class Entry { private final ImmutableOpenMap> waitingIndices; private final long startTime; private final long repositoryStateId; + @Nullable private final String failure; public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List indices, - long startTime, long repositoryStateId, ImmutableOpenMap shards) { + long startTime, long repositoryStateId, ImmutableOpenMap shards, + String failure) { this.state = state; this.snapshot = snapshot; this.includeGlobalState = includeGlobalState; @@ -108,15 +107,26 @@ public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, Sta this.waitingIndices = findWaitingIndices(shards); } this.repositoryStateId = repositoryStateId; + this.failure = failure; + } + + public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List indices, + long startTime, long repositoryStateId, ImmutableOpenMap shards) { + this(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards, null); } public Entry(Entry entry, State state, ImmutableOpenMap shards) { this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime, - entry.repositoryStateId, shards); + entry.repositoryStateId, shards, entry.failure); + } + + public Entry(Entry entry, State state, ImmutableOpenMap shards, String failure) { + this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime, + entry.repositoryStateId, shards, failure); } public Entry(Entry entry, ImmutableOpenMap shards) { - this(entry, entry.state, shards); + this(entry, entry.state, shards, entry.failure); } public Snapshot snapshot() { @@ -155,6 +165,10 @@ public long getRepositoryStateId() { return repositoryStateId; } + public String failure() { + return failure; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -422,14 +436,16 @@ public SnapshotsInProgress(StreamInput in) throws IOException { builder.put(shardId, new ShardSnapshotStatus(in)); } long repositoryStateId = in.readLong(); + final String failure = in.readOptionalString(); entries[i] = new Entry(snapshot, - includeGlobalState, - partial, - state, - Collections.unmodifiableList(indexBuilder), - startTime, - repositoryStateId, - builder.build()); + includeGlobalState, + partial, + state, + Collections.unmodifiableList(indexBuilder), + startTime, + repositoryStateId, + builder.build(), + failure); } this.entries = Arrays.asList(entries); } @@ -453,6 +469,7 @@ public void writeTo(StreamOutput out) throws IOException { shardEntry.value.writeTo(out); } out.writeLong(entry.repositoryStateId); + out.writeOptionalString(entry.failure); } } diff --git a/es/es-server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/es/es-server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 23053ffed9c4..1eb40ce9d310 100644 --- a/es/es-server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/es/es-server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -721,7 +721,7 @@ public boolean isReadOnly() { protected void writeIndexGen(final RepositoryData repositoryData, final long repositoryStateId) throws IOException { assert isReadOnly() == false; // can not write to a read only repository final long currentGen = latestIndexBlobId(); - if (repositoryStateId != SnapshotsInProgress.UNDEFINED_REPOSITORY_STATE_ID && currentGen != repositoryStateId) { + if (currentGen != repositoryStateId) { // the index file was updated by a concurrent operation, so we were operating on stale // repository data throw new RepositoryException(metadata.name(), "concurrent modification of the index-N file, expected current generation [" + diff --git a/es/es-server/src/main/java/org/elasticsearch/snapshots/SnapshotInProgressException.java b/es/es-server/src/main/java/org/elasticsearch/snapshots/SnapshotInProgressException.java new file mode 100644 index 000000000000..f93b648067ae --- /dev/null +++ b/es/es-server/src/main/java/org/elasticsearch/snapshots/SnapshotInProgressException.java @@ -0,0 +1,46 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.snapshots; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.rest.RestStatus; + +import java.io.IOException; + +/** + * Thrown on the attempt to execute an action that requires + * that no snapshot is in progress. + */ +public class SnapshotInProgressException extends ElasticsearchException { + + public SnapshotInProgressException(String msg) { + super(msg); + } + + public SnapshotInProgressException(StreamInput in) throws IOException { + super(in); + } + + @Override + public RestStatus status() { + return RestStatus.BAD_REQUEST; + } +} diff --git a/es/es-server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/es/es-server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 289a7263e826..afc179cb2aa9 100644 --- a/es/es-server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/es/es-server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -66,41 +66,33 @@ import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportChannel; -import org.elasticsearch.transport.TransportRequest; -import org.elasticsearch.transport.TransportRequestHandler; -import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequestDeduplicator; +import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import java.util.stream.Collectors; import static java.util.Collections.emptyMap; -import static java.util.Collections.unmodifiableMap; +import static java.util.Collections.unmodifiableList; import static org.elasticsearch.cluster.SnapshotsInProgress.completed; -import static org.elasticsearch.transport.EmptyTransportResponseHandler.INSTANCE_SAME; /** * This service runs on data and master nodes and controls currently snapshotted shards on these nodes. It is responsible for * starting and stopping shard level snapshots */ public class SnapshotShardsService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener { - private static final Logger logger = LogManager.getLogger(SnapshotShardsService.class); - public static final String UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6 = "internal:cluster/snapshot/update_snapshot"; - public static final String UPDATE_SNAPSHOT_STATUS_ACTION_NAME = "internal:cluster/snapshot/update_snapshot_status"; - + private static final String UPDATE_SNAPSHOT_STATUS_ACTION_NAME = "internal:cluster/snapshot/update_snapshot_status"; private final ClusterService clusterService; @@ -112,21 +104,19 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements private final ThreadPool threadPool; - private final Lock shutdownLock = new ReentrantLock(); - - private final Condition shutdownCondition = shutdownLock.newCondition(); - private final Settings settings; + private final Map> shardSnapshots = new HashMap<>(); - private volatile Map shardSnapshots = emptyMap(); + // A map of snapshots to the shardIds that we already reported to the master as failed + private final TransportRequestDeduplicator remoteFailedRequestDeduplicator = + new TransportRequestDeduplicator<>(); private final SnapshotStateExecutor snapshotStateExecutor = new SnapshotStateExecutor(); - private UpdateSnapshotStatusAction updateSnapshotStatusHandler; + private final UpdateSnapshotStatusAction updateSnapshotStatusHandler; @Inject - public SnapshotShardsService(Settings settings, ClusterService clusterService, SnapshotsService snapshotsService, ThreadPool threadPool, - TransportService transportService, IndicesService indicesService, + public SnapshotShardsService(Settings settings, ClusterService clusterService, SnapshotsService snapshotsService, + ThreadPool threadPool, TransportService transportService, IndicesService indicesService, IndexNameExpressionResolver indexNameExpressionResolver) { - this.settings = settings; this.indicesService = indicesService; this.snapshotsService = snapshotsService; this.transportService = transportService; @@ -138,37 +128,18 @@ public SnapshotShardsService(Settings settings, ClusterService clusterService, S } // The constructor of UpdateSnapshotStatusAction will register itself to the TransportService. - this.updateSnapshotStatusHandler = new UpdateSnapshotStatusAction( - UPDATE_SNAPSHOT_STATUS_ACTION_NAME, transportService, clusterService, threadPool, indexNameExpressionResolver); - - if (DiscoveryNode.isMasterNode(settings)) { - // This needs to run only on nodes that can become masters - transportService.registerRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6, UpdateSnapshotStatusRequestV6::new, ThreadPool.Names.SAME, new UpdateSnapshotStateRequestHandlerV6()); - } - + this.updateSnapshotStatusHandler = + new UpdateSnapshotStatusAction(transportService, clusterService, threadPool, indexNameExpressionResolver); } @Override protected void doStart() { assert this.updateSnapshotStatusHandler != null; assert transportService.getRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME) != null; - if (DiscoveryNode.isMasterNode(settings)) { - assert transportService.getRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6) != null; - } } @Override protected void doStop() { - shutdownLock.lock(); - try { - while(!shardSnapshots.isEmpty() && shutdownCondition.await(5, TimeUnit.SECONDS)) { - // Wait for at most 5 second for locally running snapshots to finish - } - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } finally { - shutdownLock.unlock(); - } } @Override @@ -183,7 +154,9 @@ public void clusterChanged(ClusterChangedEvent event) { SnapshotsInProgress currentSnapshots = event.state().custom(SnapshotsInProgress.TYPE); if ((previousSnapshots == null && currentSnapshots != null) || (previousSnapshots != null && previousSnapshots.equals(currentSnapshots) == false)) { - processIndexShardSnapshots(event); + synchronized (shardSnapshots) { + processIndexShardSnapshots(currentSnapshots); + } } String previousMasterNodeId = event.previousState().nodes().getMasterNodeId(); @@ -200,12 +173,14 @@ public void clusterChanged(ClusterChangedEvent event) { @Override public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { // abort any snapshots occurring on the soon-to-be closed shard - Map snapshotShardsMap = shardSnapshots; - for (Map.Entry snapshotShards : snapshotShardsMap.entrySet()) { - Map shards = snapshotShards.getValue().shards; - if (shards.containsKey(shardId)) { - logger.debug("[{}] shard closing, abort snapshotting for snapshot [{}]", shardId, snapshotShards.getKey().getSnapshotId()); - shards.get(shardId).abortIfNotCompleted("shard is closing, aborting"); + synchronized (shardSnapshots) { + for (Map.Entry> snapshotShards : shardSnapshots.entrySet()) { + Map shards = snapshotShards.getValue(); + if (shards.containsKey(shardId)) { + logger.debug("[{}] shard closing, abort snapshotting for snapshot [{}]", + shardId, snapshotShards.getKey().getSnapshotId()); + shards.get(shardId).abortIfNotCompleted("shard is closing, aborting"); + } } } } @@ -220,167 +195,145 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh * @return map of shard id to snapshot status */ public Map currentSnapshotShards(Snapshot snapshot) { - SnapshotShards snapshotShards = shardSnapshots.get(snapshot); - if (snapshotShards == null) { - return null; - } else { - return snapshotShards.shards; + synchronized (shardSnapshots) { + final Map current = shardSnapshots.get(snapshot); + return current == null ? null : new HashMap<>(current); } } /** * Checks if any new shards should be snapshotted on this node * - * @param event cluster state changed event + * @param snapshotsInProgress Current snapshots in progress in cluster state */ - private void processIndexShardSnapshots(ClusterChangedEvent event) { - SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE); - Map survivors = new HashMap<>(); + private void processIndexShardSnapshots(SnapshotsInProgress snapshotsInProgress) { + cancelRemoved(snapshotsInProgress); + if (snapshotsInProgress != null) { + startNewSnapshots(snapshotsInProgress); + } + } + + private void cancelRemoved(@Nullable SnapshotsInProgress snapshotsInProgress) { // First, remove snapshots that are no longer there - for (Map.Entry entry : shardSnapshots.entrySet()) { + Iterator>> it = shardSnapshots.entrySet().iterator(); + while (it.hasNext()) { + final Map.Entry> entry = it.next(); final Snapshot snapshot = entry.getKey(); - if (snapshotsInProgress != null && snapshotsInProgress.snapshot(snapshot) != null) { - survivors.put(entry.getKey(), entry.getValue()); - } else { + if (snapshotsInProgress == null || snapshotsInProgress.snapshot(snapshot) == null) { // abort any running snapshots of shards for the removed entry; // this could happen if for some reason the cluster state update for aborting // running shards is missed, then the snapshot is removed is a subsequent cluster // state update, which is being processed here - for (IndexShardSnapshotStatus snapshotStatus : entry.getValue().shards.values()) { + it.remove(); + for (IndexShardSnapshotStatus snapshotStatus : entry.getValue().values()) { snapshotStatus.abortIfNotCompleted("snapshot has been removed in cluster state, aborting"); } } } + } + private void startNewSnapshots(SnapshotsInProgress snapshotsInProgress) { // For now we will be mostly dealing with a single snapshot at a time but might have multiple simultaneously running // snapshots in the future - Map> newSnapshots = new HashMap<>(); // Now go through all snapshots and update existing or create missing - final String localNodeId = event.state().nodes().getLocalNodeId(); - final DiscoveryNode masterNode = event.state().nodes().getMasterNode(); - final Map> snapshotIndices = new HashMap<>(); - if (snapshotsInProgress != null) { - for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { - snapshotIndices.put(entry.snapshot(), - entry.indices().stream().collect(Collectors.toMap(IndexId::getName, Function.identity()))); - if (entry.state() == State.STARTED) { - Map startedShards = new HashMap<>(); - SnapshotShards snapshotShards = shardSnapshots.get(entry.snapshot()); - for (ObjectObjectCursor shard : entry.shards()) { - // Add all new shards to start processing on - if (localNodeId.equals(shard.value.nodeId())) { - if (shard.value.state() == State.INIT && (snapshotShards == null || !snapshotShards.shards.containsKey(shard.key))) { - logger.trace("[{}] - Adding shard to the queue", shard.key); - startedShards.put(shard.key, IndexShardSnapshotStatus.newInitializing()); - } + final String localNodeId = clusterService.localNode().getId(); + for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { + final State entryState = entry.state(); + if (entryState == State.STARTED) { + Map startedShards = null; + final Snapshot snapshot = entry.snapshot(); + Map snapshotShards = shardSnapshots.getOrDefault(snapshot, emptyMap()); + for (ObjectObjectCursor shard : entry.shards()) { + // Add all new shards to start processing on + final ShardId shardId = shard.key; + final ShardSnapshotStatus shardSnapshotStatus = shard.value; + if (localNodeId.equals(shardSnapshotStatus.nodeId()) && shardSnapshotStatus.state() == State.INIT + && snapshotShards.containsKey(shardId) == false) { + logger.trace("[{}] - Adding shard to the queue", shardId); + if (startedShards == null) { + startedShards = new HashMap<>(); } + startedShards.put(shardId, IndexShardSnapshotStatus.newInitializing()); } - if (!startedShards.isEmpty()) { - newSnapshots.put(entry.snapshot(), startedShards); - if (snapshotShards != null) { - // We already saw this snapshot but we need to add more started shards - Map shards = new HashMap<>(); - // Put all shards that were already running on this node - shards.putAll(snapshotShards.shards); - // Put all newly started shards - shards.putAll(startedShards); - survivors.put(entry.snapshot(), new SnapshotShards(unmodifiableMap(shards))); - } else { - // Brand new snapshot that we haven't seen before - survivors.put(entry.snapshot(), new SnapshotShards(unmodifiableMap(startedShards))); + } + if (startedShards != null && startedShards.isEmpty() == false) { + shardSnapshots.computeIfAbsent(snapshot, s -> new HashMap<>()).putAll(startedShards); + startNewShards(entry, startedShards); + } + } else if (entryState == State.ABORTED) { + // Abort all running shards for this snapshot + final Snapshot snapshot = entry.snapshot(); + Map snapshotShards = shardSnapshots.getOrDefault(snapshot, emptyMap()); + for (ObjectObjectCursor shard : entry.shards()) { + final IndexShardSnapshotStatus snapshotStatus = snapshotShards.get(shard.key); + if (snapshotStatus != null) { + final IndexShardSnapshotStatus.Copy lastSnapshotStatus = + snapshotStatus.abortIfNotCompleted("snapshot has been aborted"); + final Stage stage = lastSnapshotStatus.getStage(); + if (stage == Stage.FINALIZE) { + logger.debug("[{}] trying to cancel snapshot on shard [{}] that is finalizing, " + + "letting it finish", snapshot, shard.key); + } else if (stage == Stage.DONE) { + logger.debug("[{}] trying to cancel snapshot on the shard [{}] that is already done, " + + "updating status on the master", snapshot, shard.key); + notifySuccessfulSnapshotShard(snapshot, shard.key); + } else if (stage == Stage.FAILURE) { + logger.debug("[{}] trying to cancel snapshot on the shard [{}] that has already failed, " + + "updating status on the master", snapshot, shard.key); + notifyFailedSnapshotShard(snapshot, shard.key, lastSnapshotStatus.getFailure()); } - } - } else if (entry.state() == State.ABORTED) { - // Abort all running shards for this snapshot - SnapshotShards snapshotShards = shardSnapshots.get(entry.snapshot()); - if (snapshotShards != null) { - final String failure = "snapshot has been aborted"; - for (ObjectObjectCursor shard : entry.shards()) { - IndexShardSnapshotStatus snapshotStatus = snapshotShards.shards.get(shard.key); - if (snapshotStatus != null) { - final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.abortIfNotCompleted(failure); - final Stage stage = lastSnapshotStatus.getStage(); - if (stage == Stage.FINALIZE) { - logger.debug("[{}] trying to cancel snapshot on shard [{}] that is finalizing, " + - "letting it finish", entry.snapshot(), shard.key); - - } else if (stage == Stage.DONE) { - logger.debug("[{}] trying to cancel snapshot on the shard [{}] that is already done, " + - "updating status on the master", entry.snapshot(), shard.key); - notifySuccessfulSnapshotShard(entry.snapshot(), shard.key, localNodeId, masterNode); - - } else if (stage == Stage.FAILURE) { - logger.debug("[{}] trying to cancel snapshot on the shard [{}] that has already failed, " + - "updating status on the master", entry.snapshot(), shard.key); - final String snapshotFailure = lastSnapshotStatus.getFailure(); - notifyFailedSnapshotShard(entry.snapshot(), shard.key, localNodeId, snapshotFailure, masterNode); - } - } + } else { + // due to CS batching we might have missed the INIT state and straight went into ABORTED + // notify master that abort has completed by moving to FAILED + if (shard.value.state() == State.ABORTED) { + notifyFailedSnapshotShard(snapshot, shard.key, shard.value.reason()); } } } } } + } - // Update the list of snapshots that we saw and tried to started - // If startup of these shards fails later, we don't want to try starting these shards again - shutdownLock.lock(); - try { - shardSnapshots = unmodifiableMap(survivors); - if (shardSnapshots.isEmpty()) { - // Notify all waiting threads that no more snapshots - shutdownCondition.signalAll(); - } - } finally { - shutdownLock.unlock(); - } + private void startNewShards(SnapshotsInProgress.Entry entry, Map startedShards) { + final Snapshot snapshot = entry.snapshot(); + final Map indicesMap = entry.indices().stream().collect(Collectors.toMap(IndexId::getName, Function.identity())); + final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); + for (final Map.Entry shardEntry : startedShards.entrySet()) { + final ShardId shardId = shardEntry.getKey(); + final IndexId indexId = indicesMap.get(shardId.getIndexName()); + assert indexId != null; + executor.execute(new AbstractRunnable() { - // We have new shards to starts - if (newSnapshots.isEmpty() == false) { - Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); - for (final Map.Entry> entry : newSnapshots.entrySet()) { - final Snapshot snapshot = entry.getKey(); - final Map indicesMap = snapshotIndices.get(snapshot); - assert indicesMap != null; - - for (final Map.Entry shardEntry : entry.getValue().entrySet()) { - final ShardId shardId = shardEntry.getKey(); - final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()); - final IndexId indexId = indicesMap.get(shardId.getIndexName()); - assert indexId != null; - executor.execute(new AbstractRunnable() { - - final SetOnce failure = new SetOnce<>(); - - @Override - public void doRun() { - snapshot(indexShard, snapshot, indexId, shardEntry.getValue()); - } + private final SetOnce failure = new SetOnce<>(); - @Override - public void onFailure(Exception e) { - logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e); - failure.set(e); - } + @Override + public void doRun() { + final IndexShard indexShard = + indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()); + snapshot(indexShard, snapshot, indexId, shardEntry.getValue()); + } - @Override - public void onRejection(Exception e) { - failure.set(e); - } + @Override + public void onFailure(Exception e) { + logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e); + failure.set(e); + } - @Override - public void onAfter() { - final Exception exception = failure.get(); - if (exception != null) { - final String failure = ExceptionsHelper.detailedMessage(exception); - notifyFailedSnapshotShard(snapshot, shardId, localNodeId, failure, masterNode); - } else { - notifySuccessfulSnapshotShard(snapshot, shardId, localNodeId, masterNode); - } - } - }); + @Override + public void onRejection(Exception e) { + failure.set(e); } - } + + @Override + public void onAfter() { + final Exception exception = failure.get(); + if (exception != null) { + notifyFailedSnapshotShard(snapshot, shardId, ExceptionsHelper.detailedMessage(exception)); + } else { + notifySuccessfulSnapshotShard(snapshot, shardId); + } + } + }); } } @@ -390,7 +343,8 @@ public void onAfter() { * @param snapshot snapshot * @param snapshotStatus snapshot status */ - private void snapshot(final IndexShard indexShard, final Snapshot snapshot, final IndexId indexId, final IndexShardSnapshotStatus snapshotStatus) { + private void snapshot(final IndexShard indexShard, final Snapshot snapshot, final IndexId indexId, + final IndexShardSnapshotStatus snapshotStatus) { final ShardId shardId = indexShard.shardId(); if (indexShard.routingEntry().primary() == false) { throw new IndexShardSnapshotFailedException(shardId, "snapshot should be performed only on primary"); @@ -433,8 +387,6 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) { return; } - final String localNodeId = event.state().nodes().getLocalNodeId(); - final DiscoveryNode masterNode = event.state().nodes().getMasterNode(); for (SnapshotsInProgress.Entry snapshot : snapshotsInProgress.entries()) { if (snapshot.state() == State.STARTED || snapshot.state() == State.ABORTED) { Map localShards = currentSnapshotShards(snapshot.snapshot()); @@ -442,7 +394,6 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) { ImmutableOpenMap masterShards = snapshot.shards(); for(Map.Entry localShard : localShards.entrySet()) { ShardId shardId = localShard.getKey(); - IndexShardSnapshotStatus localShardStatus = localShard.getValue(); ShardSnapshotStatus masterShard = masterShards.get(shardId); if (masterShard != null && masterShard.state().completed() == false) { final IndexShardSnapshotStatus.Copy indexShardSnapshotStatus = localShard.getValue().asCopy(); @@ -451,15 +402,14 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) { if (stage == Stage.DONE) { // but we think the shard is done - we need to make new master know that the shard is done logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard is done locally, " + - "updating status on the master", snapshot.snapshot(), shardId); - notifySuccessfulSnapshotShard(snapshot.snapshot(), shardId, localNodeId, masterNode); + "updating status on the master", snapshot.snapshot(), shardId); + notifySuccessfulSnapshotShard(snapshot.snapshot(), shardId); } else if (stage == Stage.FAILURE) { // but we think the shard failed - we need to make new master know that the shard failed logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard failed locally, " + - "updating status on master", snapshot.snapshot(), shardId); - final String failure = indexShardSnapshotStatus.getFailure(); - notifyFailedSnapshotShard(snapshot.snapshot(), shardId, localNodeId, failure, masterNode); + "updating status on master", snapshot.snapshot(), shardId); + notifyFailedSnapshotShard(snapshot.snapshot(), shardId, indexShardSnapshotStatus.getFailure()); } } } @@ -468,17 +418,6 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) { } } - /** - * Stores the list of shards that has to be snapshotted on this node - */ - private static class SnapshotShards { - private final Map shards; - - private SnapshotShards(Map shards) { - this.shards = shards; - } - } - /** * Internal request that is used to send changes in snapshot status to master */ @@ -539,33 +478,56 @@ public String toString() { } /** Notify the master node that the given shard has been successfully snapshotted **/ - void notifySuccessfulSnapshotShard(final Snapshot snapshot, - final ShardId shardId, - final String localNodeId, - final DiscoveryNode masterNode) { - sendSnapshotShardUpdate(snapshot, shardId, new ShardSnapshotStatus(localNodeId, State.SUCCESS), masterNode); + private void notifySuccessfulSnapshotShard(final Snapshot snapshot, final ShardId shardId) { + sendSnapshotShardUpdate(snapshot, shardId, new ShardSnapshotStatus(clusterService.localNode().getId(), State.SUCCESS)); } /** Notify the master node that the given shard failed to be snapshotted **/ - void notifyFailedSnapshotShard(final Snapshot snapshot, - final ShardId shardId, - final String localNodeId, - final String failure, - final DiscoveryNode masterNode) { - sendSnapshotShardUpdate(snapshot, shardId, new ShardSnapshotStatus(localNodeId, State.FAILED, failure), masterNode); + private void notifyFailedSnapshotShard(final Snapshot snapshot, final ShardId shardId, final String failure) { + sendSnapshotShardUpdate(snapshot, shardId, new ShardSnapshotStatus(clusterService.localNode().getId(), State.FAILED, failure)); } /** Updates the shard snapshot status by sending a {@link UpdateIndexShardSnapshotStatusRequest} to the master node */ - void sendSnapshotShardUpdate(final Snapshot snapshot, - final ShardId shardId, - final ShardSnapshotStatus status, - final DiscoveryNode masterNode) { - try { - UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status); - transportService.sendRequest(transportService.getLocalNode(), UPDATE_SNAPSHOT_STATUS_ACTION_NAME, request, INSTANCE_SAME); - } catch (Exception e) { - logger.warn(() -> new ParameterizedMessage("[{}] [{}] failed to update snapshot state", snapshot, status), e); - } + private void sendSnapshotShardUpdate(final Snapshot snapshot, final ShardId shardId, final ShardSnapshotStatus status) { + remoteFailedRequestDeduplicator.executeOnce( + new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status), + new ActionListener<>() { + @Override + public void onResponse(Void aVoid) { + logger.trace("[{}] [{}] updated snapshot state", snapshot, status); + } + + @Override + public void onFailure(Exception e) { + logger.warn( + () -> new ParameterizedMessage("[{}] [{}] failed to update snapshot state", snapshot, status), e); + } + }, + (req, reqListener) -> transportService.sendRequest(transportService.getLocalNode(), UPDATE_SNAPSHOT_STATUS_ACTION_NAME, req, + new TransportResponseHandler() { + @Override + public UpdateIndexShardSnapshotStatusResponse read(StreamInput in) throws IOException { + final UpdateIndexShardSnapshotStatusResponse response = new UpdateIndexShardSnapshotStatusResponse(); + response.readFrom(in); + return response; + } + + @Override + public void handleResponse(UpdateIndexShardSnapshotStatusResponse response) { + reqListener.onResponse(null); + } + + @Override + public void handleException(TransportException exp) { + reqListener.onFailure(exp); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }) + ); } /** @@ -573,7 +535,8 @@ void sendSnapshotShardUpdate(final Snapshot snapshot, * * @param request update shard status request */ - private void innerUpdateSnapshotState(final UpdateIndexShardSnapshotStatusRequest request, ActionListener listener) { + private void innerUpdateSnapshotState(final UpdateIndexShardSnapshotStatusRequest request, + ActionListener listener) { logger.trace("received updated snapshot restore state [{}]", request); clusterService.submitStateUpdateTask( "update snapshot state", @@ -593,10 +556,11 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS }); } - class SnapshotStateExecutor implements ClusterStateTaskExecutor { + private static class SnapshotStateExecutor implements ClusterStateTaskExecutor { @Override - public ClusterTasksResult execute(ClusterState currentState, List tasks) throws Exception { + public ClusterTasksResult + execute(ClusterState currentState, List tasks) { final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); if (snapshots != null) { int changedCount = 0; @@ -607,7 +571,8 @@ public ClusterTasksResult execute(Cluster for (UpdateIndexShardSnapshotStatusRequest updateSnapshotState : tasks) { if (entry.snapshot().equals(updateSnapshotState.snapshot())) { - logger.trace("[{}] Updating shard [{}] with status [{}]", updateSnapshotState.snapshot(), updateSnapshotState.shardId(), updateSnapshotState.status().state()); + logger.trace("[{}] Updating shard [{}] with status [{}]", updateSnapshotState.snapshot(), + updateSnapshotState.shardId(), updateSnapshotState.status().state()); if (updated == false) { shards.putAll(entry.shards()); updated = true; @@ -625,8 +590,6 @@ public ClusterTasksResult execute(Cluster // TODO: Add PARTIAL_SUCCESS status? SnapshotsInProgress.Entry updatedEntry = new SnapshotsInProgress.Entry(entry, State.SUCCESS, shards.build()); entries.add(updatedEntry); - // Finalize snapshot in the repository - snapshotsService.endSnapshot(updatedEntry); } } else { entries.add(entry); @@ -634,10 +597,9 @@ public ClusterTasksResult execute(Cluster } if (changedCount > 0) { logger.trace("changed cluster state triggered by {} snapshot state updates", changedCount); - - final SnapshotsInProgress updatedSnapshots = new SnapshotsInProgress(entries.toArray(new SnapshotsInProgress.Entry[entries.size()])); - return ClusterTasksResult.builder().successes(tasks).build( - ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, updatedSnapshots).build()); + return ClusterTasksResult.builder().successes(tasks) + .build(ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, + new SnapshotsInProgress(unmodifiableList(entries))).build()); } } return ClusterTasksResult.builder().successes(tasks).build(currentState); @@ -648,14 +610,14 @@ static class UpdateIndexShardSnapshotStatusResponse extends ActionResponse { } - class UpdateSnapshotStatusAction extends TransportMasterNodeAction { - - UpdateSnapshotStatusAction(String actionName, - TransportService transportService, - ClusterService clusterService, - ThreadPool threadPool, - IndexNameExpressionResolver indexNameExpressionResolver) { - super(actionName, transportService, clusterService, threadPool, indexNameExpressionResolver, UpdateIndexShardSnapshotStatusRequest::new); + private class UpdateSnapshotStatusAction + extends TransportMasterNodeAction { + UpdateSnapshotStatusAction(TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, IndexNameExpressionResolver indexNameExpressionResolver) { + super( + SnapshotShardsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME, transportService, clusterService, threadPool, + indexNameExpressionResolver, UpdateIndexShardSnapshotStatusRequest::new + ); } @Override @@ -669,7 +631,8 @@ protected UpdateIndexShardSnapshotStatusResponse newResponse() { } @Override - protected void masterOperation(UpdateIndexShardSnapshotStatusRequest request, ClusterState state, ActionListener listener) throws Exception { + protected void masterOperation(UpdateIndexShardSnapshotStatusRequest request, ClusterState state, + ActionListener listener) { innerUpdateSnapshotState(request, listener); } @@ -678,79 +641,4 @@ protected ClusterBlockException checkBlock(UpdateIndexShardSnapshotStatusRequest return null; } } - - /** - * A BWC version of {@link UpdateIndexShardSnapshotStatusRequest} - */ - static class UpdateSnapshotStatusRequestV6 extends TransportRequest { - private Snapshot snapshot; - private ShardId shardId; - private ShardSnapshotStatus status; - - UpdateSnapshotStatusRequestV6() { - - } - - UpdateSnapshotStatusRequestV6(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status) { - this.snapshot = snapshot; - this.shardId = shardId; - this.status = status; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - snapshot = new Snapshot(in); - shardId = ShardId.readShardId(in); - status = new ShardSnapshotStatus(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - snapshot.writeTo(out); - shardId.writeTo(out); - status.writeTo(out); - } - - Snapshot snapshot() { - return snapshot; - } - - ShardId shardId() { - return shardId; - } - - ShardSnapshotStatus status() { - return status; - } - - @Override - public String toString() { - return snapshot + ", shardId [" + shardId + "], status [" + status.state() + "]"; - } - } - - /** - * A BWC version of {@link UpdateSnapshotStatusAction} - */ - class UpdateSnapshotStateRequestHandlerV6 implements TransportRequestHandler { - @Override - public void messageReceived(UpdateSnapshotStatusRequestV6 requestV6, final TransportChannel channel) throws Exception { - final UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(requestV6.snapshot(), requestV6.shardId(), requestV6.status()); - innerUpdateSnapshotState(request, new ActionListener() { - @Override - public void onResponse(UpdateIndexShardSnapshotStatusResponse updateSnapshotStatusResponse) { - - } - - @Override - public void onFailure(Exception e) { - logger.warn("Failed to update snapshot status", e); - } - }); - channel.sendResponse(TransportResponse.Empty.INSTANCE); - } - } - } diff --git a/es/es-server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/es/es-server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 49a1a3e578e4..6526f5780907 100644 --- a/es/es-server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/es/es-server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -27,7 +27,7 @@ import org.apache.lucene.util.CollectionUtil; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; @@ -58,6 +58,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; @@ -77,12 +78,14 @@ import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import static java.util.Collections.unmodifiableList; import static java.util.Collections.unmodifiableMap; import static org.elasticsearch.cluster.SnapshotsInProgress.completed; @@ -91,20 +94,23 @@ *

* A typical snapshot creating process looks like this: *

    - *
  • On the master node the {@link #createSnapshot(SnapshotRequest, CreateSnapshotListener)} is called and makes sure that no snapshots is currently running - * and registers the new snapshot in cluster state
  • - *
  • When cluster state is updated the {@link #beginSnapshot(ClusterState, SnapshotsInProgress.Entry, boolean, CreateSnapshotListener)} method - * kicks in and initializes the snapshot in the repository and then populates list of shards that needs to be snapshotted in cluster state
  • + *
  • On the master node the {@link #createSnapshot(CreateSnapshotRequest, ActionListener)} is called and makes sure that + * no snapshot is currently running and registers the new snapshot in cluster state
  • + *
  • When cluster state is updated + * the {@link #beginSnapshot(ClusterState, SnapshotsInProgress.Entry, boolean, ActionListener)} method kicks in and initializes + * the snapshot in the repository and then populates list of shards that needs to be snapshotted in cluster state
  • *
  • Each data node is watching for these shards and when new shards scheduled for snapshotting appear in the cluster state, data nodes - * start processing them through {@link SnapshotShardsService#processIndexShardSnapshots(ClusterChangedEvent)} method
  • - *
  • Once shard snapshot is created data node updates state of the shard in the cluster state using the {@link SnapshotShardsService#sendSnapshotShardUpdate(Snapshot, ShardId, ShardSnapshotStatus, DiscoveryNode)} method
  • - *
  • When last shard is completed master node in {@link SnapshotShardsService#innerUpdateSnapshotState} method marks the snapshot as completed
  • + * start processing them through {@link SnapshotShardsService#processIndexShardSnapshots(SnapshotsInProgress)} method + *
  • Once shard snapshot is created data node updates state of the shard in the cluster state using + * the {@link SnapshotShardsService#sendSnapshotShardUpdate(Snapshot, ShardId, ShardSnapshotStatus)} method
  • + *
  • When last shard is completed master node in {@link SnapshotShardsService#innerUpdateSnapshotState} method marks the snapshot + * as completed
  • *
  • After cluster state is updated, the {@link #endSnapshot(SnapshotsInProgress.Entry)} finalizes snapshot in the repository, - * notifies all {@link #snapshotCompletionListeners} that snapshot is completed, and finally calls {@link #removeSnapshotFromClusterState(Snapshot, SnapshotInfo, Exception)} to remove snapshot from cluster state
  • + * notifies all {@link #snapshotCompletionListeners} that snapshot is completed, and finally calls + * {@link #removeSnapshotFromClusterState(Snapshot, SnapshotInfo, Exception)} to remove snapshot from cluster state *
*/ public class SnapshotsService extends AbstractLifecycleComponent implements ClusterStateApplier { - private static final Logger logger = LogManager.getLogger(SnapshotsService.class); private final ClusterService clusterService; @@ -115,10 +121,17 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus private final ThreadPool threadPool; - private final CopyOnWriteArrayList snapshotCompletionListeners = new CopyOnWriteArrayList<>(); + private final Map>> snapshotCompletionListeners = new ConcurrentHashMap<>(); + + // Set of snapshots that are currently being initialized by this node + private final Set initializingSnapshots = Collections.synchronizedSet(new HashSet<>()); + + // Set of snapshots that are currently being ended by this node + private final Set endingSnapshots = Collections.synchronizedSet(new HashSet<>()); @Inject - public SnapshotsService(Settings settings, ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, RepositoriesService repositoriesService, ThreadPool threadPool) { + public SnapshotsService(Settings settings, ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, + RepositoriesService repositoriesService, ThreadPool threadPool) { this.clusterService = clusterService; this.indexNameExpressionResolver = indexNameExpressionResolver; this.repositoriesService = repositoriesService; @@ -202,7 +215,7 @@ public List snapshots(final String repositoryName, } final ArrayList snapshotList = new ArrayList<>(snapshotSet); CollectionUtil.timSort(snapshotList); - return Collections.unmodifiableList(snapshotList); + return unmodifiableList(snapshotList); } /** @@ -218,7 +231,18 @@ public List currentSnapshots(final String repositoryName) { snapshotList.add(inProgressSnapshot(entry)); } CollectionUtil.timSort(snapshotList); - return Collections.unmodifiableList(snapshotList); + return unmodifiableList(snapshotList); + } + + /** + * Same as {@link #createSnapshot(CreateSnapshotRequest, ActionListener)} but invokes its callback on completion of + * the snapshot. + * + * @param request snapshot request + * @param listener snapshot completion listener + */ + public void executeSnapshot(final CreateSnapshotRequest request, final ActionListener listener) { + createSnapshot(request, ActionListener.wrap(snapshot -> addListener(snapshot, listener), listener::onFailure)); } /** @@ -230,20 +254,20 @@ public List currentSnapshots(final String repositoryName) { * @param request snapshot request * @param listener snapshot creation listener */ - public void createSnapshot(final SnapshotRequest request, final CreateSnapshotListener listener) { - final String repositoryName = request.repositoryName; - final String snapshotName = request.snapshotName; + public void createSnapshot(final CreateSnapshotRequest request, final ActionListener listener) { + final String repositoryName = request.repository(); + final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot()); validate(repositoryName, snapshotName); final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot final RepositoryData repositoryData = repositoriesService.repository(repositoryName).getRepositoryData(); - clusterService.submitStateUpdateTask(request.cause(), new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("create_snapshot [" + snapshotName + ']', new ClusterStateUpdateTask() { private SnapshotsInProgress.Entry newSnapshot = null; @Override public ClusterState execute(ClusterState currentState) { - validate(request, currentState); + validate(repositoryName, snapshotName, currentState); SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE); if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, @@ -252,17 +276,19 @@ public ClusterState execute(ClusterState currentState) { SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); if (snapshots == null || snapshots.entries().isEmpty()) { // Store newSnapshot here to be processed in clusterStateProcessed - List indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, request.indicesOptions(), request.indices())); + List indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, + request.indicesOptions(), request.indices())); logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices); List snapshotIndices = repositoryData.resolveNewIndices(indices); newSnapshot = new SnapshotsInProgress.Entry(new Snapshot(repositoryName, snapshotId), - request.includeGlobalState(), - request.partial(), - State.INIT, - snapshotIndices, - System.currentTimeMillis(), - repositoryData.getGenId(), - null); + request.includeGlobalState(), + request.partial(), + State.INIT, + snapshotIndices, + System.currentTimeMillis(), + repositoryData.getGenId(), + null); + initializingSnapshots.add(newSnapshot.snapshot()); snapshots = new SnapshotsInProgress(newSnapshot); } else { throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running"); @@ -273,6 +299,9 @@ public ClusterState execute(ClusterState currentState) { @Override public void onFailure(String source, Exception e) { logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to create snapshot", repositoryName, snapshotName), e); + if (newSnapshot != null) { + initializingSnapshots.remove(newSnapshot.snapshot()); + } newSnapshot = null; listener.onFailure(e); } @@ -280,9 +309,21 @@ public void onFailure(String source, Exception e) { @Override public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) { if (newSnapshot != null) { - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> - beginSnapshot(newState, newSnapshot, request.partial(), listener) - ); + final Snapshot current = newSnapshot.snapshot(); + assert initializingSnapshots.contains(current); + beginSnapshot(newState, newSnapshot, request.partial(), new ActionListener() { + @Override + public void onResponse(final Snapshot snapshot) { + initializingSnapshots.remove(snapshot); + listener.onResponse(snapshot); + } + + @Override + public void onFailure(final Exception e) { + initializingSnapshots.remove(current); + listener.onFailure(e); + } + }); } } @@ -290,23 +331,22 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl public TimeValue timeout() { return request.masterNodeTimeout(); } - }); } /** * Validates snapshot request * - * @param request snapshot request + * @param repositoryName repository name + * @param snapshotName snapshot name * @param state current cluster state */ - private void validate(SnapshotRequest request, ClusterState state) { + private void validate(String repositoryName, String snapshotName, ClusterState state) { RepositoriesMetaData repositoriesMetaData = state.getMetaData().custom(RepositoriesMetaData.TYPE); - final String repository = request.repositoryName; - if (repositoriesMetaData == null || repositoriesMetaData.repository(repository) == null) { - throw new RepositoryMissingException(repository); + if (repositoriesMetaData == null || repositoriesMetaData.repository(repositoryName) == null) { + throw new RepositoryMissingException(repositoryName); } - validate(repository, request.snapshotName); + validate(repositoryName, snapshotName); } private static void validate(final String repositoryName, final String snapshotName) { @@ -330,8 +370,8 @@ private static void validate(final String repositoryName, final String snapshotN } if (Strings.validFileName(snapshotName) == false) { throw new InvalidSnapshotNameException(repositoryName, - snapshotName, - "must not contain the following characters " + Strings.INVALID_FILENAME_CHARS); + snapshotName, + "must not contain the following characters " + Strings.INVALID_FILENAME_CHARS); } } @@ -348,136 +388,143 @@ private static void validate(final String repositoryName, final String snapshotN private void beginSnapshot(final ClusterState clusterState, final SnapshotsInProgress.Entry snapshot, final boolean partial, - final CreateSnapshotListener userCreateSnapshotListener) { - boolean snapshotCreated = false; - try { - Repository repository = repositoriesService.repository(snapshot.snapshot().getRepository()); - - MetaData metaData = clusterState.metaData(); - if (!snapshot.includeGlobalState()) { - // Remove global state from the cluster state - MetaData.Builder builder = MetaData.builder(); - for (IndexId index : snapshot.indices()) { - builder.put(metaData.index(index.getName()), false); - } - metaData = builder.build(); - } + final ActionListener userCreateSnapshotListener) { + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() { - repository.initializeSnapshot(snapshot.snapshot().getSnapshotId(), snapshot.indices(), metaData); - snapshotCreated = true; + boolean snapshotCreated; - logger.info("snapshot [{}] started", snapshot.snapshot()); - if (snapshot.indices().isEmpty()) { - // No indices in this snapshot - we are done - userCreateSnapshotListener.onResponse(); - endSnapshot(snapshot); - return; - } - clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() { + boolean hadAbortedInitializations; + + @Override + protected void doRun() { + assert initializingSnapshots.contains(snapshot.snapshot()); + Repository repository = repositoriesService.repository(snapshot.snapshot().getRepository()); + + MetaData metaData = clusterState.metaData(); + if (!snapshot.includeGlobalState()) { + // Remove global state from the cluster state + MetaData.Builder builder = MetaData.builder(); + for (IndexId index : snapshot.indices()) { + builder.put(metaData.index(index.getName()), false); + } + metaData = builder.build(); + } - SnapshotsInProgress.Entry endSnapshot; - String failure = null; + repository.initializeSnapshot(snapshot.snapshot().getSnapshotId(), snapshot.indices(), metaData); + snapshotCreated = true; - @Override - public ClusterState execute(ClusterState currentState) { - SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); - List entries = new ArrayList<>(); - for (SnapshotsInProgress.Entry entry : snapshots.entries()) { - if (entry.snapshot().equals(snapshot.snapshot()) == false) { - entries.add(entry); - continue; - } + logger.info("snapshot [{}] started", snapshot.snapshot()); + if (snapshot.indices().isEmpty()) { + // No indices in this snapshot - we are done + userCreateSnapshotListener.onResponse(snapshot.snapshot()); + endSnapshot(snapshot); + return; + } + clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() { + + @Override + public ClusterState execute(ClusterState currentState) { + SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); + List entries = new ArrayList<>(); + for (SnapshotsInProgress.Entry entry : snapshots.entries()) { + if (entry.snapshot().equals(snapshot.snapshot()) == false) { + entries.add(entry); + continue; + } - if (entry.state() != State.ABORTED) { - // Replace the snapshot that was just intialized - ImmutableOpenMap shards = shards(currentState, entry.indices()); - if (!partial) { - Tuple, Set> indicesWithMissingShards = indicesWithMissingShards(shards, currentState.metaData()); - Set missing = indicesWithMissingShards.v1(); - Set closed = indicesWithMissingShards.v2(); - if (missing.isEmpty() == false || closed.isEmpty() == false) { - endSnapshot = new SnapshotsInProgress.Entry(entry, State.FAILED, shards); - entries.add(endSnapshot); - - final StringBuilder failureMessage = new StringBuilder(); - if (missing.isEmpty() == false) { - failureMessage.append("Indices don't have primary shards "); - failureMessage.append(missing); - } - if (closed.isEmpty() == false) { - if (failureMessage.length() > 0) { - failureMessage.append("; "); + if (entry.state() == State.ABORTED) { + entries.add(entry); + assert entry.shards().isEmpty(); + hadAbortedInitializations = true; + } else { + // Replace the snapshot that was just initialized + ImmutableOpenMap shards = + shards(currentState, entry.indices()); + if (!partial) { + Tuple, Set> indicesWithMissingShards = indicesWithMissingShards(shards, + currentState.metaData()); + Set missing = indicesWithMissingShards.v1(); + Set closed = indicesWithMissingShards.v2(); + if (missing.isEmpty() == false || closed.isEmpty() == false) { + final StringBuilder failureMessage = new StringBuilder(); + if (missing.isEmpty() == false) { + failureMessage.append("Indices don't have primary shards "); + failureMessage.append(missing); } - failureMessage.append("Indices are closed "); - failureMessage.append(closed); + if (closed.isEmpty() == false) { + if (failureMessage.length() > 0) { + failureMessage.append("; "); + } + failureMessage.append("Indices are closed "); + failureMessage.append(closed); + } + entries.add(new SnapshotsInProgress.Entry(entry, State.FAILED, shards, failureMessage.toString())); + continue; } - failure = failureMessage.toString(); - continue; } + entries.add(new SnapshotsInProgress.Entry(entry, State.STARTED, shards)); } - SnapshotsInProgress.Entry updatedSnapshot = new SnapshotsInProgress.Entry(entry, State.STARTED, shards); - entries.add(updatedSnapshot); - if (completed(shards.values())) { - endSnapshot = updatedSnapshot; - } - } else { - assert entry.state() == State.ABORTED : "expecting snapshot to be aborted during initialization"; - failure = "snapshot was aborted during initialization"; - endSnapshot = entry; - entries.add(endSnapshot); } + return ClusterState.builder(currentState) + .putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(unmodifiableList(entries))) + .build(); } - return ClusterState.builder(currentState) - .putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(Collections.unmodifiableList(entries))) - .build(); - } - @Override - public void onFailure(String source, Exception e) { - logger.warn(() -> new ParameterizedMessage("[{}] failed to create snapshot", snapshot.snapshot().getSnapshotId()), e); - removeSnapshotFromClusterState(snapshot.snapshot(), null, e, new CleanupAfterErrorListener(snapshot, true, userCreateSnapshotListener, e)); - } + @Override + public void onFailure(String source, Exception e) { + logger.warn(() -> new ParameterizedMessage("[{}] failed to create snapshot", + snapshot.snapshot().getSnapshotId()), e); + removeSnapshotFromClusterState(snapshot.snapshot(), null, e, + new CleanupAfterErrorListener(snapshot, true, userCreateSnapshotListener, e)); + } - @Override - public void onNoLongerMaster(String source) { - // We are not longer a master - we shouldn't try to do any cleanup - // The new master will take care of it - logger.warn("[{}] failed to create snapshot - no longer a master", snapshot.snapshot().getSnapshotId()); - userCreateSnapshotListener.onFailure( - new SnapshotException(snapshot.snapshot(), "master changed during snapshot initialization")); - } + @Override + public void onNoLongerMaster(String source) { + // We are not longer a master - we shouldn't try to do any cleanup + // The new master will take care of it + logger.warn("[{}] failed to create snapshot - no longer a master", snapshot.snapshot().getSnapshotId()); + userCreateSnapshotListener.onFailure( + new SnapshotException(snapshot.snapshot(), "master changed during snapshot initialization")); + } - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - // The userCreateSnapshotListener.onResponse() notifies caller that the snapshot was accepted - // for processing. If client wants to wait for the snapshot completion, it can register snapshot - // completion listener in this method. For the snapshot completion to work properly, the snapshot - // should still exist when listener is registered. - userCreateSnapshotListener.onResponse(); - - // Now that snapshot completion listener is registered we can end the snapshot if needed - // We should end snapshot only if 1) we didn't accept it for processing (which happens when there - // is nothing to do) and 2) there was a snapshot in metadata that we should end. Otherwise we should - // go ahead and continue working on this snapshot rather then end here. - if (endSnapshot != null) { - endSnapshot(endSnapshot, failure); + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + // The userCreateSnapshotListener.onResponse() notifies caller that the snapshot was accepted + // for processing. If client wants to wait for the snapshot completion, it can register snapshot + // completion listener in this method. For the snapshot completion to work properly, the snapshot + // should still exist when listener is registered. + userCreateSnapshotListener.onResponse(snapshot.snapshot()); + + if (hadAbortedInitializations) { + final SnapshotsInProgress snapshotsInProgress = newState.custom(SnapshotsInProgress.TYPE); + assert snapshotsInProgress != null; + final SnapshotsInProgress.Entry entry = snapshotsInProgress.snapshot(snapshot.snapshot()); + assert entry != null; + endSnapshot(entry); + } } - } - }); - } catch (Exception e) { - logger.warn(() -> new ParameterizedMessage("failed to create snapshot [{}]", snapshot.snapshot().getSnapshotId()), e); - removeSnapshotFromClusterState(snapshot.snapshot(), null, e, new CleanupAfterErrorListener(snapshot, snapshotCreated, userCreateSnapshotListener, e)); - } + }); + } + + @Override + public void onFailure(Exception e) { + logger.warn(() -> new ParameterizedMessage("failed to create snapshot [{}]", + snapshot.snapshot().getSnapshotId()), e); + removeSnapshotFromClusterState(snapshot.snapshot(), null, e, + new CleanupAfterErrorListener(snapshot, snapshotCreated, userCreateSnapshotListener, e)); + } + }); } private class CleanupAfterErrorListener implements ActionListener { private final SnapshotsInProgress.Entry snapshot; private final boolean snapshotCreated; - private final CreateSnapshotListener userCreateSnapshotListener; + private final ActionListener userCreateSnapshotListener; private final Exception e; - CleanupAfterErrorListener(SnapshotsInProgress.Entry snapshot, boolean snapshotCreated, CreateSnapshotListener userCreateSnapshotListener, Exception e) { + CleanupAfterErrorListener(SnapshotsInProgress.Entry snapshot, boolean snapshotCreated, + ActionListener userCreateSnapshotListener, Exception e) { this.snapshot = snapshot; this.snapshotCreated = snapshotCreated; this.userCreateSnapshotListener = userCreateSnapshotListener; @@ -495,7 +542,7 @@ public void onFailure(Exception e) { cleanupAfterError(e); } - public void onNoLongerMaster(String source) { + public void onNoLongerMaster() { userCreateSnapshotListener.onFailure(e); } @@ -503,17 +550,18 @@ private void cleanupAfterError(Exception exception) { if(snapshotCreated) { try { repositoriesService.repository(snapshot.snapshot().getRepository()) - .finalizeSnapshot(snapshot.snapshot().getSnapshotId(), - snapshot.indices(), - snapshot.startTime(), - ExceptionsHelper.detailedMessage(exception), - 0, - Collections.emptyList(), - snapshot.getRepositoryStateId(), - snapshot.includeGlobalState()); + .finalizeSnapshot(snapshot.snapshot().getSnapshotId(), + snapshot.indices(), + snapshot.startTime(), + ExceptionsHelper.detailedMessage(exception), + 0, + Collections.emptyList(), + snapshot.getRepositoryStateId(), + snapshot.includeGlobalState()); } catch (Exception inner) { inner.addSuppressed(exception); - logger.warn(() -> new ParameterizedMessage("[{}] failed to close snapshot in repository", snapshot.snapshot()), inner); + logger.warn(() -> new ParameterizedMessage("[{}] failed to close snapshot in repository", + snapshot.snapshot()), inner); } } userCreateSnapshotListener.onFailure(e); @@ -521,10 +569,10 @@ private void cleanupAfterError(Exception exception) { } - private SnapshotInfo inProgressSnapshot(SnapshotsInProgress.Entry entry) { + private static SnapshotInfo inProgressSnapshot(SnapshotsInProgress.Entry entry) { return new SnapshotInfo(entry.snapshot().getSnapshotId(), - entry.indices().stream().map(IndexId::getName).collect(Collectors.toList()), - entry.startTime(), entry.includeGlobalState()); + entry.indices().stream().map(IndexId::getName).collect(Collectors.toList()), + entry.startTime(), entry.includeGlobalState()); } /** @@ -579,7 +627,7 @@ public List currentSnapshots(final String repository, builder.add(entry); } } - return Collections.unmodifiableList(builder); + return unmodifiableList(builder); } /** @@ -635,7 +683,7 @@ public Map snapshotShards(final String reposi return unmodifiableMap(shardStatus); } - private SnapshotShardFailure findShardFailure(List shardFailures, ShardId shardId) { + private static SnapshotShardFailure findShardFailure(List shardFailures, ShardId shardId) { for (SnapshotShardFailure shardFailure : shardFailures) { if (shardId.getIndexName().equals(shardFailure.index()) && shardId.getId() == shardFailure.shardId()) { return shardFailure; @@ -649,14 +697,28 @@ public void applyClusterState(ClusterChangedEvent event) { try { if (event.localNodeMaster()) { // We don't remove old master when master flips anymore. So, we need to check for change in master - if (event.nodesRemoved() || event.previousState().nodes().isLocalNodeElectedMaster() == false) { - processSnapshotsOnRemovedNodes(event); + final SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE); + final boolean newMaster = event.previousState().nodes().isLocalNodeElectedMaster() == false; + if (snapshotsInProgress != null) { + if (newMaster || removedNodesCleanupNeeded(snapshotsInProgress, event.nodesDelta().removedNodes())) { + processSnapshotsOnRemovedNodes(); + } + if (event.routingTableChanged() && waitingShardsStartedOrUnassigned(snapshotsInProgress, event)) { + processStartedShards(); + } + // Cleanup all snapshots that have no more work left: + // 1. Completed snapshots + // 2. Snapshots in state INIT that the previous master failed to start + // 3. Snapshots in any other state that have all their shard tasks completed + snapshotsInProgress.entries().stream().filter( + entry -> entry.state().completed() + || initializingSnapshots.contains(entry.snapshot()) == false + && (entry.state() == State.INIT || completed(entry.shards().values())) + ).forEach(this::endSnapshot); } - if (event.routingTableChanged()) { - processStartedShards(event); + if (newMaster) { + finalizeSnapshotDeletionFromPreviousMaster(event); } - removeFinishedSnapshotFromClusterState(event); - finalizeSnapshotDeletionFromPreviousMaster(event); } } catch (Exception e) { logger.warn("Failed to update snapshot state ", e); @@ -675,163 +737,135 @@ public void applyClusterState(ClusterChangedEvent event) { * snapshot was deleted and a call to GET snapshots would reveal that the snapshot no longer exists. */ private void finalizeSnapshotDeletionFromPreviousMaster(ClusterChangedEvent event) { - if (event.localNodeMaster() && event.previousState().nodes().isLocalNodeElectedMaster() == false) { - SnapshotDeletionsInProgress deletionsInProgress = event.state().custom(SnapshotDeletionsInProgress.TYPE); - if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { - assert deletionsInProgress.getEntries().size() == 1 : "only one in-progress deletion allowed per cluster"; - SnapshotDeletionsInProgress.Entry entry = deletionsInProgress.getEntries().get(0); - deleteSnapshotFromRepository(entry.getSnapshot(), null, entry.getRepositoryStateId()); - } + SnapshotDeletionsInProgress deletionsInProgress = event.state().custom(SnapshotDeletionsInProgress.TYPE); + if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { + assert deletionsInProgress.getEntries().size() == 1 : "only one in-progress deletion allowed per cluster"; + SnapshotDeletionsInProgress.Entry entry = deletionsInProgress.getEntries().get(0); + deleteSnapshotFromRepository(entry.getSnapshot(), null, entry.getRepositoryStateId()); } } /** - * Removes a finished snapshot from the cluster state. This can happen if the previous - * master node processed a cluster state update that marked the snapshot as finished, - * but the previous master node died before removing the snapshot in progress from the - * cluster state. It is then the responsibility of the new master node to end the - * snapshot and remove it from the cluster state. + * Cleans up shard snapshots that were running on removed nodes */ - private void removeFinishedSnapshotFromClusterState(ClusterChangedEvent event) { - if (event.localNodeMaster() && !event.previousState().nodes().isLocalNodeElectedMaster()) { - SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE); - if (snapshotsInProgress != null && !snapshotsInProgress.entries().isEmpty()) { - for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { - if (entry.state().completed()) { - endSnapshot(entry); + private void processSnapshotsOnRemovedNodes() { + clusterService.submitStateUpdateTask("update snapshot state after node removal", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + DiscoveryNodes nodes = currentState.nodes(); + SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); + if (snapshots == null) { + return currentState; + } + boolean changed = false; + ArrayList entries = new ArrayList<>(); + for (final SnapshotsInProgress.Entry snapshot : snapshots.entries()) { + SnapshotsInProgress.Entry updatedSnapshot = snapshot; + if (snapshot.state() == State.STARTED || snapshot.state() == State.ABORTED) { + ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); + boolean snapshotChanged = false; + for (ObjectObjectCursor shardEntry : snapshot.shards()) { + ShardSnapshotStatus shardStatus = shardEntry.value; + if (!shardStatus.state().completed() && shardStatus.nodeId() != null) { + if (nodes.nodeExists(shardStatus.nodeId())) { + shards.put(shardEntry.key, shardEntry.value); + } else { + // TODO: Restart snapshot on another node? + snapshotChanged = true; + logger.warn("failing snapshot of shard [{}] on closed node [{}]", + shardEntry.key, shardStatus.nodeId()); + shards.put(shardEntry.key, + new ShardSnapshotStatus(shardStatus.nodeId(), State.FAILED, "node shutdown")); + } + } + } + if (snapshotChanged) { + changed = true; + ImmutableOpenMap shardsMap = shards.build(); + if (!snapshot.state().completed() && completed(shardsMap.values())) { + updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.SUCCESS, shardsMap); + } else { + updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, snapshot.state(), shardsMap); + } + } + entries.add(updatedSnapshot); + } else if (snapshot.state() == State.INIT && initializingSnapshots.contains(snapshot.snapshot()) == false) { + changed = true; + // Mark the snapshot as aborted as it failed to start from the previous master + updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.ABORTED, snapshot.shards()); + entries.add(updatedSnapshot); + + // Clean up the snapshot that failed to start from the old master + deleteSnapshot(snapshot.snapshot(), new ActionListener() { + @Override + public void onResponse(Void aVoid) { + logger.debug("cleaned up abandoned snapshot {} in INIT state", snapshot.snapshot()); + } + + @Override + public void onFailure(Exception e) { + logger.warn("failed to clean up abandoned snapshot {} in INIT state", snapshot.snapshot()); + } + }, updatedSnapshot.getRepositoryStateId(), false); } } + if (changed) { + return ClusterState.builder(currentState) + .putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(unmodifiableList(entries))).build(); + } + return currentState; } - } + + @Override + public void onFailure(String source, Exception e) { + logger.warn("failed to update snapshot state after node removal"); + } + }); } - /** - * Cleans up shard snapshots that were running on removed nodes - * - * @param event cluster changed event - */ - private void processSnapshotsOnRemovedNodes(ClusterChangedEvent event) { - if (removedNodesCleanupNeeded(event)) { - // Check if we just became the master - final boolean newMaster = !event.previousState().nodes().isLocalNodeElectedMaster(); - clusterService.submitStateUpdateTask("update snapshot state after node removal", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - DiscoveryNodes nodes = currentState.nodes(); - SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); - if (snapshots == null) { - return currentState; - } + private void processStartedShards() { + clusterService.submitStateUpdateTask("update snapshot state after shards started", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + RoutingTable routingTable = currentState.routingTable(); + SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); + if (snapshots != null) { boolean changed = false; ArrayList entries = new ArrayList<>(); for (final SnapshotsInProgress.Entry snapshot : snapshots.entries()) { SnapshotsInProgress.Entry updatedSnapshot = snapshot; - boolean snapshotChanged = false; - if (snapshot.state() == State.STARTED || snapshot.state() == State.ABORTED) { - ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); - for (ObjectObjectCursor shardEntry : snapshot.shards()) { - ShardSnapshotStatus shardStatus = shardEntry.value; - if (!shardStatus.state().completed() && shardStatus.nodeId() != null) { - if (nodes.nodeExists(shardStatus.nodeId())) { - shards.put(shardEntry.key, shardEntry.value); - } else { - // TODO: Restart snapshot on another node? - snapshotChanged = true; - logger.warn("failing snapshot of shard [{}] on closed node [{}]", shardEntry.key, shardStatus.nodeId()); - shards.put(shardEntry.key, new ShardSnapshotStatus(shardStatus.nodeId(), State.FAILED, "node shutdown")); - } - } - } - if (snapshotChanged) { + if (snapshot.state() == State.STARTED) { + ImmutableOpenMap shards = processWaitingShards(snapshot.shards(), + routingTable); + if (shards != null) { changed = true; - ImmutableOpenMap shardsMap = shards.build(); - if (!snapshot.state().completed() && completed(shardsMap.values())) { - updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.SUCCESS, shardsMap); - endSnapshot(updatedSnapshot); + if (!snapshot.state().completed() && completed(shards.values())) { + updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.SUCCESS, shards); } else { - updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, snapshot.state(), shardsMap); + updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, shards); } } entries.add(updatedSnapshot); - } else if (snapshot.state() == State.INIT && newMaster) { - changed = true; - // Mark the snapshot as aborted as it failed to start from the previous master - updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.ABORTED, snapshot.shards()); - entries.add(updatedSnapshot); - - // Clean up the snapshot that failed to start from the old master - deleteSnapshot(snapshot.snapshot(), new DeleteSnapshotListener() { - @Override - public void onResponse() { - logger.debug("cleaned up abandoned snapshot {} in INIT state", snapshot.snapshot()); - } - - @Override - public void onFailure(Exception e) { - logger.warn("failed to clean up abandoned snapshot {} in INIT state", snapshot.snapshot()); - } - }, updatedSnapshot.getRepositoryStateId(), false); } } if (changed) { - snapshots = new SnapshotsInProgress(entries.toArray(new SnapshotsInProgress.Entry[entries.size()])); - return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build(); + return ClusterState.builder(currentState) + .putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(unmodifiableList(entries))).build(); } - return currentState; - } - - @Override - public void onFailure(String source, Exception e) { - logger.warn("failed to update snapshot state after node removal"); - } - }); - } - } - - private void processStartedShards(ClusterChangedEvent event) { - if (waitingShardsStartedOrUnassigned(event)) { - clusterService.submitStateUpdateTask("update snapshot state after shards started", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - RoutingTable routingTable = currentState.routingTable(); - SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); - if (snapshots != null) { - boolean changed = false; - ArrayList entries = new ArrayList<>(); - for (final SnapshotsInProgress.Entry snapshot : snapshots.entries()) { - SnapshotsInProgress.Entry updatedSnapshot = snapshot; - if (snapshot.state() == State.STARTED) { - ImmutableOpenMap shards = processWaitingShards(snapshot.shards(), routingTable); - if (shards != null) { - changed = true; - if (!snapshot.state().completed() && completed(shards.values())) { - updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.SUCCESS, shards); - endSnapshot(updatedSnapshot); - } else { - updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, shards); - } - } - entries.add(updatedSnapshot); - } - } - if (changed) { - snapshots = new SnapshotsInProgress(entries.toArray(new SnapshotsInProgress.Entry[entries.size()])); - return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build(); - } - } - return currentState; } + return currentState; + } - @Override - public void onFailure(String source, Exception e) { - logger.warn(() -> new ParameterizedMessage("failed to update snapshot state after shards started from [{}] ", source), e); - } - }); - } + @Override + public void onFailure(String source, Exception e) { + logger.warn(() -> + new ParameterizedMessage("failed to update snapshot state after shards started from [{}] ", source), e); + } + }); } - private ImmutableOpenMap processWaitingShards( - ImmutableOpenMap snapshotShards, RoutingTable routingTable) { + private static ImmutableOpenMap processWaitingShards( + ImmutableOpenMap snapshotShards, RoutingTable routingTable) { boolean snapshotChanged = false; ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); for (ObjectObjectCursor shardEntry : snapshotShards) { @@ -870,19 +904,16 @@ private ImmutableOpenMap processWaitingShards( } } - private boolean waitingShardsStartedOrUnassigned(ClusterChangedEvent event) { - SnapshotsInProgress curr = event.state().custom(SnapshotsInProgress.TYPE); - if (curr != null) { - for (SnapshotsInProgress.Entry entry : curr.entries()) { - if (entry.state() == State.STARTED && !entry.waitingIndices().isEmpty()) { - for (ObjectCursor index : entry.waitingIndices().keys()) { - if (event.indexRoutingTableChanged(index.value)) { - IndexRoutingTable indexShardRoutingTable = event.state().getRoutingTable().index(index.value); - for (ShardId shardId : entry.waitingIndices().get(index.value)) { - ShardRouting shardRouting = indexShardRoutingTable.shard(shardId.id()).primaryShard(); - if (shardRouting != null && (shardRouting.started() || shardRouting.unassigned())) { - return true; - } + private static boolean waitingShardsStartedOrUnassigned(SnapshotsInProgress snapshotsInProgress, ClusterChangedEvent event) { + for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { + if (entry.state() == State.STARTED) { + for (ObjectCursor index : entry.waitingIndices().keys()) { + if (event.indexRoutingTableChanged(index.value)) { + IndexRoutingTable indexShardRoutingTable = event.state().getRoutingTable().index(index.value); + for (ShardId shardId : entry.waitingIndices().get(index.value)) { + ShardRouting shardRouting = indexShardRoutingTable.shard(shardId.id()).primaryShard(); + if (shardRouting != null && (shardRouting.started() || shardRouting.unassigned())) { + return true; } } } @@ -892,28 +923,12 @@ private boolean waitingShardsStartedOrUnassigned(ClusterChangedEvent event) { return false; } - private boolean removedNodesCleanupNeeded(ClusterChangedEvent event) { - SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE); - if (snapshotsInProgress == null) { - return false; - } - // Check if we just became the master - boolean newMaster = !event.previousState().nodes().isLocalNodeElectedMaster(); - for (SnapshotsInProgress.Entry snapshot : snapshotsInProgress.entries()) { - if (newMaster && (snapshot.state() == State.SUCCESS || snapshot.state() == State.INIT)) { - // We just replaced old master and snapshots in intermediate states needs to be cleaned - return true; - } - for (DiscoveryNode node : event.nodesDelta().removedNodes()) { - for (ObjectCursor shardStatus : snapshot.shards().values()) { - if (!shardStatus.value.state().completed() && node.getId().equals(shardStatus.value.nodeId())) { - // At least one shard was running on the removed node - we need to fail it - return true; - } - } - } - } - return false; + private static boolean removedNodesCleanupNeeded(SnapshotsInProgress snapshotsInProgress, List removedNodes) { + // If at least one shard was running on a removed node - we need to fail it + return removedNodes.isEmpty() == false && snapshotsInProgress.entries().stream().flatMap(snapshot -> + StreamSupport.stream(((Iterable) () -> snapshot.shards().valuesIt()).spliterator(), false) + .filter(s -> s.state().completed() == false).map(ShardSnapshotStatus::nodeId)) + .anyMatch(removedNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet())::contains); } /** @@ -922,12 +937,14 @@ private boolean removedNodesCleanupNeeded(ClusterChangedEvent event) { * @param shards list of shard statuses * @return list of failed and closed indices */ - private Tuple, Set> indicesWithMissingShards(ImmutableOpenMap shards, MetaData metaData) { + private Tuple, Set> indicesWithMissingShards( + ImmutableOpenMap shards, MetaData metaData) { Set missing = new HashSet<>(); Set closed = new HashSet<>(); for (ObjectObjectCursor entry : shards) { if (entry.value.state() == State.MISSING) { - if (metaData.hasIndex(entry.key.getIndex().getName()) && metaData.getIndexSafe(entry.key.getIndex()).getState() == IndexMetaData.State.CLOSE) { + if (metaData.hasIndex(entry.key.getIndex().getName()) && + metaData.getIndexSafe(entry.key.getIndex()).getState() == IndexMetaData.State.CLOSE) { closed.add(entry.key.getIndex().getName()); } else { missing.add(entry.key.getIndex().getName()); @@ -944,24 +961,16 @@ private Tuple, Set> indicesWithMissingShards(ImmutableOpenMa * * @param entry snapshot */ - void endSnapshot(final SnapshotsInProgress.Entry entry) { - endSnapshot(entry, null); - } - - - /** - * Finalizes the shard in repository and then removes it from cluster state - *

- * This is non-blocking method that runs on a thread from SNAPSHOT thread pool - * - * @param entry snapshot - * @param failure failure reason or null if snapshot was successful - */ - private void endSnapshot(final SnapshotsInProgress.Entry entry, final String failure) { - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { - final Snapshot snapshot = entry.snapshot(); - try { + private void endSnapshot(final SnapshotsInProgress.Entry entry) { + if (endingSnapshots.add(entry.snapshot()) == false) { + return; + } + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() { + @Override + protected void doRun() { + final Snapshot snapshot = entry.snapshot(); final Repository repository = repositoriesService.repository(snapshot.getRepository()); + final String failure = entry.failure(); logger.trace("[{}] finalizing snapshot in repository, state: [{}], failure[{}]", snapshot, entry.state(), failure); ArrayList shardFailures = new ArrayList<>(); for (ObjectObjectCursor shardStatus : entry.shards()) { @@ -977,12 +986,16 @@ private void endSnapshot(final SnapshotsInProgress.Entry entry, final String fai entry.startTime(), failure, entry.shards().size(), - Collections.unmodifiableList(shardFailures), + unmodifiableList(shardFailures), entry.getRepositoryStateId(), entry.includeGlobalState()); removeSnapshotFromClusterState(snapshot, snapshotInfo, null); logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state()); - } catch (Exception e) { + } + + @Override + public void onFailure(final Exception e) { + Snapshot snapshot = entry.snapshot(); logger.warn(() -> new ParameterizedMessage("[{}] failed to finalize snapshot", snapshot), e); removeSnapshotFromClusterState(snapshot, null, e); } @@ -991,7 +1004,7 @@ private void endSnapshot(final SnapshotsInProgress.Entry entry, final String fai /** * Removes record of running snapshot from cluster state - * @param snapshot snapshot + * @param snapshot snapshot * @param snapshotInfo snapshot info if snapshot was successful * @param e exception if snapshot failed */ @@ -1001,11 +1014,11 @@ private void removeSnapshotFromClusterState(final Snapshot snapshot, final Snaps /** * Removes record of running snapshot from cluster state and notifies the listener when this action is complete - * @param snapshot snapshot + * @param snapshot snapshot * @param failure exception if snapshot failed * @param listener listener to notify when snapshot information is removed from the cluster state */ - private void removeSnapshotFromClusterState(final Snapshot snapshot, final SnapshotInfo snapshotInfo, final Exception failure, + private void removeSnapshotFromClusterState(final Snapshot snapshot, @Nullable SnapshotInfo snapshotInfo, final Exception failure, @Nullable CleanupAfterErrorListener listener) { clusterService.submitStateUpdateTask("remove snapshot metadata", new ClusterStateUpdateTask() { @@ -1023,8 +1036,8 @@ public ClusterState execute(ClusterState currentState) { } } if (changed) { - snapshots = new SnapshotsInProgress(entries.toArray(new SnapshotsInProgress.Entry[entries.size()])); - return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build(); + return ClusterState.builder(currentState) + .putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(unmodifiableList(entries))).build(); } } return currentState; @@ -1033,6 +1046,7 @@ public ClusterState execute(ClusterState currentState) { @Override public void onFailure(String source, Exception e) { logger.warn(() -> new ParameterizedMessage("[{}] failed to remove snapshot metadata", snapshot), e); + endingSnapshots.remove(snapshot); if (listener != null) { listener.onFailure(e); } @@ -1040,24 +1054,27 @@ public void onFailure(String source, Exception e) { @Override public void onNoLongerMaster(String source) { + endingSnapshots.remove(snapshot); if (listener != null) { - listener.onNoLongerMaster(source); + listener.onNoLongerMaster(); } } @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - for (SnapshotCompletionListener listener : snapshotCompletionListeners) { + final List> completionListeners = snapshotCompletionListeners.remove(snapshot); + if (completionListeners != null) { try { - if (snapshotInfo != null) { - listener.onSnapshotCompletion(snapshot, snapshotInfo); + if (snapshotInfo == null) { + ActionListener.onFailure(completionListeners, failure); } else { - listener.onSnapshotFailure(snapshot, failure); + ActionListener.onResponse(completionListeners, snapshotInfo); } - } catch (Exception t) { - logger.warn(() -> new ParameterizedMessage("failed to notify listener [{}]", listener), t); + } catch (Exception e) { + logger.warn("Failed to notify listeners", e); } } + endingSnapshots.remove(snapshot); if (listener != null) { listener.onResponse(snapshotInfo); } @@ -1073,7 +1090,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS * @param snapshotName snapshotName * @param listener listener */ - public void deleteSnapshot(final String repositoryName, final String snapshotName, final DeleteSnapshotListener listener, + public void deleteSnapshot(final String repositoryName, final String snapshotName, final ActionListener listener, final boolean immediatePriority) { // First, look for the snapshot in the repository final Repository repository = repositoriesService.repository(repositoryName); @@ -1084,18 +1101,24 @@ public void deleteSnapshot(final String repositoryName, final String snapshotNam throw new SnapshotException(repositoryName, snapshotName, "cannot delete incompatible snapshot"); } Optional matchedEntry = repositoryData.getSnapshotIds() - .stream() - .filter(s -> s.getName().equals(snapshotName)) - .findFirst(); + .stream() + .filter(s -> s.getName().equals(snapshotName)) + .findFirst(); // if nothing found by the same name, then look in the cluster state for current in progress snapshots + long repoGenId = repositoryData.getGenId(); if (matchedEntry.isPresent() == false) { - matchedEntry = currentSnapshots(repositoryName, Collections.emptyList()).stream() - .map(e -> e.snapshot().getSnapshotId()).filter(s -> s.getName().equals(snapshotName)).findFirst(); + Optional matchedInProgress = currentSnapshots(repositoryName, Collections.emptyList()).stream() + .filter(s -> s.snapshot().getSnapshotId().getName().equals(snapshotName)).findFirst(); + if (matchedInProgress.isPresent()) { + matchedEntry = matchedInProgress.map(s -> s.snapshot().getSnapshotId()); + // Derive repository generation if a snapshot is in progress because it will increment the generation when it finishes + repoGenId = matchedInProgress.get().getRepositoryStateId() + 1L; + } } if (matchedEntry.isPresent() == false) { throw new SnapshotMissingException(repositoryName, snapshotName); } - deleteSnapshot(new Snapshot(repositoryName, matchedEntry.get()), listener, repositoryData.getGenId(), immediatePriority); + deleteSnapshot(new Snapshot(repositoryName, matchedEntry.get()), listener, repoGenId, immediatePriority); } /** @@ -1107,7 +1130,7 @@ public void deleteSnapshot(final String repositoryName, final String snapshotNam * @param listener listener * @param repositoryStateId the unique id for the state of the repository */ - private void deleteSnapshot(final Snapshot snapshot, final DeleteSnapshotListener listener, final long repositoryStateId, + private void deleteSnapshot(final Snapshot snapshot, final ActionListener listener, final long repositoryStateId, final boolean immediatePriority) { Priority priority = immediatePriority ? Priority.IMMEDIATE : Priority.NORMAL; clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask(priority) { @@ -1118,14 +1141,15 @@ private void deleteSnapshot(final Snapshot snapshot, final DeleteSnapshotListene public ClusterState execute(ClusterState currentState) throws Exception { SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE); if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { - throw new ConcurrentSnapshotExecutionException(snapshot, "cannot delete - another snapshot is currently being deleted"); + throw new ConcurrentSnapshotExecutionException(snapshot, + "cannot delete - another snapshot is currently being deleted"); } RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE); if (restoreInProgress != null) { // don't allow snapshot deletions while a restore is taking place, // otherwise we could end up deleting a snapshot that is being restored // and the files the restore depends on would all be gone - if (restoreInProgress.entries().isEmpty() == false) { + if (restoreInProgress.isEmpty() == false) { throw new ConcurrentSnapshotExecutionException(snapshot, "cannot delete snapshot during a restore"); } } @@ -1157,10 +1181,12 @@ public ClusterState execute(ClusterState currentState) throws Exception { final ImmutableOpenMap shards; final State state = snapshotEntry.state(); + final String failure; if (state == State.INIT) { // snapshot is still initializing, mark it as aborted shards = snapshotEntry.shards(); - + assert shards.isEmpty(); + failure = "Snapshot was aborted during initialization"; } else if (state == State.STARTED) { // snapshot is started - mark every non completed shard as aborted final ImmutableOpenMap.Builder shardsBuilder = ImmutableOpenMap.builder(); @@ -1172,14 +1198,14 @@ public ClusterState execute(ClusterState currentState) throws Exception { shardsBuilder.put(shardEntry.key, status); } shards = shardsBuilder.build(); - + failure = "Snapshot was aborted by deletion"; } else { boolean hasUncompletedShards = false; // Cleanup in case a node gone missing and snapshot wasn't updated for some reason for (ObjectCursor shardStatus : snapshotEntry.shards().values()) { // Check if we still have shard running on existing nodes if (shardStatus.value.state().completed() == false && shardStatus.value.nodeId() != null - && currentState.nodes().get(shardStatus.value.nodeId()) != null) { + && currentState.nodes().get(shardStatus.value.nodeId()) != null) { hasUncompletedShards = true; break; } @@ -1193,12 +1219,11 @@ public ClusterState execute(ClusterState currentState) throws Exception { // where we force to finish the snapshot logger.debug("trying to delete completed snapshot with no finalizing shards - can delete immediately"); shards = snapshotEntry.shards(); - endSnapshot(snapshotEntry); } + failure = snapshotEntry.failure(); } - SnapshotsInProgress.Entry newSnapshot = new SnapshotsInProgress.Entry(snapshotEntry, State.ABORTED, shards); - snapshots = new SnapshotsInProgress(newSnapshot); - clusterStateBuilder.putCustom(SnapshotsInProgress.TYPE, snapshots); + SnapshotsInProgress.Entry newSnapshot = new SnapshotsInProgress.Entry(snapshotEntry, State.ABORTED, shards, failure); + clusterStateBuilder.putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(newSnapshot)); } return clusterStateBuilder.build(); } @@ -1212,49 +1237,34 @@ public void onFailure(String source, Exception e) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { if (waitForSnapshot) { logger.trace("adding snapshot completion listener to wait for deleted snapshot to finish"); - addListener(new SnapshotCompletionListener() { - @Override - public void onSnapshotCompletion(Snapshot completedSnapshot, SnapshotInfo snapshotInfo) { - if (completedSnapshot.equals(snapshot)) { - logger.debug("deleted snapshot completed - deleting files"); - removeListener(this); - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { - try { - deleteSnapshot(completedSnapshot.getRepository(), completedSnapshot.getSnapshotId().getName(), - listener, true); - - } catch (Exception ex) { - logger.warn(() -> new ParameterizedMessage("[{}] failed to delete snapshot", snapshot), ex); - } - } - ); - } - } - - @Override - public void onSnapshotFailure(Snapshot failedSnapshot, Exception e) { - if (failedSnapshot.equals(snapshot)) { - logger.warn("deleted snapshot failed - deleting files", e); - removeListener(this); - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { + addListener(snapshot, ActionListener.wrap( + snapshotInfo -> { + logger.debug("deleted snapshot completed - deleting files"); + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { try { - deleteSnapshot(failedSnapshot.getRepository(), - failedSnapshot.getSnapshotId().getName(), - listener, - true); - } catch (SnapshotMissingException smex) { - logger.info(() -> new ParameterizedMessage( - "Tried deleting in-progress snapshot [{}], but it " + - "could not be found after failing to abort.", - smex.getSnapshotName()), e); - listener.onFailure(new SnapshotException(snapshot, - "Tried deleting in-progress snapshot [" + smex.getSnapshotName() + "], but it " + - "could not be found after failing to abort.", smex)); + deleteSnapshot(snapshot.getRepository(), snapshot.getSnapshotId().getName(), listener, true); + } catch (Exception ex) { + logger.warn(() -> new ParameterizedMessage("[{}] failed to delete snapshot", snapshot), ex); } - }); - } + } + ); + }, + e -> { + logger.warn("deleted snapshot failed - deleting files", e); + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { + try { + deleteSnapshot(snapshot.getRepository(), snapshot.getSnapshotId().getName(), listener, true); + } catch (SnapshotMissingException smex) { + logger.info(() -> new ParameterizedMessage( + "Tried deleting in-progress snapshot [{}], but it could not be found after failing to abort.", + smex.getSnapshotName()), e); + listener.onFailure(new SnapshotException(snapshot, + "Tried deleting in-progress snapshot [" + smex.getSnapshotName() + "], but it " + + "could not be found after failing to abort.", smex)); + } + }); } - }); + )); } else { logger.debug("deleted snapshot is not running - deleting files"); deleteSnapshotFromRepository(snapshot, listener, repositoryStateId); @@ -1297,8 +1307,7 @@ public static boolean isRepositoryInUse(ClusterState clusterState, String reposi * @param listener listener * @param repositoryStateId the unique id representing the state of the repository at the time the deletion began */ - private void deleteSnapshotFromRepository(final Snapshot snapshot, @Nullable final DeleteSnapshotListener listener, - long repositoryStateId) { + private void deleteSnapshotFromRepository(Snapshot snapshot, @Nullable ActionListener listener, long repositoryStateId) { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { try { Repository repository = repositoriesService.repository(snapshot.getRepository()); @@ -1316,7 +1325,7 @@ private void deleteSnapshotFromRepository(final Snapshot snapshot, @Nullable fin * Removes the snapshot deletion from {@link SnapshotDeletionsInProgress} in the cluster state. */ private void removeSnapshotDeletionFromClusterState(final Snapshot snapshot, @Nullable final Exception failure, - @Nullable final DeleteSnapshotListener listener) { + @Nullable final ActionListener listener) { clusterService.submitStateUpdateTask("remove snapshot deletion metadata", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { @@ -1350,7 +1359,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS if (failure != null) { listener.onFailure(failure); } else { - listener.onResponse(); + listener.onResponse(null); } } } @@ -1364,7 +1373,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS * @param indices list of indices to be snapshotted * @return list of shard to be included into current snapshot */ - private ImmutableOpenMap shards(ClusterState clusterState, List indices) { + private static ImmutableOpenMap shards(ClusterState clusterState, + List indices) { ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); MetaData metaData = clusterState.metaData(); for (IndexId index : indices) { @@ -1372,7 +1382,8 @@ private ImmutableOpenMap shard IndexMetaData indexMetaData = metaData.index(indexName); if (indexMetaData == null) { // The index was deleted before we managed to start the snapshot - mark it as missing. - builder.put(new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, 0), new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "missing index")); + builder.put(new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, 0), + new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "missing index")); } else if (indexMetaData.getState() == IndexMetaData.State.CLOSE) { for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) { ShardId shardId = new ShardId(indexMetaData.getIndex(), i); @@ -1385,17 +1396,20 @@ private ImmutableOpenMap shard if (indexRoutingTable != null) { ShardRouting primary = indexRoutingTable.shard(i).primaryShard(); if (primary == null || !primary.assignedToNode()) { - builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "primary shard is not allocated")); + builder.put(shardId, + new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "primary shard is not allocated")); } else if (primary.relocating() || primary.initializing()) { - // The WAITING state was introduced in V1.2.0 - don't use it if there are nodes with older version in the cluster builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), State.WAITING)); } else if (!primary.started()) { - builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), State.MISSING, "primary shard hasn't been started yet")); + builder.put(shardId, + new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), State.MISSING, + "primary shard hasn't been started yet")); } else { builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId())); } } else { - builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "missing routing table")); + builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, + "missing routing table")); } } } @@ -1411,7 +1425,7 @@ private ImmutableOpenMap shard public static void checkIndexDeletion(ClusterState currentState, Set indices) { Set indicesToFail = indicesToFailForCloseOrDeletion(currentState, indices); if (indicesToFail != null) { - throw new IllegalArgumentException("Cannot delete indices that are being snapshotted: " + indicesToFail + + throw new SnapshotInProgressException("Cannot delete indices that are being snapshotted: " + indicesToFail + ". Try again after snapshot finishes or cancel the currently running snapshot."); } } @@ -1423,7 +1437,7 @@ public static void checkIndexDeletion(ClusterState currentState, Set indices) { Set indicesToFail = indicesToFailForCloseOrDeletion(currentState, indices); if (indicesToFail != null) { - throw new IllegalArgumentException("Cannot close indices that are being snapshotted: " + indicesToFail + + throw new SnapshotInProgressException("Cannot close indices that are being snapshotted: " + indicesToFail + ". Try again after snapshot finishes or cancel the currently running snapshot."); } } @@ -1466,19 +1480,11 @@ private static Set indicesToFailForCloseOrDeletion(ClusterState currentSt /** * Adds snapshot completion listener * + * @param snapshot Snapshot to listen for * @param listener listener */ - public void addListener(SnapshotCompletionListener listener) { - this.snapshotCompletionListeners.add(listener); - } - - /** - * Removes snapshot completion listener - * - * @param listener listener - */ - public void removeListener(SnapshotCompletionListener listener) { - this.snapshotCompletionListeners.remove(listener); + private void addListener(Snapshot snapshot, ActionListener listener) { + snapshotCompletionListeners.computeIfAbsent(snapshot, k -> new CopyOnWriteArrayList<>()).add(listener); } @Override @@ -1499,225 +1505,4 @@ protected void doClose() { public RepositoriesService getRepositoriesService() { return repositoriesService; } - - /** - * Listener for create snapshot operation - */ - public interface CreateSnapshotListener { - - /** - * Called when snapshot has successfully started - */ - void onResponse(); - - /** - * Called if a snapshot operation couldn't start - */ - void onFailure(Exception e); - } - - /** - * Listener for delete snapshot operation - */ - public interface DeleteSnapshotListener { - - /** - * Called if delete operation was successful - */ - void onResponse(); - - /** - * Called if delete operation failed - */ - void onFailure(Exception e); - } - - public interface SnapshotCompletionListener { - - void onSnapshotCompletion(Snapshot snapshot, SnapshotInfo snapshotInfo); - - void onSnapshotFailure(Snapshot snapshot, Exception e); - } - - /** - * Snapshot creation request - */ - public static class SnapshotRequest { - - private final String cause; - - private final String repositoryName; - - private final String snapshotName; - - private String[] indices; - - private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen(); - - private boolean partial; - - private Settings settings; - - private boolean includeGlobalState; - - private TimeValue masterNodeTimeout; - - /** - * Constructs new snapshot creation request - * - * @param repositoryName repository name - * @param snapshotName snapshot name - * @param cause cause for snapshot operation - */ - public SnapshotRequest(final String repositoryName, final String snapshotName, final String cause) { - this.repositoryName = Objects.requireNonNull(repositoryName); - this.snapshotName = Objects.requireNonNull(snapshotName); - this.cause = Objects.requireNonNull(cause); - } - - /** - * Sets the list of indices to be snapshotted - * - * @param indices list of indices - * @return this request - */ - public SnapshotRequest indices(String[] indices) { - this.indices = indices; - return this; - } - - /** - * Sets repository-specific snapshot settings - * - * @param settings snapshot settings - * @return this request - */ - public SnapshotRequest settings(Settings settings) { - this.settings = settings; - return this; - } - - /** - * Set to true if global state should be stored as part of the snapshot - * - * @param includeGlobalState true if global state should be stored as part of the snapshot - * @return this request - */ - public SnapshotRequest includeGlobalState(boolean includeGlobalState) { - this.includeGlobalState = includeGlobalState; - return this; - } - - /** - * Sets master node timeout - * - * @param masterNodeTimeout master node timeout - * @return this request - */ - public SnapshotRequest masterNodeTimeout(TimeValue masterNodeTimeout) { - this.masterNodeTimeout = masterNodeTimeout; - return this; - } - - /** - * Sets the indices options - * - * @param indicesOptions indices options - * @return this request - */ - public SnapshotRequest indicesOptions(IndicesOptions indicesOptions) { - this.indicesOptions = indicesOptions; - return this; - } - - /** - * Set to true if partial snapshot should be allowed - * - * @param partial true if partial snapshots should be allowed - * @return this request - */ - public SnapshotRequest partial(boolean partial) { - this.partial = partial; - return this; - } - - /** - * Returns cause for snapshot operation - * - * @return cause for snapshot operation - */ - public String cause() { - return cause; - } - - /** - * Returns the repository name - */ - public String repositoryName() { - return repositoryName; - } - - /** - * Returns the snapshot name - */ - public String snapshotName() { - return snapshotName; - } - - /** - * Returns the list of indices to be snapshotted - * - * @return the list of indices - */ - public String[] indices() { - return indices; - } - - /** - * Returns indices options - * - * @return indices options - */ - public IndicesOptions indicesOptions() { - return indicesOptions; - } - - /** - * Returns repository-specific settings for the snapshot operation - * - * @return repository-specific settings - */ - public Settings settings() { - return settings; - } - - /** - * Returns true if global state should be stored as part of the snapshot - * - * @return true if global state should be stored as part of the snapshot - */ - public boolean includeGlobalState() { - return includeGlobalState; - } - - /** - * Returns true if partial snapshot should be allowed - * - * @return true if partial snapshot should be allowed - */ - public boolean partial() { - return partial; - } - - /** - * Returns master node timeout - * - * @return master node timeout - */ - public TimeValue masterNodeTimeout() { - return masterNodeTimeout; - } - - } } - diff --git a/es/es-server/src/main/java/org/elasticsearch/transport/TransportRequestDeduplicator.java b/es/es-server/src/main/java/org/elasticsearch/transport/TransportRequestDeduplicator.java new file mode 100644 index 000000000000..d929ef34ce2c --- /dev/null +++ b/es/es-server/src/main/java/org/elasticsearch/transport/TransportRequestDeduplicator.java @@ -0,0 +1,114 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentMap; +import java.util.function.BiConsumer; + +/** + * Deduplicator for {@link TransportRequest}s that keeps track of {@link TransportRequest}s that should + * not be sent in parallel. + * @param Transport Request Class + */ +public final class TransportRequestDeduplicator { + + private final ConcurrentMap requests = ConcurrentCollections.newConcurrentMap(); + + /** + * Ensures a given request not executed multiple times when another equal request is already in-flight. + * If the request is not yet known to the deduplicator it will invoke the passed callback with an {@link ActionListener} + * that must be completed by the caller when the request completes. Once that listener is completed the request will be removed from + * the deduplicator's internal state. If the request is already known to the deduplicator it will keep + * track of the given listener and invoke it when the listener passed to the callback on first invocation is completed. + * @param request Request to deduplicate + * @param listener Listener to invoke on request completion + * @param callback Callback to be invoked with request and completion listener the first time the request is added to the deduplicator + */ + public void executeOnce(T request, ActionListener listener, BiConsumer> callback) { + ActionListener completionListener = requests.computeIfAbsent(request, CompositeListener::new).addListener(listener); + if (completionListener != null) { + callback.accept(request, completionListener); + } + } + + public int size() { + return requests.size(); + } + + private final class CompositeListener implements ActionListener { + + private final List> listeners = new ArrayList<>(); + + private final T request; + + private boolean isNotified; + private Exception failure; + + CompositeListener(T request) { + this.request = request; + } + + CompositeListener addListener(ActionListener listener) { + synchronized (this) { + if (this.isNotified == false) { + listeners.add(listener); + return listeners.size() == 1 ? this : null; + } + } + if (failure != null) { + listener.onFailure(failure); + } else { + listener.onResponse(null); + } + return null; + } + + private void onCompleted(Exception failure) { + synchronized (this) { + this.failure = failure; + this.isNotified = true; + } + try { + if (failure == null) { + ActionListener.onResponse(listeners, null); + } else { + ActionListener.onFailure(listeners, failure); + } + } finally { + requests.remove(request); + } + } + + @Override + public void onResponse(final Void aVoid) { + onCompleted(null); + } + + @Override + public void onFailure(Exception failure) { + onCompleted(failure); + } + } +}