diff --git a/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java b/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java index 7d09e44d37903..a1eb616b1eec3 100644 --- a/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java +++ b/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java @@ -19,9 +19,7 @@ package org.elasticsearch.action.bulk; -import org.elasticsearch.Version; -import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest; -import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequest; +import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -32,7 +30,7 @@ /** * */ -public class BulkShardRequest extends ShardReplicationOperationRequest { +public class BulkShardRequest extends ReplicationRequest { private int shardId; diff --git a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index cb2fd6fcfebb3..09157f7ba4c56 100644 --- a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -28,7 +28,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction; +import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.action.update.UpdateHelper; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; @@ -65,7 +65,7 @@ /** * Performs the index operation. */ -public class TransportShardBulkAction extends TransportShardReplicationOperationAction { +public class TransportShardBulkAction extends TransportReplicationAction { private final static String OP_TYPE_UPDATE = "update"; private final static String OP_TYPE_DELETE = "delete"; diff --git a/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java b/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java index 65295de900cc7..c5d5e78141738 100644 --- a/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java +++ b/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java @@ -22,7 +22,7 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.DocumentRequest; -import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest; +import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -44,7 +44,7 @@ * @see org.elasticsearch.client.Client#delete(DeleteRequest) * @see org.elasticsearch.client.Requests#deleteRequest(String) */ -public class DeleteRequest extends ShardReplicationOperationRequest implements DocumentRequest { +public class DeleteRequest extends ReplicationRequest implements DocumentRequest { private String type; private String id; diff --git a/src/main/java/org/elasticsearch/action/delete/DeleteRequestBuilder.java b/src/main/java/org/elasticsearch/action/delete/DeleteRequestBuilder.java index b15799824e166..0ce907bac1d64 100644 --- a/src/main/java/org/elasticsearch/action/delete/DeleteRequestBuilder.java +++ b/src/main/java/org/elasticsearch/action/delete/DeleteRequestBuilder.java @@ -19,7 +19,7 @@ package org.elasticsearch.action.delete; -import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder; +import org.elasticsearch.action.support.replication.ReplicationRequestBuilder; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.Nullable; import org.elasticsearch.index.VersionType; @@ -27,7 +27,7 @@ /** * A delete document action request builder. */ -public class DeleteRequestBuilder extends ShardReplicationOperationRequestBuilder { +public class DeleteRequestBuilder extends ReplicationRequestBuilder { public DeleteRequestBuilder(ElasticsearchClient client, DeleteAction action) { super(client, action, new DeleteRequest()); diff --git a/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java b/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java index 97f4ac8d02055..0a0719419c71e 100644 --- a/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java +++ b/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java @@ -27,7 +27,7 @@ import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.AutoCreateIndex; -import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction; +import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; @@ -50,7 +50,7 @@ /** * Performs the delete operation. */ -public class TransportDeleteAction extends TransportShardReplicationOperationAction { +public class TransportDeleteAction extends TransportReplicationAction { private final AutoCreateIndex autoCreateIndex; private final TransportCreateIndexAction createIndexAction; diff --git a/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 1f766b7a174c8..57b244ecefc1e 100644 --- a/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -20,11 +20,10 @@ package org.elasticsearch.action.index; import com.google.common.base.Charsets; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchGenerationException; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.action.*; -import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest; +import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; @@ -63,7 +62,7 @@ * @see org.elasticsearch.client.Requests#indexRequest(String) * @see org.elasticsearch.client.Client#index(IndexRequest) */ -public class IndexRequest extends ShardReplicationOperationRequest implements DocumentRequest { +public class IndexRequest extends ReplicationRequest implements DocumentRequest { /** * Operation type controls if the type of the index operation. diff --git a/src/main/java/org/elasticsearch/action/index/IndexRequestBuilder.java b/src/main/java/org/elasticsearch/action/index/IndexRequestBuilder.java index 7eb109c3ae22b..cf49435862823 100644 --- a/src/main/java/org/elasticsearch/action/index/IndexRequestBuilder.java +++ b/src/main/java/org/elasticsearch/action/index/IndexRequestBuilder.java @@ -19,7 +19,7 @@ package org.elasticsearch.action.index; -import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder; +import org.elasticsearch.action.support.replication.ReplicationRequestBuilder; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; @@ -32,7 +32,7 @@ /** * An index document action request builder. */ -public class IndexRequestBuilder extends ShardReplicationOperationRequestBuilder { +public class IndexRequestBuilder extends ReplicationRequestBuilder { public IndexRequestBuilder(ElasticsearchClient client, IndexAction action) { super(client, action, new IndexRequest()); diff --git a/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index 36b870f724fed..57ab297ebf70d 100644 --- a/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -27,7 +27,7 @@ import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.AutoCreateIndex; -import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction; +import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; @@ -61,7 +61,7 @@ *
  • allowIdGeneration: If the id is set not, should it be generated. Defaults to true. * */ -public class TransportIndexAction extends TransportShardReplicationOperationAction { +public class TransportIndexAction extends TransportReplicationAction { private final AutoCreateIndex autoCreateIndex; private final boolean allowIdGeneration; diff --git a/src/main/java/org/elasticsearch/action/support/replication/ShardReplicationOperationRequest.java b/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java similarity index 93% rename from src/main/java/org/elasticsearch/action/support/replication/ShardReplicationOperationRequest.java rename to src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java index 0d9730c246acf..a6864f45441c8 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/ShardReplicationOperationRequest.java +++ b/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java @@ -37,7 +37,7 @@ /** * */ -public abstract class ShardReplicationOperationRequest extends ActionRequest implements IndicesRequest { +public abstract class ReplicationRequest extends ActionRequest implements IndicesRequest { public static final TimeValue DEFAULT_TIMEOUT = new TimeValue(1, TimeUnit.MINUTES); @@ -50,21 +50,21 @@ public abstract class ShardReplicationOperationRequest, Response extends ActionResponse, RequestBuilder extends ShardReplicationOperationRequestBuilder> +public abstract class ReplicationRequestBuilder, Response extends ActionResponse, RequestBuilder extends ReplicationRequestBuilder> extends ActionRequestBuilder { - protected ShardReplicationOperationRequestBuilder(ElasticsearchClient client, Action action, Request request) { + protected ReplicationRequestBuilder(ElasticsearchClient client, Action action, Request request) { super(client, action, request); } diff --git a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java b/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java similarity index 98% rename from src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java rename to src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 3d05c4960a3f0..2e2a9e7abf3e2 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -26,8 +26,8 @@ import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.IndexRequest.OpType; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportActions; @@ -76,7 +76,7 @@ /** */ -public abstract class TransportShardReplicationOperationAction extends TransportAction { +public abstract class TransportReplicationAction extends TransportAction { protected final TransportService transportService; protected final ClusterService clusterService; @@ -90,11 +90,11 @@ public abstract class TransportShardReplicationOperationAction request, Class replicaRequest, String executor) { + protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService, + ClusterService clusterService, IndicesService indicesService, + ThreadPool threadPool, ShardStateAction shardStateAction, + MappingUpdatedAction mappingUpdatedAction, ActionFilters actionFilters, + Class request, Class replicaRequest, String executor) { super(settings, actionName, threadPool, actionFilters); this.transportService = transportService; this.clusterService = clusterService; diff --git a/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationOperationTests.java b/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java similarity index 93% rename from src/test/java/org/elasticsearch/action/support/replication/ShardReplicationOperationTests.java rename to src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java index fd95cffa2ad4f..432e21248fa21 100644 --- a/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationOperationTests.java +++ b/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.action.support.replication; import com.google.common.base.Predicate; - import org.apache.lucene.index.CorruptIndexException; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; @@ -42,13 +41,7 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.ImmutableShardRouting; -import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.cluster.routing.IndexShardRoutingTable; -import org.elasticsearch.cluster.routing.RoutingTable; -import org.elasticsearch.cluster.routing.ShardIterator; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.*; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.StreamInput; @@ -83,17 +76,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE; -import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; -import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; -import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED; -import static org.hamcrest.Matchers.arrayWithSize; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; +import static org.elasticsearch.cluster.metadata.IndexMetaData.*; +import static org.hamcrest.Matchers.*; -public class ShardReplicationOperationTests extends ElasticsearchTestCase { +public class ShardReplicationTests extends ElasticsearchTestCase { private static ThreadPool threadPool; @@ -102,13 +88,13 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase { private CapturingTransport transport; private Action action; /* * - * TransportShardReplicationOperationAction needs an instance of IndexShard to count operations. + * TransportReplicationAction needs an instance of IndexShard to count operations. * indexShards is reset to null before each test and will be initialized upon request in the tests. */ @BeforeClass public static void beforeClass() { - threadPool = new ThreadPool("ShardReplicationOperationTests"); + threadPool = new ThreadPool("ShardReplicationTests"); } @Before @@ -145,7 +131,7 @@ public void testBlocks() throws ExecutionException, InterruptedException { ClusterBlocks.Builder block = ClusterBlocks.builder() .addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL)); clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block)); - TransportShardReplicationOperationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener); + TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener); assertFalse("primary phase should stop execution", primaryPhase.checkBlocks()); assertListenerThrows("primary phase should fail operation", listener, ClusterBlockException.class); @@ -277,7 +263,7 @@ public void testNotStartedPrimary() throws InterruptedException, ExecutionExcept Request request = new Request(shardId).timeout("1ms"); PlainActionFuture listener = new PlainActionFuture<>(); - TransportShardReplicationOperationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener); + TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener); primaryPhase.run(); assertListenerThrows("unassigned primary didn't cause a timeout", listener, UnavailableShardsException.class); @@ -309,7 +295,7 @@ public void testRoutingToPrimary() { Request request = new Request(shardId); PlainActionFuture listener = new PlainActionFuture<>(); - TransportShardReplicationOperationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener); + TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener); assertTrue(primaryPhase.checkBlocks()); primaryPhase.routeRequestOrPerformLocally(shardRoutingTable.primaryShard(), shardRoutingTable.shardsIt()); if (primaryNodeId.equals(clusterService.localNode().id())) { @@ -374,7 +360,7 @@ public void testWriteConsistency() throws ExecutionException, InterruptedExcepti final IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id()); PlainActionFuture listener = new PlainActionFuture<>(); - TransportShardReplicationOperationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener); + TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener); if (passesWriteConsistency) { assertThat(primaryPhase.checkWriteConsistency(shardRoutingTable.primaryShard()), nullValue()); primaryPhase.run(); @@ -457,11 +443,11 @@ protected void runReplicateTest(IndexShardRoutingTable shardRoutingTable, int as logger.debug("expecting [{}] assigned replicas, [{}] total shards. using state: \n{}", assignedReplicas, totalShards, clusterService.state().prettyPrint()); - final TransportShardReplicationOperationAction.InternalRequest internalRequest = action.new InternalRequest(request); + final TransportReplicationAction.InternalRequest internalRequest = action.new InternalRequest(request); internalRequest.concreteIndex(shardId.index().name()); Releasable reference = getOrCreateIndexShardOperationsCounter(); assertIndexShardCounter(2); - TransportShardReplicationOperationAction.ReplicationPhase replicationPhase = + TransportReplicationAction.ReplicationPhase replicationPhase = action.new ReplicationPhase(shardIt, request, new Response(), new ClusterStateObserver(clusterService, logger), primaryShard, internalRequest, listener, reference); @@ -532,7 +518,7 @@ public void testCounterOnPrimary() throws InterruptedException, ExecutionExcepti * However, this failure would only become apparent once listener.get is called. Seems a little implicit. * */ action = new ActionWithDelay(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool); - final TransportShardReplicationOperationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener); + final TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener); Thread t = new Thread() { public void run() { primaryPhase.run(); @@ -568,7 +554,7 @@ public void testCounterIncrementedWhileReplicationOngoing() throws InterruptedEx logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint()); Request request = new Request(shardId).timeout("100ms"); PlainActionFuture listener = new PlainActionFuture<>(); - TransportShardReplicationOperationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener); + TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener); primaryPhase.run(); assertIndexShardCounter(2); assertThat(transport.capturedRequests().length, equalTo(1)); @@ -635,7 +621,7 @@ public void testCounterDecrementedIfShardOperationThrowsException() throws Inter logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint()); Request request = new Request(shardId).timeout("100ms"); PlainActionFuture listener = new PlainActionFuture<>(); - TransportShardReplicationOperationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener); + TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener); primaryPhase.run(); // no replica request should have been sent yet assertThat(transport.capturedRequests().length, equalTo(0)); @@ -662,7 +648,7 @@ public void close() { }; } - static class Request extends ShardReplicationOperationRequest { + static class Request extends ReplicationRequest { int shardId; public AtomicBoolean processedOnPrimary = new AtomicBoolean(); public AtomicInteger processedOnReplicas = new AtomicInteger(); @@ -694,7 +680,7 @@ public void readFrom(StreamInput in) throws IOException { static class Response extends ActionWriteResponse { } - class Action extends TransportShardReplicationOperationAction { + class Action extends TransportReplicationAction { Action(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, diff --git a/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 2d15f481b96ba..c8234e8f4b420 100644 --- a/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1512,7 +1512,7 @@ public void testRetryWithAutogeneratedIdWorksAndNoDuplicateDocs() throws IOExcep replicaEngine.create(index); fail(); } catch (VersionConflictEngineException e) { - // we ignore version conflicts on replicas, see TransportShardReplicationOperationAction.ignoreReplicaException + // we ignore version conflicts on replicas, see TransportReplicationAction.ignoreReplicaException } replicaEngine.refresh("test"); Engine.Searcher replicaSearcher = replicaEngine.acquireSearcher("test"); @@ -1556,7 +1556,7 @@ public void testRetryWithAutogeneratedIdsAndWrongOrderWorksAndNoDuplicateDocs() replicaEngine.create(secondIndexRequestReplica); fail(); } catch (VersionConflictEngineException e) { - // we ignore version conflicts on replicas, see TransportShardReplicationOperationAction.ignoreReplicaException. + // we ignore version conflicts on replicas, see TransportReplicationAction.ignoreReplicaException. } replicaEngine.refresh("test"); Engine.Searcher replicaSearcher = replicaEngine.acquireSearcher("test");