Skip to content

Commit

Permalink
Associate restore snapshot task to parent mount task (#108705)
Browse files Browse the repository at this point in the history
* Associate restore snapshot task to parent mount task

Closes #105830
  • Loading branch information
nicktindall committed May 21, 2024
1 parent c300653 commit deb3ef9
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 30 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/108705.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 108705
summary: Associate restore snapshot task to parent mount task
area: Distributed
type: bug
issues:
- 105830
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -2154,15 +2155,19 @@ public static void safeAcquire(Semaphore semaphore) {
public static <T> T safeAwait(SubscribableListener<T> listener) {
final var future = new PlainActionFuture<T>();
listener.addListener(future);
return safeGet(future);
}

public static <T> T safeGet(Future<T> future) {
try {
return future.get(SAFE_AWAIT_TIMEOUT.millis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError("safeAwait: interrupted waiting for SubscribableListener", e);
throw new AssertionError("safeGet: interrupted waiting for SubscribableListener", e);
} catch (ExecutionException e) {
throw new AssertionError("safeAwait: listener was completed exceptionally", e);
throw new AssertionError("safeGet: listener was completed exceptionally", e);
} catch (TimeoutException e) {
throw new AssertionError("safeAwait: listener was not completed within the timeout", e);
throw new AssertionError("safeGet: listener was not completed within the timeout", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,13 @@
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplanation;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.TransportRestoreSnapshotAction;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStatus;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStats;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
Expand All @@ -21,6 +26,7 @@
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
Expand All @@ -32,6 +38,7 @@
import org.elasticsearch.cluster.routing.allocation.DataTier;
import org.elasticsearch.cluster.routing.allocation.NodeAllocationResult;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -50,6 +57,7 @@
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest;
Expand Down Expand Up @@ -1155,6 +1163,75 @@ private void assertSearchableSnapshotStats(String indexName, boolean cacheEnable
}
}

public void testMountingSnapshotLinksRefreshTaskAsChild() throws Exception {
final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createAndPopulateIndex(indexName, indexSettingsNoReplicas(1).put(INDEX_SOFT_DELETES_SETTING.getKey(), true));

final String repositoryName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createRepository(repositoryName, "fs");

final SnapshotId snapshotOne = createSnapshot(repositoryName, "snapshot-1", List.of(indexName)).snapshotId();
assertAcked(indicesAdmin().prepareDelete(indexName));

// block the cluster state update thread, so we have the opportunity to inspect the mount/restore tasks' metadata
ClusterService clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
clusterService.submitUnbatchedStateUpdateTask("block master service", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
safeAwait(cyclicBarrier); // notify test that we're blocked
safeAwait(cyclicBarrier); // wait to be unblocked by test
return currentState;
}

@Override
public void onFailure(Exception e) {
throw new AssertionError("block master service", e);
}
});

// wait for master thread to be blocked
safeAwait(cyclicBarrier);

ActionFuture<RestoreSnapshotResponse> response = null;
try {
final MountSearchableSnapshotRequest mountRequest = new MountSearchableSnapshotRequest(
indexName,
repositoryName,
snapshotOne.getName(),
indexName,
Settings.EMPTY,
Strings.EMPTY_ARRAY,
true,
MountSearchableSnapshotRequest.Storage.FULL_COPY
);

response = client().execute(MountSearchableSnapshotAction.INSTANCE, mountRequest);
assertBusy(() -> {
TaskInfo restoreSnapshotTask = getTaskForActionFromMaster(TransportRestoreSnapshotAction.TYPE.name());
TaskInfo mountSnapshotTask = getTaskForActionFromMaster(MountSearchableSnapshotAction.NAME);
assertEquals(mountSnapshotTask.taskId(), restoreSnapshotTask.parentTaskId());
});
} finally {
// Unblock the master thread
safeAwait(cyclicBarrier);
// If we started the mount, wait for it to complete, to prevent a race between the mount and the test cleanup
if (response != null) {
safeGet(response);
}
}
}

private TaskInfo getTaskForActionFromMaster(String action) {
ListTasksResponse response = client().execute(
TransportListTasksAction.TYPE,
new ListTasksRequest().setDetailed(true).setNodes(internalCluster().getMasterName()).setActions(action)
).actionGet();
assertThat(response.getTasks(), hasSize(1));
return response.getTasks().get(0);
}

private static long max(long... values) {
return Arrays.stream(values).max().orElseThrow(() -> new AssertionError("no values"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,33 +237,32 @@ protected void masterOperation(
dataTierAllocationSetting.get(indexSettings);
}

client.admin()
.cluster()
.restoreSnapshot(
new RestoreSnapshotRequest(repoName, snapName)
// Restore the single index specified
.indices(indexName)
// Always rename it to the desired mounted index name
.renamePattern(".+")
.renameReplacement(mountedIndexName)
// Pass through index settings, adding the index-level settings required to use searchable snapshots
.indexSettings(indexSettings)
// Pass through ignored index settings
.ignoreIndexSettings(ignoreIndexSettings.toArray(new String[0]))
// Don't include global state
.includeGlobalState(false)
// Don't include aliases
.includeAliases(false)
// Pass through the wait-for-completion flag
.waitForCompletion(request.waitForCompletion())
// Pass through the master-node timeout
.masterNodeTimeout(request.masterNodeTimeout())
// Fail the restore if the snapshot found above is swapped out from under us before the restore happens
.snapshotUuid(snapshotId.getUUID())
// Log snapshot restore at the DEBUG log level
.quiet(true),
delegate
);
RestoreSnapshotRequest restoreSnapshotRequest = new RestoreSnapshotRequest(repoName, snapName)
// Restore the single index specified
.indices(indexName)
// Always rename it to the desired mounted index name
.renamePattern(".+")
.renameReplacement(mountedIndexName)
// Pass through index settings, adding the index-level settings required to use searchable snapshots
.indexSettings(indexSettings)
// Pass through ignored index settings
.ignoreIndexSettings(ignoreIndexSettings.toArray(new String[0]))
// Don't include global state
.includeGlobalState(false)
// Don't include aliases
.includeAliases(false)
// Pass through the wait-for-completion flag
.waitForCompletion(request.waitForCompletion())
// Pass through the master-node timeout
.masterNodeTimeout(request.masterNodeTimeout())
// Fail the restore if the snapshot found above is swapped out from under us before the restore happens
.snapshotUuid(snapshotId.getUUID())
// Log snapshot restore at the DEBUG log level
.quiet(true);
// Specify the mount task as the parent of the refresh task
restoreSnapshotRequest.setParentTask(clusterService.localNode().getId(), task.getId());

client.admin().cluster().restoreSnapshot(restoreSnapshotRequest, delegate);
}), threadPool.executor(ThreadPool.Names.SNAPSHOT_META), null);
}
}

0 comments on commit deb3ef9

Please sign in to comment.