Skip to content

Commit

Permalink
Delay searchable snapshot allocation during shutdown (#86153)
Browse files Browse the repository at this point in the history
During a shutdown, searchable snapshots should not be re-allocated to new nodes,
since this leads to data download. Instead, now wait for the shutdown reallocation
delay before reallocating shards.

Fixes #85052
  • Loading branch information
henningandersen committed Apr 29, 2022
1 parent 4c69202 commit ace5edd
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 16 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/86153.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 86153
summary: Delay searchable snapshot allocation during shutdown
area: "Snapshot/Restore"
type: bug
issues:
- 85052
Original file line number Diff line number Diff line change
Expand Up @@ -250,27 +250,40 @@ public AllocateUnassignedDecision makeAllocationDecision(
// if we didn't manage to find *any* data (regardless of matching sizes), and the replica is
// unassigned due to a node leaving, so we delay allocation of this replica to see if the
// node with the shard copy will rejoin so we can re-use the copy it has
logger.debug("{}: allocation of [{}] is delayed", unassignedShard.shardId(), unassignedShard);
long remainingDelayMillis = 0L;
long totalDelayMillis = 0L;
if (explain) {
UnassignedInfo unassignedInfo = unassignedShard.unassignedInfo();
Metadata metadata = allocation.metadata();
IndexMetadata indexMetadata = metadata.index(unassignedShard.index());
totalDelayMillis = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetadata.getSettings()).getMillis();
long remainingDelayNanos = unassignedInfo.getRemainingDelay(
System.nanoTime(),
indexMetadata.getSettings(),
metadata.nodeShutdowns()
);
remainingDelayMillis = TimeValue.timeValueNanos(remainingDelayNanos).millis();
}
return AllocateUnassignedDecision.delayed(remainingDelayMillis, totalDelayMillis, nodeDecisions);
return delayedDecision(unassignedShard, allocation, logger, nodeDecisions);
}

return AllocateUnassignedDecision.NOT_TAKEN;
}

/**
* Return a delayed decision, filling in the right amount of remaining time if decisions are debugged/explained.
*/
public static AllocateUnassignedDecision delayedDecision(
ShardRouting unassignedShard,
RoutingAllocation allocation,
Logger logger,
List<NodeAllocationResult> nodeDecisions
) {
boolean explain = allocation.debugDecision();
logger.debug("{}: allocation of [{}] is delayed", unassignedShard.shardId(), unassignedShard);
long remainingDelayMillis = 0L;
long totalDelayMillis = 0L;
if (explain) {
UnassignedInfo unassignedInfo = unassignedShard.unassignedInfo();
Metadata metadata = allocation.metadata();
IndexMetadata indexMetadata = metadata.index(unassignedShard.index());
totalDelayMillis = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetadata.getSettings()).getMillis();
long remainingDelayNanos = unassignedInfo.getRemainingDelay(
System.nanoTime(),
indexMetadata.getSettings(),
metadata.nodeShutdowns()
);
remainingDelayMillis = TimeValue.timeValueNanos(remainingDelayNanos).millis();
}
return AllocateUnassignedDecision.delayed(remainingDelayMillis, totalDelayMillis, nodeDecisions);
}

/**
* Determines if the shard can be allocated on at least one node based on the allocation deciders.
*
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugin/searchable-snapshots/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies {
compileOnly project(path: xpackModule('core'))
implementation project(path: 'preallocate')
internalClusterTestImplementation(testArtifact(project(xpackModule('core'))))
internalClusterTestImplementation(project(path: xpackModule('shutdown')))
internalClusterTestImplementation(project(path: ':modules:reindex'))
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.searchablesnapshots.allocation;

import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.client.internal.Requests;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.xpack.searchablesnapshots.BaseSearchableSnapshotsIntegTestCase;
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots;
import org.elasticsearch.xpack.searchablesnapshots.cache.full.CacheService;
import org.elasticsearch.xpack.shutdown.DeleteShutdownNodeAction;
import org.elasticsearch.xpack.shutdown.PutShutdownNodeAction;
import org.elasticsearch.xpack.shutdown.ShutdownPlugin;
import org.hamcrest.Matchers;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, minNumDataNodes = 2)
public class SearchableSnapshotShutdownIntegTests extends BaseSearchableSnapshotsIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), ShutdownPlugin.class);
}

@Override
protected int numberOfShards() {
// use 1 shard per index and instead use multiple indices to have multiple shards.
return 1;
}

public void testAllocationDisabledDuringShutdown() throws Exception {
final List<String> restoredIndexNames = setupMountedIndices();
final String[] restoredIndexNamesArray = restoredIndexNames.toArray(String[]::new);
final Set<String> indexNodes = restoredIndexNames.stream()
.flatMap(index -> internalCluster().nodesInclude(index).stream())
.collect(Collectors.toSet());
final ClusterState state = client().admin().cluster().prepareState().clear().setRoutingTable(true).setNodes(true).get().getState();
final Map<String, String> nodeNameToId = state.getNodes()
.stream()
.collect(Collectors.toMap(DiscoveryNode::getName, DiscoveryNode::getId));

for (String indexNode : indexNodes) {
final String indexNodeId = nodeNameToId.get(indexNode);
putShutdown(indexNodeId);
final int shards = (int) StreamSupport.stream(state.routingTable().allShards(restoredIndexNamesArray).spliterator(), false)
.filter(s -> indexNodeId.equals(s.currentNodeId()))
.count();
assert shards > 0;

assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME);
assertExecutorIsIdle(SearchableSnapshots.CACHE_PREWARMING_THREAD_POOL_NAME);
waitForBlobCacheFillsToComplete();
final CacheService cacheService = internalCluster().getInstance(CacheService.class, indexNode);
cacheService.synchronizeCache();

logger.info("--> Restarting [{}/{}]", indexNodeId, indexNode);
internalCluster().restartNode(indexNode, new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
assertBusy(() -> {
ClusterHealthResponse response = client().admin()
.cluster()
.health(Requests.clusterHealthRequest(restoredIndexNamesArray))
.actionGet();
assertThat(response.getUnassignedShards(), Matchers.equalTo(shards));
});
return super.onNodeStopped(nodeName);
}
});
// leave shutdown in place for some nodes to check that shards get assigned anyway.
if (randomBoolean()) {
removeShutdown(indexNodeId);
}
}

ensureGreen(restoredIndexNamesArray);
}

private List<String> setupMountedIndices() throws Exception {
int count = between(1, 10);
List<String> restoredIndices = new ArrayList<>();
final String repositoryName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createRepository(repositoryName, "mock");

for (int i = 0; i < count; ++i) {
final String indexName = "index_" + i;
createAndPopulateIndex(indexName, Settings.builder());

final SnapshotId snapshotId = createSnapshot(repositoryName, "snapshot-" + i, List.of(indexName)).snapshotId();
assertAcked(client().admin().indices().prepareDelete(indexName));
restoredIndices.add(mountSnapshot(repositoryName, snapshotId.getName(), indexName, Settings.EMPTY));
}
return restoredIndices;
}

private void putShutdown(String nodeToRestartId) throws InterruptedException, ExecutionException {
PutShutdownNodeAction.Request putShutdownRequest = new PutShutdownNodeAction.Request(
nodeToRestartId,
SingleNodeShutdownMetadata.Type.RESTART,
this.getTestName(),
null,
null
);
assertTrue(client().execute(PutShutdownNodeAction.INSTANCE, putShutdownRequest).get().isAcknowledged());
}

private void removeShutdown(String node) throws ExecutionException, InterruptedException {
assertTrue(client().execute(DeleteShutdownNodeAction.INSTANCE, new DeleteShutdownNodeAction.Request(node)).get().isAcknowledged());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RecoverySource;
Expand Down Expand Up @@ -341,11 +342,26 @@ private AllocateUnassignedDecision decideAllocation(RoutingAllocation allocation
);
return AllocateUnassignedDecision.yes(nodeWithHighestMatch.node(), null, nodeDecisions, true);
}
} else if (isDelayedDueToNodeRestart(allocation, shardRouting)) {
return ReplicaShardAllocator.delayedDecision(shardRouting, allocation, logger, nodeDecisions);
}

// TODO: do we need handling of delayed allocation for leaving replicas here?
return AllocateUnassignedDecision.NOT_TAKEN;
}

private boolean isDelayedDueToNodeRestart(RoutingAllocation allocation, ShardRouting shardRouting) {
if (shardRouting.unassignedInfo().isDelayed()) {
String lastAllocatedNodeId = shardRouting.unassignedInfo().getLastAllocatedNodeId();
if (lastAllocatedNodeId != null) {
SingleNodeShutdownMetadata nodeShutdownMetadata = allocation.nodeShutdowns().get(lastAllocatedNodeId);
return nodeShutdownMetadata != null && nodeShutdownMetadata.getType() == SingleNodeShutdownMetadata.Type.RESTART;
}
}

return false;
}

@Override
public AllocateUnassignedDecision explainUnassignedShardAllocation(ShardRouting shardRouting, RoutingAllocation routingAllocation) {
assert shardRouting.unassigned();
Expand Down

0 comments on commit ace5edd

Please sign in to comment.