From cce93a36107a8129ded675d1775ff9c57b265ec1 Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Thu, 19 Jan 2023 15:15:12 +0200 Subject: [PATCH 01/21] Unpromotables skip replication and peer recovery For skipping replication: * ReplicationTracker and Group filter promotable * Remove from in sync allocations in metadata Fixes ES-4861 For skipping peer recovery: * Shards pass directly to STARTED skipping intermediate peer recovery stages and messages Fixes ES-5257 --- .../cluster/routing/ShardRoutingRoleIT.java | 48 +++++++++++++++---- .../replication/ReplicationOperation.java | 1 + .../cluster/routing/IndexRoutingTable.java | 4 +- .../routing/IndexShardRoutingTable.java | 14 +++--- .../allocation/IndexMetadataUpdater.java | 14 +++--- .../index/seqno/ReplicationTracker.java | 23 +++++++-- .../elasticsearch/index/shard/IndexShard.java | 24 ++++++++-- .../index/shard/ReplicationGroup.java | 5 +- .../recovery/PeerRecoveryTargetService.java | 31 ++++++++---- .../recovery/RecoveriesCollection.java | 1 + ...portVerifyShardBeforeCloseActionTests.java | 2 +- ...TransportResyncReplicationActionTests.java | 2 +- .../ReplicationOperationTests.java | 2 +- .../TransportReplicationActionTests.java | 7 ++- .../cluster/ClusterStateTests.java | 6 +-- .../metadata/AutoExpandReplicasTests.java | 8 +++- .../index/engine/EngineTestCase.java | 2 +- 17 files changed, 142 insertions(+), 52 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java index 38f66233c9b5a..2f60dc82c1484 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java @@ -28,6 +28,7 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.engine.NoOpEngine; import org.elasticsearch.index.shard.IndexShard; @@ -40,6 +41,8 @@ import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.XContentTestUtils; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.Collection; @@ -97,11 +100,19 @@ public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode n @Override public Optional getEngineFactory(IndexSettings indexSettings) { - return Optional.of( - config -> config.isPromotableToPrimary() - ? new InternalEngine(config) - : new NoOpEngine(config, new TranslogStats(0, 0, 0, 0, 0)) - ); + return Optional.of(config -> { + if (config.isPromotableToPrimary()) { + return new InternalEngine(config); + } else { + try { + config.getStore().createEmpty(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + return new NoOpEngine(EngineTestCase.copy(config, () -> -1L), new TranslogStats(0, 0, 0, 0, 0)); + } + }); } } @@ -112,7 +123,7 @@ protected boolean addMockInternalEngine() { @Override protected Collection> nodePlugins() { - return CollectionUtils.appendToCopy(super.nodePlugins(), TestPlugin.class); + return CollectionUtils.concatLists(List.of(MockTransportService.TestPlugin.class, TestPlugin.class), super.nodePlugins()); } @Override @@ -196,11 +207,25 @@ private static void assertRolesInRoutingTableXContent(ClusterState state) { } } - public void testShardCreation() { + private static void installMockTransportVerifications(RoutingTableWatcher routingTableWatcher) { + for (var transportService : internalCluster().getInstances(TransportService.class)) { + MockTransportService mockTransportService = (MockTransportService) transportService; + mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (routingTableWatcher.numIndexingCopies == 1) { + assertThat("no recovery action should be exchanged", action, not(containsString("recovery"))); + assertThat("no replicated action should be exchanged", action, not(containsString("[r]"))); + } + connection.sendRequest(requestId, action, request, options); + }); + } + } + + public void testShardCreation() throws Exception { var routingTableWatcher = new RoutingTableWatcher(); var numDataNodes = routingTableWatcher.numReplicas + 2; internalCluster().ensureAtLeastNumDataNodes(numDataNodes); + installMockTransportVerifications(routingTableWatcher); getMasterNodePlugin().numIndexingCopies = routingTableWatcher.numIndexingCopies; final var masterClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); @@ -237,6 +262,8 @@ public void testShardCreation() { ensureGreen(INDEX_NAME); assertEngineTypes(); + indexRandom(randomBoolean(), INDEX_NAME, randomIntBetween(50, 100)); + ensureGreen(INDEX_NAME); // removing replicas drops SEARCH_ONLY copies first while (routingTableWatcher.numReplicas > 0) { @@ -343,6 +370,7 @@ public void testPromotion() { var numDataNodes = routingTableWatcher.numReplicas + 2; internalCluster().ensureAtLeastNumDataNodes(numDataNodes); + installMockTransportVerifications(routingTableWatcher); getMasterNodePlugin().numIndexingCopies = routingTableWatcher.numIndexingCopies; final var masterClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); @@ -401,14 +429,14 @@ public AllocationCommand getCancelPrimaryCommand() { return null; } - public void testSearchRouting() { - + public void testSearchRouting() throws Exception { var routingTableWatcher = new RoutingTableWatcher(); routingTableWatcher.numReplicas = Math.max(1, routingTableWatcher.numReplicas); routingTableWatcher.numIndexingCopies = Math.min(routingTableWatcher.numIndexingCopies, routingTableWatcher.numReplicas); getMasterNodePlugin().numIndexingCopies = routingTableWatcher.numIndexingCopies; internalCluster().ensureAtLeastNumDataNodes(routingTableWatcher.numReplicas + 1); + installMockTransportVerifications(routingTableWatcher); final var masterClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); try { @@ -416,7 +444,7 @@ public void testSearchRouting() { masterClusterService.addListener(routingTableWatcher); createIndex(INDEX_NAME, routingTableWatcher.getIndexSettings()); - // TODO index some documents here once recovery/replication ignore unpromotable shards + indexRandom(randomBoolean(), INDEX_NAME, randomIntBetween(50, 100)); ensureGreen(INDEX_NAME); assertEngineTypes(); diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index 8ec274bc410f6..6b1916b4ec843 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -213,6 +213,7 @@ private void performOnReplica( final long maxSeqNoOfUpdatesOrDeletes, final PendingReplicationActions pendingReplicationActions ) { + assert shard.isPromotableToPrimary() : "only promotable shards should receive replication requests"; if (logger.isTraceEnabled()) { logger.trace("[{}] sending op [{}] to replica {} for request [{}]", shard.shardId(), opType, shard, replicaRequest); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java b/server/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java index a26e36aa39f9b..0c62dce1b2209 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java @@ -136,7 +136,9 @@ boolean validate(Metadata metadata) { ); } final Set inSyncAllocationIds = indexMetadata.inSyncAllocationIds(shardRouting.id()); - if (shardRouting.active() && inSyncAllocationIds.contains(shardRouting.allocationId().getId()) == false) { + if (shardRouting.active() + && shardRouting.isPromotableToPrimary() + && inSyncAllocationIds.contains(shardRouting.allocationId().getId()) == false) { throw new IllegalStateException( "active shard routing " + shardRouting diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java b/server/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java index 0dd85d873463d..3a5a369caa3f2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java @@ -499,15 +499,17 @@ public ShardRouting getByAllocationId(String allocationId) { return null; } - public Set getAllAllocationIds() { + public Set getPromotableAllocationIds() { assert MasterService.assertNotMasterUpdateThread("not using this on the master thread so we don't have to pre-compute this"); Set allAllocationIds = new HashSet<>(); for (ShardRouting shard : shards) { - if (shard.relocating()) { - allAllocationIds.add(shard.getTargetRelocatingShard().allocationId().getId()); - } - if (shard.assignedToNode()) { - allAllocationIds.add(shard.allocationId().getId()); + if (shard.isPromotableToPrimary()) { + if (shard.relocating()) { + allAllocationIds.add(shard.getTargetRelocatingShard().allocationId().getId()); + } + if (shard.assignedToNode()) { + allAllocationIds.add(shard.allocationId().getId()); + } } } return allAllocationIds; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetadataUpdater.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetadataUpdater.java index 469e7f7efe36c..e0b53e312e400 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetadataUpdater.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetadataUpdater.java @@ -69,12 +69,14 @@ public void shardStarted(ShardRouting initializingShard, ShardRouting startedSha + "] and startedShard.allocationId [" + startedShard.allocationId().getId() + "] have to have the same"; - Updates updates = changes(startedShard.shardId()); - updates.addedAllocationIds.add(startedShard.allocationId().getId()); - if (startedShard.primary() - // started shard has to have null recoverySource; have to pick up recoverySource from its initializing state - && (initializingShard.recoverySource() == RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE)) { - updates.removedAllocationIds.add(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID); + if (startedShard.isPromotableToPrimary()) { + Updates updates = changes(startedShard.shardId()); + updates.addedAllocationIds.add(startedShard.allocationId().getId()); + if (startedShard.primary() + // started shard has to have null recoverySource; have to pick up recoverySource from its initializing state + && (initializingShard.recoverySource() == RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE)) { + updates.removedAllocationIds.add(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID); + } } } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 86290ca79a65a..12ae735d16b55 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -263,6 +263,7 @@ public synchronized RetentionLeases getRetentionLeases(final boolean expireLease final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis(); final Set leaseIdsForCurrentPeers = routingTable.assignedShards() .stream() + .filter(ShardRouting::isPromotableToPrimary) .map(ReplicationTracker::getPeerRecoveryRetentionLeaseId) .collect(Collectors.toSet()); final boolean allShardsStarted = routingTable.allShardsStarted(); @@ -607,7 +608,7 @@ public synchronized void renewPeerRecoveryRetentionLeases() { boolean renewalNeeded = false; for (int copy = 0; copy < routingTable.size(); copy++) { final ShardRouting shardRouting = routingTable.shard(copy); - if (shardRouting.assignedToNode() == false) { + if (shardRouting.assignedToNode() == false || shardRouting.isPromotableToPrimary() == false) { continue; } final RetentionLease retentionLease = retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting)); @@ -628,7 +629,7 @@ public synchronized void renewPeerRecoveryRetentionLeases() { if (renewalNeeded) { for (int copy = 0; copy < routingTable.size(); copy++) { final ShardRouting shardRouting = routingTable.shard(copy); - if (shardRouting.assignedToNode()) { + if (shardRouting.assignedToNode() && shardRouting.isPromotableToPrimary()) { final RetentionLease retentionLease = retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting)); if (retentionLease != null) { final CheckpointState checkpointState = checkpoints.get(shardRouting.allocationId().getId()); @@ -874,8 +875,15 @@ private boolean invariant() { assert replicationGroup == null || replicationGroup.equals(calculateReplicationGroup()) : "cached replication group out of sync: expected: " + calculateReplicationGroup() + " but was: " + replicationGroup; + if (replicationGroup != null) { + assert replicationGroup.getReplicationTargets().stream().allMatch(ShardRouting::isPromotableToPrimary) + : "expected all replication target shards of the replication group to be promotable to primary"; + assert replicationGroup.getSkippedShards().stream().allMatch(ShardRouting::isPromotableToPrimary) + : "expected all skipped shards of the replication group to be promotable to primary"; + } + // all assigned shards from the routing table are tracked - assert routingTable == null || checkpoints.keySet().containsAll(routingTable.getAllAllocationIds()) + assert routingTable == null || checkpoints.keySet().containsAll(routingTable.getPromotableAllocationIds()) : "local checkpoints " + checkpoints + " not in-sync with routing table " + routingTable; for (Map.Entry entry : checkpoints.entrySet()) { @@ -895,7 +903,7 @@ private boolean invariant() { if (primaryMode && indexSettings.isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases) { // all tracked shard copies have a corresponding peer-recovery retention lease for (final ShardRouting shardRouting : routingTable.assignedShards()) { - if (checkpoints.get(shardRouting.allocationId().getId()).tracked) { + if (shardRouting.isPromotableToPrimary() && checkpoints.get(shardRouting.allocationId().getId()).tracked) { assert retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting)) : "no retention lease for tracked shard [" + shardRouting + "] in " + retentionLeases; assert PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals( @@ -1151,6 +1159,7 @@ private void addPeerRecoveryRetentionLeaseForSolePrimary() { } else if (hasAllPeerRecoveryRetentionLeases == false && routingTable.assignedShards() .stream() + .filter(ShardRouting::isPromotableToPrimary) .allMatch( shardRouting -> retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting)) || checkpoints.get(shardRouting.allocationId().getId()).tracked == false @@ -1185,6 +1194,7 @@ public synchronized void updateFromMaster( // remove entries which don't exist on master Set initializingAllocationIds = routingTable.getAllInitializingShards() .stream() + .filter(ShardRouting::isPromotableToPrimary) .map(ShardRouting::allocationId) .map(AllocationId::getId) .collect(Collectors.toSet()); @@ -1495,7 +1505,10 @@ public synchronized boolean hasAllPeerRecoveryRetentionLeases() { */ public synchronized void createMissingPeerRecoveryRetentionLeases(ActionListener listener) { if (hasAllPeerRecoveryRetentionLeases == false) { - final List shardRoutings = routingTable.assignedShards(); + final List shardRoutings = routingTable.assignedShards() + .stream() + .filter(ShardRouting::isPromotableToPrimary) + .toList(); final GroupedActionListener groupedActionListener = new GroupedActionListener<>( shardRoutings.size(), ActionListener.wrap(vs -> { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 280934c73364f..dc72da1ef86e5 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -709,11 +709,16 @@ public void onFailure(Exception e) { if (indexSettings.isSoftDeleteEnabled() && useRetentionLeasesInPeerRecovery == false) { final RetentionLeases retentionLeases = replicationTracker.getRetentionLeases(); + final Set shardRoutings = Sets.newHashSetWithExpectedSize(routingTable.size()); for (int copy = 0; copy < routingTable.size(); copy++) { - shardRoutings.add(routingTable.shard(copy)); + ShardRouting shardRouting = routingTable.shard(copy); + if (shardRouting.isPromotableToPrimary()) shardRoutings.add(shardRouting); } - shardRoutings.addAll(routingTable.assignedShards()); // include relocation targets + + // include relocation targets + shardRoutings.addAll(routingTable.assignedShards().stream().filter(ShardRouting::isPromotableToPrimary).toList()); + if (shardRoutings.stream() .allMatch( shr -> shr.assignedToNode() && retentionLeases.contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(shr)) @@ -1668,6 +1673,13 @@ public void postRecovery(String reason) throws IndexShardStartedException, Index if (state == IndexShardState.STARTED) { throw new IndexShardStartedException(shardId); } + if (routingEntry().isPromotableToPrimary() == false) { + // Swiftly skip intermediate stages + recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX); + recoveryState.setStage(RecoveryState.Stage.TRANSLOG); + recoveryState.getIndex().setFileDetailsComplete(); + recoveryState.setStage(RecoveryState.Stage.FINALIZE); + } recoveryState.setStage(RecoveryState.Stage.DONE); changeState(IndexShardState.POST_RECOVERY, reason); } @@ -1935,7 +1947,9 @@ public void openEngineAndRecoverFromTranslog() throws IOException { */ public void openEngineAndSkipTranslogRecovery() throws IOException { assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]"; - recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG); + recoveryState.validateCurrentStage( + routingEntry().isPromotableToPrimary() ? RecoveryState.Stage.TRANSLOG : RecoveryState.Stage.INDEX + ); loadGlobalCheckpointToReplicationTracker(); innerOpenEngineAndTranslog(replicationTracker); getEngine().skipTranslogRecovery(); @@ -1972,7 +1986,9 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine. onSettingsChanged(); assert assertSequenceNumbersInCommit(); - recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG); + recoveryState.validateCurrentStage( + routingEntry().isPromotableToPrimary() ? RecoveryState.Stage.TRANSLOG : RecoveryState.Stage.INDEX + ); } private boolean assertSequenceNumbersInCommit() throws IOException { diff --git a/server/src/main/java/org/elasticsearch/index/shard/ReplicationGroup.java b/server/src/main/java/org/elasticsearch/index/shard/ReplicationGroup.java index cf3b8fc0fbaf3..53f932faf4512 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/ReplicationGroup.java +++ b/server/src/main/java/org/elasticsearch/index/shard/ReplicationGroup.java @@ -40,11 +40,14 @@ public ReplicationGroup( this.trackedAllocationIds = trackedAllocationIds; this.version = version; - this.unavailableInSyncShards = Sets.difference(inSyncAllocationIds, routingTable.getAllAllocationIds()); + this.unavailableInSyncShards = Sets.difference(inSyncAllocationIds, routingTable.getPromotableAllocationIds()); this.replicationTargets = new ArrayList<>(); this.skippedShards = new ArrayList<>(); for (int copy = 0; copy < routingTable.size(); copy++) { ShardRouting shard = routingTable.shard(copy); + if (shard.isPromotableToPrimary() == false) { + continue; + } if (shard.unassigned()) { assert shard.primary() == false : "primary shard should not be unassigned in a replication group: " + shard; skippedShards.add(shard); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 1d3120310f0c9..fdbc278124af8 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -217,6 +217,8 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi final RecoveryTarget recoveryTarget = recoveryRef.target(); assert recoveryTarget.sourceNode() != null : "cannot do a recovery without a source node"; final RecoveryState.Timer timer = recoveryTarget.state().getTimer(); + final IndexShard indexShard = recoveryTarget.indexShard(); + final boolean promotableToPrimary = indexShard.routingEntry().isPromotableToPrimary(); record StartRecoveryRequestToSend(StartRecoveryRequest startRecoveryRequest, String actionName, TransportRequest requestToSend) {} final ActionListener toSendListener = ActionListener.notifyOnce( @@ -224,17 +226,23 @@ record StartRecoveryRequestToSend(StartRecoveryRequest startRecoveryRequest, Str @Override public void onResponse(StartRecoveryRequestToSend r) { logger.trace( - "{} [{}]: recovery from {}", + "{}{} [{}]: recovery from {}", r.startRecoveryRequest().shardId(), + promotableToPrimary ? " (promotable)" : "", r.actionName(), r.startRecoveryRequest().sourceNode() ); - transportService.sendRequest( - r.startRecoveryRequest().sourceNode(), - r.actionName(), - r.requestToSend(), - new RecoveryResponseHandler(r.startRecoveryRequest(), timer) - ); + RecoveryResponseHandler recoveryResponseHandler = new RecoveryResponseHandler(r.startRecoveryRequest(), timer); + if (promotableToPrimary) { + transportService.sendRequest( + r.startRecoveryRequest().sourceNode(), + r.actionName(), + r.requestToSend(), + recoveryResponseHandler + ); + } else { + onGoingRecoveries.markRecoveryAsDone(recoveryId); + } } @Override @@ -252,7 +260,6 @@ public void onFailure(Exception e) { if (preExistingRequest == null) { try { - final IndexShard indexShard = recoveryTarget.indexShard(); indexShard.preRecovery(toSendListener.delegateFailure((l, v) -> ActionListener.completeWith(l, () -> { logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId()); indexShard.prepareForIndexRecovery(); @@ -267,7 +274,12 @@ public void onFailure(Exception e) { store.decRef(); } } - final long startingSeqNo = indexShard.recoverLocallyUpToGlobalCheckpoint(); + long startingSeqNo = UNASSIGNED_SEQ_NO; + if (promotableToPrimary) { + startingSeqNo = indexShard.recoverLocallyUpToGlobalCheckpoint(); + } else { + indexShard.openEngineAndSkipTranslogRecovery(); + } assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG : "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]"; final var startRequest = getStartRecoveryRequest(logger, clusterService.localNode(), recoveryTarget, startingSeqNo); @@ -277,6 +289,7 @@ public void onFailure(Exception e) { toSendListener.onFailure(e); } } else { + assert indexShard.routingEntry().isPromotableToPrimary(); toSendListener.onResponse( new StartRecoveryRequestToSend( preExistingRequest, diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java index d896425eef2cf..59ed1ba2b871f 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java @@ -164,6 +164,7 @@ public RecoveryRef getRecoverySafe(long id, ShardId shardId) { throw new IndexShardClosedException(shardId); } assert recoveryRef.target().shardId().equals(shardId); + assert recoveryRef.target().indexShard().routingEntry().isPromotableToPrimary(); return recoveryRef; } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java index 8dd0e89e1cbbe..cf096e35bdbc0 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -234,7 +234,7 @@ public void testUnavailableShardsMarkedAsStale() throws Exception { final long primaryTerm = indexMetadata.primaryTerm(0); final Set inSyncAllocationIds = indexMetadata.inSyncAllocationIds(0); - final Set trackedShards = shardRoutingTable.getAllAllocationIds(); + final Set trackedShards = shardRoutingTable.getPromotableAllocationIds(); List unavailableShards = randomSubsetOf(randomIntBetween(1, nbReplicas), shardRoutingTable.replicaShards()); IndexShardRoutingTable.Builder shardRoutingTableBuilder = new IndexShardRoutingTable.Builder(shardRoutingTable); diff --git a/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java index 4a3498ea6baae..919737caf2c7a 100644 --- a/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java @@ -152,7 +152,7 @@ public void testResyncDoesNotBlockOnPrimaryAction() throws Exception { new ReplicationGroup( shardRoutingTable, clusterService.state().metadata().index(index).inSyncAllocationIds(shardId.id()), - shardRoutingTable.getAllAllocationIds(), + shardRoutingTable.getPromotableAllocationIds(), 0 ) ); diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java index 8dab09fb6015f..543b673635ee0 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -507,7 +507,7 @@ public void testPrimaryFailureHandlingReplicaResponse() throws Exception { final Set inSyncAllocationIds = indexMetadata.inSyncAllocationIds(0); final IndexShardRoutingTable shardRoutingTable = state.routingTable().index(index).shard(shardId.id()); - final Set trackedShards = shardRoutingTable.getAllAllocationIds(); + final Set trackedShards = shardRoutingTable.getPromotableAllocationIds(); final ReplicationGroup initialReplicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards, 0); final Thread testThread = Thread.currentThread(); diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 97605ec71928f..e64dddff3cdd3 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -928,7 +928,12 @@ public void testSeqNoIsSetOnPrimary() { Set inSyncIds = randomBoolean() ? singleton(routingEntry.allocationId().getId()) : clusterService.state().metadata().index(index).inSyncAllocationIds(0); - ReplicationGroup replicationGroup = new ReplicationGroup(shardRoutingTable, inSyncIds, shardRoutingTable.getAllAllocationIds(), 0); + ReplicationGroup replicationGroup = new ReplicationGroup( + shardRoutingTable, + inSyncIds, + shardRoutingTable.getPromotableAllocationIds(), + 0 + ); when(shard.getReplicationGroup()).thenReturn(replicationGroup); PendingReplicationActions replicationActions = new PendingReplicationActions(shardId, threadPool); replicationActions.accept(replicationGroup); diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java index 5a8d864061d05..2584583b4f0a3 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java @@ -134,7 +134,7 @@ public void testToXContent() throws IOException { IndexRoutingTable index = clusterState.getRoutingTable().getIndicesRouting().get("index"); String ephemeralId = clusterState.getNodes().get("nodeId1").getEphemeralId(); - String allocationId = index.shard(0).getAllAllocationIds().iterator().next(); + String allocationId = index.shard(0).getPromotableAllocationIds().iterator().next(); XContentBuilder builder = JsonXContent.contentBuilder(); builder.startObject(); @@ -370,7 +370,7 @@ public void testToXContent_FlatSettingTrue_ReduceMappingFalse() throws IOExcepti IndexRoutingTable index = clusterState.getRoutingTable().getIndicesRouting().get("index"); String ephemeralId = clusterState.getNodes().get("nodeId1").getEphemeralId(); - String allocationId = index.shard(0).getAllAllocationIds().iterator().next(); + String allocationId = index.shard(0).getPromotableAllocationIds().iterator().next(); XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint(); builder.startObject(); @@ -598,7 +598,7 @@ public void testToXContent_FlatSettingFalse_ReduceMappingTrue() throws IOExcepti IndexRoutingTable index = clusterState.getRoutingTable().getIndicesRouting().get("index"); String ephemeralId = clusterState.getNodes().get("nodeId1").getEphemeralId(); - String allocationId = index.shard(0).getAllAllocationIds().iterator().next(); + String allocationId = index.shard(0).getPromotableAllocationIds().iterator().next(); XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint(); builder.startObject(); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/AutoExpandReplicasTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/AutoExpandReplicasTests.java index fedbc31fcdeb7..c82b13918835e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/AutoExpandReplicasTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/AutoExpandReplicasTests.java @@ -162,7 +162,11 @@ public void testAutoExpandWhenNodeLeavesAndPossiblyRejoins() throws InterruptedE postTable = state.routingTable().index("index").shard(0); assertTrue("not all shards started in " + state.toString(), postTable.allShardsStarted()); - assertThat(postTable.toString(), postTable.getAllAllocationIds(), everyItem(is(in(preTable.getAllAllocationIds())))); + assertThat( + postTable.toString(), + postTable.getPromotableAllocationIds(), + everyItem(is(in(preTable.getPromotableAllocationIds()))) + ); } else { // fake an election where conflicting nodes are removed and readded state = ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).masterNodeId(null).build()).build(); @@ -199,7 +203,7 @@ public void testAutoExpandWhenNodeLeavesAndPossiblyRejoins() throws InterruptedE .map(shr -> shr.allocationId().getId()) .collect(Collectors.toSet()); - assertThat(postTable.toString(), unchangedAllocationIds, everyItem(is(in(postTable.getAllAllocationIds())))); + assertThat(postTable.toString(), unchangedAllocationIds, everyItem(is(in(postTable.getPromotableAllocationIds())))); RoutingNodesHelper.asStream(postTable).forEach(shardRouting -> { if (shardRouting.assignedToNode() && unchangedAllocationIds.contains(shardRouting.allocationId().getId())) { diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 277dfeb913525..68b4f18fbcfd2 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -249,7 +249,7 @@ public static FieldType randomIdFieldType() { return randomBoolean() ? ProvidedIdFieldMapper.Defaults.FIELD_TYPE : TsidExtractingIdFieldMapper.FIELD_TYPE; } - public EngineConfig copy(EngineConfig config, LongSupplier globalCheckpointSupplier) { + public static EngineConfig copy(EngineConfig config, LongSupplier globalCheckpointSupplier) { return new EngineConfig( config.getShardId(), config.getThreadPool(), From 2188af00e8f1086bf34bea0b8a306a139a06fc1d Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Tue, 24 Jan 2023 18:54:07 +0200 Subject: [PATCH 02/21] Update docs/changelog/93210.yaml --- docs/changelog/93210.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/93210.yaml diff --git a/docs/changelog/93210.yaml b/docs/changelog/93210.yaml new file mode 100644 index 0000000000000..179f4ab9dec8d --- /dev/null +++ b/docs/changelog/93210.yaml @@ -0,0 +1,5 @@ +pr: 93210 +summary: Unpromotables skip replication and peer recovery +area: Allocation +type: enhancement +issues: [] From 4a2f8b3fbf00dd68a41204e78d0617cf242475df Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Wed, 25 Jan 2023 14:41:01 +0200 Subject: [PATCH 03/21] New Refresh action for unpromotable replicas --- .../cluster/routing/ShardRoutingRoleIT.java | 72 ++++++++++++++++++ .../elasticsearch/action/ActionModule.java | 2 + .../refresh/ReplicaShardRefreshRequest.java | 58 -------------- .../refresh/TransportShardRefreshAction.java | 76 ++++++++++++------- ...sportUnpromotableReplicaRefreshAction.java | 62 +++++++++++++++ .../UnpromotableReplicaRefreshRequest.java | 59 ++++++++++++++ 6 files changed, 245 insertions(+), 84 deletions(-) delete mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/refresh/ReplicaShardRefreshRequest.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableReplicaRefreshAction.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/refresh/UnpromotableReplicaRefreshRequest.java diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java index 2f60dc82c1484..279d77718c647 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java @@ -10,6 +10,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.admin.indices.refresh.TransportShardRefreshAction; +import org.elasticsearch.action.admin.indices.refresh.TransportUnpromotableReplicaRefreshAction; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; @@ -50,13 +53,17 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; @SuppressWarnings("resource") @@ -481,4 +488,69 @@ public void testSearchRouting() throws Exception { } } + public void testSearchRefresh() throws Exception { + var routingTableWatcher = new RoutingTableWatcher(); + + var numDataNodes = routingTableWatcher.numReplicas + 2; + internalCluster().ensureAtLeastNumDataNodes(numDataNodes); + getMasterNodePlugin().numIndexingCopies = routingTableWatcher.numIndexingCopies; + + final AtomicInteger refreshReplicaActions = new AtomicInteger(0); + final AtomicInteger refreshUnpromotableActions = new AtomicInteger(0); + + for (var transportService : internalCluster().getInstances(TransportService.class)) { + MockTransportService mockTransportService = (MockTransportService) transportService; + mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (action.startsWith(TransportShardRefreshAction.NAME + "[r]")) { + refreshReplicaActions.incrementAndGet(); + } + if (action.startsWith(TransportUnpromotableReplicaRefreshAction.NAME)) { + refreshUnpromotableActions.incrementAndGet(); + } + connection.sendRequest(requestId, action, request, options); + }); + mockTransportService.addRequestHandlingBehavior( + TransportUnpromotableReplicaRefreshAction.NAME, + (handler, request, channel, task) -> { + // Skip handling the request and send an immediate empty response + channel.sendResponse(ActionResponse.Empty.INSTANCE); + } + ); + } + + final var masterClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); + try { + // verify the correct number of shard copies of each role as the routing table evolves + masterClusterService.addListener(routingTableWatcher); + + createIndex( + INDEX_NAME, + Settings.builder() + .put(routingTableWatcher.getIndexSettings()) + .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), false) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1) + .build() + ); + ensureGreen(INDEX_NAME); + assertEngineTypes(); + + indexRandom(false, INDEX_NAME, randomIntBetween(100, 200)); + assertNoFailures(client().admin().indices().prepareRefresh(INDEX_NAME).execute().get()); + + // Each primary will send a TransportShardRefreshAction to each of the promotable replica shards + assertThat( + refreshReplicaActions.get(), + is(equalTo((routingTableWatcher.numIndexingCopies - 1) * routingTableWatcher.numShards)) + ); + + // Each primary will send a TransportUnpromotableReplicaRefreshAction to each of the unpromotable replica / search shards + assertThat( + refreshUnpromotableActions.get(), + is(equalTo((routingTableWatcher.numReplicas - (routingTableWatcher.numIndexingCopies - 1)) * routingTableWatcher.numShards)) + ); + } finally { + masterClusterService.removeListener(routingTableWatcher); + } + } + } diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 59d055e27415c..7447477199821 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -161,6 +161,7 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshAction; import org.elasticsearch.action.admin.indices.refresh.TransportRefreshAction; import org.elasticsearch.action.admin.indices.refresh.TransportShardRefreshAction; +import org.elasticsearch.action.admin.indices.refresh.TransportUnpromotableReplicaRefreshAction; import org.elasticsearch.action.admin.indices.resolve.ResolveIndexAction; import org.elasticsearch.action.admin.indices.rollover.RolloverAction; import org.elasticsearch.action.admin.indices.rollover.TransportRolloverAction; @@ -708,6 +709,7 @@ public void reg actions.register(TransportNodesListShardStoreMetadata.TYPE, TransportNodesListShardStoreMetadata.class); actions.register(TransportShardFlushAction.TYPE, TransportShardFlushAction.class); actions.register(TransportShardRefreshAction.TYPE, TransportShardRefreshAction.class); + actions.register(TransportUnpromotableReplicaRefreshAction.TYPE, TransportUnpromotableReplicaRefreshAction.class); actions.register(TransportPrevalidateShardPathAction.TYPE, TransportPrevalidateShardPathAction.class); // desired nodes diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/ReplicaShardRefreshRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/ReplicaShardRefreshRequest.java deleted file mode 100644 index 86038db90f404..0000000000000 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/ReplicaShardRefreshRequest.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ - -package org.elasticsearch.action.admin.indices.refresh; - -import org.elasticsearch.Version; -import org.elasticsearch.action.support.replication.ReplicationRequest; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.core.Nullable; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.tasks.TaskId; - -import java.io.IOException; - -public class ReplicaShardRefreshRequest extends ReplicationRequest { - - @Nullable - private final Long segmentGeneration; - - public ReplicaShardRefreshRequest(ShardId shardId, TaskId parentTaskId, @Nullable Long segmentGeneration) { - super(shardId); - setParentTask(parentTaskId); - this.segmentGeneration = segmentGeneration; - } - - public ReplicaShardRefreshRequest(StreamInput in) throws IOException { - super(in); - if (in.getVersion().onOrAfter(Version.V_8_7_0)) { - this.segmentGeneration = in.readOptionalVLong(); - } else { - this.segmentGeneration = null; - } - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - if (out.getVersion().onOrAfter(Version.V_8_7_0)) { - out.writeOptionalVLong(segmentGeneration); - } - } - - @Nullable - public Long getSegmentGeneration() { - return segmentGeneration; - } - - @Override - public String toString() { - return "ReplicaShardRefreshRequest{" + shardId + '}'; - } -} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java index 53c83a99183d8..8d1825e7aba59 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java @@ -9,30 +9,37 @@ package org.elasticsearch.action.admin.indices.refresh; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.RefCountingListener; import org.elasticsearch.action.support.replication.BasicReplicationRequest; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportReplicationAction; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.function.Predicate; +import java.util.stream.Collectors; public class TransportShardRefreshAction extends TransportReplicationAction< BasicReplicationRequest, - ReplicaShardRefreshRequest, + BasicReplicationRequest, ReplicationResponse> { private static final Logger logger = LogManager.getLogger(TransportShardRefreshAction.class); @@ -41,8 +48,6 @@ public class TransportShardRefreshAction extends TransportReplicationAction< public static final ActionType TYPE = new ActionType<>(NAME, ReplicationResponse::new); public static final String SOURCE_API = "api"; - private final Settings settings; - @Inject public TransportShardRefreshAction( Settings settings, @@ -63,10 +68,9 @@ public TransportShardRefreshAction( shardStateAction, actionFilters, BasicReplicationRequest::new, - ReplicaShardRefreshRequest::new, + BasicReplicationRequest::new, ThreadPool.Names.REFRESH ); - this.settings = settings; } @Override @@ -78,31 +82,51 @@ protected ReplicationResponse newResponseInstance(StreamInput in) throws IOExcep protected void shardOperationOnPrimary( BasicReplicationRequest shardRequest, IndexShard primary, - ActionListener> listener + ActionListener> listener ) { - ActionListener.completeWith(listener, () -> { + try ( + var listeners = new RefCountingListener( + listener.map(ignored -> { return new PrimaryResult<>(shardRequest, new ReplicationResponse()); }) + ) + ) { var refreshResult = primary.refresh(SOURCE_API); logger.trace("{} refresh request executed on primary", primary.shardId()); - var shardRefreshRequest = new ReplicaShardRefreshRequest( - primary.shardId(), - shardRequest.getParentTask(), - refreshResult.generation() - ); - return new PrimaryResult<>(shardRefreshRequest, new ReplicationResponse()); - }); + + // Forward the request to all nodes that hold unpromotable replica shards + final ClusterState clusterState = clusterService.state(); + clusterService.state() + .routingTable() + .shardRoutingTable(shardRequest.shardId()) + .replicaShards() + .stream() + .filter(Predicate.not(ShardRouting::isPromotableToPrimary)) + .map(ShardRouting::currentNodeId) + .collect(Collectors.toUnmodifiableSet()) + .forEach(nodeId -> { + final DiscoveryNode node = clusterState.nodes().get(nodeId); + UnpromotableReplicaRefreshRequest request = new UnpromotableReplicaRefreshRequest( + primary.shardId(), + refreshResult.generation() + ); + logger.trace("forwarding refresh request [{}] to node [{}]", request, node); + transportService.sendChildRequest( + node, + TransportUnpromotableReplicaRefreshAction.NAME, + request, + taskManager.getTask(shardRequest.getParentTask().getId()), + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>(listeners.acquire(ignored -> {}), (in) -> TransportResponse.Empty.INSTANCE) + ); + }); + } } @Override - protected void shardOperationOnReplica(ReplicaShardRefreshRequest request, IndexShard replica, ActionListener listener) { - if (DiscoveryNode.isStateless(settings) && replica.routingEntry().isPromotableToPrimary() == false) { - assert request.getSegmentGeneration() != Engine.RefreshResult.UNKNOWN_GENERATION; - replica.waitForSegmentGeneration(request.getSegmentGeneration(), listener.map(l -> new ReplicaResult())); - } else { - ActionListener.completeWith(listener, () -> { - replica.refresh(SOURCE_API); - logger.trace("{} refresh request executed on replica", replica.shardId()); - return new ReplicaResult(); - }); - } + protected void shardOperationOnReplica(BasicReplicationRequest request, IndexShard replica, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + replica.refresh(SOURCE_API); + logger.trace("{} refresh request executed on replica", replica.shardId()); + return new ReplicaResult(); + }); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableReplicaRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableReplicaRefreshAction.java new file mode 100644 index 0000000000000..f38c1407b4839 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableReplicaRefreshAction.java @@ -0,0 +1,62 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.admin.indices.refresh; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.TransportService; + +public class TransportUnpromotableReplicaRefreshAction extends HandledTransportAction< + UnpromotableReplicaRefreshRequest, + ActionResponse.Empty> { + public static final String NAME = RefreshAction.NAME + "[u]"; + public static final ActionType TYPE = new ActionType<>(NAME, in -> ActionResponse.Empty.INSTANCE); + private static final Logger logger = LogManager.getLogger(TransportUnpromotableReplicaRefreshAction.class); + private final ClusterService clusterService; + private final TransportService transportService; + private final IndicesService indicesService; + private final Client client; + + @Inject + public TransportUnpromotableReplicaRefreshAction( + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters, + IndicesService indicesService, + Client client + ) { + super(NAME, transportService, actionFilters, UnpromotableReplicaRefreshRequest::new); + this.clusterService = clusterService; + this.transportService = transportService; + this.indicesService = indicesService; + this.client = client; + } + + @Override + protected void doExecute(Task task, UnpromotableReplicaRefreshRequest request, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + assert request.getSegmentGeneration() != Engine.RefreshResult.UNKNOWN_GENERATION; + IndexShard shard = indicesService.indexServiceSafe(request.getShardId().getIndex()).getShard(request.getShardId().id()); + shard.waitForSegmentGeneration(request.getSegmentGeneration(), listener.map(l -> ActionResponse.Empty.INSTANCE)); + return ActionResponse.Empty.INSTANCE; + }); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/UnpromotableReplicaRefreshRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/UnpromotableReplicaRefreshRequest.java new file mode 100644 index 0000000000000..bb80734fe48bb --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/UnpromotableReplicaRefreshRequest.java @@ -0,0 +1,59 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.admin.indices.refresh; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.shard.ShardId; + +import java.io.IOException; + +public class UnpromotableReplicaRefreshRequest extends ActionRequest { + + private final ShardId shardId; + private final long segmentGeneration; + + public UnpromotableReplicaRefreshRequest(final ShardId shardId, long segmentGeneration) { + this.shardId = shardId; + this.segmentGeneration = segmentGeneration; + } + + public UnpromotableReplicaRefreshRequest(StreamInput in) throws IOException { + super(in); + shardId = new ShardId(in); + segmentGeneration = in.readVLong(); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + shardId.writeTo(out); + out.writeVLong(segmentGeneration); + } + + public ShardId getShardId() { + return shardId; + } + + public long getSegmentGeneration() { + return segmentGeneration; + } + + @Override + public String toString() { + return "UnpromotableReplicaRefreshRequest{" + "shardId=" + shardId + ", segmentGeneration=" + segmentGeneration + '}'; + } +} From 06fb59b83c89ebc6394e5a920305ae85866e46bd Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Fri, 27 Jan 2023 13:05:18 +0200 Subject: [PATCH 04/21] Reviewer comments on refresh --- .../cluster/routing/ShardRoutingRoleIT.java | 10 ++++---- .../elasticsearch/action/ActionModule.java | 4 +-- .../refresh/TransportShardRefreshAction.java | 25 ++++++++++--------- ...nsportUnpromotableShardRefreshAction.java} | 19 +++++++------- ...a => UnpromotableShardRefreshRequest.java} | 8 +++--- 5 files changed, 33 insertions(+), 33 deletions(-) rename server/src/main/java/org/elasticsearch/action/admin/indices/refresh/{TransportUnpromotableReplicaRefreshAction.java => TransportUnpromotableShardRefreshAction.java} (81%) rename server/src/main/java/org/elasticsearch/action/admin/indices/refresh/{UnpromotableReplicaRefreshRequest.java => UnpromotableShardRefreshRequest.java} (79%) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java index 279d77718c647..673d692d8b9b3 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java @@ -12,7 +12,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.admin.indices.refresh.TransportShardRefreshAction; -import org.elasticsearch.action.admin.indices.refresh.TransportUnpromotableReplicaRefreshAction; +import org.elasticsearch.action.admin.indices.refresh.TransportUnpromotableShardRefreshAction; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; @@ -488,7 +488,7 @@ public void testSearchRouting() throws Exception { } } - public void testSearchRefresh() throws Exception { + public void testRefreshOfSearchShards() throws Exception { var routingTableWatcher = new RoutingTableWatcher(); var numDataNodes = routingTableWatcher.numReplicas + 2; @@ -504,13 +504,13 @@ public void testSearchRefresh() throws Exception { if (action.startsWith(TransportShardRefreshAction.NAME + "[r]")) { refreshReplicaActions.incrementAndGet(); } - if (action.startsWith(TransportUnpromotableReplicaRefreshAction.NAME)) { + if (action.startsWith(TransportUnpromotableShardRefreshAction.NAME)) { refreshUnpromotableActions.incrementAndGet(); } connection.sendRequest(requestId, action, request, options); }); mockTransportService.addRequestHandlingBehavior( - TransportUnpromotableReplicaRefreshAction.NAME, + TransportUnpromotableShardRefreshAction.NAME, (handler, request, channel, task) -> { // Skip handling the request and send an immediate empty response channel.sendResponse(ActionResponse.Empty.INSTANCE); @@ -543,7 +543,7 @@ public void testSearchRefresh() throws Exception { is(equalTo((routingTableWatcher.numIndexingCopies - 1) * routingTableWatcher.numShards)) ); - // Each primary will send a TransportUnpromotableReplicaRefreshAction to each of the unpromotable replica / search shards + // Each primary will send a TransportUnpromotableShardRefreshAction to each of the unpromotable replica / search shards assertThat( refreshUnpromotableActions.get(), is(equalTo((routingTableWatcher.numReplicas - (routingTableWatcher.numIndexingCopies - 1)) * routingTableWatcher.numShards)) diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 7447477199821..ccbbac8f43ce7 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -161,7 +161,7 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshAction; import org.elasticsearch.action.admin.indices.refresh.TransportRefreshAction; import org.elasticsearch.action.admin.indices.refresh.TransportShardRefreshAction; -import org.elasticsearch.action.admin.indices.refresh.TransportUnpromotableReplicaRefreshAction; +import org.elasticsearch.action.admin.indices.refresh.TransportUnpromotableShardRefreshAction; import org.elasticsearch.action.admin.indices.resolve.ResolveIndexAction; import org.elasticsearch.action.admin.indices.rollover.RolloverAction; import org.elasticsearch.action.admin.indices.rollover.TransportRolloverAction; @@ -709,7 +709,7 @@ public void reg actions.register(TransportNodesListShardStoreMetadata.TYPE, TransportNodesListShardStoreMetadata.class); actions.register(TransportShardFlushAction.TYPE, TransportShardFlushAction.class); actions.register(TransportShardRefreshAction.TYPE, TransportShardRefreshAction.class); - actions.register(TransportUnpromotableReplicaRefreshAction.TYPE, TransportUnpromotableReplicaRefreshAction.class); + actions.register(TransportUnpromotableShardRefreshAction.TYPE, TransportUnpromotableShardRefreshAction.class); actions.register(TransportPrevalidateShardPathAction.TYPE, TransportPrevalidateShardPathAction.class); // desired nodes diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java index 8d1825e7aba59..3b1600ce94a39 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java @@ -28,6 +28,7 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; @@ -84,38 +85,38 @@ protected void shardOperationOnPrimary( IndexShard primary, ActionListener> listener ) { - try ( - var listeners = new RefCountingListener( - listener.map(ignored -> { return new PrimaryResult<>(shardRequest, new ReplicationResponse()); }) - ) - ) { + try (var listeners = new RefCountingListener(listener.map(v -> new PrimaryResult<>(shardRequest, new ReplicationResponse())))) { var refreshResult = primary.refresh(SOURCE_API); logger.trace("{} refresh request executed on primary", primary.shardId()); // Forward the request to all nodes that hold unpromotable replica shards final ClusterState clusterState = clusterService.state(); - clusterService.state() - .routingTable() + final Task parentTaskId = taskManager.getTask(shardRequest.getParentTask().getId()); + clusterState.routingTable() .shardRoutingTable(shardRequest.shardId()) - .replicaShards() + .assignedShards() .stream() .filter(Predicate.not(ShardRouting::isPromotableToPrimary)) .map(ShardRouting::currentNodeId) .collect(Collectors.toUnmodifiableSet()) .forEach(nodeId -> { final DiscoveryNode node = clusterState.nodes().get(nodeId); - UnpromotableReplicaRefreshRequest request = new UnpromotableReplicaRefreshRequest( + UnpromotableShardRefreshRequest request = new UnpromotableShardRefreshRequest( primary.shardId(), refreshResult.generation() ); logger.trace("forwarding refresh request [{}] to node [{}]", request, node); transportService.sendChildRequest( node, - TransportUnpromotableReplicaRefreshAction.NAME, + TransportUnpromotableShardRefreshAction.NAME, request, - taskManager.getTask(shardRequest.getParentTask().getId()), + parentTaskId, TransportRequestOptions.EMPTY, - new ActionListenerResponseHandler<>(listeners.acquire(ignored -> {}), (in) -> TransportResponse.Empty.INSTANCE) + new ActionListenerResponseHandler<>( + listeners.acquire(ignored -> {}), + (in) -> TransportResponse.Empty.INSTANCE, + ThreadPool.Names.REFRESH + ) ); }); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableReplicaRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java similarity index 81% rename from server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableReplicaRefreshAction.java rename to server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java index f38c1407b4839..7450070fd3841 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableReplicaRefreshAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java @@ -24,26 +24,24 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportService; -public class TransportUnpromotableReplicaRefreshAction extends HandledTransportAction< - UnpromotableReplicaRefreshRequest, - ActionResponse.Empty> { +public class TransportUnpromotableShardRefreshAction extends HandledTransportAction { public static final String NAME = RefreshAction.NAME + "[u]"; public static final ActionType TYPE = new ActionType<>(NAME, in -> ActionResponse.Empty.INSTANCE); - private static final Logger logger = LogManager.getLogger(TransportUnpromotableReplicaRefreshAction.class); + private static final Logger logger = LogManager.getLogger(TransportUnpromotableShardRefreshAction.class); private final ClusterService clusterService; private final TransportService transportService; private final IndicesService indicesService; private final Client client; @Inject - public TransportUnpromotableReplicaRefreshAction( + public TransportUnpromotableShardRefreshAction( ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, IndicesService indicesService, Client client ) { - super(NAME, transportService, actionFilters, UnpromotableReplicaRefreshRequest::new); + super(NAME, transportService, actionFilters, UnpromotableShardRefreshRequest::new); this.clusterService = clusterService; this.transportService = transportService; this.indicesService = indicesService; @@ -51,12 +49,13 @@ public TransportUnpromotableReplicaRefreshAction( } @Override - protected void doExecute(Task task, UnpromotableReplicaRefreshRequest request, ActionListener listener) { - ActionListener.completeWith(listener, () -> { + protected void doExecute(Task task, UnpromotableShardRefreshRequest request, ActionListener listener) { + try { assert request.getSegmentGeneration() != Engine.RefreshResult.UNKNOWN_GENERATION; IndexShard shard = indicesService.indexServiceSafe(request.getShardId().getIndex()).getShard(request.getShardId().id()); shard.waitForSegmentGeneration(request.getSegmentGeneration(), listener.map(l -> ActionResponse.Empty.INSTANCE)); - return ActionResponse.Empty.INSTANCE; - }); + } catch (Exception e) { + listener.onFailure(e); + } } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/UnpromotableReplicaRefreshRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/UnpromotableShardRefreshRequest.java similarity index 79% rename from server/src/main/java/org/elasticsearch/action/admin/indices/refresh/UnpromotableReplicaRefreshRequest.java rename to server/src/main/java/org/elasticsearch/action/admin/indices/refresh/UnpromotableShardRefreshRequest.java index bb80734fe48bb..52ef3917ce722 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/UnpromotableReplicaRefreshRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/UnpromotableShardRefreshRequest.java @@ -16,17 +16,17 @@ import java.io.IOException; -public class UnpromotableReplicaRefreshRequest extends ActionRequest { +public class UnpromotableShardRefreshRequest extends ActionRequest { private final ShardId shardId; private final long segmentGeneration; - public UnpromotableReplicaRefreshRequest(final ShardId shardId, long segmentGeneration) { + public UnpromotableShardRefreshRequest(final ShardId shardId, long segmentGeneration) { this.shardId = shardId; this.segmentGeneration = segmentGeneration; } - public UnpromotableReplicaRefreshRequest(StreamInput in) throws IOException { + public UnpromotableShardRefreshRequest(StreamInput in) throws IOException { super(in); shardId = new ShardId(in); segmentGeneration = in.readVLong(); @@ -54,6 +54,6 @@ public long getSegmentGeneration() { @Override public String toString() { - return "UnpromotableReplicaRefreshRequest{" + "shardId=" + shardId + ", segmentGeneration=" + segmentGeneration + '}'; + return "UnpromotableShardRefreshRequest{" + "shardId=" + shardId + ", segmentGeneration=" + segmentGeneration + '}'; } } From 25d50cb9cbe72f2e0bd96e3f9e782a84159651ba Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Fri, 27 Jan 2023 13:38:27 +0200 Subject: [PATCH 05/21] Fixes --- .../cluster/routing/ShardRoutingRoleIT.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java index 79d813e3cd836..c32a0d43abce3 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java @@ -225,6 +225,13 @@ private static void installMockTransportVerifications(RoutingTableWatcher routin } connection.sendRequest(requestId, action, request, options); }); + mockTransportService.addRequestHandlingBehavior( + TransportUnpromotableShardRefreshAction.NAME, + (handler, request, channel, task) -> { + // Skip handling the request and send an immediate empty response + channel.sendResponse(ActionResponse.Empty.INSTANCE); + } + ); } } @@ -439,7 +446,7 @@ public AllocationCommand getCancelPrimaryCommand() { } @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/93292") - public void testSearchRouting() { + public void testSearchRouting() throws Exception { var routingTableWatcher = new RoutingTableWatcher(); routingTableWatcher.numReplicas = Math.max(1, routingTableWatcher.numReplicas); @@ -508,6 +515,7 @@ public void testRefreshOfSearchShards() throws Exception { var numDataNodes = routingTableWatcher.numReplicas + 2; internalCluster().ensureAtLeastNumDataNodes(numDataNodes); + installMockTransportVerifications(routingTableWatcher); getMasterNodePlugin().numIndexingCopies = routingTableWatcher.numIndexingCopies; final AtomicInteger refreshReplicaActions = new AtomicInteger(0); @@ -524,13 +532,6 @@ public void testRefreshOfSearchShards() throws Exception { } connection.sendRequest(requestId, action, request, options); }); - mockTransportService.addRequestHandlingBehavior( - TransportUnpromotableShardRefreshAction.NAME, - (handler, request, channel, task) -> { - // Skip handling the request and send an immediate empty response - channel.sendResponse(ActionResponse.Empty.INSTANCE); - } - ); } final var masterClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); @@ -589,6 +590,7 @@ public void testClosedIndex() { var numDataNodes = routingTableWatcher.numReplicas + 2; internalCluster().ensureAtLeastNumDataNodes(numDataNodes); + installMockTransportVerifications(routingTableWatcher); getMasterNodePlugin().numIndexingCopies = routingTableWatcher.numIndexingCopies; final var masterClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); From b817173b3d5333b7792c1231bdcd53767cdde9d4 Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Fri, 27 Jan 2023 15:29:45 +0200 Subject: [PATCH 06/21] Add unpromotable refresh to NON_OPERATOR_ACTIONS --- .../org/elasticsearch/xpack/security/operator/Constants.java | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index edb6d07c6326e..a207a6b82573f 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -403,6 +403,7 @@ public class Constants { "indices:admin/open", "indices:admin/refresh", "indices:admin/refresh[s]", + "indices:admin/refresh[u]", "indices:admin/reload_analyzers", "indices:admin/resize", "indices:admin/resolve/index", From 33d9218657e15c3f8d84dcb79c0154a7e0d05ad2 Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Fri, 27 Jan 2023 15:48:26 +0200 Subject: [PATCH 07/21] PR comments --- .../elasticsearch/cluster/routing/ShardRoutingRoleIT.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java index c32a0d43abce3..b17f882812fa9 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java @@ -66,6 +66,7 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.startsWith; @SuppressWarnings("resource") public class ShardRoutingRoleIT extends ESIntegTestCase { @@ -115,6 +116,7 @@ public Optional getEngineFactory(IndexSettings indexSettings) { try { config.getStore().createEmpty(); } catch (IOException e) { + logger.error("Error creating empty store", e); throw new RuntimeException(e); } @@ -220,7 +222,7 @@ private static void installMockTransportVerifications(RoutingTableWatcher routin MockTransportService mockTransportService = (MockTransportService) transportService; mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> { if (routingTableWatcher.numIndexingCopies == 1) { - assertThat("no recovery action should be exchanged", action, not(containsString("recovery"))); + assertThat("no recovery action should be exchanged", action, not(startsWith("internal:index/shard/recovery/"))); assertThat("no replicated action should be exchanged", action, not(containsString("[r]"))); } connection.sendRequest(requestId, action, request, options); @@ -278,7 +280,6 @@ public void testShardCreation() throws Exception { ensureGreen(INDEX_NAME); assertEngineTypes(); indexRandom(randomBoolean(), INDEX_NAME, randomIntBetween(50, 100)); - ensureGreen(INDEX_NAME); // removing replicas drops SEARCH_ONLY copies first while (routingTableWatcher.numReplicas > 0) { From 540b071fc08e357e9ea7e3a7dba01ed69ca7525e Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Fri, 27 Jan 2023 16:14:30 +0200 Subject: [PATCH 08/21] Hide unpromotable refresh action from clients --- .../org/elasticsearch/action/ActionModule.java | 2 -- .../refresh/TransportShardRefreshAction.java | 1 + ...ransportUnpromotableShardRefreshAction.java | 18 ++---------------- .../xpack/security/operator/Constants.java | 1 - 4 files changed, 3 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index ccbbac8f43ce7..59d055e27415c 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -161,7 +161,6 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshAction; import org.elasticsearch.action.admin.indices.refresh.TransportRefreshAction; import org.elasticsearch.action.admin.indices.refresh.TransportShardRefreshAction; -import org.elasticsearch.action.admin.indices.refresh.TransportUnpromotableShardRefreshAction; import org.elasticsearch.action.admin.indices.resolve.ResolveIndexAction; import org.elasticsearch.action.admin.indices.rollover.RolloverAction; import org.elasticsearch.action.admin.indices.rollover.TransportRolloverAction; @@ -709,7 +708,6 @@ public void reg actions.register(TransportNodesListShardStoreMetadata.TYPE, TransportNodesListShardStoreMetadata.class); actions.register(TransportShardFlushAction.TYPE, TransportShardFlushAction.class); actions.register(TransportShardRefreshAction.TYPE, TransportShardRefreshAction.class); - actions.register(TransportUnpromotableShardRefreshAction.TYPE, TransportUnpromotableShardRefreshAction.class); actions.register(TransportPrevalidateShardPathAction.TYPE, TransportPrevalidateShardPathAction.class); // desired nodes diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java index 3b1600ce94a39..95487f981e2c7 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java @@ -72,6 +72,7 @@ public TransportShardRefreshAction( BasicReplicationRequest::new, ThreadPool.Names.REFRESH ); + new TransportUnpromotableShardRefreshAction(transportService, actionFilters, indicesService); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java index 7450070fd3841..bebe17fbce274 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java @@ -10,42 +10,28 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; -import org.elasticsearch.client.internal.Client; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.logging.LogManager; -import org.elasticsearch.logging.Logger; import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportService; public class TransportUnpromotableShardRefreshAction extends HandledTransportAction { public static final String NAME = RefreshAction.NAME + "[u]"; - public static final ActionType TYPE = new ActionType<>(NAME, in -> ActionResponse.Empty.INSTANCE); - private static final Logger logger = LogManager.getLogger(TransportUnpromotableShardRefreshAction.class); - private final ClusterService clusterService; - private final TransportService transportService; + private final IndicesService indicesService; - private final Client client; @Inject public TransportUnpromotableShardRefreshAction( - ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, - IndicesService indicesService, - Client client + IndicesService indicesService ) { super(NAME, transportService, actionFilters, UnpromotableShardRefreshRequest::new); - this.clusterService = clusterService; - this.transportService = transportService; this.indicesService = indicesService; - this.client = client; } @Override diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index a207a6b82573f..edb6d07c6326e 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -403,7 +403,6 @@ public class Constants { "indices:admin/open", "indices:admin/refresh", "indices:admin/refresh[s]", - "indices:admin/refresh[u]", "indices:admin/reload_analyzers", "indices:admin/resize", "indices:admin/resolve/index", From 992dbcfa94270de2591a447af5e4ce31ca97ffc6 Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Fri, 27 Jan 2023 17:20:05 +0200 Subject: [PATCH 09/21] Test no replication actions in search node --- .../cluster/routing/ShardRoutingRoleIT.java | 109 +++++++++++++----- ...ansportUnpromotableShardRefreshAction.java | 3 +- 2 files changed, 83 insertions(+), 29 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java index b17f882812fa9..631338fbe8de2 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java @@ -42,6 +42,7 @@ import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.XContentTestUtils; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.TransportService; @@ -76,6 +77,7 @@ public class ShardRoutingRoleIT extends ESIntegTestCase { public static class TestPlugin extends Plugin implements ClusterPlugin, EnginePlugin { volatile int numIndexingCopies = 1; + volatile String nodeToExclusivelyHoldUnpromotableShards = null; @Override public ShardRoutingRoleStrategy getShardRoutingRoleStrategy() { @@ -104,6 +106,25 @@ public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode n } return super.canForceAllocatePrimary(shardRouting, node, allocation); } + + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + if (nodeToExclusivelyHoldUnpromotableShards != null) { + if (node.node().getName().equals(nodeToExclusivelyHoldUnpromotableShards)) { + if (shardRouting.isPromotableToPrimary()) { + logger.warn("Returning1 NO for node [{}] and shard [{}]", node, shardRouting); + return Decision.NO; + } + } else { + if (shardRouting.isPromotableToPrimary() == false) { + logger.warn("Returning2 NO for node [{}] and shard [{}]", node, shardRouting); + return Decision.NO; + } + } + } + logger.warn("Returning ALWAYS for node [{}] and shard [{}]", node, shardRouting); + return Decision.ALWAYS; + } }); } @@ -511,7 +532,48 @@ public void testSearchRouting() throws Exception { } } - public void testRefreshOfSearchShards() throws Exception { + private String randomSearchPreference(int numShards, String... nodeIds) { + final var preference = randomFrom(Preference.SHARDS, Preference.PREFER_NODES, Preference.LOCAL); + // ONLY_LOCAL and ONLY_NODES omitted here because they may yield no shard copies which causes the search to fail + // TODO add support for ONLY_LOCAL and ONLY_NODES too + return switch (preference) { + case LOCAL, ONLY_LOCAL -> preference.type(); + case PREFER_NODES, ONLY_NODES -> preference.type() + ":" + String.join(",", randomNonEmptySubsetOf(Arrays.asList(nodeIds))); + case SHARDS -> preference.type() + + ":" + + String.join( + ",", + randomSubsetOf(between(1, numShards), IntStream.range(0, numShards).mapToObj(Integer::toString).toList()) + ); + }; + } + + public void testClosedIndex() { + var routingTableWatcher = new RoutingTableWatcher(); + + var numDataNodes = routingTableWatcher.numReplicas + 2; + internalCluster().ensureAtLeastNumDataNodes(numDataNodes); + installMockTransportVerifications(routingTableWatcher); + getMasterNodePlugin().numIndexingCopies = routingTableWatcher.numIndexingCopies; + + final var masterClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); + try { + // verify the correct number of shard copies of each role as the routing table evolves + masterClusterService.addListener(routingTableWatcher); + + createIndex(INDEX_NAME, routingTableWatcher.getIndexSettings()); + ensureGreen(INDEX_NAME); + assertEngineTypes(); + + assertAcked(client().admin().indices().prepareClose(INDEX_NAME)); + ensureGreen(INDEX_NAME); + assertEngineTypes(); + } finally { + masterClusterService.removeListener(routingTableWatcher); + } + } + + public void testRefreshOfUnpromotableShards() throws Exception { var routingTableWatcher = new RoutingTableWatcher(); var numDataNodes = routingTableWatcher.numReplicas + 2; @@ -560,7 +622,7 @@ public void testRefreshOfSearchShards() throws Exception { is(equalTo((routingTableWatcher.numIndexingCopies - 1) * routingTableWatcher.numShards)) ); - // Each primary will send a TransportUnpromotableShardRefreshAction to each of the unpromotable replica / search shards + // Each primary will send a TransportUnpromotableShardRefreshAction to each of the unpromotable replica shards assertThat( refreshUnpromotableActions.get(), is(equalTo((routingTableWatcher.numReplicas - (routingTableWatcher.numIndexingCopies - 1)) * routingTableWatcher.numShards)) @@ -570,44 +632,35 @@ public void testRefreshOfSearchShards() throws Exception { } } - private String randomSearchPreference(int numShards, String... nodeIds) { - final var preference = randomFrom(Preference.SHARDS, Preference.PREFER_NODES, Preference.LOCAL); - // ONLY_LOCAL and ONLY_NODES omitted here because they may yield no shard copies which causes the search to fail - // TODO add support for ONLY_LOCAL and ONLY_NODES too - return switch (preference) { - case LOCAL, ONLY_LOCAL -> preference.type(); - case PREFER_NODES, ONLY_NODES -> preference.type() + ":" + String.join(",", randomNonEmptySubsetOf(Arrays.asList(nodeIds))); - case SHARDS -> preference.type() - + ":" - + String.join( - ",", - randomSubsetOf(between(1, numShards), IntStream.range(0, numShards).mapToObj(Integer::toString).toList()) - ); - }; - } - - public void testClosedIndex() { + public void testNodeWithUnpromotableShardsNeverGetsReplicationActions() throws Exception { var routingTableWatcher = new RoutingTableWatcher(); - - var numDataNodes = routingTableWatcher.numReplicas + 2; - internalCluster().ensureAtLeastNumDataNodes(numDataNodes); + routingTableWatcher.numReplicas = routingTableWatcher.numIndexingCopies; // meaning: only 1 unpromotable replica per shard + internalCluster().ensureAtLeastNumDataNodes(routingTableWatcher.numReplicas + 1); + final var nodeWithUnpromotables = internalCluster().startDataOnlyNode(); installMockTransportVerifications(routingTableWatcher); getMasterNodePlugin().numIndexingCopies = routingTableWatcher.numIndexingCopies; + getMasterNodePlugin().nodeToExclusivelyHoldUnpromotableShards = nodeWithUnpromotables; + + for (var transportService : internalCluster().getInstances(TransportService.class)) { + MockTransportService mockTransportService = (MockTransportService) transportService; + mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (connection.getNode().getName().equals(nodeWithUnpromotables)) { + assertThat(action, not(containsString("[r]"))); + } + connection.sendRequest(requestId, action, request, options); + }); + } final var masterClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); try { // verify the correct number of shard copies of each role as the routing table evolves masterClusterService.addListener(routingTableWatcher); - createIndex(INDEX_NAME, routingTableWatcher.getIndexSettings()); ensureGreen(INDEX_NAME); - assertEngineTypes(); - - assertAcked(client().admin().indices().prepareClose(INDEX_NAME)); - ensureGreen(INDEX_NAME); - assertEngineTypes(); + indexRandom(randomBoolean(), INDEX_NAME, randomIntBetween(50, 100)); } finally { masterClusterService.removeListener(routingTableWatcher); } } + } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java index bebe17fbce274..d6afeb7380ac4 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java @@ -17,6 +17,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; public class TransportUnpromotableShardRefreshAction extends HandledTransportAction { @@ -30,7 +31,7 @@ public TransportUnpromotableShardRefreshAction( ActionFilters actionFilters, IndicesService indicesService ) { - super(NAME, transportService, actionFilters, UnpromotableShardRefreshRequest::new); + super(NAME, transportService, actionFilters, UnpromotableShardRefreshRequest::new, ThreadPool.Names.REFRESH); this.indicesService = indicesService; } From 995b6ad15a967f2df2d791634fd2cab3ba8da0ac Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Fri, 27 Jan 2023 17:38:11 +0200 Subject: [PATCH 10/21] Catch exceptions of primary refresh --- .../org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java | 1 - .../admin/indices/refresh/TransportShardRefreshAction.java | 2 ++ 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java index 631338fbe8de2..b188ee16a5b54 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java @@ -42,7 +42,6 @@ import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.XContentTestUtils; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.TransportService; diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java index 95487f981e2c7..c7e7ab9733827 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java @@ -120,6 +120,8 @@ protected void shardOperationOnPrimary( ) ); }); + } catch (Exception e) { + listener.onFailure(e); } } From 9561ed81372b16648a4686f92e2ba9de97faa48d Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Fri, 27 Jan 2023 17:48:20 +0200 Subject: [PATCH 11/21] Remove unnecessary line --- .../java/org/elasticsearch/index/shard/IndexShard.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index dc72da1ef86e5..fc432e2dde01b 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -713,12 +713,11 @@ public void onFailure(Exception e) { final Set shardRoutings = Sets.newHashSetWithExpectedSize(routingTable.size()); for (int copy = 0; copy < routingTable.size(); copy++) { ShardRouting shardRouting = routingTable.shard(copy); - if (shardRouting.isPromotableToPrimary()) shardRoutings.add(shardRouting); + if (shardRouting.isPromotableToPrimary()) { + shardRoutings.add(shardRouting); + } } - // include relocation targets - shardRoutings.addAll(routingTable.assignedShards().stream().filter(ShardRouting::isPromotableToPrimary).toList()); - if (shardRoutings.stream() .allMatch( shr -> shr.assignedToNode() && retentionLeases.contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(shr)) From 369f20e9a7f690806b199aa1cd89222fb7b4fe0f Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Fri, 27 Jan 2023 17:56:14 +0200 Subject: [PATCH 12/21] Change stages for TRANSLOG stage --- .../cluster/routing/ShardRoutingRoleIT.java | 3 --- .../org/elasticsearch/index/shard/IndexShard.java | 15 ++------------- .../recovery/PeerRecoveryTargetService.java | 9 +++++++-- 3 files changed, 9 insertions(+), 18 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java index b188ee16a5b54..8e2caf19375aa 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java @@ -111,17 +111,14 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing if (nodeToExclusivelyHoldUnpromotableShards != null) { if (node.node().getName().equals(nodeToExclusivelyHoldUnpromotableShards)) { if (shardRouting.isPromotableToPrimary()) { - logger.warn("Returning1 NO for node [{}] and shard [{}]", node, shardRouting); return Decision.NO; } } else { if (shardRouting.isPromotableToPrimary() == false) { - logger.warn("Returning2 NO for node [{}] and shard [{}]", node, shardRouting); return Decision.NO; } } } - logger.warn("Returning ALWAYS for node [{}] and shard [{}]", node, shardRouting); return Decision.ALWAYS; } }); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index fc432e2dde01b..574c2396ac2f8 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1672,13 +1672,6 @@ public void postRecovery(String reason) throws IndexShardStartedException, Index if (state == IndexShardState.STARTED) { throw new IndexShardStartedException(shardId); } - if (routingEntry().isPromotableToPrimary() == false) { - // Swiftly skip intermediate stages - recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX); - recoveryState.setStage(RecoveryState.Stage.TRANSLOG); - recoveryState.getIndex().setFileDetailsComplete(); - recoveryState.setStage(RecoveryState.Stage.FINALIZE); - } recoveryState.setStage(RecoveryState.Stage.DONE); changeState(IndexShardState.POST_RECOVERY, reason); } @@ -1946,9 +1939,7 @@ public void openEngineAndRecoverFromTranslog() throws IOException { */ public void openEngineAndSkipTranslogRecovery() throws IOException { assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]"; - recoveryState.validateCurrentStage( - routingEntry().isPromotableToPrimary() ? RecoveryState.Stage.TRANSLOG : RecoveryState.Stage.INDEX - ); + recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG); loadGlobalCheckpointToReplicationTracker(); innerOpenEngineAndTranslog(replicationTracker); getEngine().skipTranslogRecovery(); @@ -1985,9 +1976,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine. onSettingsChanged(); assert assertSequenceNumbersInCommit(); - recoveryState.validateCurrentStage( - routingEntry().isPromotableToPrimary() ? RecoveryState.Stage.TRANSLOG : RecoveryState.Stage.INDEX - ); + recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG); } private boolean assertSequenceNumbersInCommit() throws IOException { diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index fdbc278124af8..00a9bf063739e 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -232,13 +232,12 @@ public void onResponse(StartRecoveryRequestToSend r) { r.actionName(), r.startRecoveryRequest().sourceNode() ); - RecoveryResponseHandler recoveryResponseHandler = new RecoveryResponseHandler(r.startRecoveryRequest(), timer); if (promotableToPrimary) { transportService.sendRequest( r.startRecoveryRequest().sourceNode(), r.actionName(), r.requestToSend(), - recoveryResponseHandler + new RecoveryResponseHandler(r.startRecoveryRequest(), timer) ); } else { onGoingRecoveries.markRecoveryAsDone(recoveryId); @@ -278,7 +277,13 @@ public void onFailure(Exception e) { if (promotableToPrimary) { startingSeqNo = indexShard.recoverLocallyUpToGlobalCheckpoint(); } else { + // Skip intermediate stages until the translog stage + final RecoveryState recoveryState = recoveryTarget.state(); + recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX); + recoveryState.setStage(RecoveryState.Stage.TRANSLOG); indexShard.openEngineAndSkipTranslogRecovery(); + recoveryState.getIndex().setFileDetailsComplete(); + recoveryState.setStage(RecoveryState.Stage.FINALIZE); } assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG : "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]"; From 9da083bbcd160c4146932b52e501195c6a5137f3 Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Fri, 27 Jan 2023 18:15:07 +0200 Subject: [PATCH 13/21] Small cleanup --- .../indices/recovery/PeerRecoveryTargetService.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 00a9bf063739e..05235f43e51c1 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -216,7 +216,8 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi } final RecoveryTarget recoveryTarget = recoveryRef.target(); assert recoveryTarget.sourceNode() != null : "cannot do a recovery without a source node"; - final RecoveryState.Timer timer = recoveryTarget.state().getTimer(); + final RecoveryState recoveryState = recoveryTarget.state(); + final RecoveryState.Timer timer = recoveryState.getTimer(); final IndexShard indexShard = recoveryTarget.indexShard(); final boolean promotableToPrimary = indexShard.routingEntry().isPromotableToPrimary(); @@ -226,9 +227,8 @@ record StartRecoveryRequestToSend(StartRecoveryRequest startRecoveryRequest, Str @Override public void onResponse(StartRecoveryRequestToSend r) { logger.trace( - "{}{} [{}]: recovery from {}", + "{} [{}]: recovery from {}", r.startRecoveryRequest().shardId(), - promotableToPrimary ? " (promotable)" : "", r.actionName(), r.startRecoveryRequest().sourceNode() ); @@ -277,8 +277,7 @@ public void onFailure(Exception e) { if (promotableToPrimary) { startingSeqNo = indexShard.recoverLocallyUpToGlobalCheckpoint(); } else { - // Skip intermediate stages until the translog stage - final RecoveryState recoveryState = recoveryTarget.state(); + // Skip unnecessary intermediate stages recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX); recoveryState.setStage(RecoveryState.Stage.TRANSLOG); indexShard.openEngineAndSkipTranslogRecovery(); @@ -294,7 +293,6 @@ public void onFailure(Exception e) { toSendListener.onFailure(e); } } else { - assert indexShard.routingEntry().isPromotableToPrimary(); toSendListener.onResponse( new StartRecoveryRequestToSend( preExistingRequest, From f02d52767378337c2dc3ebf2fe394fe181741e60 Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Fri, 27 Jan 2023 18:27:52 +0200 Subject: [PATCH 14/21] Small cleanup --- .../cluster/routing/ShardRoutingRoleIT.java | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java index 8e2caf19375aa..15b85ce8827fb 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java @@ -576,16 +576,11 @@ public void testRefreshOfUnpromotableShards() throws Exception { internalCluster().ensureAtLeastNumDataNodes(numDataNodes); installMockTransportVerifications(routingTableWatcher); getMasterNodePlugin().numIndexingCopies = routingTableWatcher.numIndexingCopies; - - final AtomicInteger refreshReplicaActions = new AtomicInteger(0); final AtomicInteger refreshUnpromotableActions = new AtomicInteger(0); for (var transportService : internalCluster().getInstances(TransportService.class)) { MockTransportService mockTransportService = (MockTransportService) transportService; mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> { - if (action.startsWith(TransportShardRefreshAction.NAME + "[r]")) { - refreshReplicaActions.incrementAndGet(); - } if (action.startsWith(TransportUnpromotableShardRefreshAction.NAME)) { refreshUnpromotableActions.incrementAndGet(); } @@ -612,12 +607,6 @@ public void testRefreshOfUnpromotableShards() throws Exception { indexRandom(false, INDEX_NAME, randomIntBetween(100, 200)); assertNoFailures(client().admin().indices().prepareRefresh(INDEX_NAME).execute().get()); - // Each primary will send a TransportShardRefreshAction to each of the promotable replica shards - assertThat( - refreshReplicaActions.get(), - is(equalTo((routingTableWatcher.numIndexingCopies - 1) * routingTableWatcher.numShards)) - ); - // Each primary will send a TransportUnpromotableShardRefreshAction to each of the unpromotable replica shards assertThat( refreshUnpromotableActions.get(), From 5b1a0d12db49bd0359190132912ab3c93865d317 Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Fri, 27 Jan 2023 19:46:39 +0200 Subject: [PATCH 15/21] Only one small refresh --- .../org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java index 15b85ce8827fb..da4420668f3a8 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java @@ -604,8 +604,7 @@ public void testRefreshOfUnpromotableShards() throws Exception { ensureGreen(INDEX_NAME); assertEngineTypes(); - indexRandom(false, INDEX_NAME, randomIntBetween(100, 200)); - assertNoFailures(client().admin().indices().prepareRefresh(INDEX_NAME).execute().get()); + indexRandom(true, INDEX_NAME, randomIntBetween(1, 10)); // Each primary will send a TransportUnpromotableShardRefreshAction to each of the unpromotable replica shards assertThat( From cefcd2371f12e6d046751fd4b7f1487b1af18589 Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Fri, 27 Jan 2023 19:52:30 +0200 Subject: [PATCH 16/21] Fix checkstyle --- .../org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java index da4420668f3a8..23a1ae05b1bdc 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java @@ -11,7 +11,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.action.admin.indices.refresh.TransportShardRefreshAction; import org.elasticsearch.action.admin.indices.refresh.TransportUnpromotableShardRefreshAction; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; @@ -57,7 +56,6 @@ import java.util.stream.IntStream; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; From df74f9e57cd8b933276da4350a27eadff61a4fce Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Sat, 28 Jan 2023 01:02:31 +0200 Subject: [PATCH 17/21] Fix test with promotable shards only --- .../cluster/routing/ShardRoutingRoleIT.java | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java index 23a1ae05b1bdc..fe734f814ee51 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java @@ -51,6 +51,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; @@ -74,7 +75,7 @@ public class ShardRoutingRoleIT extends ESIntegTestCase { public static class TestPlugin extends Plugin implements ClusterPlugin, EnginePlugin { volatile int numIndexingCopies = 1; - volatile String nodeToExclusivelyHoldUnpromotableShards = null; + static final String NODE_ATTR_UNPROMOTABLE_ONLY = "unpromotableonly"; @Override public ShardRoutingRoleStrategy getShardRoutingRoleStrategy() { @@ -106,8 +107,13 @@ public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode n @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { - if (nodeToExclusivelyHoldUnpromotableShards != null) { - if (node.node().getName().equals(nodeToExclusivelyHoldUnpromotableShards)) { + var nodeWithUnpromotableOnly = allocation.getClusterState() + .nodes() + .stream() + .filter(n -> Objects.equals("true", n.getAttributes().get(NODE_ATTR_UNPROMOTABLE_ONLY))) + .findAny(); + if (nodeWithUnpromotableOnly.isPresent()) { + if (node.node().getName().equals(nodeWithUnpromotableOnly.get().getName())) { if (shardRouting.isPromotableToPrimary()) { return Decision.NO; } @@ -618,15 +624,16 @@ public void testNodeWithUnpromotableShardsNeverGetsReplicationActions() throws E var routingTableWatcher = new RoutingTableWatcher(); routingTableWatcher.numReplicas = routingTableWatcher.numIndexingCopies; // meaning: only 1 unpromotable replica per shard internalCluster().ensureAtLeastNumDataNodes(routingTableWatcher.numReplicas + 1); - final var nodeWithUnpromotables = internalCluster().startDataOnlyNode(); + final String nodeWithUnpromotableOnly = internalCluster().startDataOnlyNode( + Settings.builder().put("node.attr." + TestPlugin.NODE_ATTR_UNPROMOTABLE_ONLY, "true").build() + ); installMockTransportVerifications(routingTableWatcher); getMasterNodePlugin().numIndexingCopies = routingTableWatcher.numIndexingCopies; - getMasterNodePlugin().nodeToExclusivelyHoldUnpromotableShards = nodeWithUnpromotables; for (var transportService : internalCluster().getInstances(TransportService.class)) { MockTransportService mockTransportService = (MockTransportService) transportService; mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> { - if (connection.getNode().getName().equals(nodeWithUnpromotables)) { + if (connection.getNode().getName().equals(nodeWithUnpromotableOnly)) { assertThat(action, not(containsString("[r]"))); } connection.sendRequest(requestId, action, request, options); From 7fae710ed0fa8b05d3b899a43936796e2cfb4bd3 Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Sat, 28 Jan 2023 01:16:12 +0200 Subject: [PATCH 18/21] Better split in PeerRecoveryTargetService --- .../recovery/PeerRecoveryTargetService.java | 95 ++++++++++--------- 1 file changed, 49 insertions(+), 46 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 05235f43e51c1..abd1ef4aaf958 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -219,43 +219,56 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi final RecoveryState recoveryState = recoveryTarget.state(); final RecoveryState.Timer timer = recoveryState.getTimer(); final IndexShard indexShard = recoveryTarget.indexShard(); - final boolean promotableToPrimary = indexShard.routingEntry().isPromotableToPrimary(); - record StartRecoveryRequestToSend(StartRecoveryRequest startRecoveryRequest, String actionName, TransportRequest requestToSend) {} - final ActionListener toSendListener = ActionListener.notifyOnce( - ActionListener.runBefore(new ActionListener<>() { - @Override - public void onResponse(StartRecoveryRequestToSend r) { - logger.trace( - "{} [{}]: recovery from {}", - r.startRecoveryRequest().shardId(), - r.actionName(), - r.startRecoveryRequest().sourceNode() - ); - if (promotableToPrimary) { - transportService.sendRequest( - r.startRecoveryRequest().sourceNode(), - r.actionName(), - r.requestToSend(), - new RecoveryResponseHandler(r.startRecoveryRequest(), timer) - ); - } else { - onGoingRecoveries.markRecoveryAsDone(recoveryId); - } - } + final var failureHandler = ActionListener.notifyOnce(ActionListener.runBefore(ActionListener.noop().delegateResponse((l, e) -> { + // this will be logged as warning later on... + logger.trace("unexpected error while preparing shard for peer recovery, failing recovery", e); + onGoingRecoveries.failRecovery( + recoveryId, + new RecoveryFailedException(recoveryTarget.state(), "failed to prepare shard for recovery", e), + true + ); + }), recoveryRef::close)); - @Override - public void onFailure(Exception e) { - // this will be logged as warning later on... - logger.trace("unexpected error while preparing shard for peer recovery, failing recovery", e); - onGoingRecoveries.failRecovery( - recoveryId, - new RecoveryFailedException(recoveryTarget.state(), "failed to prepare shard for recovery", e), - true - ); - } - }, recoveryRef::close) - ); + if (indexShard.routingEntry().isPromotableToPrimary() == false) { + assert preExistingRequest == null; + assert indexShard.indexSettings().getIndexMetadata().isSearchableSnapshot() == false; + try { + indexShard.preRecovery(failureHandler.map(v -> { + logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId()); + indexShard.prepareForIndexRecovery(); + // Skip unnecessary intermediate stages + recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX); + recoveryState.setStage(RecoveryState.Stage.TRANSLOG); + indexShard.openEngineAndSkipTranslogRecovery(); + recoveryState.getIndex().setFileDetailsComplete(); + recoveryState.setStage(RecoveryState.Stage.FINALIZE); + onGoingRecoveries.markRecoveryAsDone(recoveryId); + return null; + })); + } catch (Exception e) { + failureHandler.onFailure(e); + } + + return; + } + + record StartRecoveryRequestToSend(StartRecoveryRequest startRecoveryRequest, String actionName, TransportRequest requestToSend) {} + final ActionListener toSendListener = failureHandler.map(r -> { + logger.trace( + "{} [{}]: recovery from {}", + r.startRecoveryRequest().shardId(), + r.actionName(), + r.startRecoveryRequest().sourceNode() + ); + transportService.sendRequest( + r.startRecoveryRequest().sourceNode(), + r.actionName(), + r.requestToSend(), + new RecoveryResponseHandler(r.startRecoveryRequest(), timer) + ); + return null; + }); if (preExistingRequest == null) { try { @@ -273,17 +286,7 @@ public void onFailure(Exception e) { store.decRef(); } } - long startingSeqNo = UNASSIGNED_SEQ_NO; - if (promotableToPrimary) { - startingSeqNo = indexShard.recoverLocallyUpToGlobalCheckpoint(); - } else { - // Skip unnecessary intermediate stages - recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX); - recoveryState.setStage(RecoveryState.Stage.TRANSLOG); - indexShard.openEngineAndSkipTranslogRecovery(); - recoveryState.getIndex().setFileDetailsComplete(); - recoveryState.setStage(RecoveryState.Stage.FINALIZE); - } + final long startingSeqNo = indexShard.recoverLocallyUpToGlobalCheckpoint(); assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG : "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]"; final var startRequest = getStartRecoveryRequest(logger, clusterService.localNode(), recoveryTarget, startingSeqNo); From 515f7e302b611871b5f65d7e7dabdf7b1cd87956 Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Tue, 31 Jan 2023 08:53:55 +0200 Subject: [PATCH 19/21] Test with more than 1 unpromotable nodes --- .../cluster/routing/ShardRoutingRoleIT.java | 37 +++++++++++++------ 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java index fe734f814ee51..ed4c1b87df554 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand; import org.elasticsearch.cluster.routing.allocation.command.CancelAllocationCommand; @@ -54,6 +55,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -107,23 +109,32 @@ public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode n @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { - var nodeWithUnpromotableOnly = allocation.getClusterState() + var nodesWithUnpromotableOnly = allocation.getClusterState() .nodes() .stream() .filter(n -> Objects.equals("true", n.getAttributes().get(NODE_ATTR_UNPROMOTABLE_ONLY))) - .findAny(); - if (nodeWithUnpromotableOnly.isPresent()) { - if (node.node().getName().equals(nodeWithUnpromotableOnly.get().getName())) { + .map(DiscoveryNode::getName) + .collect(Collectors.toUnmodifiableSet()); + if (nodesWithUnpromotableOnly.isEmpty() == false) { + if (nodesWithUnpromotableOnly.contains(node.node().getName())) { if (shardRouting.isPromotableToPrimary()) { - return Decision.NO; + return allocation.decision( + Decision.NO, + "test", + "shard is promotable to primary so may not be assigned to [" + node.node().getName() + "]" + ); } } else { if (shardRouting.isPromotableToPrimary() == false) { - return Decision.NO; + return allocation.decision( + Decision.NO, + "test", + "shard is not promotable to primary so may not be assigned to [" + node.node().getName() + "]" + ); } } } - return Decision.ALWAYS; + return Decision.YES; } }); } @@ -620,11 +631,13 @@ public void testRefreshOfUnpromotableShards() throws Exception { } } - public void testNodeWithUnpromotableShardsNeverGetsReplicationActions() throws Exception { + public void testNodesWithUnpromotableShardsNeverGetReplicationActions() throws Exception { var routingTableWatcher = new RoutingTableWatcher(); - routingTableWatcher.numReplicas = routingTableWatcher.numIndexingCopies; // meaning: only 1 unpromotable replica per shard - internalCluster().ensureAtLeastNumDataNodes(routingTableWatcher.numReplicas + 1); - final String nodeWithUnpromotableOnly = internalCluster().startDataOnlyNode( + var additionalNumberOfNodesWithUnpromotableShards = randomIntBetween(1, 3); + routingTableWatcher.numReplicas = routingTableWatcher.numIndexingCopies + additionalNumberOfNodesWithUnpromotableShards - 1; + internalCluster().ensureAtLeastNumDataNodes(routingTableWatcher.numIndexingCopies + 1); + final List nodesWithUnpromotableOnly = internalCluster().startDataOnlyNodes( + additionalNumberOfNodesWithUnpromotableShards, Settings.builder().put("node.attr." + TestPlugin.NODE_ATTR_UNPROMOTABLE_ONLY, "true").build() ); installMockTransportVerifications(routingTableWatcher); @@ -633,7 +646,7 @@ public void testNodeWithUnpromotableShardsNeverGetsReplicationActions() throws E for (var transportService : internalCluster().getInstances(TransportService.class)) { MockTransportService mockTransportService = (MockTransportService) transportService; mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> { - if (connection.getNode().getName().equals(nodeWithUnpromotableOnly)) { + if (nodesWithUnpromotableOnly.contains(connection.getNode().getName())) { assertThat(action, not(containsString("[r]"))); } connection.sendRequest(requestId, action, request, options); From b8bf26c0791fb4ccd363f8a569a4b1c614092cff Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Tue, 31 Jan 2023 09:12:26 +0200 Subject: [PATCH 20/21] Simplify useRetentionLeasesInPeerRecovery --- .../elasticsearch/index/shard/IndexShard.java | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 574c2396ac2f8..871928a96e4b6 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -55,7 +55,6 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AsyncIOProcessor; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.Booleans; import org.elasticsearch.core.CheckedConsumer; @@ -709,21 +708,26 @@ public void onFailure(Exception e) { if (indexSettings.isSoftDeleteEnabled() && useRetentionLeasesInPeerRecovery == false) { final RetentionLeases retentionLeases = replicationTracker.getRetentionLeases(); - - final Set shardRoutings = Sets.newHashSetWithExpectedSize(routingTable.size()); + boolean allShardsUseRetentionLeases = true; for (int copy = 0; copy < routingTable.size(); copy++) { ShardRouting shardRouting = routingTable.shard(copy); if (shardRouting.isPromotableToPrimary()) { - shardRoutings.add(shardRouting); + if (shardRouting.assignedToNode() == false + || retentionLeases.contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(shardRouting)) == false) { + allShardsUseRetentionLeases = false; + break; + } + if (this.shardRouting.relocating()) { + ShardRouting shardRoutingReloc = this.shardRouting.getTargetRelocatingShard(); + if (shardRoutingReloc.assignedToNode() == false + || retentionLeases.contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(shardRoutingReloc)) == false) { + allShardsUseRetentionLeases = false; + break; + } + } } } - - if (shardRoutings.stream() - .allMatch( - shr -> shr.assignedToNode() && retentionLeases.contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(shr)) - )) { - useRetentionLeasesInPeerRecovery = true; - } + useRetentionLeasesInPeerRecovery = allShardsUseRetentionLeases; } } From fb6a63c25004d082bde67735df9e7d2bee9375fa Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Tue, 31 Jan 2023 09:41:48 +0200 Subject: [PATCH 21/21] Adopt new ActionListener.run --- .../cluster/routing/ShardRoutingRoleIT.java | 2 +- .../TransportUnpromotableShardRefreshAction.java | 11 +++++------ 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java index 1e50451da173d..2f186a41139b7 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java @@ -478,7 +478,7 @@ public AllocationCommand getCancelPrimaryCommand() { return null; } - public void testSearchRouting() { + public void testSearchRouting() throws Exception { var routingTableWatcher = new RoutingTableWatcher(); routingTableWatcher.numReplicas = Math.max(1, routingTableWatcher.numReplicas); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java index d6afeb7380ac4..500a53513a60b 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java @@ -36,13 +36,12 @@ public TransportUnpromotableShardRefreshAction( } @Override - protected void doExecute(Task task, UnpromotableShardRefreshRequest request, ActionListener listener) { - try { - assert request.getSegmentGeneration() != Engine.RefreshResult.UNKNOWN_GENERATION; + protected void doExecute(Task task, UnpromotableShardRefreshRequest request, ActionListener responseListener) { + ActionListener.run(responseListener, listener -> { + assert request.getSegmentGeneration() != Engine.RefreshResult.UNKNOWN_GENERATION + : "The request segment is " + request.getSegmentGeneration(); IndexShard shard = indicesService.indexServiceSafe(request.getShardId().getIndex()).getShard(request.getShardId().id()); shard.waitForSegmentGeneration(request.getSegmentGeneration(), listener.map(l -> ActionResponse.Empty.INSTANCE)); - } catch (Exception e) { - listener.onFailure(e); - } + }); } }