From 8788899ab14daa1d562dd0e050ff9c59e1f5f56d Mon Sep 17 00:00:00 2001 From: Pooya Salehi Date: Mon, 12 Jun 2023 13:22:24 +0200 Subject: [PATCH 1/6] Stateless real-time mget --- .../get/TransportShardMultiGetAction.java | 156 +++++++++++++++++- .../get/TransportMultiGetActionTests.java | 4 +- 2 files changed, 157 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java index 886da11ddedf6..d868f8e2eb849 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java @@ -9,14 +9,22 @@ package org.elasticsearch.action.get; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.NoShardAvailableActionException; +import org.elasticsearch.action.admin.indices.refresh.TransportShardRefreshAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.TransportActions; +import org.elasticsearch.action.support.replication.BasicReplicationRequest; import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; +import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.PlainShardIterator; import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.Writeable; @@ -26,10 +34,14 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.ExecutorSelector; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import static org.elasticsearch.core.Strings.format; @@ -37,9 +49,11 @@ public class TransportShardMultiGetAction extends TransportSingleShardAction TYPE = new ActionType<>(ACTION_NAME, MultiGetShardResponse::new); + private static final Logger logger = LogManager.getLogger(TransportShardMultiGetAction.class); private final IndicesService indicesService; private final ExecutorSelector executorSelector; + private final NodeClient client; @Inject public TransportShardMultiGetAction( @@ -49,7 +63,8 @@ public TransportShardMultiGetAction( ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - ExecutorSelector executorSelector + ExecutorSelector executorSelector, + NodeClient client ) { super( ACTION_NAME, @@ -63,6 +78,7 @@ public TransportShardMultiGetAction( ); this.indicesService = indicesService; this.executorSelector = executorSelector; + this.client = client; } @Override @@ -84,7 +100,10 @@ protected boolean resolveIndex(MultiGetShardRequest request) { protected ShardIterator shards(ClusterState state, InternalRequest request) { ShardIterator iterator = clusterService.operationRouting() .getShards(state, request.request().index(), request.request().shardId(), request.request().preference()); - return clusterService.operationRouting().useOnlyPromotableShardsForStateless(iterator); + if (iterator == null) { + return null; + } + return new PlainShardIterator(iterator.shardId(), iterator.getShardRoutings().stream().filter(ShardRouting::isSearchable).toList()); } @Override @@ -92,6 +111,12 @@ protected void asyncShardOperation(MultiGetShardRequest request, ShardId shardId throws IOException { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); IndexShard indexShard = indexService.getShard(shardId.id()); + if (indexShard.routingEntry().isPromotableToPrimary() == false) { + handleMultiGetOnUnpromotableShard(request, indexShard, listener); + return; + } + assert DiscoveryNode.isStateless(clusterService.getSettings()) == false + : "A TransportShardMultiGetAction should always be handled by a search shard in Stateless"; if (request.realtime()) { // we are not tied to a refresh cycle here anyway asyncShardMultiGet(request, shardId, listener); } else { @@ -151,6 +176,122 @@ protected String getExecutor(MultiGetShardRequest request, ShardId shardId) { } } + private void handleMultiGetOnUnpromotableShard( + MultiGetShardRequest request, + IndexShard indexShard, + ActionListener listener + ) throws IOException { + ShardId shardId = indexShard.shardId(); + var node = getCurrentNodeOfPrimary(shardId); + if (request.refresh()) { + logger.trace("send refresh action for shard {} to node {}", shardId, node.getId()); + var refreshRequest = new BasicReplicationRequest(shardId); + refreshRequest.setParentTask(request.getParentTask()); + client.executeLocally( + TransportShardRefreshAction.TYPE, + refreshRequest, + listener.delegateFailureAndWrap((l, replicationResponse) -> super.asyncShardOperation(request, shardId, l)) + ); + } else if (request.realtime()) { + TransportShardMultiGetFomTranslogAction.Request getFromTranslogRequest = new TransportShardMultiGetFomTranslogAction.Request( + request, + shardId + ); + getFromTranslogRequest.setParentTask(request.getParentTask()); + transportService.sendRequest( + node, + TransportShardMultiGetFomTranslogAction.NAME, + getFromTranslogRequest, + new ActionListenerResponseHandler<>(listener.delegateFailure((l, r) -> { + var missingLocations = locationsWithMissingResults(r); + if (missingLocations.isEmpty()) { + logger.info("--> received result of all ids in real-time mget[shard] from the promotable shard."); + l.onResponse(r.multiGetShardResponse()); + } else { + logger.info( + () -> format( + "--> no result for ids '%s' from the promotable shard (segment generation to wait for: %s)", + missingLocations.stream().map(i -> request.items.get(i).id()).toList(), + r.segmentGeneration() + ) + ); + if (r.segmentGeneration() == -1) { + // Nothing to wait for (no previous unsafe generation), just handle the rest locally. + ActionRunnable.supply(l, () -> handleLocalGets(missingLocations, request, r.multiGetShardResponse(), shardId)) + .run(); + } else { + assert r.segmentGeneration() > -1L; + indexShard.waitForSegmentGeneration( + r.segmentGeneration(), + listener.delegateFailureAndWrap( + (ll, aLong) -> threadPool.executor(getExecutor(request, shardId)) + .execute( + ActionRunnable.supply( + ll, + () -> handleLocalGets(missingLocations, request, r.multiGetShardResponse(), shardId) + ) + ) + ) + ); + } + } + }), TransportShardMultiGetFomTranslogAction.Response::new, getExecutor(request, shardId)) + ); + } else { + // A non-real-time mget with no explicit refresh requested. + super.asyncShardOperation(request, shardId, listener); + } + } + + // Returns the index of entries in response.locations that have a missing result with no failure on the promotable shard. + private static List locationsWithMissingResults(TransportShardMultiGetFomTranslogAction.Response response) { + List locations = new ArrayList<>(); + for (int i = 0; i < response.multiGetShardResponse().locations.size(); i++) { + if (response.multiGetShardResponse().responses.get(i) == null && response.multiGetShardResponse().failures.get(i) == null) { + locations.add(i); + } + } + return locations; + } + + private MultiGetShardResponse handleLocalGets( + List missingLocations, + MultiGetShardRequest request, + MultiGetShardResponse response, + ShardId shardId + ) { + logger.info("--> handling local gets for ids: {}", missingLocations); + var indexShard = getIndexShard(shardId); + for (var l : missingLocations) { + MultiGetRequest.Item item = request.items.get(l); + try { + GetResult getResult = indexShard.getService() + .get( + item.id(), + item.storedFields(), + request.realtime(), + item.version(), + item.versionType(), + item.fetchSourceContext(), + request.isForceSyntheticSource() + ); + logger.info("--> local get for id {} returned {}", item.id(), getResult); + response.add(request.locations.get(l), new GetResponse(getResult)); + } catch (RuntimeException e) { + if (TransportActions.isShardNotAvailableException(e)) { + throw e; + } else { + logger.debug(() -> format("%s failed to execute multi_get for [%s]", shardId, item.id()), e); + response.add(request.locations.get(l), new MultiGetResponse.Failure(request.index(), item.id(), e)); + } + } catch (IOException e) { + logger.debug(() -> format("%s failed to execute multi_get for [%s]", shardId, item.id()), e); + response.add(request.locations.get(l), new MultiGetResponse.Failure(request.index(), item.id(), e)); + } + } + return response; + } + private void asyncShardMultiGet(MultiGetShardRequest request, ShardId shardId, ActionListener listener) throws IOException { if (request.refresh() && request.realtime() == false) { @@ -163,6 +304,17 @@ private void asyncShardMultiGet(MultiGetShardRequest request, ShardId shardId, A } } + private DiscoveryNode getCurrentNodeOfPrimary(ShardId shardId) { + var clusterState = clusterService.state(); + var shardRoutingTable = clusterState.routingTable().shardRoutingTable(shardId); + if (shardRoutingTable.primaryShard() == null || shardRoutingTable.primaryShard().active() == false) { + throw new NoShardAvailableActionException(shardId, "primary shard is not active"); + } + DiscoveryNode node = clusterState.nodes().get(shardRoutingTable.primaryShard().currentNodeId()); + assert node != null; + return node; + } + private IndexShard getIndexShard(ShardId shardId) { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); return indexService.getShard(shardId.id()); diff --git a/server/src/test/java/org/elasticsearch/action/get/TransportMultiGetActionTests.java b/server/src/test/java/org/elasticsearch/action/get/TransportMultiGetActionTests.java index dfd69aac4abda..de089c25f7201 100644 --- a/server/src/test/java/org/elasticsearch/action/get/TransportMultiGetActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/get/TransportMultiGetActionTests.java @@ -165,6 +165,7 @@ public TaskManager getTaskManager() { when(clusterService.localNode()).thenReturn(transportService.getLocalNode()); when(clusterService.state()).thenReturn(clusterState); when(clusterService.operationRouting()).thenReturn(operationRouting); + final NodeClient client = new NodeClient(Settings.EMPTY, threadPool); shardAction = new TransportShardMultiGetAction( clusterService, @@ -173,7 +174,8 @@ public TaskManager getTaskManager() { threadPool, new ActionFilters(emptySet()), new Resolver(), - EmptySystemIndices.INSTANCE.getExecutorSelector() + EmptySystemIndices.INSTANCE.getExecutorSelector(), + client ) { @Override protected void doExecute(Task task, MultiGetShardRequest request, ActionListener listener) {} From fd644d6a867ec409f3f0c3c7706fb1061aaca092 Mon Sep 17 00:00:00 2001 From: Pooya Salehi Date: Tue, 13 Jun 2023 11:53:01 +0200 Subject: [PATCH 2/6] Adapt yaml tests --- .../resources/rest-api-spec/test/mget/40_routing.yml | 6 ------ .../rest-api-spec/test/mget/60_realtime_refresh.yml | 5 ----- .../action/get/TransportShardMultiGetAction.java | 9 ++++----- 3 files changed, 4 insertions(+), 16 deletions(-) diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/mget/40_routing.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/mget/40_routing.yml index 7169c0ec25001..7a2706ffe3863 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/mget/40_routing.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/mget/40_routing.yml @@ -8,11 +8,6 @@ routing: index: number_of_shards: 5 number_of_routing_shards: 5 - number_of_replicas: 0 - - - do: - cluster.health: - wait_for_status: green - do: index: @@ -51,7 +46,6 @@ requires routing: settings: index: number_of_shards: 5 - number_of_replicas: 0 mappings: _routing: required: true diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/mget/60_realtime_refresh.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/mget/60_realtime_refresh.yml index 2f2036217d8dc..dc6e591c27986 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/mget/60_realtime_refresh.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/mget/60_realtime_refresh.yml @@ -9,11 +9,6 @@ settings: index: refresh_interval: -1 - number_of_replicas: 0 - - - do: - cluster.health: - wait_for_status: green - do: index: diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java index d868f8e2eb849..9beff114ef169 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java @@ -205,12 +205,12 @@ private void handleMultiGetOnUnpromotableShard( new ActionListenerResponseHandler<>(listener.delegateFailure((l, r) -> { var missingLocations = locationsWithMissingResults(r); if (missingLocations.isEmpty()) { - logger.info("--> received result of all ids in real-time mget[shard] from the promotable shard."); + logger.debug("received result of all ids in real-time mget[shard] from the promotable shard."); l.onResponse(r.multiGetShardResponse()); } else { - logger.info( + logger.debug( () -> format( - "--> no result for ids '%s' from the promotable shard (segment generation to wait for: %s)", + "no result for ids '%s' from the promotable shard (segment generation to wait for: %s)", missingLocations.stream().map(i -> request.items.get(i).id()).toList(), r.segmentGeneration() ) @@ -260,7 +260,7 @@ private MultiGetShardResponse handleLocalGets( MultiGetShardResponse response, ShardId shardId ) { - logger.info("--> handling local gets for ids: {}", missingLocations); + logger.trace("handling local gets for locations: {}", missingLocations); var indexShard = getIndexShard(shardId); for (var l : missingLocations) { MultiGetRequest.Item item = request.items.get(l); @@ -275,7 +275,6 @@ private MultiGetShardResponse handleLocalGets( item.fetchSourceContext(), request.isForceSyntheticSource() ); - logger.info("--> local get for id {} returned {}", item.id(), getResult); response.add(request.locations.get(l), new GetResponse(getResult)); } catch (RuntimeException e) { if (TransportActions.isShardNotAvailableException(e)) { From 36d8892c1a56e75394491b19cea54bef634fa5a6 Mon Sep 17 00:00:00 2001 From: Pooya Salehi Date: Wed, 14 Jun 2023 11:24:06 +0200 Subject: [PATCH 3/6] Address review comments --- .../action/get/TransportGetAction.java | 11 +++++---- .../get/TransportShardMultiGetAction.java | 24 ++++++------------- 2 files changed, 13 insertions(+), 22 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java index 56904f51fd8ea..e1ff917d08185 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java @@ -20,8 +20,10 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.OperationRouting; import org.elasticsearch.cluster.routing.PlainShardIterator; +import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; @@ -180,7 +182,7 @@ private void asyncGet(GetRequest request, ShardId shardId, ActionListener listener) throws IOException { ShardId shardId = indexShard.shardId(); - DiscoveryNode node = getCurrentNodeOfPrimary(shardId); + var node = getCurrentNodeOfPrimary(clusterService.state().routingTable(), clusterService.state().nodes(), shardId); if (request.refresh()) { logger.trace("send refresh action for shard {} to node {}", shardId, node.getId()); var refreshRequest = new BasicReplicationRequest(shardId); @@ -226,13 +228,12 @@ private void handleGetOnUnpromotableShard(GetRequest request, IndexShard indexSh } } - private DiscoveryNode getCurrentNodeOfPrimary(ShardId shardId) { - var clusterState = clusterService.state(); - var shardRoutingTable = clusterState.routingTable().shardRoutingTable(shardId); + static DiscoveryNode getCurrentNodeOfPrimary(RoutingTable routingTable, DiscoveryNodes nodes, ShardId shardId) { + var shardRoutingTable = routingTable.shardRoutingTable(shardId); if (shardRoutingTable.primaryShard() == null || shardRoutingTable.primaryShard().active() == false) { throw new NoShardAvailableActionException(shardId, "primary shard is not active"); } - DiscoveryNode node = clusterState.nodes().get(shardRoutingTable.primaryShard().currentNodeId()); + DiscoveryNode node = nodes.get(shardRoutingTable.primaryShard().currentNodeId()); assert node != null; return node; } diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java index 9beff114ef169..1f9ab2b431490 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java @@ -12,7 +12,6 @@ import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.ActionType; -import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.admin.indices.refresh.TransportShardRefreshAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.TransportActions; @@ -43,6 +42,7 @@ import java.util.ArrayList; import java.util.List; +import static org.elasticsearch.action.get.TransportGetAction.getCurrentNodeOfPrimary; import static org.elasticsearch.core.Strings.format; public class TransportShardMultiGetAction extends TransportSingleShardAction { @@ -182,7 +182,8 @@ private void handleMultiGetOnUnpromotableShard( ActionListener listener ) throws IOException { ShardId shardId = indexShard.shardId(); - var node = getCurrentNodeOfPrimary(shardId); + var node = getCurrentNodeOfPrimary(clusterService.state().routingTable(), clusterService.state().nodes(), shardId); + ; if (request.refresh()) { logger.trace("send refresh action for shard {} to node {}", shardId, node.getId()); var refreshRequest = new BasicReplicationRequest(shardId); @@ -193,15 +194,15 @@ private void handleMultiGetOnUnpromotableShard( listener.delegateFailureAndWrap((l, replicationResponse) -> super.asyncShardOperation(request, shardId, l)) ); } else if (request.realtime()) { - TransportShardMultiGetFomTranslogAction.Request getFromTranslogRequest = new TransportShardMultiGetFomTranslogAction.Request( + TransportShardMultiGetFomTranslogAction.Request mgetFromTranslogRequest = new TransportShardMultiGetFomTranslogAction.Request( request, shardId ); - getFromTranslogRequest.setParentTask(request.getParentTask()); + mgetFromTranslogRequest.setParentTask(request.getParentTask()); transportService.sendRequest( node, TransportShardMultiGetFomTranslogAction.NAME, - getFromTranslogRequest, + mgetFromTranslogRequest, new ActionListenerResponseHandler<>(listener.delegateFailure((l, r) -> { var missingLocations = locationsWithMissingResults(r); if (missingLocations.isEmpty()) { @@ -243,7 +244,7 @@ private void handleMultiGetOnUnpromotableShard( } } - // Returns the index of entries in response.locations that have a missing result with no failure on the promotable shard. + // Returns the indices of entries in response.locations that have a missing result with no failure on the promotable shard. private static List locationsWithMissingResults(TransportShardMultiGetFomTranslogAction.Response response) { List locations = new ArrayList<>(); for (int i = 0; i < response.multiGetShardResponse().locations.size(); i++) { @@ -303,17 +304,6 @@ private void asyncShardMultiGet(MultiGetShardRequest request, ShardId shardId, A } } - private DiscoveryNode getCurrentNodeOfPrimary(ShardId shardId) { - var clusterState = clusterService.state(); - var shardRoutingTable = clusterState.routingTable().shardRoutingTable(shardId); - if (shardRoutingTable.primaryShard() == null || shardRoutingTable.primaryShard().active() == false) { - throw new NoShardAvailableActionException(shardId, "primary shard is not active"); - } - DiscoveryNode node = clusterState.nodes().get(shardRoutingTable.primaryShard().currentNodeId()); - assert node != null; - return node; - } - private IndexShard getIndexShard(ShardId shardId) { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); return indexService.getShard(shardId.id()); From 644debf4f3ae765b22e261b9be53b1846e0d1b06 Mon Sep 17 00:00:00 2001 From: Pooya Salehi Date: Wed, 14 Jun 2023 11:33:49 +0200 Subject: [PATCH 4/6] Cleanup --- .../elasticsearch/action/get/TransportShardMultiGetAction.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java index 1f9ab2b431490..642075313785e 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java @@ -183,7 +183,6 @@ private void handleMultiGetOnUnpromotableShard( ) throws IOException { ShardId shardId = indexShard.shardId(); var node = getCurrentNodeOfPrimary(clusterService.state().routingTable(), clusterService.state().nodes(), shardId); - ; if (request.refresh()) { logger.trace("send refresh action for shard {} to node {}", shardId, node.getId()); var refreshRequest = new BasicReplicationRequest(shardId); From 2f4cb8b1248f3948190ffc9874913b2e8ceab90f Mon Sep 17 00:00:00 2001 From: Pooya Salehi Date: Wed, 14 Jun 2023 15:51:06 +0200 Subject: [PATCH 5/6] Address second review comments --- .../action/get/TransportGetAction.java | 10 +- .../get/TransportShardMultiGetAction.java | 122 ++++++------------ 2 files changed, 46 insertions(+), 86 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java index e1ff917d08185..53761f8451ddd 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java @@ -20,10 +20,8 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.OperationRouting; import org.elasticsearch.cluster.routing.PlainShardIterator; -import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; @@ -182,7 +180,7 @@ private void asyncGet(GetRequest request, ShardId shardId, ActionListener listener) throws IOException { ShardId shardId = indexShard.shardId(); - var node = getCurrentNodeOfPrimary(clusterService.state().routingTable(), clusterService.state().nodes(), shardId); + var node = getCurrentNodeOfPrimary(clusterService.state(), shardId); if (request.refresh()) { logger.trace("send refresh action for shard {} to node {}", shardId, node.getId()); var refreshRequest = new BasicReplicationRequest(shardId); @@ -228,12 +226,12 @@ private void handleGetOnUnpromotableShard(GetRequest request, IndexShard indexSh } } - static DiscoveryNode getCurrentNodeOfPrimary(RoutingTable routingTable, DiscoveryNodes nodes, ShardId shardId) { - var shardRoutingTable = routingTable.shardRoutingTable(shardId); + static DiscoveryNode getCurrentNodeOfPrimary(ClusterState clusterState, ShardId shardId) { + var shardRoutingTable = clusterState.routingTable().shardRoutingTable(shardId); if (shardRoutingTable.primaryShard() == null || shardRoutingTable.primaryShard().active() == false) { throw new NoShardAvailableActionException(shardId, "primary shard is not active"); } - DiscoveryNode node = nodes.get(shardRoutingTable.primaryShard().currentNodeId()); + DiscoveryNode node = clusterState.nodes().get(shardRoutingTable.primaryShard().currentNodeId()); assert node != null; return node; } diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java index 642075313785e..f195ca758ffce 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java @@ -39,8 +39,6 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; import static org.elasticsearch.action.get.TransportGetAction.getCurrentNodeOfPrimary; import static org.elasticsearch.core.Strings.format; @@ -132,35 +130,10 @@ protected void asyncShardOperation(MultiGetShardRequest request, ShardId shardId @Override protected MultiGetShardResponse shardOperation(MultiGetShardRequest request, ShardId shardId) { - var indexShard = getIndexShard(shardId); MultiGetShardResponse response = new MultiGetShardResponse(); for (int i = 0; i < request.locations.size(); i++) { - MultiGetRequest.Item item = request.items.get(i); - try { - GetResult getResult = indexShard.getService() - .get( - item.id(), - item.storedFields(), - request.realtime(), - item.version(), - item.versionType(), - item.fetchSourceContext(), - request.isForceSyntheticSource() - ); - response.add(request.locations.get(i), new GetResponse(getResult)); - } catch (RuntimeException e) { - if (TransportActions.isShardNotAvailableException(e)) { - throw e; - } else { - logger.debug(() -> format("%s failed to execute multi_get for [%s]", shardId, item.id()), e); - response.add(request.locations.get(i), new MultiGetResponse.Failure(request.index(), item.id(), e)); - } - } catch (IOException e) { - logger.debug(() -> format("%s failed to execute multi_get for [%s]", shardId, item.id()), e); - response.add(request.locations.get(i), new MultiGetResponse.Failure(request.index(), item.id(), e)); - } + getAndAddToResponse(shardId, i, request, response); } - return response; } @@ -182,7 +155,7 @@ private void handleMultiGetOnUnpromotableShard( ActionListener listener ) throws IOException { ShardId shardId = indexShard.shardId(); - var node = getCurrentNodeOfPrimary(clusterService.state().routingTable(), clusterService.state().nodes(), shardId); + var node = getCurrentNodeOfPrimary(clusterService.state(), shardId); if (request.refresh()) { logger.trace("send refresh action for shard {} to node {}", shardId, node.getId()); var refreshRequest = new BasicReplicationRequest(shardId); @@ -203,22 +176,24 @@ private void handleMultiGetOnUnpromotableShard( TransportShardMultiGetFomTranslogAction.NAME, mgetFromTranslogRequest, new ActionListenerResponseHandler<>(listener.delegateFailure((l, r) -> { - var missingLocations = locationsWithMissingResults(r); - if (missingLocations.isEmpty()) { + var responseHasMissingLocations = false; + for (int i = 0; i < r.multiGetShardResponse().locations.size(); i++) { + if (r.multiGetShardResponse().responses.get(i) == null && r.multiGetShardResponse().failures.get(i) == null) { + responseHasMissingLocations = true; + break; + } + } + if (responseHasMissingLocations == false) { logger.debug("received result of all ids in real-time mget[shard] from the promotable shard."); l.onResponse(r.multiGetShardResponse()); } else { logger.debug( - () -> format( - "no result for ids '%s' from the promotable shard (segment generation to wait for: %s)", - missingLocations.stream().map(i -> request.items.get(i).id()).toList(), - r.segmentGeneration() - ) + "no result for some ids from the promotable shard (segment generation to wait for: {})", + r.segmentGeneration() ); if (r.segmentGeneration() == -1) { // Nothing to wait for (no previous unsafe generation), just handle the rest locally. - ActionRunnable.supply(l, () -> handleLocalGets(missingLocations, request, r.multiGetShardResponse(), shardId)) - .run(); + ActionRunnable.supply(l, () -> handleLocalGets(request, r.multiGetShardResponse(), shardId)).run(); } else { assert r.segmentGeneration() > -1L; indexShard.waitForSegmentGeneration( @@ -226,10 +201,7 @@ private void handleMultiGetOnUnpromotableShard( listener.delegateFailureAndWrap( (ll, aLong) -> threadPool.executor(getExecutor(request, shardId)) .execute( - ActionRunnable.supply( - ll, - () -> handleLocalGets(missingLocations, request, r.multiGetShardResponse(), shardId) - ) + ActionRunnable.supply(ll, () -> handleLocalGets(request, r.multiGetShardResponse(), shardId)) ) ) ); @@ -243,52 +215,42 @@ private void handleMultiGetOnUnpromotableShard( } } - // Returns the indices of entries in response.locations that have a missing result with no failure on the promotable shard. - private static List locationsWithMissingResults(TransportShardMultiGetFomTranslogAction.Response response) { - List locations = new ArrayList<>(); - for (int i = 0; i < response.multiGetShardResponse().locations.size(); i++) { - if (response.multiGetShardResponse().responses.get(i) == null && response.multiGetShardResponse().failures.get(i) == null) { - locations.add(i); + private MultiGetShardResponse handleLocalGets(MultiGetShardRequest request, MultiGetShardResponse response, ShardId shardId) { + logger.trace("handling local gets for missing locations"); + for (int i = 0; i < response.locations.size(); i++) { + if (response.responses.get(i) == null && response.failures.get(i) == null) { + getAndAddToResponse(shardId, i, request, response); } } - return locations; + return response; } - private MultiGetShardResponse handleLocalGets( - List missingLocations, - MultiGetShardRequest request, - MultiGetShardResponse response, - ShardId shardId - ) { - logger.trace("handling local gets for locations: {}", missingLocations); + private void getAndAddToResponse(ShardId shardId, int location, MultiGetShardRequest request, MultiGetShardResponse response) { var indexShard = getIndexShard(shardId); - for (var l : missingLocations) { - MultiGetRequest.Item item = request.items.get(l); - try { - GetResult getResult = indexShard.getService() - .get( - item.id(), - item.storedFields(), - request.realtime(), - item.version(), - item.versionType(), - item.fetchSourceContext(), - request.isForceSyntheticSource() - ); - response.add(request.locations.get(l), new GetResponse(getResult)); - } catch (RuntimeException e) { - if (TransportActions.isShardNotAvailableException(e)) { - throw e; - } else { - logger.debug(() -> format("%s failed to execute multi_get for [%s]", shardId, item.id()), e); - response.add(request.locations.get(l), new MultiGetResponse.Failure(request.index(), item.id(), e)); - } - } catch (IOException e) { + MultiGetRequest.Item item = request.items.get(location); + try { + GetResult getResult = indexShard.getService() + .get( + item.id(), + item.storedFields(), + request.realtime(), + item.version(), + item.versionType(), + item.fetchSourceContext(), + request.isForceSyntheticSource() + ); + response.add(request.locations.get(location), new GetResponse(getResult)); + } catch (RuntimeException e) { + if (TransportActions.isShardNotAvailableException(e)) { + throw e; + } else { logger.debug(() -> format("%s failed to execute multi_get for [%s]", shardId, item.id()), e); - response.add(request.locations.get(l), new MultiGetResponse.Failure(request.index(), item.id(), e)); + response.add(request.locations.get(location), new MultiGetResponse.Failure(request.index(), item.id(), e)); } + } catch (IOException e) { + logger.debug(() -> format("%s failed to execute multi_get for [%s]", shardId, item.id()), e); + response.add(request.locations.get(location), new MultiGetResponse.Failure(request.index(), item.id(), e)); } - return response; } private void asyncShardMultiGet(MultiGetShardRequest request, ShardId shardId, ActionListener listener) From 9acefaa2cb4c7c7caf784d090019731576dd0789 Mon Sep 17 00:00:00 2001 From: Pooya Salehi Date: Thu, 15 Jun 2023 10:33:28 +0200 Subject: [PATCH 6/6] Use auto_expand_replicas in yaml tests --- .../resources/rest-api-spec/test/mget/40_routing.yml | 10 ++++++++++ .../rest-api-spec/test/mget/60_realtime_refresh.yml | 5 +++++ 2 files changed, 15 insertions(+) diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/mget/40_routing.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/mget/40_routing.yml index 7a2706ffe3863..d08c9b6795e95 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/mget/40_routing.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/mget/40_routing.yml @@ -8,6 +8,11 @@ routing: index: number_of_shards: 5 number_of_routing_shards: 5 + auto_expand_replicas: 0-1 + + - do: + cluster.health: + wait_for_status: green - do: index: @@ -46,10 +51,15 @@ requires routing: settings: index: number_of_shards: 5 + auto_expand_replicas: 0-1 mappings: _routing: required: true + - do: + cluster.health: + wait_for_status: green + - do: index: index: test_1 diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/mget/60_realtime_refresh.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/mget/60_realtime_refresh.yml index dc6e591c27986..a5baf94d2123b 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/mget/60_realtime_refresh.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/mget/60_realtime_refresh.yml @@ -9,6 +9,11 @@ settings: index: refresh_interval: -1 + auto_expand_replicas: 0-1 + + - do: + cluster.health: + wait_for_status: green - do: index: