Skip to content

Commit

Permalink
Determine shard size before allocating shards recovering from snapsho…
Browse files Browse the repository at this point in the history
…ts (#61906) (#63337)

Determines the shard size of shards before allocating shards that are
recovering from snapshots. It ensures during shard allocation that the
target node that is selected as recovery target will have enough free
disk space for the recovery event. This applies to regular restores,
CCR bootstrap from remote, as well as mounting searchable snapshots.

The InternalSnapshotInfoService is responsible for fetching snapshot
shard sizes from repositories. It provides a getShardSize() method
to other components of the system that can be used to retrieve the
latest known shard size. If the latest snapshot shard size retrieval
failed, the getShardSize() returns
ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE. While
we'd like a better way to handle such failures, returning this value
allows to keep the existing behavior for now.

Note that this PR does not address an issues (we already have today)
where a replica is being allocated without knowing how much disk
space is being used by the primary.

Co-authored-by: Yannick Welsch <yannick@welsch.lu>
  • Loading branch information
tlrx and ywelsch committed Oct 6, 2020
1 parent 733e89d commit 87076c3
Show file tree
Hide file tree
Showing 54 changed files with 1,329 additions and 180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.snapshots.EmptySnapshotsInfoService;

import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -79,7 +80,8 @@ public static AllocationService createAllocationService(Settings settings, Clust
defaultAllocationDeciders(settings, clusterSettings),
NoopGatewayAllocator.INSTANCE,
new BalancedShardsAllocator(settings),
EmptyClusterInfoService.INSTANCE
EmptyClusterInfoService.INSTANCE,
EmptySnapshotsInfoService.INSTANCE
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.lucene.mockfile.FilterFileSystemProvider;
import org.apache.lucene.mockfile.FilterPath;
import org.apache.lucene.util.Constants;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.cluster.ClusterInfoService;
Expand All @@ -32,6 +34,7 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.Rebalance;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.io.PathUtils;
Expand All @@ -44,6 +47,10 @@
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.monitor.fs.FsService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.junit.After;
Expand All @@ -62,6 +69,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -141,29 +149,95 @@ public void testHighWatermarkNotExceeded() throws Exception {
final String dataNode0Id = internalCluster().getInstance(NodeEnvironment.class, dataNodeName).nodeId();
final Path dataNode0Path = internalCluster().getInstance(Environment.class, dataNodeName).dataFiles()[0];

createIndex("test", Settings.builder()
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createIndex(indexName, Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 6)
.put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms")
.build());
final long minShardSize = createReasonableSizedShards();
final long minShardSize = createReasonableSizedShards(indexName);

// reduce disk size of node 0 so that no shards fit below the high watermark, forcing all shards onto the other data node
// (subtract the translog size since the disk threshold decider ignores this and may therefore move the shard back again)
fileSystemProvider.getTestFileStore(dataNode0Path).setTotalSpace(minShardSize + WATERMARK_BYTES - 1L);
refreshDiskUsage();
assertBusy(() -> assertThat(getShardRoutings(dataNode0Id), empty()));
assertBusy(() -> assertThat(getShardRoutings(dataNode0Id, indexName), empty()));

// increase disk size of node 0 to allow just enough room for one shard, and check that it's rebalanced back
fileSystemProvider.getTestFileStore(dataNode0Path).setTotalSpace(minShardSize + WATERMARK_BYTES + 1L);
refreshDiskUsage();
assertBusy(() -> assertThat(getShardRoutings(dataNode0Id), hasSize(1)));
assertBusy(() -> assertThat(getShardRoutings(dataNode0Id, indexName), hasSize(1)));
}

private Set<ShardRouting> getShardRoutings(String nodeId) {
public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Exception {
internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNode();
final String dataNodeName = internalCluster().startDataOnlyNode();
ensureStableCluster(3);

assertAcked(client().admin().cluster().preparePutRepository("repo")
.setType(FsRepository.TYPE)
.setSettings(Settings.builder()
.put("location", randomRepoPath())
.put("compress", randomBoolean())));

final InternalClusterInfoService clusterInfoService
= (InternalClusterInfoService) internalCluster().getMasterNodeInstance(ClusterInfoService.class);
internalCluster().getMasterNodeInstance(ClusterService.class).addListener(event -> clusterInfoService.refresh());

final String dataNode0Id = internalCluster().getInstance(NodeEnvironment.class, dataNodeName).nodeId();
final Path dataNode0Path = internalCluster().getInstance(Environment.class, dataNodeName).dataFiles()[0];

final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createIndex(indexName, Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 6)
.put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms")
.build());
final long minShardSize = createReasonableSizedShards(indexName);

final CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("repo", "snap")
.setWaitForCompletion(true).get();
final SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo();
assertThat(snapshotInfo.successfulShards(), is(snapshotInfo.totalShards()));
assertThat(snapshotInfo.state(), is(SnapshotState.SUCCESS));

assertAcked(client().admin().indices().prepareDelete(indexName).get());

// reduce disk size of node 0 so that no shards fit below the low watermark, forcing shards to be assigned to the other data node
fileSystemProvider.getTestFileStore(dataNode0Path).setTotalSpace(minShardSize + WATERMARK_BYTES - 1L);
refreshDiskUsage();

assertAcked(client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder()
.put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), Rebalance.NONE.toString())
.build())
.get());

final RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("repo", "snap")
.setWaitForCompletion(true).get();
final RestoreInfo restoreInfo = restoreSnapshotResponse.getRestoreInfo();
assertThat(restoreInfo.successfulShards(), is(snapshotInfo.totalShards()));
assertThat(restoreInfo.failedShards(), is(0));

assertBusy(() -> assertThat(getShardRoutings(dataNode0Id, indexName), empty()));

assertAcked(client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder()
.putNull(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey())
.build())
.get());

// increase disk size of node 0 to allow just enough room for one shard, and check that it's rebalanced back
fileSystemProvider.getTestFileStore(dataNode0Path).setTotalSpace(minShardSize + WATERMARK_BYTES + 1L);
refreshDiskUsage();
assertBusy(() -> assertThat(getShardRoutings(dataNode0Id, indexName), hasSize(1)));
}

private Set<ShardRouting> getShardRoutings(final String nodeId, final String indexName) {
final Set<ShardRouting> shardRoutings = new HashSet<>();
for (IndexShardRoutingTable indexShardRoutingTable : client().admin().cluster().prepareState().clear().setRoutingTable(true)
.get().getState().getRoutingTable().index("test")) {
.get().getState().getRoutingTable().index(indexName)) {
for (ShardRouting shard : indexShardRoutingTable.shards()) {
assertThat(shard.state(), equalTo(ShardRoutingState.STARTED));
if (shard.currentNodeId().equals(nodeId)) {
Expand All @@ -177,17 +251,17 @@ private Set<ShardRouting> getShardRoutings(String nodeId) {
/**
* Index documents until all the shards are at least WATERMARK_BYTES in size, and return the size of the smallest shard
*/
private long createReasonableSizedShards() throws InterruptedException {
private long createReasonableSizedShards(final String indexName) throws InterruptedException {
while (true) {
final IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[scaledRandomIntBetween(100, 10000)];
for (int i = 0; i < indexRequestBuilders.length; i++) {
indexRequestBuilders[i] = client().prepareIndex("test", "_doc").setSource("field", randomAlphaOfLength(10));
indexRequestBuilders[i] = client().prepareIndex(indexName, "_doc").setSource("field", randomAlphaOfLength(10));
}
indexRandom(true, indexRequestBuilders);
forceMerge();
refresh();

final ShardStats[] shardStatses = client().admin().indices().prepareStats("test")
final ShardStats[] shardStatses = client().admin().indices().prepareStats(indexName)
.clear().setStore(true).setTranslog(true).get().getShards();
final long[] shardSizes = new long[shardStatses.length];
for (ShardStats shardStats : shardStatses) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.snapshots.SnapshotsInfoService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand All @@ -58,6 +59,7 @@ public class TransportClusterAllocationExplainAction
private static final Logger logger = LogManager.getLogger(TransportClusterAllocationExplainAction.class);

private final ClusterInfoService clusterInfoService;
private final SnapshotsInfoService snapshotsInfoService;
private final AllocationDeciders allocationDeciders;
private final ShardsAllocator shardAllocator;
private final AllocationService allocationService;
Expand All @@ -66,11 +68,13 @@ public class TransportClusterAllocationExplainAction
public TransportClusterAllocationExplainAction(TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterInfoService clusterInfoService, AllocationDeciders allocationDeciders,
ClusterInfoService clusterInfoService, SnapshotsInfoService snapshotsInfoService,
AllocationDeciders allocationDeciders,
ShardsAllocator shardAllocator, AllocationService allocationService) {
super(ClusterAllocationExplainAction.NAME, transportService, clusterService, threadPool, actionFilters,
ClusterAllocationExplainRequest::new, indexNameExpressionResolver);
this.clusterInfoService = clusterInfoService;
this.snapshotsInfoService = snapshotsInfoService;
this.allocationDeciders = allocationDeciders;
this.shardAllocator = shardAllocator;
this.allocationService = allocationService;
Expand All @@ -97,7 +101,7 @@ protected void masterOperation(final ClusterAllocationExplainRequest request, fi
final RoutingNodes routingNodes = state.getRoutingNodes();
final ClusterInfo clusterInfo = clusterInfoService.getClusterInfo();
final RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, state,
clusterInfo, System.nanoTime());
clusterInfo, snapshotsInfoService.snapshotShardSizes(), System.nanoTime());

ShardRouting shardRouting = findShardToExplain(request, allocation);
logger.debug("explaining the allocation for [{}], found shard [{}]", request, shardRouting);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.carrotsearch.hppc.ObjectHashSet;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -33,7 +32,6 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.StoreStats;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.elasticsearch.persistent.PersistentTasksNodeService;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.script.ScriptMetadata;
import org.elasticsearch.snapshots.SnapshotsInfoService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskResultsService;

Expand Down Expand Up @@ -110,14 +111,14 @@ public class ClusterModule extends AbstractModule {
final ShardsAllocator shardsAllocator;

public ClusterModule(Settings settings, ClusterService clusterService, List<ClusterPlugin> clusterPlugins,
ClusterInfoService clusterInfoService) {
ClusterInfoService clusterInfoService, SnapshotsInfoService snapshotsInfoService) {
this.clusterPlugins = clusterPlugins;
this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
this.allocationDeciders = new AllocationDeciders(deciderList);
this.shardsAllocator = createShardsAllocator(settings, clusterService.getClusterSettings(), clusterPlugins);
this.clusterService = clusterService;
this.indexNameExpressionResolver = new IndexNameExpressionResolver();
this.allocationService = new AllocationService(allocationDeciders, shardsAllocator, clusterInfoService);
this.allocationService = new AllocationService(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService);
}

public static List<Entry> getNamedWriteables() {
Expand Down

0 comments on commit 87076c3

Please sign in to comment.