diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 3bf9abecf435c..2a4e06a871007 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -28,7 +28,6 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; @@ -73,14 +72,6 @@ protected ReplicationOperation.Replicas newReplicasProxy(long primaryTerm) { return new ResyncActionReplicasProxy(primaryTerm); } - @Override - protected void sendReplicaRequest( - final ConcreteReplicaRequest replicaRequest, - final DiscoveryNode node, - final ActionListener listener) { - super.sendReplicaRequest(replicaRequest, node, listener); - } - @Override protected ClusterBlockLevel globalBlockLevel() { // resync should never be blocked because it's an internal action diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 42745aa0e15dc..0c651606c5baa 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -618,7 +618,7 @@ public void onFailure(Exception e) { } } - protected IndexShard getIndexShard(final ShardId shardId) { + private IndexShard getIndexShard(final ShardId shardId) { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); return indexService.getShard(shardId.id()); } @@ -1044,7 +1044,12 @@ public void performOn( } final ConcreteReplicaRequest replicaRequest = new ConcreteReplicaRequest<>( request, replica.allocationId().getId(), primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes); - sendReplicaRequest(replicaRequest, node, listener); + final ActionListenerResponseHandler handler = new ActionListenerResponseHandler<>(listener, in -> { + ReplicaResponse replicaResponse = new ReplicaResponse(); + replicaResponse.readFrom(in); + return replicaResponse; + }); + transportService.sendRequest(node, transportReplicaAction, replicaRequest, transportOptions, handler); } @Override @@ -1066,25 +1071,6 @@ public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, A } } - /** - * Sends the specified replica request to the specified node. - * - * @param replicaRequest the replica request - * @param node the node to send the request to - * @param listener callback for handling the response or failure - */ - protected void sendReplicaRequest( - final ConcreteReplicaRequest replicaRequest, - final DiscoveryNode node, - final ActionListener listener) { - final ActionListenerResponseHandler handler = new ActionListenerResponseHandler<>(listener, in -> { - ReplicaResponse replicaResponse = new ReplicaResponse(); - replicaResponse.readFrom(in); - return replicaResponse; - }); - transportService.sendRequest(node, transportReplicaAction, replicaRequest, transportOptions, handler); - } - /** a wrapper class to encapsulate a request when being sent to a specific allocation id **/ public static class ConcreteShardRequest extends TransportRequest { diff --git a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java index 9b55cff8cff9a..4d3d0123fe6c9 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java @@ -22,16 +22,13 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -103,19 +100,6 @@ protected ReplicationResponse newResponseInstance() { return new ReplicationResponse(); } - @Override - protected void sendReplicaRequest( - final ConcreteReplicaRequest replicaRequest, - final DiscoveryNode node, - final ActionListener listener) { - if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) { - super.sendReplicaRequest(replicaRequest, node, listener); - } else { - final long pre60NodeCheckpoint = SequenceNumbers.PRE_60_NODE_CHECKPOINT; - listener.onResponse(new ReplicaResponse(pre60NodeCheckpoint, pre60NodeCheckpoint)); - } - } - @Override protected PrimaryResult shardOperationOnPrimary( final Request request, final IndexShard indexShard) throws Exception { diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 9164d9e4184eb..02e9ff3146cf6 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -757,7 +757,7 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException { assertEquals(0, shardFailedRequests.length); } - public void testSeqNoIsSetOnPrimary() throws Exception { + public void testSeqNoIsSetOnPrimary() { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); // we use one replica to check the primary term was set on the operation and sent to the replica @@ -788,14 +788,14 @@ public void testSeqNoIsSetOnPrimary() throws Exception { return null; }).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject()); - TestAction action = - new TestAction(Settings.EMPTY, "internal:testSeqNoIsSetOnPrimary", transportService, clusterService, shardStateAction, - threadPool) { - @Override - protected IndexShard getIndexShard(ShardId shardId) { - return shard; - } - }; + final IndexService indexService = mock(IndexService.class); + when(indexService.getShard(shard.shardId().id())).thenReturn(shard); + + final IndicesService indicesService = mock(IndicesService.class); + when(indicesService.indexServiceSafe(shard.shardId().getIndex())).thenReturn(indexService); + + TestAction action = new TestAction(Settings.EMPTY, "internal:testSeqNoIsSetOnPrimary", transportService, clusterService, + shardStateAction, threadPool, indicesService); action.handlePrimaryRequest(concreteShardRequest, createTransportChannel(listener), null); CapturingTransport.CapturedRequest[] requestsToReplicas = transport.capturedRequests(); @@ -1207,11 +1207,16 @@ static class TestResponse extends ReplicationResponse { private class TestAction extends TransportReplicationAction { - TestAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ShardStateAction shardStateAction, ThreadPool threadPool) { - super(settings, actionName, transportService, clusterService, mockIndicesService(clusterService), threadPool, + this(settings, actionName, transportService, clusterService, shardStateAction, threadPool, mockIndicesService(clusterService)); + } + + TestAction(Settings settings, String actionName, TransportService transportService, + ClusterService clusterService, ShardStateAction shardStateAction, + ThreadPool threadPool, IndicesService indicesService) { + super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(), Request::new, Request::new, ThreadPool.Names.SAME); @@ -1241,7 +1246,7 @@ protected boolean resolveIndex() { } } - final IndicesService mockIndicesService(ClusterService clusterService) { + private IndicesService mockIndicesService(ClusterService clusterService) { final IndicesService indicesService = mock(IndicesService.class); when(indicesService.indexServiceSafe(any(Index.class))).then(invocation -> { Index index = (Index)invocation.getArguments()[0]; @@ -1261,7 +1266,7 @@ final IndicesService mockIndicesService(ClusterService clusterService) { return indicesService; } - final IndexService mockIndexService(final IndexMetaData indexMetaData, ClusterService clusterService) { + private IndexService mockIndexService(final IndexMetaData indexMetaData, ClusterService clusterService) { final IndexService indexService = mock(IndexService.class); when(indexService.getShard(anyInt())).then(invocation -> { int shard = (Integer) invocation.getArguments()[0]; diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java index 8fe204cee2c34..b8c87acb56dd6 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java @@ -41,18 +41,23 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.test.transport.MockTransport; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportChannel; -import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportMessageListener; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import org.junit.After; import org.junit.Before; @@ -70,6 +75,7 @@ import java.util.concurrent.ExecutionException; import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_INDEX_UUID; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; @@ -78,6 +84,8 @@ import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting; import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; import static org.elasticsearch.test.ClusterServiceUtils.setState; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasItem; @@ -85,6 +93,9 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.startsWith; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** @@ -163,7 +174,49 @@ public void setUp() throws Exception { setState(clusterService, state.build()); final Settings transportSettings = Settings.builder().put("node.name", node1.getId()).build(); - transportService = MockTransportService.createNewService(transportSettings, Version.CURRENT, threadPool, null); + + MockTransport transport = new MockTransport() { + @Override + protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) { + assertThat(action, allOf(startsWith("cluster:admin/test/"), endsWith("[r]"))); + assertThat(node, equalTo(node2)); + // node2 doesn't really exist, but we are performing some trickery in mockIndicesService() to pretend that node1 holds both + // the primary and the replica, so redirect the request back to node1. + transportService.sendRequest(transportService.getLocalNode(), action, request, + new TransportResponseHandler() { + @Override + public TransportReplicationAction.ReplicaResponse read(StreamInput in) throws IOException { + final TransportReplicationAction.ReplicaResponse replicaResponse + = new TransportReplicationAction.ReplicaResponse(); + replicaResponse.readFrom(in); + return replicaResponse; + } + + @SuppressWarnings("unchecked") + private TransportResponseHandler getResponseHandler() { + return (TransportResponseHandler) + getResponseHandlers().onResponseReceived(requestId, TransportMessageListener.NOOP_LISTENER); + } + + @Override + public void handleResponse(TransportReplicationAction.ReplicaResponse response) { + getResponseHandler().handleResponse(response); + } + + @Override + public void handleException(TransportException exp) { + getResponseHandler().handleException(exp); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }); + } + }; + transportService = transport.createTransportService(transportSettings, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, + bta -> node1, null, emptySet()); transportService.start(); transportService.acceptIncomingRequests(); shardStateAction = new ShardStateAction(clusterService, transportService, null, null, threadPool); @@ -198,7 +251,8 @@ public void testTransportReplicationActionWithAllPermits() throws Exception { final PlainActionFuture listener = new PlainActionFuture<>(); futures[threadId] = listener; - final TestAction singlePermitAction = new SinglePermitWithBlocksAction(Settings.EMPTY, "internalSinglePermit[" + threadId + "]", + final TestAction singlePermitAction = new SinglePermitWithBlocksAction(Settings.EMPTY, + "cluster:admin/test/single_permit[" + threadId + "]", transportService, clusterService, shardStateAction, threadPool, shardId, primary, replica, globalBlock); actions[threadId] = singlePermitAction; @@ -251,8 +305,8 @@ private void assertBlockIsPresentForDelayedOp() { logger.trace("now starting the operation that acquires all permits and sets the block in the cluster state"); // An action which acquires all operation permits during execution and set a block - final TestAction allPermitsAction = new AllPermitsThenBlockAction(Settings.EMPTY, "internalAllPermits", transportService, - clusterService, shardStateAction, threadPool, shardId, primary, replica); + final TestAction allPermitsAction = new AllPermitsThenBlockAction(Settings.EMPTY, "cluster:admin/test/all_permits", + transportService, clusterService, shardStateAction, threadPool, shardId, primary, replica); final PlainActionFuture allPermitFuture = new PlainActionFuture<>(); Thread thread = new Thread(() -> { @@ -299,6 +353,7 @@ void runWithPrimaryShardReference(final TransportReplicationAction.PrimaryShardR } final Response allPermitsResponse = allPermitFuture.get(); + assertSuccessfulOperation(allPermitsAction, allPermitsResponse); for (int i = 0; i < numOperations; i++) { @@ -357,18 +412,21 @@ private abstract class TestAction extends TransportReplicationAction executedOnPrimary = new SetOnce<>(); - protected final SetOnce executedOnReplica = new SetOnce<>(); + final SetOnce executedOnPrimary; + final SetOnce executedOnReplica = new SetOnce<>(); TestAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, - ShardStateAction shardStateAction, ThreadPool threadPool, ShardId shardId, IndexShard primary, IndexShard replica) { - super(settings, actionName, transportService, clusterService, null, threadPool, shardStateAction, + ShardStateAction shardStateAction, ThreadPool threadPool, ShardId shardId, IndexShard primary, IndexShard replica, + SetOnce executedOnPrimary) { + super(settings, actionName, transportService, clusterService, mockIndicesService(shardId, executedOnPrimary, primary, replica), + threadPool, shardStateAction, new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(), Request::new, Request::new, ThreadPool.Names.SAME); this.shardId = Objects.requireNonNull(shardId); this.primary = Objects.requireNonNull(primary); assertEquals(shardId, primary.shardId()); this.replica = Objects.requireNonNull(replica); assertEquals(shardId, replica.shardId()); + this.executedOnPrimary = executedOnPrimary; } @Override @@ -391,52 +449,25 @@ protected PrimaryResult shardOperationOnPrimary(Request shard @Override protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard shard) throws Exception { + assertEquals("Replica is always assigned to node 2 in this test", clusterService.state().nodes().get("_node2").getId(), + shard.routingEntry().currentNodeId()); executedOnReplica.set(true); // The TransportReplicationAction.getIndexShard() method is overridden for testing purpose but we double check here // that the permit has been acquired on the replica shard assertSame(replica, shard); return new ReplicaResult(); } + } - @Override - protected IndexShard getIndexShard(final ShardId shardId) { - if (this.shardId.equals(shardId) == false) { - throw new AssertionError("shard id differs from " + shardId); - } - return (executedOnPrimary.get() == null) ? primary : replica; - } - - @Override - protected void sendReplicaRequest(final ConcreteReplicaRequest replicaRequest, - final DiscoveryNode node, - final ActionListener listener) { - assertEquals("Replica is always assigned to node 2 in this test", clusterService.state().nodes().get("_node2"), node); - try { - handleReplicaRequest(replicaRequest, new TransportChannel() { - @Override - public String getProfileName() { - return null; - } - - @Override - public String getChannelType() { - return null; - } + private static IndicesService mockIndicesService(ShardId shardId, SetOnce executedOnPrimary, IndexShard primary, + IndexShard replica) { + final IndexService indexService = mock(IndexService.class); + when(indexService.getShard(shardId.id())).then(invocation -> (executedOnPrimary.get() == null) ? primary : replica); - @Override - public void sendResponse(TransportResponse response) throws IOException { - listener.onResponse((ReplicationOperation.ReplicaResponse) response); - } + final IndicesService indicesService = mock(IndicesService.class); + when(indicesService.indexServiceSafe(shardId.getIndex())).then(invocation -> indexService); - @Override - public void sendResponse(Exception exception) throws IOException { - listener.onFailure(exception); - } - }, null); - } catch (Exception e) { - listener.onFailure(e); - } - } + return indicesService; } /** @@ -452,7 +483,8 @@ private class SinglePermitWithBlocksAction extends TestAction { SinglePermitWithBlocksAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ShardStateAction shardStateAction, ThreadPool threadPool, ShardId shardId, IndexShard primary, IndexShard replica, boolean globalBlock) { - super(settings, actionName, transportService, clusterService, shardStateAction, threadPool, shardId, primary, replica); + super(settings, actionName, transportService, clusterService, shardStateAction, threadPool, shardId, primary, replica, + new SetOnce<>()); this.globalBlock = globalBlock; } @@ -497,7 +529,8 @@ private class AllPermitsThenBlockAction extends TestAction { AllPermitsThenBlockAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ShardStateAction shardStateAction, ThreadPool threadPool, ShardId shardId, IndexShard primary, IndexShard replica) { - super(settings, actionName, transportService, clusterService, shardStateAction, threadPool, shardId, primary, replica); + super(settings, actionName, transportService, clusterService, shardStateAction, threadPool, shardId, primary, replica, + new SetOnce<>()); } @Override