Skip to content

Commit

Permalink
Remove test-only customisation from TransReplAct (#40863)
Browse files Browse the repository at this point in the history
The `getIndexShard()` and `sendReplicaRequest()` methods in
TransportReplicationAction are effectively only used to customise some
behaviour in tests. However there are other ways to do this that do not cause
such an obstacle to separating the TransportReplicationAction into its two
halves (see #40706).

This commit removes these customisation points and injects the test-only
behaviour using other techniques.
  • Loading branch information
DaveCTurner committed Apr 5, 2019
1 parent 669d72e commit d8956d2
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -74,19 +73,6 @@ protected ReplicationOperation.Replicas newReplicasProxy(long primaryTerm) {
return new ResyncActionReplicasProxy(primaryTerm);
}

@Override
protected void sendReplicaRequest(
final ConcreteReplicaRequest<ResyncReplicationRequest> replicaRequest,
final DiscoveryNode node,
final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
super.sendReplicaRequest(replicaRequest, node, listener);
} else {
final long pre60NodeCheckpoint = SequenceNumbers.PRE_60_NODE_CHECKPOINT;
listener.onResponse(new ReplicaResponse(pre60NodeCheckpoint, pre60NodeCheckpoint));
}
}

@Override
protected ClusterBlockLevel globalBlockLevel() {
// resync should never be blocked because it's an internal action
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ public void onFailure(Exception e) {
}
}

protected IndexShard getIndexShard(final ShardId shardId) {
private IndexShard getIndexShard(final ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
return indexService.getShard(shardId.id());
}
Expand Down Expand Up @@ -1058,7 +1058,12 @@ public void performOn(
}
final ConcreteReplicaRequest<ReplicaRequest> replicaRequest = new ConcreteReplicaRequest<>(
request, replica.allocationId().getId(), primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);
sendReplicaRequest(replicaRequest, node, listener);
final ActionListenerResponseHandler<ReplicaResponse> handler = new ActionListenerResponseHandler<>(listener, in -> {
ReplicaResponse replicaResponse = new ReplicaResponse();
replicaResponse.readFrom(in);
return replicaResponse;
});
transportService.sendRequest(node, transportReplicaAction, replicaRequest, transportOptions, handler);
}

@Override
Expand All @@ -1080,25 +1085,6 @@ public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, A
}
}

/**
* Sends the specified replica request to the specified node.
*
* @param replicaRequest the replica request
* @param node the node to send the request to
* @param listener callback for handling the response or failure
*/
protected void sendReplicaRequest(
final ConcreteReplicaRequest<ReplicaRequest> replicaRequest,
final DiscoveryNode node,
final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
final ActionListenerResponseHandler<ReplicaResponse> handler = new ActionListenerResponseHandler<>(listener, in -> {
ReplicaResponse replicaResponse = new ReplicaResponse();
replicaResponse.readFrom(in);
return replicaResponse;
});
transportService.sendRequest(node, transportReplicaAction, replicaRequest, transportOptions, handler);
}

/** a wrapper class to encapsulate a request when being sent to a specific allocation id **/
public static class ConcreteShardRequest<R extends TransportRequest> extends TransportRequest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,13 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -103,19 +100,6 @@ protected ReplicationResponse newResponseInstance() {
return new ReplicationResponse();
}

@Override
protected void sendReplicaRequest(
final ConcreteReplicaRequest<Request> replicaRequest,
final DiscoveryNode node,
final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
super.sendReplicaRequest(replicaRequest, node, listener);
} else {
final long pre60NodeCheckpoint = SequenceNumbers.PRE_60_NODE_CHECKPOINT;
listener.onResponse(new ReplicaResponse(pre60NodeCheckpoint, pre60NodeCheckpoint));
}
}

@Override
protected PrimaryResult<Request, ReplicationResponse> shardOperationOnPrimary(
final Request request, final IndexShard indexShard) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException {
assertEquals(0, shardFailedRequests.length);
}

public void testSeqNoIsSetOnPrimary() throws Exception {
public void testSeqNoIsSetOnPrimary() {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
// we use one replica to check the primary term was set on the operation and sent to the replica
Expand Down Expand Up @@ -788,14 +788,14 @@ public void testSeqNoIsSetOnPrimary() throws Exception {
return null;
}).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject());

TestAction action =
new TestAction(Settings.EMPTY, "internal:testSeqNoIsSetOnPrimary", transportService, clusterService, shardStateAction,
threadPool) {
@Override
protected IndexShard getIndexShard(ShardId shardId) {
return shard;
}
};
final IndexService indexService = mock(IndexService.class);
when(indexService.getShard(shard.shardId().id())).thenReturn(shard);

final IndicesService indicesService = mock(IndicesService.class);
when(indicesService.indexServiceSafe(shard.shardId().getIndex())).thenReturn(indexService);

TestAction action = new TestAction(Settings.EMPTY, "internal:testSeqNoIsSetOnPrimary", transportService, clusterService,
shardStateAction, threadPool, indicesService);

action.handlePrimaryRequest(concreteShardRequest, createTransportChannel(listener), null);
CapturingTransport.CapturedRequest[] requestsToReplicas = transport.capturedRequests();
Expand Down Expand Up @@ -1207,11 +1207,16 @@ static class TestResponse extends ReplicationResponse {

private class TestAction extends TransportReplicationAction<Request, Request, TestResponse> {


TestAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, ShardStateAction shardStateAction,
ThreadPool threadPool) {
super(settings, actionName, transportService, clusterService, mockIndicesService(clusterService), threadPool,
this(settings, actionName, transportService, clusterService, shardStateAction, threadPool, mockIndicesService(clusterService));
}

TestAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, ShardStateAction shardStateAction,
ThreadPool threadPool, IndicesService indicesService) {
super(settings, actionName, transportService, clusterService, indicesService, threadPool,
shardStateAction,
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(),
Request::new, Request::new, ThreadPool.Names.SAME);
Expand Down Expand Up @@ -1241,7 +1246,7 @@ protected boolean resolveIndex() {
}
}

final IndicesService mockIndicesService(ClusterService clusterService) {
private IndicesService mockIndicesService(ClusterService clusterService) {
final IndicesService indicesService = mock(IndicesService.class);
when(indicesService.indexServiceSafe(any(Index.class))).then(invocation -> {
Index index = (Index)invocation.getArguments()[0];
Expand All @@ -1261,7 +1266,7 @@ final IndicesService mockIndicesService(ClusterService clusterService) {
return indicesService;
}

final IndexService mockIndexService(final IndexMetaData indexMetaData, ClusterService clusterService) {
private IndexService mockIndexService(final IndexMetaData indexMetaData, ClusterService clusterService) {
final IndexService indexService = mock(IndexService.class);
when(indexService.getShard(anyInt())).then(invocation -> {
int shard = (Integer) invocation.getArguments()[0];
Expand Down
Loading

0 comments on commit d8956d2

Please sign in to comment.