Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove test-only customisation from TransReplAct #40863

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -73,14 +72,6 @@ protected ReplicationOperation.Replicas newReplicasProxy(long primaryTerm) {
return new ResyncActionReplicasProxy(primaryTerm);
}

@Override
protected void sendReplicaRequest(
final ConcreteReplicaRequest<ResyncReplicationRequest> replicaRequest,
final DiscoveryNode node,
final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
super.sendReplicaRequest(replicaRequest, node, listener);
}

@Override
protected ClusterBlockLevel globalBlockLevel() {
// resync should never be blocked because it's an internal action
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ public void onFailure(Exception e) {
}
}

protected IndexShard getIndexShard(final ShardId shardId) {
private IndexShard getIndexShard(final ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
return indexService.getShard(shardId.id());
}
Expand Down Expand Up @@ -1044,7 +1044,12 @@ public void performOn(
}
final ConcreteReplicaRequest<ReplicaRequest> replicaRequest = new ConcreteReplicaRequest<>(
request, replica.allocationId().getId(), primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);
sendReplicaRequest(replicaRequest, node, listener);
final ActionListenerResponseHandler<ReplicaResponse> handler = new ActionListenerResponseHandler<>(listener, in -> {
ReplicaResponse replicaResponse = new ReplicaResponse();
replicaResponse.readFrom(in);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps, while you're at it, implement the StreamInput constructor for ReplicaResponse and override readFrom by throwing exception, just like AddVotingConfigExclusionsResponse does. This can then just be new ActionListenerResponseHandler<>(listener, ReplicaResponse::new)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Writeable implementations are on their way as a followup.

return replicaResponse;
});
transportService.sendRequest(node, transportReplicaAction, replicaRequest, transportOptions, handler);
}

@Override
Expand All @@ -1066,25 +1071,6 @@ public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, A
}
}

/**
* Sends the specified replica request to the specified node.
*
* @param replicaRequest the replica request
* @param node the node to send the request to
* @param listener callback for handling the response or failure
*/
protected void sendReplicaRequest(
final ConcreteReplicaRequest<ReplicaRequest> replicaRequest,
final DiscoveryNode node,
final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
final ActionListenerResponseHandler<ReplicaResponse> handler = new ActionListenerResponseHandler<>(listener, in -> {
ReplicaResponse replicaResponse = new ReplicaResponse();
replicaResponse.readFrom(in);
return replicaResponse;
});
transportService.sendRequest(node, transportReplicaAction, replicaRequest, transportOptions, handler);
}

/** a wrapper class to encapsulate a request when being sent to a specific allocation id **/
public static class ConcreteShardRequest<R extends TransportRequest> extends TransportRequest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,13 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -103,19 +100,6 @@ protected ReplicationResponse newResponseInstance() {
return new ReplicationResponse();
}

@Override
protected void sendReplicaRequest(
final ConcreteReplicaRequest<Request> replicaRequest,
final DiscoveryNode node,
final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
super.sendReplicaRequest(replicaRequest, node, listener);
} else {
final long pre60NodeCheckpoint = SequenceNumbers.PRE_60_NODE_CHECKPOINT;
listener.onResponse(new ReplicaResponse(pre60NodeCheckpoint, pre60NodeCheckpoint));
}
}

@Override
protected PrimaryResult<Request, ReplicationResponse> shardOperationOnPrimary(
final Request request, final IndexShard indexShard) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException {
assertEquals(0, shardFailedRequests.length);
}

public void testSeqNoIsSetOnPrimary() throws Exception {
public void testSeqNoIsSetOnPrimary() {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
// we use one replica to check the primary term was set on the operation and sent to the replica
Expand Down Expand Up @@ -788,14 +788,14 @@ public void testSeqNoIsSetOnPrimary() throws Exception {
return null;
}).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject());

TestAction action =
new TestAction(Settings.EMPTY, "internal:testSeqNoIsSetOnPrimary", transportService, clusterService, shardStateAction,
threadPool) {
@Override
protected IndexShard getIndexShard(ShardId shardId) {
return shard;
}
};
final IndexService indexService = mock(IndexService.class);
when(indexService.getShard(shard.shardId().id())).thenReturn(shard);

final IndicesService indicesService = mock(IndicesService.class);
when(indicesService.indexServiceSafe(shard.shardId().getIndex())).thenReturn(indexService);

TestAction action = new TestAction(Settings.EMPTY, "internal:testSeqNoIsSetOnPrimary", transportService, clusterService,
shardStateAction, threadPool, indicesService);

action.handlePrimaryRequest(concreteShardRequest, createTransportChannel(listener), null);
CapturingTransport.CapturedRequest[] requestsToReplicas = transport.capturedRequests();
Expand Down Expand Up @@ -1207,11 +1207,16 @@ static class TestResponse extends ReplicationResponse {

private class TestAction extends TransportReplicationAction<Request, Request, TestResponse> {


TestAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, ShardStateAction shardStateAction,
ThreadPool threadPool) {
super(settings, actionName, transportService, clusterService, mockIndicesService(clusterService), threadPool,
this(settings, actionName, transportService, clusterService, shardStateAction, threadPool, mockIndicesService(clusterService));
}

TestAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, ShardStateAction shardStateAction,
ThreadPool threadPool, IndicesService indicesService) {
super(settings, actionName, transportService, clusterService, indicesService, threadPool,
shardStateAction,
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(),
Request::new, Request::new, ThreadPool.Names.SAME);
Expand Down Expand Up @@ -1241,7 +1246,7 @@ protected boolean resolveIndex() {
}
}

final IndicesService mockIndicesService(ClusterService clusterService) {
private IndicesService mockIndicesService(ClusterService clusterService) {
final IndicesService indicesService = mock(IndicesService.class);
when(indicesService.indexServiceSafe(any(Index.class))).then(invocation -> {
Index index = (Index)invocation.getArguments()[0];
Expand All @@ -1261,7 +1266,7 @@ final IndicesService mockIndicesService(ClusterService clusterService) {
return indicesService;
}

final IndexService mockIndexService(final IndexMetaData indexMetaData, ClusterService clusterService) {
private IndexService mockIndexService(final IndexMetaData indexMetaData, ClusterService clusterService) {
final IndexService indexService = mock(IndexService.class);
when(indexService.getShard(anyInt())).then(invocation -> {
int shard = (Integer) invocation.getArguments()[0];
Expand Down
Loading