Skip to content

Commit

Permalink
Avoid async cache-size fetch on partial shards (#68644)
Browse files Browse the repository at this point in the history
Today we perform an async fetch for every searchable snapshot shard
while allocating it, so that we can prefer to allocate it to the node
that holds the warmest cache for that shard. For partial shards, there
is no persistently-cached data to reuse, so we can skip the async fetch.
  • Loading branch information
DaveCTurner committed Feb 8, 2021
1 parent ed6de0c commit 138cf89
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 5 deletions.
Expand Up @@ -56,6 +56,7 @@

import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_PARTIAL_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_REPOSITORY_NAME_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING;
Expand Down Expand Up @@ -262,6 +263,12 @@ public int getNumberOfInFlightFetches() {
private AsyncShardFetch.FetchResult<NodeCacheFilesMetadata> fetchData(ShardRouting shard, RoutingAllocation allocation) {
final ShardId shardId = shard.shardId();
final Settings indexSettings = allocation.metadata().index(shard.index()).getSettings();

if (SNAPSHOT_PARTIAL_SETTING.get(indexSettings)) {
// cached data for partial indices is not persistent, no need to fetch it
return new AsyncShardFetch.FetchResult<>(shardId, Collections.emptyMap(), Collections.emptySet());
}

final SnapshotId snapshotId = new SnapshotId(
SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings),
SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings)
Expand Down
Expand Up @@ -29,6 +29,9 @@

import java.io.IOException;
import java.util.List;
import java.util.Optional;

import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_PARTIAL_SETTING;

public class TransportSearchableSnapshotCacheStoresAction extends TransportNodesAction<
TransportSearchableSnapshotCacheStoresAction.Request,
Expand Down Expand Up @@ -87,6 +90,9 @@ protected NodeCacheFilesMetadata newNodeResponse(StreamInput in) throws IOExcept
@Override
protected NodeCacheFilesMetadata nodeOperation(NodeRequest request) {
assert cacheService != null;
assert Optional.ofNullable(clusterService.state().metadata().index(request.shardId.getIndex()))
.map(indexMetadata -> SNAPSHOT_PARTIAL_SETTING.get(indexMetadata.getSettings()))
.orElse(false) == false : request.shardId + " is partial, should not be fetching its cached size";
return new NodeCacheFilesMetadata(clusterService.localNode(), cacheService.getCachedSize(request.shardId, request.snapshotId));
}

Expand Down
Expand Up @@ -46,9 +46,11 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;

import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_PARTIAL_SETTING;
import static org.hamcrest.Matchers.empty;

public class SearchableSnapshotAllocatorTests extends ESAllocationTestCase {
Expand Down Expand Up @@ -153,7 +155,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void doE
Request request,
ActionListener<Response> listener
) {
throw new AssertionError("Expecting no requests but received [" + action + "]");
throw new AssertionError("Expecting no requests but received [" + action.name() + "]");
}
};

Expand All @@ -167,15 +169,66 @@ public <Request extends ActionRequest, Response extends ActionResponse> void doE
assertTrue(allocation.routingTable().index(shardId.getIndex()).allPrimaryShardsUnassigned());
}

public void testNoFetchesForPartialIndex() {
final ShardId shardId = new ShardId("test", "_na_", 0);
final List<DiscoveryNode> nodes = randomList(1, 10, () -> newNode("node-" + UUIDs.randomBase64UUID(random())));
final DiscoveryNode localNode = randomFrom(nodes);
final Settings localNodeSettings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getName()).build();

final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(localNodeSettings, random());

final Metadata metadata = buildSingleShardIndexMetadata(shardId, builder -> builder.put(SNAPSHOT_PARTIAL_SETTING.getKey(), true));
final RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
routingTableBuilder.addAsRestore(metadata.index(shardId.getIndex()), randomSnapshotSource(shardId));

final ClusterState state = buildClusterState(nodes, metadata, routingTableBuilder);
final RoutingAllocation allocation = buildAllocation(
deterministicTaskQueue,
state,
randomNonNegativeLong(),
yesAllocationDeciders()
);

final Client client = new NoOpNodeClient(deterministicTaskQueue.getThreadPool()) {
@Override
public <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
ActionType<Response> action,
Request request,
ActionListener<Response> listener
) {
throw new AssertionError("Expecting no requests but received [" + action.name() + "]");
}
};

final SearchableSnapshotAllocator allocator = new SearchableSnapshotAllocator(
client,
(reason, priority, listener) -> { throw new AssertionError("Expecting no reroutes"); }
);
allocateAllUnassigned(allocation, allocator);
assertFalse(allocation.routingNodesChanged());
assertThat(allocation.routingNodes().assignedShards(shardId), empty());
assertTrue(allocation.routingTable().index(shardId.getIndex()).allPrimaryShardsUnassigned());
}

private static Metadata buildSingleShardIndexMetadata(ShardId shardId) {
return buildSingleShardIndexMetadata(shardId, UnaryOperator.identity());
}

private static Metadata buildSingleShardIndexMetadata(ShardId shardId, UnaryOperator<Settings.Builder> extraSettings) {
return Metadata.builder()
.put(
IndexMetadata.builder(shardId.getIndexName())
.settings(
settings(Version.CURRENT).put(
ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING.getKey(),
SearchableSnapshotAllocator.ALLOCATOR_NAME
).put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), SearchableSnapshotsConstants.SNAPSHOT_DIRECTORY_FACTORY_KEY)
extraSettings.apply(
settings(Version.CURRENT).put(
ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING.getKey(),
SearchableSnapshotAllocator.ALLOCATOR_NAME
)
.put(
IndexModule.INDEX_STORE_TYPE_SETTING.getKey(),
SearchableSnapshotsConstants.SNAPSHOT_DIRECTORY_FACTORY_KEY
)
)
)
.numberOfShards(1)
.numberOfReplicas(0)
Expand Down

0 comments on commit 138cf89

Please sign in to comment.