Skip to content

Commit

Permalink
Async creation of IndexShard instances (#95121)
Browse files Browse the repository at this point in the history
Today when applying a new cluster state we block the cluster applier thread for
up to 5s while waiting to acquire each shard lock. Failure to acquire the shard
lock is treated as an allocation failure, so after 5 retries (by default) we
give up on the allocation.

The shard lock may be held by some other actor, typically the previous
incarnation of the shard which is still shutting down, but it will eventually
be released. Yet, 5 retries of 5s each is sometimes not enough time to wait.
Knowing that the shard lock will eventually be released, we can retry much more
tenaciously.

Moreover there's no reason why we have to create the `IndexShard` while
applying the cluster state, because the shard remains in state `INITIALIZING`,
and therefore unused, while it coordinates its own recovery.

With this commit we try and acquire the shard lock during cluster state
application, but do not wait if the lock is unavailable. Instead, we schedule a
retry (also executed on the cluster state applier thread) and proceed with the
rest of the cluster state application process.

Relates #24530
Backport of #94545 and #94623 (and a little bit of #94417) to 8.7
  • Loading branch information
DaveCTurner committed Apr 11, 2023
1 parent 4a1eb73 commit 1153a7d
Show file tree
Hide file tree
Showing 8 changed files with 397 additions and 47 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/94545.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 94545
summary: Async creation of `IndexShard` instances
area: Recovery
type: bug
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.indices.cluster;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.LogEvent;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.junit.annotations.TestLogging;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;

public class ShardLockFailureIT extends ESIntegTestCase {

@TestLogging(reason = "checking DEBUG logs from ICSS", value = "org.elasticsearch.indices.cluster.IndicesClusterStateService:DEBUG")
public void testShardLockFailure() throws Exception {
final var node = internalCluster().startDataOnlyNode(
Settings.builder()
.put(IndicesClusterStateService.SHARD_LOCK_RETRY_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(10))
.put(IndicesClusterStateService.SHARD_LOCK_RETRY_TIMEOUT_SETTING.getKey(), TimeValue.timeValueDays(10))
.build()
);

final var indexName = "testindex";
createIndex(
indexName,
Settings.builder()
.put(IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + "._name", node)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-all")
.build()
);
ensureGreen(indexName);

final var shardId = new ShardId(resolveIndex(indexName), 0);

internalCluster().getCurrentMasterNodeInstance(ClusterService.class).addListener(event -> {
try {
assertTrue(
event.state()
.routingTable()
.shardRoutingTable(shardId)
.allShards()
.noneMatch(sr -> sr.unassigned() && sr.unassignedInfo().getNumFailedAllocations() > 0)
);
} catch (IndexNotFoundException e) {
// ok
}
});

var mockLogAppender = new MockLogAppender();
try (
var ignored1 = internalCluster().getInstance(NodeEnvironment.class, node).shardLock(shardId, "blocked for test");
var ignored2 = mockLogAppender.capturing(IndicesClusterStateService.class);
) {
final CountDownLatch countDownLatch = new CountDownLatch(1);

mockLogAppender.addExpectation(new MockLogAppender.LoggingExpectation() {
int debugMessagesSeen = 0;
int warnMessagesSeen = 0;

@Override
public synchronized void match(LogEvent event) {
try {
assertEquals("org.elasticsearch.indices.cluster.IndicesClusterStateService", event.getLoggerName());
if (event.getMessage().getFormattedMessage().matches("shard lock for .* has been unavailable for at least .*")) {
if (event.getLevel() == Level.WARN) {
warnMessagesSeen += 1;
assertEquals(29L * warnMessagesSeen - 24, debugMessagesSeen);
if (warnMessagesSeen == 3) {
countDownLatch.countDown();
}
} else if (event.getLevel() == Level.DEBUG) {
debugMessagesSeen += 1;
} else {
fail("unexpected log level: " + event.getLevel());
}
}
} catch (Throwable t) {
ExceptionsHelper.maybeDieOnAnotherThread(new AssertionError("unexpected", t));
}
}

@Override
public void assertMatched() {}
});

updateIndexSettings(indexName, Settings.builder().putNull(IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + "._name"));
ensureYellow(indexName);
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
assertEquals(ClusterHealthStatus.YELLOW, client().admin().cluster().prepareHealth(indexName).get().getStatus());
mockLogAppender.assertAllExpectationsMatched();
}

ensureGreen(indexName);
}

@TestLogging(reason = "checking WARN logs from ICSS", value = "org.elasticsearch.indices.cluster.IndicesClusterStateService:WARN")
public void testShardLockTimeout() throws Exception {
final var node = internalCluster().startDataOnlyNode(
Settings.builder()
.put(IndicesClusterStateService.SHARD_LOCK_RETRY_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(10))
.put(IndicesClusterStateService.SHARD_LOCK_RETRY_TIMEOUT_SETTING.getKey(), TimeValue.timeValueMillis(100))
.build()
);

final var indexName = "testindex";
createIndex(
indexName,
Settings.builder()
.put(IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + "._name", node)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-all")
.put(MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY.getKey(), 1)
.build()
);
ensureGreen(indexName);

final var shardId = new ShardId(resolveIndex(indexName), 0);

var mockLogAppender = new MockLogAppender();
try (
var ignored1 = internalCluster().getInstance(NodeEnvironment.class, node).shardLock(shardId, "blocked for test");
var ignored2 = mockLogAppender.capturing(IndicesClusterStateService.class);
) {
mockLogAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"timeout message",
"org.elasticsearch.indices.cluster.IndicesClusterStateService",
Level.WARN,
"""
timed out after [indices.store.shard_lock_retry.timeout=100ms/100ms] \
while waiting to acquire shard lock for [testindex][0]"""
)
);

updateIndexSettings(indexName, Settings.builder().putNull(IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + "._name"));
assertBusy(mockLogAppender::assertAllExpectationsMatched);
final var clusterHealthResponse = client().admin()
.cluster()
.prepareHealth(indexName)
.setWaitForEvents(Priority.LANGUID)
.setTimeout(TimeValue.timeValueSeconds(10))
.setWaitForNoInitializingShards(true)
.setWaitForNoRelocatingShards(true)
.get();
assertFalse(clusterHealthResponse.isTimedOut());
assertEquals(ClusterHealthStatus.YELLOW, clusterHealthResponse.getStatus());
assertEquals(1, clusterHealthResponse.getUnassignedShards());
}

assertAcked(client().admin().cluster().prepareReroute().setRetryFailed(true));
ensureGreen(indexName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;

/**
* {@link IndexShardRoutingTable} encapsulates all instances of a single shard.
Expand Down Expand Up @@ -135,6 +136,10 @@ public ShardRouting shard(int idx) {
return shards[idx];
}

public Stream<ShardRouting> allShards() {
return Stream.of(shards);
}

/**
* Returns a {@link List} of active shards
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -282,12 +283,10 @@ public void run() {
}

/**
* Run the given clusterStateConsumer on the applier thread. Should only be used in tests and by {@link IndicesStore} when it's deleting
* the data behind a shard that moved away from a node.
*
* @param priority {@link Priority#HIGH} unless in tests.
* Run the given {@code clusterStateConsumer} on the applier thread. Should only be used in tests, by {@link IndicesClusterStateService}
* when trying to acquire shard locks and create shards, and by {@link IndicesStore} when it's deleting the data behind a shard that
* moved away from a node.
*/
// TODO get rid of this, make it so that shard data can be deleted without blocking the applier thread.
public void runOnApplierThread(
String source,
Priority priority,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.elasticsearch.indices.analysis.HunspellService;
import org.elasticsearch.indices.breaker.BreakerSettings;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore;
Expand Down Expand Up @@ -565,7 +566,9 @@ public void apply(Settings value, Settings current, Settings previous) {
TcpTransport.isUntrustedRemoteClusterEnabled() ? RemoteClusterPortSettings.TCP_NO_DELAY : null,
TcpTransport.isUntrustedRemoteClusterEnabled() ? RemoteClusterPortSettings.TCP_REUSE_ADDRESS : null,
TcpTransport.isUntrustedRemoteClusterEnabled() ? RemoteClusterPortSettings.TCP_SEND_BUFFER_SIZE : null,
StatelessSecureSettings.STATELESS_SECURE_SETTINGS
StatelessSecureSettings.STATELESS_SECURE_SETTINGS,
IndicesClusterStateService.SHARD_LOCK_RETRY_INTERVAL_SETTING,
IndicesClusterStateService.SHARD_LOCK_RETRY_TIMEOUT_SETTING
).filter(Objects::nonNull).collect(Collectors.toSet());

static List<SettingUpgrader<?>> BUILT_IN_SETTING_UPGRADERS = Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.env.ShardLockObtainFailedException;
import org.elasticsearch.gateway.MetadataStateFormat;
import org.elasticsearch.gateway.WriteStateException;
import org.elasticsearch.index.analysis.IndexAnalyzers;
Expand Down Expand Up @@ -434,7 +433,9 @@ public synchronized IndexShard createShard(
ShardLock lock = null;
eventListener.beforeIndexShardCreated(routing, indexSettings);
try {
lock = nodeEnv.shardLock(shardId, "starting shard", TimeUnit.SECONDS.toMillis(5));
// Try and acquire the shard lock, but we are on the cluster applier thread so we do not wait if it is unavailable; in that
// case, the IndicesClusterStateService will try again (in the background)
lock = nodeEnv.shardLock(shardId, "starting shard");
ShardPath path;
try {
path = ShardPath.loadShardPath(logger, nodeEnv, shardId, this.indexSettings.customDataPath());
Expand Down Expand Up @@ -533,8 +534,6 @@ public synchronized IndexShard createShard(
shards = Maps.copyMapWithAddedEntry(shards, shardId.id(), indexShard);
success = true;
return indexShard;
} catch (ShardLockObtainFailedException e) {
throw new IOException("failed to obtain in-memory shard lock", e);
} finally {
if (success == false) {
if (lock != null) {
Expand Down

0 comments on commit 1153a7d

Please sign in to comment.