From d8956d260149708b81e81c2446f348090451d3ff Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 5 Apr 2019 08:50:44 +0100 Subject: [PATCH] Remove test-only customisation from TransReplAct (#40863) The `getIndexShard()` and `sendReplicaRequest()` methods in TransportReplicationAction are effectively only used to customise some behaviour in tests. However there are other ways to do this that do not cause such an obstacle to separating the TransportReplicationAction into its two halves (see #40706). This commit removes these customisation points and injects the test-only behaviour using other techniques. --- .../TransportResyncReplicationAction.java | 14 -- .../TransportReplicationAction.java | 28 +--- .../seqno/GlobalCheckpointSyncAction.java | 16 --- .../TransportReplicationActionTests.java | 31 ++-- ...ReplicationAllPermitsAcquisitionTests.java | 133 +++++++++++------- 5 files changed, 108 insertions(+), 114 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index eb2f18e2e40f1..de1bf0e517b50 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -29,7 +29,6 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; @@ -74,19 +73,6 @@ protected ReplicationOperation.Replicas newReplicasProxy(long primaryTerm) { return new ResyncActionReplicasProxy(primaryTerm); } - @Override - protected void sendReplicaRequest( - final ConcreteReplicaRequest replicaRequest, - final DiscoveryNode node, - final ActionListener listener) { - if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) { - super.sendReplicaRequest(replicaRequest, node, listener); - } else { - final long pre60NodeCheckpoint = SequenceNumbers.PRE_60_NODE_CHECKPOINT; - listener.onResponse(new ReplicaResponse(pre60NodeCheckpoint, pre60NodeCheckpoint)); - } - } - @Override protected ClusterBlockLevel globalBlockLevel() { // resync should never be blocked because it's an internal action diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index d27b6d4f31b0a..ac6298c2c8691 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -619,7 +619,7 @@ public void onFailure(Exception e) { } } - protected IndexShard getIndexShard(final ShardId shardId) { + private IndexShard getIndexShard(final ShardId shardId) { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); return indexService.getShard(shardId.id()); } @@ -1058,7 +1058,12 @@ public void performOn( } final ConcreteReplicaRequest replicaRequest = new ConcreteReplicaRequest<>( request, replica.allocationId().getId(), primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes); - sendReplicaRequest(replicaRequest, node, listener); + final ActionListenerResponseHandler handler = new ActionListenerResponseHandler<>(listener, in -> { + ReplicaResponse replicaResponse = new ReplicaResponse(); + replicaResponse.readFrom(in); + return replicaResponse; + }); + transportService.sendRequest(node, transportReplicaAction, replicaRequest, transportOptions, handler); } @Override @@ -1080,25 +1085,6 @@ public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, A } } - /** - * Sends the specified replica request to the specified node. - * - * @param replicaRequest the replica request - * @param node the node to send the request to - * @param listener callback for handling the response or failure - */ - protected void sendReplicaRequest( - final ConcreteReplicaRequest replicaRequest, - final DiscoveryNode node, - final ActionListener listener) { - final ActionListenerResponseHandler handler = new ActionListenerResponseHandler<>(listener, in -> { - ReplicaResponse replicaResponse = new ReplicaResponse(); - replicaResponse.readFrom(in); - return replicaResponse; - }); - transportService.sendRequest(node, transportReplicaAction, replicaRequest, transportOptions, handler); - } - /** a wrapper class to encapsulate a request when being sent to a specific allocation id **/ public static class ConcreteShardRequest extends TransportRequest { diff --git a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java index 9b55cff8cff9a..4d3d0123fe6c9 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java @@ -22,16 +22,13 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -103,19 +100,6 @@ protected ReplicationResponse newResponseInstance() { return new ReplicationResponse(); } - @Override - protected void sendReplicaRequest( - final ConcreteReplicaRequest replicaRequest, - final DiscoveryNode node, - final ActionListener listener) { - if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) { - super.sendReplicaRequest(replicaRequest, node, listener); - } else { - final long pre60NodeCheckpoint = SequenceNumbers.PRE_60_NODE_CHECKPOINT; - listener.onResponse(new ReplicaResponse(pre60NodeCheckpoint, pre60NodeCheckpoint)); - } - } - @Override protected PrimaryResult shardOperationOnPrimary( final Request request, final IndexShard indexShard) throws Exception { diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 9164d9e4184eb..02e9ff3146cf6 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -757,7 +757,7 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException { assertEquals(0, shardFailedRequests.length); } - public void testSeqNoIsSetOnPrimary() throws Exception { + public void testSeqNoIsSetOnPrimary() { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); // we use one replica to check the primary term was set on the operation and sent to the replica @@ -788,14 +788,14 @@ public void testSeqNoIsSetOnPrimary() throws Exception { return null; }).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject()); - TestAction action = - new TestAction(Settings.EMPTY, "internal:testSeqNoIsSetOnPrimary", transportService, clusterService, shardStateAction, - threadPool) { - @Override - protected IndexShard getIndexShard(ShardId shardId) { - return shard; - } - }; + final IndexService indexService = mock(IndexService.class); + when(indexService.getShard(shard.shardId().id())).thenReturn(shard); + + final IndicesService indicesService = mock(IndicesService.class); + when(indicesService.indexServiceSafe(shard.shardId().getIndex())).thenReturn(indexService); + + TestAction action = new TestAction(Settings.EMPTY, "internal:testSeqNoIsSetOnPrimary", transportService, clusterService, + shardStateAction, threadPool, indicesService); action.handlePrimaryRequest(concreteShardRequest, createTransportChannel(listener), null); CapturingTransport.CapturedRequest[] requestsToReplicas = transport.capturedRequests(); @@ -1207,11 +1207,16 @@ static class TestResponse extends ReplicationResponse { private class TestAction extends TransportReplicationAction { - TestAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ShardStateAction shardStateAction, ThreadPool threadPool) { - super(settings, actionName, transportService, clusterService, mockIndicesService(clusterService), threadPool, + this(settings, actionName, transportService, clusterService, shardStateAction, threadPool, mockIndicesService(clusterService)); + } + + TestAction(Settings settings, String actionName, TransportService transportService, + ClusterService clusterService, ShardStateAction shardStateAction, + ThreadPool threadPool, IndicesService indicesService) { + super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(), Request::new, Request::new, ThreadPool.Names.SAME); @@ -1241,7 +1246,7 @@ protected boolean resolveIndex() { } } - final IndicesService mockIndicesService(ClusterService clusterService) { + private IndicesService mockIndicesService(ClusterService clusterService) { final IndicesService indicesService = mock(IndicesService.class); when(indicesService.indexServiceSafe(any(Index.class))).then(invocation -> { Index index = (Index)invocation.getArguments()[0]; @@ -1261,7 +1266,7 @@ final IndicesService mockIndicesService(ClusterService clusterService) { return indicesService; } - final IndexService mockIndexService(final IndexMetaData indexMetaData, ClusterService clusterService) { + private IndexService mockIndexService(final IndexMetaData indexMetaData, ClusterService clusterService) { final IndexService indexService = mock(IndexService.class); when(indexService.getShard(anyInt())).then(invocation -> { int shard = (Integer) invocation.getArguments()[0]; diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java index 8fe204cee2c34..b8c87acb56dd6 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java @@ -41,18 +41,23 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.test.transport.MockTransport; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportChannel; -import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportMessageListener; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import org.junit.After; import org.junit.Before; @@ -70,6 +75,7 @@ import java.util.concurrent.ExecutionException; import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_INDEX_UUID; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; @@ -78,6 +84,8 @@ import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting; import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; import static org.elasticsearch.test.ClusterServiceUtils.setState; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasItem; @@ -85,6 +93,9 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.startsWith; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** @@ -163,7 +174,49 @@ public void setUp() throws Exception { setState(clusterService, state.build()); final Settings transportSettings = Settings.builder().put("node.name", node1.getId()).build(); - transportService = MockTransportService.createNewService(transportSettings, Version.CURRENT, threadPool, null); + + MockTransport transport = new MockTransport() { + @Override + protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) { + assertThat(action, allOf(startsWith("cluster:admin/test/"), endsWith("[r]"))); + assertThat(node, equalTo(node2)); + // node2 doesn't really exist, but we are performing some trickery in mockIndicesService() to pretend that node1 holds both + // the primary and the replica, so redirect the request back to node1. + transportService.sendRequest(transportService.getLocalNode(), action, request, + new TransportResponseHandler() { + @Override + public TransportReplicationAction.ReplicaResponse read(StreamInput in) throws IOException { + final TransportReplicationAction.ReplicaResponse replicaResponse + = new TransportReplicationAction.ReplicaResponse(); + replicaResponse.readFrom(in); + return replicaResponse; + } + + @SuppressWarnings("unchecked") + private TransportResponseHandler getResponseHandler() { + return (TransportResponseHandler) + getResponseHandlers().onResponseReceived(requestId, TransportMessageListener.NOOP_LISTENER); + } + + @Override + public void handleResponse(TransportReplicationAction.ReplicaResponse response) { + getResponseHandler().handleResponse(response); + } + + @Override + public void handleException(TransportException exp) { + getResponseHandler().handleException(exp); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }); + } + }; + transportService = transport.createTransportService(transportSettings, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, + bta -> node1, null, emptySet()); transportService.start(); transportService.acceptIncomingRequests(); shardStateAction = new ShardStateAction(clusterService, transportService, null, null, threadPool); @@ -198,7 +251,8 @@ public void testTransportReplicationActionWithAllPermits() throws Exception { final PlainActionFuture listener = new PlainActionFuture<>(); futures[threadId] = listener; - final TestAction singlePermitAction = new SinglePermitWithBlocksAction(Settings.EMPTY, "internalSinglePermit[" + threadId + "]", + final TestAction singlePermitAction = new SinglePermitWithBlocksAction(Settings.EMPTY, + "cluster:admin/test/single_permit[" + threadId + "]", transportService, clusterService, shardStateAction, threadPool, shardId, primary, replica, globalBlock); actions[threadId] = singlePermitAction; @@ -251,8 +305,8 @@ private void assertBlockIsPresentForDelayedOp() { logger.trace("now starting the operation that acquires all permits and sets the block in the cluster state"); // An action which acquires all operation permits during execution and set a block - final TestAction allPermitsAction = new AllPermitsThenBlockAction(Settings.EMPTY, "internalAllPermits", transportService, - clusterService, shardStateAction, threadPool, shardId, primary, replica); + final TestAction allPermitsAction = new AllPermitsThenBlockAction(Settings.EMPTY, "cluster:admin/test/all_permits", + transportService, clusterService, shardStateAction, threadPool, shardId, primary, replica); final PlainActionFuture allPermitFuture = new PlainActionFuture<>(); Thread thread = new Thread(() -> { @@ -299,6 +353,7 @@ void runWithPrimaryShardReference(final TransportReplicationAction.PrimaryShardR } final Response allPermitsResponse = allPermitFuture.get(); + assertSuccessfulOperation(allPermitsAction, allPermitsResponse); for (int i = 0; i < numOperations; i++) { @@ -357,18 +412,21 @@ private abstract class TestAction extends TransportReplicationAction executedOnPrimary = new SetOnce<>(); - protected final SetOnce executedOnReplica = new SetOnce<>(); + final SetOnce executedOnPrimary; + final SetOnce executedOnReplica = new SetOnce<>(); TestAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, - ShardStateAction shardStateAction, ThreadPool threadPool, ShardId shardId, IndexShard primary, IndexShard replica) { - super(settings, actionName, transportService, clusterService, null, threadPool, shardStateAction, + ShardStateAction shardStateAction, ThreadPool threadPool, ShardId shardId, IndexShard primary, IndexShard replica, + SetOnce executedOnPrimary) { + super(settings, actionName, transportService, clusterService, mockIndicesService(shardId, executedOnPrimary, primary, replica), + threadPool, shardStateAction, new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(), Request::new, Request::new, ThreadPool.Names.SAME); this.shardId = Objects.requireNonNull(shardId); this.primary = Objects.requireNonNull(primary); assertEquals(shardId, primary.shardId()); this.replica = Objects.requireNonNull(replica); assertEquals(shardId, replica.shardId()); + this.executedOnPrimary = executedOnPrimary; } @Override @@ -391,52 +449,25 @@ protected PrimaryResult shardOperationOnPrimary(Request shard @Override protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard shard) throws Exception { + assertEquals("Replica is always assigned to node 2 in this test", clusterService.state().nodes().get("_node2").getId(), + shard.routingEntry().currentNodeId()); executedOnReplica.set(true); // The TransportReplicationAction.getIndexShard() method is overridden for testing purpose but we double check here // that the permit has been acquired on the replica shard assertSame(replica, shard); return new ReplicaResult(); } + } - @Override - protected IndexShard getIndexShard(final ShardId shardId) { - if (this.shardId.equals(shardId) == false) { - throw new AssertionError("shard id differs from " + shardId); - } - return (executedOnPrimary.get() == null) ? primary : replica; - } - - @Override - protected void sendReplicaRequest(final ConcreteReplicaRequest replicaRequest, - final DiscoveryNode node, - final ActionListener listener) { - assertEquals("Replica is always assigned to node 2 in this test", clusterService.state().nodes().get("_node2"), node); - try { - handleReplicaRequest(replicaRequest, new TransportChannel() { - @Override - public String getProfileName() { - return null; - } - - @Override - public String getChannelType() { - return null; - } + private static IndicesService mockIndicesService(ShardId shardId, SetOnce executedOnPrimary, IndexShard primary, + IndexShard replica) { + final IndexService indexService = mock(IndexService.class); + when(indexService.getShard(shardId.id())).then(invocation -> (executedOnPrimary.get() == null) ? primary : replica); - @Override - public void sendResponse(TransportResponse response) throws IOException { - listener.onResponse((ReplicationOperation.ReplicaResponse) response); - } + final IndicesService indicesService = mock(IndicesService.class); + when(indicesService.indexServiceSafe(shardId.getIndex())).then(invocation -> indexService); - @Override - public void sendResponse(Exception exception) throws IOException { - listener.onFailure(exception); - } - }, null); - } catch (Exception e) { - listener.onFailure(e); - } - } + return indicesService; } /** @@ -452,7 +483,8 @@ private class SinglePermitWithBlocksAction extends TestAction { SinglePermitWithBlocksAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ShardStateAction shardStateAction, ThreadPool threadPool, ShardId shardId, IndexShard primary, IndexShard replica, boolean globalBlock) { - super(settings, actionName, transportService, clusterService, shardStateAction, threadPool, shardId, primary, replica); + super(settings, actionName, transportService, clusterService, shardStateAction, threadPool, shardId, primary, replica, + new SetOnce<>()); this.globalBlock = globalBlock; } @@ -497,7 +529,8 @@ private class AllPermitsThenBlockAction extends TestAction { AllPermitsThenBlockAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ShardStateAction shardStateAction, ThreadPool threadPool, ShardId shardId, IndexShard primary, IndexShard replica) { - super(settings, actionName, transportService, clusterService, shardStateAction, threadPool, shardId, primary, replica); + super(settings, actionName, transportService, clusterService, shardStateAction, threadPool, shardId, primary, replica, + new SetOnce<>()); } @Override