Skip to content

Commit

Permalink
Wait for snapshot completion in SLM snapshot invocation (#47051)
Browse files Browse the repository at this point in the history
* Wait for snapshot completion in SLM snapshot invocation

This changes the snapshots internally invoked by SLM to wait for
completion. This allows us to capture more snapshotting failure
scenarios.

For example, previously a snapshot would be created and then registered
as a "success", however, the snapshot may have been aborted, or it may
have had a subset of its shards fail. These cases are now handled by
inspecting the response to the `CreateSnapshotRequest` and ensuring that
there are no failures. If any failures are present, the history store
now stores the action as a failure instead of a success.

Relates to #38461 and #43663
  • Loading branch information
dakrone committed Sep 25, 2019
1 parent 287d96d commit a267df3
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class CreateSnapshotResponse extends ActionResponse implements ToXContent

CreateSnapshotResponse() {}

CreateSnapshotResponse(@Nullable SnapshotInfo snapshotInfo) {
public CreateSnapshotResponse(@Nullable SnapshotInfo snapshotInfo) {
this.snapshotInfo = snapshotInfo;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ public CreateSnapshotRequest toRequest() {
Map<String, Object> mergedConfiguration = new HashMap<>(configuration);
mergedConfiguration.put("metadata", metadataWithAddedPolicyName);
req.source(mergedConfiguration);
req.waitForCompletion(false);
req.waitForCompletion(true);
return req;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ public List<Setting<?>> getSettings() {
LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE_SETTING,
RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING,
LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING,
LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING);
LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING,
LifecycleSettings.SLM_RETENTION_DURATION_SETTING);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.snapshots.SnapshotException;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.core.slm.SnapshotInvocationRecord;
Expand Down Expand Up @@ -91,16 +93,32 @@ public static Optional<String> maybeTakeSnapshot(final String jobId, final Clien
public void onResponse(CreateSnapshotResponse createSnapshotResponse) {
logger.debug("snapshot response for [{}]: {}",
policyMetadata.getPolicy().getId(), Strings.toString(createSnapshotResponse));
final long timestamp = Instant.now().toEpochMilli();
clusterService.submitStateUpdateTask("slm-record-success-" + policyMetadata.getPolicy().getId(),
WriteJobStatus.success(policyMetadata.getPolicy().getId(), request.snapshot(), timestamp));
historyStore.putAsync(SnapshotHistoryItem.creationSuccessRecord(timestamp, policyMetadata.getPolicy(),
request.snapshot()));
final SnapshotInfo snapInfo = createSnapshotResponse.getSnapshotInfo();

// Check that there are no failed shards, since the request may not entirely
// fail, but may still have failures (such as in the case of an aborted snapshot)
if (snapInfo.failedShards() == 0) {
final long timestamp = Instant.now().toEpochMilli();
clusterService.submitStateUpdateTask("slm-record-success-" + policyMetadata.getPolicy().getId(),
WriteJobStatus.success(policyMetadata.getPolicy().getId(), request.snapshot(), timestamp));
historyStore.putAsync(SnapshotHistoryItem.creationSuccessRecord(timestamp, policyMetadata.getPolicy(),
request.snapshot()));
} else {
int failures = snapInfo.failedShards();
int total = snapInfo.totalShards();
final SnapshotException e = new SnapshotException(request.repository(), request.snapshot(),
"failed to create snapshot successfully, " + failures + " out of " + total + " total shards failed");
// Add each failed shard's exception as suppressed, the exception contains
// information about which shard failed
snapInfo.shardFailures().forEach(failure -> e.addSuppressed(failure.getCause()));
// Call the failure handler to register this as a failure and persist it
onFailure(e);
}
}

@Override
public void onFailure(Exception e) {
logger.error("failed to issue create snapshot request for snapshot lifecycle policy [{}]: {}",
logger.error("failed to create snapshot for snapshot lifecycle policy [{}]: {}",
policyMetadata.getPolicy().getId(), e);
final long timestamp = Instant.now().toEpochMilli();
clusterService.submitStateUpdateTask("slm-record-failure-" + policyMetadata.getPolicy().getId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ void deleteSnapshots(Map<String, List<SnapshotInfo>> snapshotsToDelete,
List<SnapshotInfo> snapshots = entry.getValue();
for (SnapshotInfo info : snapshots) {
final String policyId = getPolicyId(info);
final long deleteStartTime = nowNanoSupplier.getAsLong();
deleteSnapshot(policyId, repo, info.snapshotId(), slmStats, ActionListener.wrap(acknowledgedResponse -> {
deleted.incrementAndGet();
if (acknowledgedResponse.isAcknowledged()) {
Expand All @@ -364,13 +365,15 @@ void deleteSnapshots(Map<String, List<SnapshotInfo>> snapshotsToDelete,
}));
// Check whether we have exceeded the maximum time allowed to spend deleting
// snapshots, if we have, short-circuit the rest of the deletions
TimeValue elapsedDeletionTime = TimeValue.timeValueNanos(nowNanoSupplier.getAsLong() - startTime);
logger.debug("elapsed time for deletion of [{}] snapshot: {}", info.snapshotId(), elapsedDeletionTime);
if (elapsedDeletionTime.compareTo(maximumTime) > 0) {
long finishTime = nowNanoSupplier.getAsLong();
TimeValue deletionTime = TimeValue.timeValueNanos(finishTime - deleteStartTime);
logger.debug("elapsed time for deletion of [{}] snapshot: {}", info.snapshotId(), deletionTime);
TimeValue totalDeletionTime = TimeValue.timeValueNanos(finishTime - startTime);
if (totalDeletionTime.compareTo(maximumTime) > 0) {
logger.info("maximum snapshot retention deletion time reached, time spent: [{}]," +
" maximum allowed time: [{}], deleted [{}] out of [{}] snapshots scheduled for deletion, failed to delete [{}]",
elapsedDeletionTime, maximumTime, deleted, count, failed);
slmStats.deletionTime(elapsedDeletionTime);
totalDeletionTime, maximumTime, deleted, count, failed);
slmStats.deletionTime(totalDeletionTime);
slmStats.retentionTimedOut();
return;
}
Expand Down Expand Up @@ -402,8 +405,8 @@ public void onResponse(AcknowledgedResponse acknowledgedResponse) {
} else {
logger.warn("[{}] snapshot [{}] delete issued but the request was not acknowledged", repo, snapshot);
}
listener.onResponse(acknowledgedResponse);
slmStats.snapshotDeleted(slmPolicy);
listener.onResponse(acknowledgedResponse);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
import org.elasticsearch.common.TriFunction;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotShardFailure;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.client.NoOpClient;
Expand All @@ -47,6 +51,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.startsWith;
Expand Down Expand Up @@ -196,6 +201,83 @@ public void testCreateSnapshotOnTrigger() {
threadPool.shutdownNow();
}

public void testPartialFailureSnapshot() throws Exception {
final String id = randomAlphaOfLength(4);
final SnapshotLifecyclePolicyMetadata slpm = makePolicyMeta(id);
final SnapshotLifecycleMetadata meta =
new SnapshotLifecycleMetadata(Collections.singletonMap(id, slpm), OperationMode.RUNNING, new SnapshotLifecycleStats());

final ClusterState state = ClusterState.builder(new ClusterName("test"))
.metaData(MetaData.builder()
.putCustom(SnapshotLifecycleMetadata.TYPE, meta)
.build())
.build();

final ThreadPool threadPool = new TestThreadPool("test");
final AtomicBoolean clientCalled = new AtomicBoolean(false);
final SetOnce<String> snapshotName = new SetOnce<>();
try (ClusterService clusterService = ClusterServiceUtils.createClusterService(state, threadPool);
VerifyingClient client = new VerifyingClient(threadPool,
(action, request, listener) -> {
assertFalse(clientCalled.getAndSet(true));
assertThat(action, instanceOf(CreateSnapshotAction.class));
assertThat(request, instanceOf(CreateSnapshotRequest.class));

CreateSnapshotRequest req = (CreateSnapshotRequest) request;

SnapshotLifecyclePolicy policy = slpm.getPolicy();
assertThat(req.snapshot(), startsWith(policy.getName() + "-"));
assertThat(req.repository(), equalTo(policy.getRepository()));
snapshotName.set(req.snapshot());
if (req.indices().length > 0) {
assertThat(Arrays.asList(req.indices()), equalTo(policy.getConfig().get("indices")));
}
boolean globalState = policy.getConfig().get("include_global_state") == null ||
Boolean.parseBoolean((String) policy.getConfig().get("include_global_state"));
assertThat(req.includeGlobalState(), equalTo(globalState));

return new CreateSnapshotResponse(
new SnapshotInfo(
new SnapshotId(req.snapshot(), "uuid"),
Arrays.asList(req.indices()),
randomNonNegativeLong(),
"snapshot started",
randomNonNegativeLong(),
3,
Collections.singletonList(
new SnapshotShardFailure("nodeId", new ShardId("index", "uuid", 0), "forced failure")),
req.includeGlobalState(),
req.userMetadata()
));
})) {
final AtomicBoolean historyStoreCalled = new AtomicBoolean(false);
SnapshotHistoryStore historyStore = new VerifyingHistoryStore(null, ZoneOffset.UTC,
item -> {
assertFalse(historyStoreCalled.getAndSet(true));
final SnapshotLifecyclePolicy policy = slpm.getPolicy();
assertEquals(policy.getId(), item.getPolicyId());
assertEquals(policy.getRepository(), item.getRepository());
assertEquals(policy.getConfig(), item.getSnapshotConfiguration());
assertEquals(snapshotName.get(), item.getSnapshotName());
assertFalse("item should be a failure", item.isSuccess());
assertThat(item.getErrorDetails(),
containsString("failed to create snapshot successfully, 1 out of 3 total shards failed"));
assertThat(item.getErrorDetails(),
containsString("forced failure"));
});

SnapshotLifecycleTask task = new SnapshotLifecycleTask(client, clusterService, historyStore);
// Trigger the event with a matching job name for the policy
task.triggered(new SchedulerEngine.Event(SnapshotLifecycleService.getJobId(slpm),
System.currentTimeMillis(), System.currentTimeMillis()));

assertTrue("snapshot should be triggered once", clientCalled.get());
assertTrue("history store should be called once", historyStoreCalled.get());
}

threadPool.shutdownNow();
}

/**
* A client that delegates to a verifying function for action/request/listener
*/
Expand Down

0 comments on commit a267df3

Please sign in to comment.