diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/mget/40_routing.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/mget/40_routing.yml index 7169c0ec25001..d08c9b6795e95 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/mget/40_routing.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/mget/40_routing.yml @@ -8,7 +8,7 @@ routing: index: number_of_shards: 5 number_of_routing_shards: 5 - number_of_replicas: 0 + auto_expand_replicas: 0-1 - do: cluster.health: @@ -51,11 +51,15 @@ requires routing: settings: index: number_of_shards: 5 - number_of_replicas: 0 + auto_expand_replicas: 0-1 mappings: _routing: required: true + - do: + cluster.health: + wait_for_status: green + - do: index: index: test_1 diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/mget/60_realtime_refresh.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/mget/60_realtime_refresh.yml index 2f2036217d8dc..a5baf94d2123b 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/mget/60_realtime_refresh.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/mget/60_realtime_refresh.yml @@ -9,7 +9,7 @@ settings: index: refresh_interval: -1 - number_of_replicas: 0 + auto_expand_replicas: 0-1 - do: cluster.health: diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java index 5e8a2280960f9..b5f540919ecfa 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java @@ -180,7 +180,7 @@ private void asyncGet(GetRequest request, ShardId shardId, ActionListener listener) throws IOException { ShardId shardId = indexShard.shardId(); - DiscoveryNode node = getCurrentNodeOfPrimary(shardId); + var node = getCurrentNodeOfPrimary(clusterService.state(), shardId); if (request.refresh()) { logger.trace("send refresh action for shard {} to node {}", shardId, node.getId()); var refreshRequest = new BasicReplicationRequest(shardId); @@ -226,8 +226,7 @@ private void handleGetOnUnpromotableShard(GetRequest request, IndexShard indexSh } } - private DiscoveryNode getCurrentNodeOfPrimary(ShardId shardId) { - var clusterState = clusterService.state(); + static DiscoveryNode getCurrentNodeOfPrimary(ClusterState clusterState, ShardId shardId) { var shardRoutingTable = clusterState.routingTable().shardRoutingTable(shardId); if (shardRoutingTable.primaryShard() == null || shardRoutingTable.primaryShard().active() == false) { throw new NoShardAvailableActionException(shardId, "primary shard is not active"); diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java index d9104545cc73f..a121fd7706148 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java @@ -9,14 +9,21 @@ package org.elasticsearch.action.get; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.admin.indices.refresh.TransportShardRefreshAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.TransportActions; +import org.elasticsearch.action.support.replication.BasicReplicationRequest; import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; +import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.PlainShardIterator; import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.Writeable; @@ -26,20 +33,25 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.ExecutorSelector; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import static org.elasticsearch.action.get.TransportGetAction.getCurrentNodeOfPrimary; import static org.elasticsearch.core.Strings.format; public class TransportShardMultiGetAction extends TransportSingleShardAction { private static final String ACTION_NAME = MultiGetAction.NAME + "[shard]"; public static final ActionType TYPE = new ActionType<>(ACTION_NAME, MultiGetShardResponse::new); + private static final Logger logger = LogManager.getLogger(TransportShardMultiGetAction.class); private final IndicesService indicesService; private final ExecutorSelector executorSelector; + private final NodeClient client; @Inject public TransportShardMultiGetAction( @@ -49,7 +61,8 @@ public TransportShardMultiGetAction( ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - ExecutorSelector executorSelector + ExecutorSelector executorSelector, + NodeClient client ) { super( ACTION_NAME, @@ -63,6 +76,7 @@ public TransportShardMultiGetAction( ); this.indicesService = indicesService; this.executorSelector = executorSelector; + this.client = client; } @Override @@ -84,7 +98,10 @@ protected boolean resolveIndex(MultiGetShardRequest request) { protected ShardIterator shards(ClusterState state, InternalRequest request) { ShardIterator iterator = clusterService.operationRouting() .getShards(state, request.request().index(), request.request().shardId(), request.request().preference()); - return clusterService.operationRouting().useOnlyPromotableShardsForStateless(iterator); + if (iterator == null) { + return null; + } + return new PlainShardIterator(iterator.shardId(), iterator.getShardRoutings().stream().filter(ShardRouting::isSearchable).toList()); } @Override @@ -92,6 +109,12 @@ protected void asyncShardOperation(MultiGetShardRequest request, ShardId shardId throws IOException { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); IndexShard indexShard = indexService.getShard(shardId.id()); + if (indexShard.routingEntry().isPromotableToPrimary() == false) { + handleMultiGetOnUnpromotableShard(request, indexShard, listener); + return; + } + assert DiscoveryNode.isStateless(clusterService.getSettings()) == false + : "A TransportShardMultiGetAction should always be handled by a search shard in Stateless"; if (request.realtime()) { // we are not tied to a refresh cycle here anyway asyncShardMultiGet(request, shardId, listener); } else { @@ -107,35 +130,10 @@ protected void asyncShardOperation(MultiGetShardRequest request, ShardId shardId @Override protected MultiGetShardResponse shardOperation(MultiGetShardRequest request, ShardId shardId) { - var indexShard = getIndexShard(shardId); MultiGetShardResponse response = new MultiGetShardResponse(); for (int i = 0; i < request.locations.size(); i++) { - MultiGetRequest.Item item = request.items.get(i); - try { - GetResult getResult = indexShard.getService() - .get( - item.id(), - item.storedFields(), - request.realtime(), - item.version(), - item.versionType(), - item.fetchSourceContext(), - request.isForceSyntheticSource() - ); - response.add(request.locations.get(i), new GetResponse(getResult)); - } catch (RuntimeException e) { - if (TransportActions.isShardNotAvailableException(e)) { - throw e; - } else { - logger.debug(() -> format("%s failed to execute multi_get for [%s]", shardId, item.id()), e); - response.add(request.locations.get(i), new MultiGetResponse.Failure(request.index(), item.id(), e)); - } - } catch (IOException e) { - logger.debug(() -> format("%s failed to execute multi_get for [%s]", shardId, item.id()), e); - response.add(request.locations.get(i), new MultiGetResponse.Failure(request.index(), item.id(), e)); - } + getAndAddToResponse(shardId, i, request, response); } - return response; } @@ -151,6 +149,110 @@ protected String getExecutor(MultiGetShardRequest request, ShardId shardId) { } } + private void handleMultiGetOnUnpromotableShard( + MultiGetShardRequest request, + IndexShard indexShard, + ActionListener listener + ) throws IOException { + ShardId shardId = indexShard.shardId(); + var node = getCurrentNodeOfPrimary(clusterService.state(), shardId); + if (request.refresh()) { + logger.trace("send refresh action for shard {} to node {}", shardId, node.getId()); + var refreshRequest = new BasicReplicationRequest(shardId); + refreshRequest.setParentTask(request.getParentTask()); + client.executeLocally( + TransportShardRefreshAction.TYPE, + refreshRequest, + listener.delegateFailureAndWrap((l, replicationResponse) -> super.asyncShardOperation(request, shardId, l)) + ); + } else if (request.realtime()) { + TransportShardMultiGetFomTranslogAction.Request mgetFromTranslogRequest = new TransportShardMultiGetFomTranslogAction.Request( + request, + shardId + ); + mgetFromTranslogRequest.setParentTask(request.getParentTask()); + transportService.sendRequest( + node, + TransportShardMultiGetFomTranslogAction.NAME, + mgetFromTranslogRequest, + new ActionListenerResponseHandler<>(listener.delegateFailure((l, r) -> { + var responseHasMissingLocations = false; + for (int i = 0; i < r.multiGetShardResponse().locations.size(); i++) { + if (r.multiGetShardResponse().responses.get(i) == null && r.multiGetShardResponse().failures.get(i) == null) { + responseHasMissingLocations = true; + break; + } + } + if (responseHasMissingLocations == false) { + logger.debug("received result of all ids in real-time mget[shard] from the promotable shard."); + l.onResponse(r.multiGetShardResponse()); + } else { + logger.debug( + "no result for some ids from the promotable shard (segment generation to wait for: {})", + r.segmentGeneration() + ); + if (r.segmentGeneration() == -1) { + // Nothing to wait for (no previous unsafe generation), just handle the rest locally. + ActionRunnable.supply(l, () -> handleLocalGets(request, r.multiGetShardResponse(), shardId)).run(); + } else { + assert r.segmentGeneration() > -1L; + indexShard.waitForSegmentGeneration( + r.segmentGeneration(), + listener.delegateFailureAndWrap( + (ll, aLong) -> threadPool.executor(getExecutor(request, shardId)) + .execute( + ActionRunnable.supply(ll, () -> handleLocalGets(request, r.multiGetShardResponse(), shardId)) + ) + ) + ); + } + } + }), TransportShardMultiGetFomTranslogAction.Response::new, getExecutor(request, shardId)) + ); + } else { + // A non-real-time mget with no explicit refresh requested. + super.asyncShardOperation(request, shardId, listener); + } + } + + private MultiGetShardResponse handleLocalGets(MultiGetShardRequest request, MultiGetShardResponse response, ShardId shardId) { + logger.trace("handling local gets for missing locations"); + for (int i = 0; i < response.locations.size(); i++) { + if (response.responses.get(i) == null && response.failures.get(i) == null) { + getAndAddToResponse(shardId, i, request, response); + } + } + return response; + } + + private void getAndAddToResponse(ShardId shardId, int location, MultiGetShardRequest request, MultiGetShardResponse response) { + var indexShard = getIndexShard(shardId); + MultiGetRequest.Item item = request.items.get(location); + try { + GetResult getResult = indexShard.getService() + .get( + item.id(), + item.storedFields(), + request.realtime(), + item.version(), + item.versionType(), + item.fetchSourceContext(), + request.isForceSyntheticSource() + ); + response.add(request.locations.get(location), new GetResponse(getResult)); + } catch (RuntimeException e) { + if (TransportActions.isShardNotAvailableException(e)) { + throw e; + } else { + logger.debug(() -> format("%s failed to execute multi_get for [%s]", shardId, item.id()), e); + response.add(request.locations.get(location), new MultiGetResponse.Failure(request.index(), item.id(), e)); + } + } catch (IOException e) { + logger.debug(() -> format("%s failed to execute multi_get for [%s]", shardId, item.id()), e); + response.add(request.locations.get(location), new MultiGetResponse.Failure(request.index(), item.id(), e)); + } + } + private void asyncShardMultiGet(MultiGetShardRequest request, ShardId shardId, ActionListener listener) throws IOException { if (request.refresh() && request.realtime() == false) { diff --git a/server/src/test/java/org/elasticsearch/action/get/TransportMultiGetActionTests.java b/server/src/test/java/org/elasticsearch/action/get/TransportMultiGetActionTests.java index dfd69aac4abda..de089c25f7201 100644 --- a/server/src/test/java/org/elasticsearch/action/get/TransportMultiGetActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/get/TransportMultiGetActionTests.java @@ -165,6 +165,7 @@ public TaskManager getTaskManager() { when(clusterService.localNode()).thenReturn(transportService.getLocalNode()); when(clusterService.state()).thenReturn(clusterState); when(clusterService.operationRouting()).thenReturn(operationRouting); + final NodeClient client = new NodeClient(Settings.EMPTY, threadPool); shardAction = new TransportShardMultiGetAction( clusterService, @@ -173,7 +174,8 @@ public TaskManager getTaskManager() { threadPool, new ActionFilters(emptySet()), new Resolver(), - EmptySystemIndices.INSTANCE.getExecutorSelector() + EmptySystemIndices.INSTANCE.getExecutorSelector(), + client ) { @Override protected void doExecute(Task task, MultiGetShardRequest request, ActionListener listener) {}