diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java index 094769d48cdc5..677063d36dd83 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java @@ -22,8 +22,8 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; -import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.DocWriteRequest.OpType; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.update.UpdateResponse; @@ -92,10 +92,10 @@ public static class Failure implements Writeable, ToXContent { private final String index; private final String type; private final String id; - private final Throwable cause; + private final Exception cause; private final RestStatus status; - public Failure(String index, String type, String id, Throwable cause) { + public Failure(String index, String type, String id, Exception cause) { this.index = index; this.type = type; this.id = id; @@ -161,7 +161,7 @@ public RestStatus getStatus() { /** * The actual cause of the failure. */ - public Throwable getCause() { + public Exception getCause() { return cause; } diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 0fa20bd5f1216..3c44c88ca1b51 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -33,8 +33,6 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction; -import org.elasticsearch.action.delete.DeleteRequest; -import org.elasticsearch.action.delete.TransportDeleteAction; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.ingest.IngestActionForwarder; import org.elasticsearch.action.support.ActionFilters; @@ -43,6 +41,8 @@ import org.elasticsearch.action.update.TransportUpdateAction; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateObserver; +import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -51,11 +51,15 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndexClosedException; +import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -157,11 +161,7 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener { inner.addSuppressed(e); listener.onFailure(inner); - } + }), responses); } } }); @@ -228,119 +226,199 @@ private long buildTookInMillis(long startTimeNanos) { return TimeUnit.NANOSECONDS.toMillis(relativeTime() - startTimeNanos); } - void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener listener, final AtomicArray responses ) { - final ClusterState clusterState = clusterService.state(); - // TODO use timeout to wait here if its blocked... - clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.WRITE); - - final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver); - MetaData metaData = clusterState.metaData(); - for (int i = 0; i < bulkRequest.requests.size(); i++) { - DocWriteRequest docWriteRequest = bulkRequest.requests.get(i); - //the request can only be null because we set it to null in the previous step, so it gets ignored - if (docWriteRequest == null) { - continue; + /** + * retries on retryable cluster blocks, resolves item requests, + * constructs shard bulk requests and delegates execution to shard bulk action + * */ + private final class BulkOperation extends AbstractRunnable { + private final Task task; + private final BulkRequest bulkRequest; + private final ActionListener listener; + private final AtomicArray responses; + private final long startTimeNanos; + private final ClusterStateObserver observer; + + BulkOperation(Task task, BulkRequest bulkRequest, ActionListener listener, + AtomicArray responses, long startTimeNanos) { + this.task = task; + this.bulkRequest = bulkRequest; + this.listener = listener; + this.responses = responses; + this.startTimeNanos = startTimeNanos; + this.observer = new ClusterStateObserver(clusterService, bulkRequest.timeout(), logger, threadPool.getThreadContext()); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + @Override + protected void doRun() throws Exception { + final ClusterState clusterState = observer.setAndGetObservedState(); + if (handleBlockExceptions(clusterState)) { + return; } - if (addFailureIfIndexIsUnavailable(docWriteRequest, bulkRequest, responses, i, concreteIndices, metaData)) { - continue; + final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver); + MetaData metaData = clusterState.metaData(); + for (int i = 0; i < bulkRequest.requests.size(); i++) { + DocWriteRequest docWriteRequest = bulkRequest.requests.get(i); + //the request can only be null because we set it to null in the previous step, so it gets ignored + if (docWriteRequest == null) { + continue; + } + if (addFailureIfIndexIsUnavailable(docWriteRequest, bulkRequest, responses, i, concreteIndices, metaData)) { + continue; + } + Index concreteIndex = concreteIndices.resolveIfAbsent(docWriteRequest); + try { + switch (docWriteRequest.opType()) { + case CREATE: + case INDEX: + IndexRequest indexRequest = (IndexRequest) docWriteRequest; + MappingMetaData mappingMd = null; + final IndexMetaData indexMetaData = metaData.index(concreteIndex); + if (indexMetaData != null) { + mappingMd = indexMetaData.mappingOrDefault(indexRequest.type()); + } + indexRequest.resolveRouting(metaData); + indexRequest.process(mappingMd, allowIdGeneration, concreteIndex.getName()); + break; + case UPDATE: + TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (UpdateRequest) docWriteRequest); + break; + case DELETE: + docWriteRequest.routing(metaData.resolveIndexRouting(docWriteRequest.parent(), docWriteRequest.routing(), docWriteRequest.index())); + // check if routing is required, if so, throw error if routing wasn't specified + if (docWriteRequest.routing() == null && metaData.routingRequired(concreteIndex.getName(), docWriteRequest.type())) { + throw new RoutingMissingException(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id()); + } + break; + default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]"); + } + } catch (ElasticsearchParseException | RoutingMissingException e) { + BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id(), e); + BulkItemResponse bulkItemResponse = new BulkItemResponse(i, docWriteRequest.opType(), failure); + responses.set(i, bulkItemResponse); + // make sure the request gets never processed again + bulkRequest.requests.set(i, null); + } } - Index concreteIndex = concreteIndices.resolveIfAbsent(docWriteRequest); - try { - switch (docWriteRequest.opType()) { - case CREATE: - case INDEX: - IndexRequest indexRequest = (IndexRequest) docWriteRequest; - MappingMetaData mappingMd = null; - final IndexMetaData indexMetaData = metaData.index(concreteIndex); - if (indexMetaData != null) { - mappingMd = indexMetaData.mappingOrDefault(indexRequest.type()); - } - indexRequest.resolveRouting(metaData); - indexRequest.process(mappingMd, allowIdGeneration, concreteIndex.getName()); - break; - case UPDATE: - TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (UpdateRequest) docWriteRequest); - break; - case DELETE: - TransportDeleteAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (DeleteRequest) docWriteRequest); - break; - default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]"); + + // first, go over all the requests and create a ShardId -> Operations mapping + Map> requestsByShard = new HashMap<>(); + for (int i = 0; i < bulkRequest.requests.size(); i++) { + DocWriteRequest request = bulkRequest.requests.get(i); + if (request == null) { + continue; } - } catch (ElasticsearchParseException | RoutingMissingException e) { - BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id(), e); - BulkItemResponse bulkItemResponse = new BulkItemResponse(i, docWriteRequest.opType(), failure); - responses.set(i, bulkItemResponse); - // make sure the request gets never processed again - bulkRequest.requests.set(i, null); + String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName(); + ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId(); + List shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>()); + shardRequests.add(new BulkItemRequest(i, request)); } - } - // first, go over all the requests and create a ShardId -> Operations mapping - Map> requestsByShard = new HashMap<>(); - for (int i = 0; i < bulkRequest.requests.size(); i++) { - DocWriteRequest request = bulkRequest.requests.get(i); - if (request == null) { - continue; + if (requestsByShard.isEmpty()) { + listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos))); + return; + } + + final AtomicInteger counter = new AtomicInteger(requestsByShard.size()); + String nodeId = clusterService.localNode().getId(); + for (Map.Entry> entry : requestsByShard.entrySet()) { + final ShardId shardId = entry.getKey(); + final List requests = entry.getValue(); + BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, bulkRequest.getRefreshPolicy(), + requests.toArray(new BulkItemRequest[requests.size()])); + bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards()); + bulkShardRequest.timeout(bulkRequest.timeout()); + if (task != null) { + bulkShardRequest.setParentTask(nodeId, task.getId()); + } + shardBulkAction.execute(bulkShardRequest, new ActionListener() { + @Override + public void onResponse(BulkShardResponse bulkShardResponse) { + for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) { + // we may have no response if item failed + if (bulkItemResponse.getResponse() != null) { + bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo()); + } + responses.set(bulkItemResponse.getItemId(), bulkItemResponse); + } + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } + + @Override + public void onFailure(Exception e) { + // create failures for all relevant requests + for (BulkItemRequest request : requests) { + final String indexName = concreteIndices.getConcreteIndex(request.index()).getName(); + DocWriteRequest docWriteRequest = request.request(); + responses.set(request.id(), new BulkItemResponse(request.id(), docWriteRequest.opType(), + new BulkItemResponse.Failure(indexName, docWriteRequest.type(), docWriteRequest.id(), e))); + } + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } + + private void finishHim() { + listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos))); + } + }); } - String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName(); - ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId(); - List shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>()); - shardRequests.add(new BulkItemRequest(i, request)); } - if (requestsByShard.isEmpty()) { - listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos))); - return; + private boolean handleBlockExceptions(ClusterState state) { + ClusterBlockException blockException = state.blocks().globalBlockedException(ClusterBlockLevel.WRITE); + if (blockException != null) { + if (blockException.retryable()) { + logger.trace("cluster is blocked, scheduling a retry", blockException); + retry(blockException); + } else { + onFailure(blockException); + } + return true; + } + return false; } - final AtomicInteger counter = new AtomicInteger(requestsByShard.size()); - String nodeId = clusterService.localNode().getId(); - for (Map.Entry> entry : requestsByShard.entrySet()) { - final ShardId shardId = entry.getKey(); - final List requests = entry.getValue(); - BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, bulkRequest.getRefreshPolicy(), - requests.toArray(new BulkItemRequest[requests.size()])); - bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards()); - bulkShardRequest.timeout(bulkRequest.timeout()); - if (task != null) { - bulkShardRequest.setParentTask(nodeId, task.getId()); + void retry(Exception failure) { + assert failure != null; + if (observer.isTimedOut()) { + // we running as a last attempt after a timeout has happened. don't retry + onFailure(failure); + return; } - shardBulkAction.execute(bulkShardRequest, new ActionListener() { + final ThreadContext.StoredContext context = threadPool.getThreadContext().newStoredContext(true); + observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override - public void onResponse(BulkShardResponse bulkShardResponse) { - for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) { - // we may have no response if item failed - if (bulkItemResponse.getResponse() != null) { - bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo()); - } - responses.set(bulkItemResponse.getItemId(), bulkItemResponse); - } - if (counter.decrementAndGet() == 0) { - finishHim(); - } + public void onNewClusterState(ClusterState state) { + context.close(); + run(); } @Override - public void onFailure(Exception e) { - // create failures for all relevant requests - for (BulkItemRequest request : requests) { - final String indexName = concreteIndices.getConcreteIndex(request.index()).getName(); - DocWriteRequest docWriteRequest = request.request(); - responses.set(request.id(), new BulkItemResponse(request.id(), docWriteRequest.opType(), - new BulkItemResponse.Failure(indexName, docWriteRequest.type(), docWriteRequest.id(), e))); - } - if (counter.decrementAndGet() == 0) { - finishHim(); - } + public void onClusterServiceClose() { + onFailure(new NodeClosedException(clusterService.localNode())); } - private void finishHim() { - listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos))); + @Override + public void onTimeout(TimeValue timeout) { + context.close(); + // Try one more time... + run(); } }); } } + void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener listener, final AtomicArray responses ) { + new BulkOperation(task, bulkRequest, listener, responses, startTimeNanos).run(); + } + private boolean addFailureIfIndexIsUnavailable(DocWriteRequest request, BulkRequest bulkRequest, AtomicArray responses, int idx, final ConcreteIndices concreteIndices, final MetaData metaData) { diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 2accce0b3ef09..de3de0f6d58e4 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -28,6 +28,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo; import org.elasticsearch.action.support.replication.TransportWriteAction; @@ -50,7 +51,10 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.mapper.MapperParsingException; +import org.elasticsearch.index.mapper.Mapping; +import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; @@ -59,10 +63,6 @@ import java.util.Map; -import static org.elasticsearch.action.delete.TransportDeleteAction.executeDeleteRequestOnPrimary; -import static org.elasticsearch.action.delete.TransportDeleteAction.executeDeleteRequestOnReplica; -import static org.elasticsearch.action.index.TransportIndexAction.executeIndexRequestOnPrimary; -import static org.elasticsearch.action.index.TransportIndexAction.executeIndexRequestOnReplica; import static org.elasticsearch.action.support.replication.ReplicationOperation.ignoreReplicaException; import static org.elasticsearch.action.support.replication.ReplicationOperation.isConflictException; @@ -103,7 +103,8 @@ protected boolean resolveIndex() { } @Override - protected WritePrimaryResult shardOperationOnPrimary(BulkShardRequest request, IndexShard primary) throws Exception { + public WritePrimaryResult shardOperationOnPrimary( + BulkShardRequest request, IndexShard primary) throws Exception { final IndexMetaData metaData = primary.indexSettings().getIndexMetaData(); long[] preVersions = new long[request.items().length]; @@ -119,7 +120,7 @@ protected WritePrimaryResult shardOperationOnPrimary(BulkShardRequest request, I responses[i] = items[i].getPrimaryResponse(); } BulkShardResponse response = new BulkShardResponse(request.shardId(), responses); - return new WritePrimaryResult(request, response, location, null, primary); + return new WritePrimaryResult<>(request, response, location, null, primary, logger); } /** Executes bulk item requests and handles request execution exceptions */ @@ -326,7 +327,7 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind } @Override - protected WriteReplicaResult shardOperationOnReplica(BulkShardRequest request, IndexShard replica) throws Exception { + public WriteReplicaResult shardOperationOnReplica(BulkShardRequest request, IndexShard replica) throws Exception { Translog.Location location = null; for (int i = 0; i < request.items().length; i++) { BulkItemRequest item = request.items()[i]; @@ -367,7 +368,89 @@ protected WriteReplicaResult shardOperationOnReplica(BulkShardRequest request, I } } } - return new WriteReplicaResult(request, location, null, replica); + return new WriteReplicaResult<>(request, location, null, replica, logger); + } + + /** + * Execute the given {@link IndexRequest} on a replica shard, throwing a + * {@link RetryOnReplicaException} if the operation needs to be re-tried. + */ + public static Engine.IndexResult executeIndexRequestOnReplica(IndexRequest request, IndexShard replica) { + final ShardId shardId = replica.shardId(); + SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, shardId.getIndexName(), request.type(), request.id(), request.source()) + .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl()); + + final Engine.Index operation; + try { + operation = replica.prepareIndexOnReplica(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry()); + } catch (MapperParsingException e) { + return new Engine.IndexResult(e, request.version()); + } + Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); + if (update != null) { + throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update); + } + return replica.index(operation); + } + + /** Utility method to prepare an index operation on primary shards */ + static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexShard primary) { + SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.index(), request.type(), request.id(), request.source()) + .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl()); + return primary.prepareIndexOnPrimary(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry()); + } + + /** Executes index operation on primary shard after updates mapping if dynamic mappings are found */ + public static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary, + MappingUpdatedAction mappingUpdatedAction) throws Exception { + Engine.Index operation; + try { + operation = prepareIndexOperationOnPrimary(request, primary); + } catch (MapperParsingException | IllegalArgumentException e) { + return new Engine.IndexResult(e, request.version()); + } + Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); + final ShardId shardId = primary.shardId(); + if (update != null) { + // can throw timeout exception when updating mappings or ISE for attempting to update default mappings + // which are bubbled up + try { + mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), request.type(), update); + } catch (IllegalArgumentException e) { + // throws IAE on conflicts merging dynamic mappings + return new Engine.IndexResult(e, request.version()); + } + try { + operation = prepareIndexOperationOnPrimary(request, primary); + } catch (MapperParsingException | IllegalArgumentException e) { + return new Engine.IndexResult(e, request.version()); + } + update = operation.parsedDoc().dynamicMappingsUpdate(); + if (update != null) { + throw new ReplicationOperation.RetryOnPrimaryException(shardId, + "Dynamic mappings are not available on the node that holds the primary yet"); + } + } + Engine.IndexResult result = primary.index(operation); + if (result.hasFailure() == false) { + // update the version on request so it will happen on the replicas + final long version = result.getVersion(); + request.version(version); + request.versionType(request.versionType().versionTypeForReplicationAndRecovery()); + assert request.versionType().validateVersionForWrites(request.version()); + } + return result; + } + + public static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary) { + final Engine.Delete delete = primary.prepareDeleteOnPrimary(request.type(), request.id(), request.version(), request.versionType()); + return primary.delete(delete); + } + + public static Engine.DeleteResult executeDeleteRequestOnReplica(DeleteRequest request, IndexShard replica) { + final Engine.Delete delete = replica.prepareDeleteOnReplica(request.type(), request.id(), + request.version(), request.versionType()); + return replica.delete(delete); } private Translog.Location locationToSync(Translog.Location current, Translog.Location next) { diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportSingleItemBulkWriteAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportSingleItemBulkWriteAction.java new file mode 100644 index 0000000000000..fd71f504ea9b1 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportSingleItemBulkWriteAction.java @@ -0,0 +1,132 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.bulk; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.action.support.WriteResponse; +import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; +import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.action.support.replication.TransportWriteAction; +import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.function.Supplier; + +/** use transport bulk action directly */ +@Deprecated +public abstract class TransportSingleItemBulkWriteAction< + Request extends ReplicatedWriteRequest, + Response extends ReplicationResponse & WriteResponse + > extends TransportWriteAction { + + private final TransportBulkAction bulkAction; + private final TransportShardBulkAction shardBulkAction; + + + protected TransportSingleItemBulkWriteAction(Settings settings, String actionName, TransportService transportService, + ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, + ShardStateAction shardStateAction, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, Supplier request, + Supplier replicaRequest, String executor, + TransportBulkAction bulkAction, TransportShardBulkAction shardBulkAction) { + super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, + indexNameExpressionResolver, request, replicaRequest, executor); + this.bulkAction = bulkAction; + this.shardBulkAction = shardBulkAction; + } + + + @Override + protected void doExecute(Task task, final Request request, final ActionListener listener) { + bulkAction.execute(task, toSingleItemBulkRequest(request), wrapBulkResponse(listener)); + } + + @Override + protected WritePrimaryResult shardOperationOnPrimary( + Request request, final IndexShard primary) throws Exception { + BulkItemRequest[] itemRequests = new BulkItemRequest[1]; + WriteRequest.RefreshPolicy refreshPolicy = request.getRefreshPolicy(); + request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE); + itemRequests[0] = new BulkItemRequest(0, ((DocWriteRequest) request)); + BulkShardRequest bulkShardRequest = new BulkShardRequest(request.shardId(), refreshPolicy, itemRequests); + WritePrimaryResult bulkResult = + shardBulkAction.shardOperationOnPrimary(bulkShardRequest, primary); + assert bulkResult.finalResponseIfSuccessful.getResponses().length == 1 : "expected only one bulk shard response"; + BulkItemResponse itemResponse = bulkResult.finalResponseIfSuccessful.getResponses()[0]; + final Response response; + final Exception failure; + if (itemResponse.isFailed()) { + failure = itemResponse.getFailure().getCause(); + response = null; + } else { + response = (Response) itemResponse.getResponse(); + failure = null; + } + return new WritePrimaryResult<>(request, response, bulkResult.location, failure, primary, logger); + } + + @Override + protected WriteReplicaResult shardOperationOnReplica( + Request replicaRequest, IndexShard replica) throws Exception { + BulkItemRequest[] itemRequests = new BulkItemRequest[1]; + WriteRequest.RefreshPolicy refreshPolicy = replicaRequest.getRefreshPolicy(); + itemRequests[0] = new BulkItemRequest(0, ((DocWriteRequest) replicaRequest)); + BulkShardRequest bulkShardRequest = new BulkShardRequest(replicaRequest.shardId(), refreshPolicy, itemRequests); + WriteReplicaResult result = shardBulkAction.shardOperationOnReplica(bulkShardRequest, replica); + // a replica operation can never throw a document-level failure, + // as the same document has been already indexed successfully in the primary + return new WriteReplicaResult<>(replicaRequest, result.location, null, replica, logger); + } + + + private ActionListener wrapBulkResponse(ActionListener listener) { + return ActionListener.wrap(bulkItemResponses -> { + assert bulkItemResponses.getItems().length == 1 : "expected only one item in bulk request"; + BulkItemResponse bulkItemResponse = bulkItemResponses.getItems()[0]; + if (bulkItemResponse.isFailed() == false) { + final DocWriteResponse response = bulkItemResponse.getResponse(); + listener.onResponse((Response) response); + } else { + listener.onFailure(bulkItemResponse.getFailure().getCause()); + } + }, listener::onFailure); + } + + public static BulkRequest toSingleItemBulkRequest(ReplicatedWriteRequest request) { + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(((DocWriteRequest) request)); + bulkRequest.setRefreshPolicy(request.getRefreshPolicy()); + bulkRequest.timeout(request.timeout()); + bulkRequest.waitForActiveShards(request.waitForActiveShards()); + request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE); + return bulkRequest; + } +} diff --git a/core/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java b/core/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java index 3e72cdaf43393..5d44fc87e43ec 100644 --- a/core/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java +++ b/core/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.delete; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.CompositeIndicesRequest; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.common.Nullable; @@ -45,7 +46,7 @@ * @see org.elasticsearch.client.Client#delete(DeleteRequest) * @see org.elasticsearch.client.Requests#deleteRequest(String) */ -public class DeleteRequest extends ReplicatedWriteRequest implements DocWriteRequest { +public class DeleteRequest extends ReplicatedWriteRequest implements DocWriteRequest, CompositeIndicesRequest { private String type; private String id; diff --git a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java index 2dd1c8f1a4649..3aaf4a472facf 100644 --- a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java +++ b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java @@ -19,136 +19,39 @@ package org.elasticsearch.action.delete; -import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.ResourceAlreadyExistsException; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.RoutingMissingException; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; -import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; -import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction; +import org.elasticsearch.action.bulk.TransportBulkAction; +import org.elasticsearch.action.bulk.TransportShardBulkAction; +import org.elasticsearch.action.bulk.TransportSingleItemBulkWriteAction; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.AutoCreateIndex; -import org.elasticsearch.action.support.replication.TransportWriteAction; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.action.shard.ShardStateAction; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; /** * Performs the delete operation. + * + * Deprecated use TransportBulkAction with a single item instead */ -public class TransportDeleteAction extends TransportWriteAction { - - private final AutoCreateIndex autoCreateIndex; - private final TransportCreateIndexAction createIndexAction; +@Deprecated +public class TransportDeleteAction extends TransportSingleItemBulkWriteAction { @Inject public TransportDeleteAction(Settings settings, TransportService transportService, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, - TransportCreateIndexAction createIndexAction, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, - AutoCreateIndex autoCreateIndex) { - super(settings, DeleteAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, - indexNameExpressionResolver, DeleteRequest::new, DeleteRequest::new, ThreadPool.Names.INDEX); - this.createIndexAction = createIndexAction; - this.autoCreateIndex = autoCreateIndex; - } - - @Override - protected void doExecute(Task task, final DeleteRequest request, final ActionListener listener) { - ClusterState state = clusterService.state(); - if (autoCreateIndex.shouldAutoCreate(request.index(), state)) { - CreateIndexRequest createIndexRequest = new CreateIndexRequest() - .index(request.index()) - .cause("auto(delete api)") - .masterNodeTimeout(request.timeout()); - createIndexAction.execute(task, createIndexRequest, new ActionListener() { - @Override - public void onResponse(CreateIndexResponse result) { - innerExecute(task, request, listener); - } - - @Override - public void onFailure(Exception e) { - if (ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException) { - // we have the index, do it - innerExecute(task, request, listener); - } else { - listener.onFailure(e); - } - } - }); - } else { - innerExecute(task, request, listener); - } - } - - @Override - protected void resolveRequest(final MetaData metaData, IndexMetaData indexMetaData, DeleteRequest request) { - super.resolveRequest(metaData, indexMetaData, request); - resolveAndValidateRouting(metaData, indexMetaData.getIndex().getName(), request); - ShardId shardId = clusterService.operationRouting().shardId(clusterService.state(), - indexMetaData.getIndex().getName(), request.id(), request.routing()); - request.setShardId(shardId); - } - - public static void resolveAndValidateRouting(final MetaData metaData, final String concreteIndex, - DeleteRequest request) { - request.routing(metaData.resolveIndexRouting(request.parent(), request.routing(), request.index())); - // check if routing is required, if so, throw error if routing wasn't specified - if (request.routing() == null && metaData.routingRequired(concreteIndex, request.type())) { - throw new RoutingMissingException(concreteIndex, request.type(), request.id()); - } - } - - private void innerExecute(Task task, final DeleteRequest request, final ActionListener listener) { - super.doExecute(task, request, listener); + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + TransportBulkAction bulkAction, TransportShardBulkAction shardBulkAction) { + super(settings, DeleteAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, + actionFilters, indexNameExpressionResolver, DeleteRequest::new, DeleteRequest::new, ThreadPool.Names.INDEX, + bulkAction, shardBulkAction); } @Override protected DeleteResponse newResponseInstance() { return new DeleteResponse(); } - - @Override - protected WritePrimaryResult shardOperationOnPrimary(DeleteRequest request, IndexShard primary) throws Exception { - final Engine.DeleteResult result = executeDeleteRequestOnPrimary(request, primary); - final DeleteResponse response = result.hasFailure() ? null : - new DeleteResponse(primary.shardId(), request.type(), request.id(), result.getVersion(), result.isFound()); - return new WritePrimaryResult(request, response, result.getTranslogLocation(), result.getFailure(), primary); - } - - @Override - protected WriteReplicaResult shardOperationOnReplica(DeleteRequest request, IndexShard replica) throws Exception { - final Engine.DeleteResult result = executeDeleteRequestOnReplica(request, replica); - return new WriteReplicaResult(request, result.getTranslogLocation(), result.getFailure(), replica); - } - - public static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary) { - Engine.Delete delete = primary.prepareDeleteOnPrimary(request.type(), request.id(), request.version(), request.versionType()); - Engine.DeleteResult result = primary.delete(delete); - if (result.hasFailure() == false) { - // update the request with the version so it will go to the replicas - request.versionType(delete.versionType().versionTypeForReplicationAndRecovery()); - request.version(result.getVersion()); - assert request.versionType().validateVersionForWrites(request.version()); - } - return result; - } - - public static Engine.DeleteResult executeDeleteRequestOnReplica(DeleteRequest request, IndexShard replica) { - Engine.Delete delete = replica.prepareDeleteOnReplica(request.type(), request.id(), request.version(), request.versionType()); - return replica.delete(delete); - } } diff --git a/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 71df06e35f3dc..6e0e095eae20d 100644 --- a/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -22,6 +22,7 @@ import org.elasticsearch.ElasticsearchGenerationException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.CompositeIndicesRequest; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.TimestampParsingException; @@ -70,7 +71,7 @@ * @see org.elasticsearch.client.Requests#indexRequest(String) * @see org.elasticsearch.client.Client#index(IndexRequest) */ -public class IndexRequest extends ReplicatedWriteRequest implements DocWriteRequest { +public class IndexRequest extends ReplicatedWriteRequest implements DocWriteRequest, CompositeIndicesRequest { private String type; private String id; diff --git a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index 14308c954d7fd..c925f86df973d 100644 --- a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -19,39 +19,16 @@ package org.elasticsearch.action.index; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.logging.log4j.util.Supplier; -import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.ResourceAlreadyExistsException; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; -import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; -import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction; -import org.elasticsearch.action.ingest.IngestActionForwarder; +import org.elasticsearch.action.bulk.TransportBulkAction; +import org.elasticsearch.action.bulk.TransportShardBulkAction; +import org.elasticsearch.action.bulk.TransportSingleItemBulkWriteAction; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.AutoCreateIndex; -import org.elasticsearch.action.support.replication.ReplicationOperation; -import org.elasticsearch.action.support.replication.TransportWriteAction; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.MappingMetaData; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.mapper.MapperParsingException; -import org.elasticsearch.index.mapper.Mapping; -import org.elasticsearch.index.mapper.SourceToParse; -import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.ingest.IngestService; -import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -64,199 +41,26 @@ * Defaults to true. *
  • allowIdGeneration: If the id is set not, should it be generated. Defaults to true. * + * + * Deprecated use TransportBulkAction with a single item instead */ -public class TransportIndexAction extends TransportWriteAction { - - private final AutoCreateIndex autoCreateIndex; - private final boolean allowIdGeneration; - private final TransportCreateIndexAction createIndexAction; - - private final ClusterService clusterService; - private final IngestService ingestService; - private final MappingUpdatedAction mappingUpdatedAction; - private final IngestActionForwarder ingestForwarder; +@Deprecated +public class TransportIndexAction extends TransportSingleItemBulkWriteAction { @Inject public TransportIndexAction(Settings settings, TransportService transportService, ClusterService clusterService, - IndicesService indicesService, IngestService ingestService, ThreadPool threadPool, - ShardStateAction shardStateAction, TransportCreateIndexAction createIndexAction, - MappingUpdatedAction mappingUpdatedAction, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, AutoCreateIndex autoCreateIndex) { + IndicesService indicesService, + ThreadPool threadPool, ShardStateAction shardStateAction, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + TransportBulkAction bulkAction, TransportShardBulkAction shardBulkAction) { super(settings, IndexAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, - actionFilters, indexNameExpressionResolver, IndexRequest::new, IndexRequest::new, ThreadPool.Names.INDEX); - this.mappingUpdatedAction = mappingUpdatedAction; - this.createIndexAction = createIndexAction; - this.autoCreateIndex = autoCreateIndex; - this.allowIdGeneration = settings.getAsBoolean("action.allow_id_generation", true); - this.clusterService = clusterService; - this.ingestService = ingestService; - this.ingestForwarder = new IngestActionForwarder(transportService); - clusterService.addStateApplier(this.ingestForwarder); - } - - @Override - protected void doExecute(Task task, final IndexRequest request, final ActionListener listener) { - if (Strings.hasText(request.getPipeline())) { - if (clusterService.localNode().isIngestNode()) { - processIngestIndexRequest(task, request, listener); - } else { - ingestForwarder.forwardIngestRequest(IndexAction.INSTANCE, request, listener); - } - return; - } - // if we don't have a master, we don't have metadata, that's fine, let it find a master using create index API - ClusterState state = clusterService.state(); - if (shouldAutoCreate(request, state)) { - CreateIndexRequest createIndexRequest = new CreateIndexRequest(); - createIndexRequest.index(request.index()); - createIndexRequest.cause("auto(index api)"); - createIndexRequest.masterNodeTimeout(request.timeout()); - createIndexAction.execute(task, createIndexRequest, new ActionListener() { - @Override - public void onResponse(CreateIndexResponse result) { - innerExecute(task, request, listener); - } - - @Override - public void onFailure(Exception e) { - if (ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException) { - // we have the index, do it - try { - innerExecute(task, request, listener); - } catch (Exception inner) { - inner.addSuppressed(e); - listener.onFailure(inner); - } - } else { - listener.onFailure(e); - } - } - }); - } else { - innerExecute(task, request, listener); - } - } - - protected boolean shouldAutoCreate(IndexRequest request, ClusterState state) { - return autoCreateIndex.shouldAutoCreate(request.index(), state); - } - - @Override - protected void resolveRequest(MetaData metaData, IndexMetaData indexMetaData, IndexRequest request) { - super.resolveRequest(metaData, indexMetaData, request); - MappingMetaData mappingMd =indexMetaData.mappingOrDefault(request.type()); - request.resolveRouting(metaData); - request.process(mappingMd, allowIdGeneration, indexMetaData.getIndex().getName()); - ShardId shardId = clusterService.operationRouting().shardId(clusterService.state(), - indexMetaData.getIndex().getName(), request.id(), request.routing()); - request.setShardId(shardId); - } - - protected void innerExecute(Task task, final IndexRequest request, final ActionListener listener) { - super.doExecute(task, request, listener); + actionFilters, indexNameExpressionResolver, IndexRequest::new, IndexRequest::new, ThreadPool.Names.INDEX, + bulkAction, shardBulkAction); } @Override protected IndexResponse newResponseInstance() { return new IndexResponse(); } - - @Override - protected WritePrimaryResult shardOperationOnPrimary(IndexRequest request, IndexShard primary) throws Exception { - final Engine.IndexResult indexResult = executeIndexRequestOnPrimary(request, primary, mappingUpdatedAction); - final IndexResponse response = indexResult.hasFailure() ? null : - new IndexResponse(primary.shardId(), request.type(), request.id(), indexResult.getVersion(), - indexResult.isCreated()); - return new WritePrimaryResult(request, response, indexResult.getTranslogLocation(), indexResult.getFailure(), primary); - } - - @Override - protected WriteReplicaResult shardOperationOnReplica(IndexRequest request, IndexShard replica) throws Exception { - final Engine.IndexResult indexResult = executeIndexRequestOnReplica(request, replica); - return new WriteReplicaResult(request, indexResult.getTranslogLocation(), indexResult.getFailure(), replica); - } - - /** - * Execute the given {@link IndexRequest} on a replica shard, throwing a - * {@link RetryOnReplicaException} if the operation needs to be re-tried. - */ - public static Engine.IndexResult executeIndexRequestOnReplica(IndexRequest request, IndexShard replica) { - final ShardId shardId = replica.shardId(); - SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, shardId.getIndexName(), request.type(), request.id(), request.source()) - .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl()); - - final Engine.Index operation; - try { - operation = replica.prepareIndexOnReplica(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry()); - } catch (MapperParsingException e) { - return new Engine.IndexResult(e, request.version()); - } - Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); - if (update != null) { - throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update); - } - return replica.index(operation); - } - - /** Utility method to prepare an index operation on primary shards */ - static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexShard primary) { - SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.index(), request.type(), request.id(), request.source()) - .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl()); - return primary.prepareIndexOnPrimary(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry()); - } - - public static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary, - MappingUpdatedAction mappingUpdatedAction) throws Exception { - Engine.Index operation; - try { - operation = prepareIndexOperationOnPrimary(request, primary); - } catch (MapperParsingException | IllegalArgumentException e) { - return new Engine.IndexResult(e, request.version()); - } - Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); - final ShardId shardId = primary.shardId(); - if (update != null) { - // can throw timeout exception when updating mappings or ISE for attempting to update default mappings - // which are bubbled up - try { - mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), request.type(), update); - } catch (IllegalArgumentException e) { - // throws IAE on conflicts merging dynamic mappings - return new Engine.IndexResult(e, request.version()); - } - try { - operation = prepareIndexOperationOnPrimary(request, primary); - } catch (MapperParsingException | IllegalArgumentException e) { - return new Engine.IndexResult(e, request.version()); - } - update = operation.parsedDoc().dynamicMappingsUpdate(); - if (update != null) { - throw new ReplicationOperation.RetryOnPrimaryException(shardId, - "Dynamic mappings are not available on the node that holds the primary yet"); - } - } - Engine.IndexResult result = primary.index(operation); - if (result.hasFailure() == false) { - // update the version on request so it will happen on the replicas - final long version = result.getVersion(); - request.version(version); - request.versionType(request.versionType().versionTypeForReplicationAndRecovery()); - assert request.versionType().validateVersionForWrites(request.version()); - } - return result; - } - - private void processIngestIndexRequest(Task task, IndexRequest indexRequest, ActionListener listener) { - ingestService.getPipelineExecutionService().executeIndexRequest(indexRequest, t -> { - logger.error((Supplier) () -> new ParameterizedMessage("failed to execute pipeline [{}]", indexRequest.getPipeline()), t); - listener.onFailure(t); - }, success -> { - // TransportIndexAction uses IndexRequest and same action name on the node that receives the request and the node that - // processes the primary action. This could lead to a pipeline being executed twice for the same - // index request, hence we set the pipeline to null once its execution completed. - indexRequest.setPipeline(null); - doExecute(task, indexRequest, listener); - }); - } } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index f4fc53cf1b8eb..a7d96ddbc7f1c 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -172,7 +172,8 @@ protected void resolveRequest(MetaData metaData, IndexMetaData indexMetaData, Re * @param shardRequest the request to the primary shard * @param primary the primary shard to perform the operation on */ - protected abstract PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard primary) throws Exception; + protected abstract PrimaryResult shardOperationOnPrimary( + Request shardRequest, IndexShard primary) throws Exception; /** * Synchronous replica operation on nodes with replica copies. This is done under the lock form @@ -363,8 +364,8 @@ public void onFailure(Exception e) { }; } - protected ReplicationOperation createReplicatedOperation( - Request request, ActionListener listener, + protected ReplicationOperation> createReplicatedOperation( + Request request, ActionListener> listener, PrimaryShardReference primaryShardReference, boolean executeOnReplicas) { return new ReplicationOperation<>(request, primaryShardReference, listener, executeOnReplicas, replicasProxy, clusterService::state, logger, actionName @@ -372,10 +373,12 @@ protected ReplicationOperation createRep } } - protected class PrimaryResult implements ReplicationOperation.PrimaryResult { + protected static class PrimaryResult, + Response extends ReplicationResponse> + implements ReplicationOperation.PrimaryResult { final ReplicaRequest replicaRequest; - final Response finalResponseIfSuccessful; - final Exception finalFailure; + public final Response finalResponseIfSuccessful; + public final Exception finalFailure; /** * Result of executing a primary operation @@ -415,7 +418,7 @@ public void respond(ActionListener listener) { } } - protected class ReplicaResult { + protected static class ReplicaResult { final Exception finalFailure; public ReplicaResult(Exception finalFailure) { @@ -902,7 +905,8 @@ protected boolean shouldExecuteReplication(IndexMetaData indexMetaData) { return IndexMetaData.isIndexUsingShadowReplicas(indexMetaData.getSettings()) == false; } - class PrimaryShardReference implements ReplicationOperation.Primary, Releasable { + class PrimaryShardReference implements + ReplicationOperation.Primary>, Releasable { private final IndexShard indexShard; private final Releasable operationLock; diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 0243b4aca9676..0322b2d2d12f3 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -21,6 +21,8 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.WriteResponse; @@ -68,7 +70,8 @@ protected TransportWriteAction(Settings settings, String actionName, TransportSe * async refresh is performed on the primary shard according to the Request refresh policy */ @Override - protected abstract WritePrimaryResult shardOperationOnPrimary(Request request, IndexShard primary) throws Exception; + protected abstract WritePrimaryResult shardOperationOnPrimary( + Request request, IndexShard primary) throws Exception; /** * Called once per replica with a reference to the replica {@linkplain IndexShard} to modify. @@ -77,19 +80,24 @@ protected TransportWriteAction(Settings settings, String actionName, TransportSe * async refresh is performed on the replica shard according to the ReplicaRequest refresh policy */ @Override - protected abstract WriteReplicaResult shardOperationOnReplica(ReplicaRequest request, IndexShard replica) throws Exception; + protected abstract WriteReplicaResult shardOperationOnReplica( + ReplicaRequest request, IndexShard replica) throws Exception; /** * Result of taking the action on the primary. */ - protected class WritePrimaryResult extends PrimaryResult implements RespondingWriteResult { + protected static class WritePrimaryResult, + Response extends ReplicationResponse & WriteResponse> extends PrimaryResult + implements RespondingWriteResult { boolean finishedAsyncActions; + public final Location location; ActionListener listener = null; public WritePrimaryResult(ReplicaRequest request, @Nullable Response finalResponse, @Nullable Location location, @Nullable Exception operationFailure, - IndexShard primary) { + IndexShard primary, Logger logger) { super(request, finalResponse, operationFailure); + this.location = location; assert location == null || operationFailure == null : "expected either failure to be null or translog location to be null, " + "but found: [" + location + "] translog location and [" + operationFailure + "] failure"; @@ -139,13 +147,16 @@ public synchronized void onSuccess(boolean forcedRefresh) { /** * Result of taking the action on the replica. */ - protected class WriteReplicaResult extends ReplicaResult implements RespondingWriteResult { + protected static class WriteReplicaResult> + extends ReplicaResult implements RespondingWriteResult { + public final Location location; boolean finishedAsyncActions; private ActionListener listener; public WriteReplicaResult(ReplicaRequest request, @Nullable Location location, - @Nullable Exception operationFailure, IndexShard replica) { + @Nullable Exception operationFailure, IndexShard replica, Logger logger) { super(operationFailure); + this.location = location; if (operationFailure != null) { this.finishedAsyncActions = true; } else { diff --git a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index bae47ef470887..4380e7f29aa9c 100644 --- a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -169,7 +169,7 @@ public ThreadPool(final Settings settings, final ExecutorBuilder... customBui final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512); builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30))); builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200)); - builders.put(Names.BULK, new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 50)); + builders.put(Names.BULK, new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 200)); // now that we reuse bulk for index/delete ops builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000)); builders.put(Names.SEARCH, new FixedExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000)); builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5))); diff --git a/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java b/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java index 283b000c5081d..2859ae73b7352 100644 --- a/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java +++ b/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java @@ -201,7 +201,7 @@ public void testAnalyze() { } public void testIndex() { - String[] indexShardActions = new String[]{IndexAction.NAME, IndexAction.NAME + "[p]", IndexAction.NAME + "[r]"}; + String[] indexShardActions = new String[]{BulkAction.NAME + "[s][p]", BulkAction.NAME + "[s][r]"}; interceptTransportActions(indexShardActions); IndexRequest indexRequest = new IndexRequest(randomIndexOrAlias(), "type", "id").source("field", "value"); @@ -212,7 +212,7 @@ public void testIndex() { } public void testDelete() { - String[] deleteShardActions = new String[]{DeleteAction.NAME, DeleteAction.NAME + "[p]", DeleteAction.NAME + "[r]"}; + String[] deleteShardActions = new String[]{BulkAction.NAME + "[s][p]", BulkAction.NAME + "[s][r]"}; interceptTransportActions(deleteShardActions); DeleteRequest deleteRequest = new DeleteRequest(randomIndexOrAlias(), "type", "id"); @@ -224,7 +224,7 @@ public void testDelete() { public void testUpdate() { //update action goes to the primary, index op gets executed locally, then replicated - String[] updateShardActions = new String[]{UpdateAction.NAME + "[s]", IndexAction.NAME + "[r]"}; + String[] updateShardActions = new String[]{UpdateAction.NAME + "[s]", BulkAction.NAME + "[s][p]", BulkAction.NAME + "[s][r]"}; interceptTransportActions(updateShardActions); String indexOrAlias = randomIndexOrAlias(); @@ -239,7 +239,7 @@ public void testUpdate() { public void testUpdateUpsert() { //update action goes to the primary, index op gets executed locally, then replicated - String[] updateShardActions = new String[]{UpdateAction.NAME + "[s]", IndexAction.NAME + "[r]"}; + String[] updateShardActions = new String[]{UpdateAction.NAME + "[s]", BulkAction.NAME + "[s][p]", BulkAction.NAME + "[s][r]"}; interceptTransportActions(updateShardActions); String indexOrAlias = randomIndexOrAlias(); @@ -253,7 +253,7 @@ public void testUpdateUpsert() { public void testUpdateDelete() { //update action goes to the primary, delete op gets executed locally, then replicated - String[] updateShardActions = new String[]{UpdateAction.NAME + "[s]", DeleteAction.NAME + "[r]"}; + String[] updateShardActions = new String[]{UpdateAction.NAME + "[s]", BulkAction.NAME + "[s][p]", BulkAction.NAME + "[s][r]"}; interceptTransportActions(updateShardActions); String indexOrAlias = randomIndexOrAlias(); diff --git a/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index f18b9354a3971..653908c75b1d1 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -22,21 +22,27 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; +import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.PipelineExecutionService; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import org.junit.Before; @@ -49,6 +55,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.Supplier; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.sameInstance; @@ -85,6 +92,9 @@ public class TransportBulkActionIngestTests extends ESTestCase { /** The actual action we want to test, with real indexing mocked */ TestTransportBulkAction action; + /** Single item bulk write action that wraps index requests */ + TestSingleItemBulkWriteAction singleItemBulkWriteAction; + /** True if the next call to the index action should act as an ingest node */ boolean localIngest; @@ -111,6 +121,20 @@ void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeN } } + class TestSingleItemBulkWriteAction extends TransportSingleItemBulkWriteAction { + + TestSingleItemBulkWriteAction(TestTransportBulkAction bulkAction) { + super(Settings.EMPTY, IndexAction.NAME, transportService, TransportBulkActionIngestTests.this.clusterService, + null, null, null, new ActionFilters(Collections.emptySet()), null, + IndexRequest::new, IndexRequest::new, ThreadPool.Names.INDEX, bulkAction, null); + } + + @Override + protected IndexResponse newResponseInstance() { + return new IndexResponse(); + } + } + @Before public void setupAction() { // initialize captors, which must be members to use @Capture because of generics @@ -143,6 +167,7 @@ public void setupAction() { executionService = mock(PipelineExecutionService.class); when(ingestService.getPipelineExecutionService()).thenReturn(executionService); action = new TestTransportBulkAction(); + singleItemBulkWriteAction = new TestSingleItemBulkWriteAction(action); reset(transportService); // call on construction of action } @@ -158,6 +183,16 @@ public void testIngestSkipped() throws Exception { verifyZeroInteractions(ingestService); } + public void testSingleItemBulkActionIngestSkipped() throws Exception { + IndexRequest indexRequest = new IndexRequest("index", "type", "id"); + indexRequest.source(Collections.emptyMap()); + singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap(response -> {}, exception -> { + throw new AssertionError(exception); + })); + assertTrue(action.isExecuted); + verifyZeroInteractions(ingestService); + } + public void testIngestLocal() throws Exception { Exception exception = new Exception("fake exception"); BulkRequest bulkRequest = new BulkRequest(); @@ -201,6 +236,38 @@ public void testIngestLocal() throws Exception { verifyZeroInteractions(transportService); } + public void testSingleItemBulkActionIngestLocal() throws Exception { + Exception exception = new Exception("fake exception"); + IndexRequest indexRequest = new IndexRequest("index", "type", "id"); + indexRequest.source(Collections.emptyMap()); + indexRequest.setPipeline("testpipeline"); + AtomicBoolean responseCalled = new AtomicBoolean(false); + AtomicBoolean failureCalled = new AtomicBoolean(false); + singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap( + response -> { + responseCalled.set(true); + }, + e -> { + assertThat(e, sameInstance(exception)); + failureCalled.set(true); + })); + + // check failure works, and passes through to the listener + assertFalse(action.isExecuted); // haven't executed yet + assertFalse(responseCalled.get()); + assertFalse(failureCalled.get()); + verify(executionService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture()); + completionHandler.getValue().accept(exception); + assertTrue(failureCalled.get()); + + // now check success + indexRequest.setPipeline(null); // this is done by the real pipeline execution service when processing + completionHandler.getValue().accept(null); + assertTrue(action.isExecuted); + assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one + verifyZeroInteractions(transportService); + } + public void testIngestForward() throws Exception { localIngest = false; BulkRequest bulkRequest = new BulkRequest(); @@ -247,5 +314,51 @@ public void testIngestForward() throws Exception { } } + public void testSingleItemBulkActionIngestForward() throws Exception { + localIngest = false; + IndexRequest indexRequest = new IndexRequest("index", "type", "id"); + indexRequest.source(Collections.emptyMap()); + indexRequest.setPipeline("testpipeline"); + IndexResponse indexResponse = mock(IndexResponse.class); + AtomicBoolean responseCalled = new AtomicBoolean(false); + ActionListener listener = ActionListener.wrap( + response -> { + responseCalled.set(true); + assertSame(indexResponse, response); + }, + e -> { + throw new AssertionError(e); + }); + singleItemBulkWriteAction.execute(null, indexRequest, listener); + + // should not have executed ingest locally + verify(executionService, never()).executeBulkRequest(any(), any(), any()); + // but instead should have sent to a remote node with the transport service + ArgumentCaptor node = ArgumentCaptor.forClass(DiscoveryNode.class); + verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture()); + boolean usedNode1 = node.getValue() == remoteNode1; // make sure we used one of the nodes + if (usedNode1 == false) { + assertSame(remoteNode2, node.getValue()); + } + assertFalse(action.isExecuted); // no local index execution + assertFalse(responseCalled.get()); // listener not called yet + + BulkItemResponse itemResponse = new BulkItemResponse(0, DocWriteRequest.OpType.CREATE, indexResponse); + BulkItemResponse[] bulkItemResponses = new BulkItemResponse[1]; + bulkItemResponses[0] = itemResponse; + remoteResponseHandler.getValue().handleResponse(new BulkResponse(bulkItemResponses, 0)); // call the listener for the remote node + assertTrue(responseCalled.get()); // now the listener we passed should have been delegated to by the remote listener + assertFalse(action.isExecuted); // still no local index execution + + // now make sure ingest nodes are rotated through with a subsequent request + reset(transportService); + singleItemBulkWriteAction.execute(null, indexRequest, listener); + verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture()); + if (usedNode1) { + assertSame(remoteNode2, node.getValue()); + } else { + assertSame(remoteNode1, node.getValue()); + } + } } diff --git a/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java b/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java index 048c274aa8543..29c55c426d313 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java @@ -117,11 +117,6 @@ private TransportBulkAction createAction(boolean controlled, AtomicLong expected resolver, null, expected::get) { - @Override - public void executeBulk(BulkRequest bulkRequest, ActionListener listener) { - expected.set(1000000); - super.executeBulk(bulkRequest, listener); - } @Override void executeBulk( @@ -146,12 +141,6 @@ void executeBulk( resolver, null, System::nanoTime) { - @Override - public void executeBulk(BulkRequest bulkRequest, ActionListener listener) { - long elapsed = spinForAtLeastOneMillisecond(); - expected.set(elapsed); - super.executeBulk(bulkRequest, listener); - } @Override void executeBulk( diff --git a/core/src/test/java/org/elasticsearch/action/index/TransportIndexActionIngestTests.java b/core/src/test/java/org/elasticsearch/action/index/TransportIndexActionIngestTests.java deleted file mode 100644 index ebc765243c427..0000000000000 --- a/core/src/test/java/org/elasticsearch/action/index/TransportIndexActionIngestTests.java +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.index; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateApplier; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.collect.ImmutableOpenMap; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.ingest.IngestService; -import org.elasticsearch.ingest.PipelineExecutionService; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.transport.TransportResponseHandler; -import org.elasticsearch.transport.TransportService; -import org.junit.Before; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.MockitoAnnotations; - -import java.util.Collections; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; - -import static org.hamcrest.Matchers.sameInstance; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Matchers.same; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.reset; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyZeroInteractions; -import static org.mockito.Mockito.when; - -public class TransportIndexActionIngestTests extends ESTestCase { - - /** Services needed by index action */ - TransportService transportService; - ClusterService clusterService; - IngestService ingestService; - - /** The ingest execution service we can capture calls to */ - PipelineExecutionService executionService; - - /** Arguments to callbacks we want to capture, but which require generics, so we must use @Captor */ - @Captor - ArgumentCaptor> exceptionHandler; - @Captor - ArgumentCaptor> successHandler; - @Captor - ArgumentCaptor> remoteResponseHandler; - - /** The actual action we want to test, with real indexing mocked */ - TestTransportIndexAction action; - - /** True if the next call to the index action should act as an ingest node */ - boolean localIngest; - - /** The nodes that forwarded index requests should be cycled through. */ - DiscoveryNodes nodes; - DiscoveryNode remoteNode1; - DiscoveryNode remoteNode2; - - /** A subclass of the real index action to allow skipping real indexing, and marking when it would have happened. */ - class TestTransportIndexAction extends TransportIndexAction { - boolean isExecuted = false; // set when the "real" index execution happens - TestTransportIndexAction() { - super(Settings.EMPTY, transportService, clusterService, null, ingestService, null, null, null, null, - new ActionFilters(Collections.emptySet()), null, null); - } - @Override - protected boolean shouldAutoCreate(IndexRequest reqest, ClusterState state) { - return false; - } - @Override - protected void innerExecute(Task task, final IndexRequest request, final ActionListener listener) { - isExecuted = true; - } - } - - @Before - public void setupAction() { - // initialize captors, which must be members to use @Capture because of generics - MockitoAnnotations.initMocks(this); - // setup services that will be called by action - transportService = mock(TransportService.class); - clusterService = mock(ClusterService.class); - localIngest = true; - // setup nodes for local and remote - DiscoveryNode localNode = mock(DiscoveryNode.class); - when(localNode.isIngestNode()).thenAnswer(stub -> localIngest); - when(clusterService.localNode()).thenReturn(localNode); - remoteNode1 = mock(DiscoveryNode.class); - remoteNode2 = mock(DiscoveryNode.class); - nodes = mock(DiscoveryNodes.class); - ImmutableOpenMap ingestNodes = ImmutableOpenMap.builder(2) - .fPut("node1", remoteNode1).fPut("node2", remoteNode2).build(); - when(nodes.getIngestNodes()).thenReturn(ingestNodes); - ClusterState state = mock(ClusterState.class); - when(state.getNodes()).thenReturn(nodes); - when(clusterService.state()).thenReturn(state); - doAnswer(invocation -> { - ClusterChangedEvent event = mock(ClusterChangedEvent.class); - when(event.state()).thenReturn(state); - ((ClusterStateApplier)invocation.getArguments()[0]).applyClusterState(event); - return null; - }).when(clusterService).addStateApplier(any(ClusterStateApplier.class)); - // setup the mocked ingest service for capturing calls - ingestService = mock(IngestService.class); - executionService = mock(PipelineExecutionService.class); - when(ingestService.getPipelineExecutionService()).thenReturn(executionService); - action = new TestTransportIndexAction(); - reset(transportService); // call on construction of action - } - - public void testIngestSkipped() throws Exception { - IndexRequest indexRequest = new IndexRequest("index", "type", "id"); - indexRequest.source(Collections.emptyMap()); - action.execute(null, indexRequest, ActionListener.wrap(response -> {}, exception -> { - throw new AssertionError(exception); - })); - assertTrue(action.isExecuted); - verifyZeroInteractions(ingestService); - } - - public void testIngestLocal() throws Exception { - Exception exception = new Exception("fake exception"); - IndexRequest indexRequest = new IndexRequest("index", "type", "id"); - indexRequest.source(Collections.emptyMap()); - indexRequest.setPipeline("testpipeline"); - AtomicBoolean responseCalled = new AtomicBoolean(false); - AtomicBoolean failureCalled = new AtomicBoolean(false); - action.execute(null, indexRequest, ActionListener.wrap( - response -> { - responseCalled.set(true); - }, - e -> { - assertThat(e, sameInstance(exception)); - failureCalled.set(true); - })); - - // check failure works, and passes through to the listener - assertFalse(action.isExecuted); // haven't executed yet - assertFalse(responseCalled.get()); - assertFalse(failureCalled.get()); - verify(executionService).executeIndexRequest(same(indexRequest), exceptionHandler.capture(), successHandler.capture()); - exceptionHandler.getValue().accept(exception); - assertTrue(failureCalled.get()); - - // now check success - successHandler.getValue().accept(true); - assertTrue(action.isExecuted); - assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one - verifyZeroInteractions(transportService); - } - - public void testIngestForward() throws Exception { - localIngest = false; - IndexRequest indexRequest = new IndexRequest("index", "type", "id"); - indexRequest.source(Collections.emptyMap()); - indexRequest.setPipeline("testpipeline"); - IndexResponse indexResponse = mock(IndexResponse.class); - AtomicBoolean responseCalled = new AtomicBoolean(false); - ActionListener listener = ActionListener.wrap( - response -> { - responseCalled.set(true); - assertSame(indexResponse, response); - }, - e -> { - throw new AssertionError(e); - }); - action.execute(null, indexRequest, listener); - - // should not have executed ingest locally - verify(executionService, never()).executeIndexRequest(any(), any(), any()); - // but instead should have sent to a remote node with the transport service - ArgumentCaptor node = ArgumentCaptor.forClass(DiscoveryNode.class); - verify(transportService).sendRequest(node.capture(), eq(IndexAction.NAME), any(), remoteResponseHandler.capture()); - boolean usedNode1 = node.getValue() == remoteNode1; // make sure we used one of the nodes - if (usedNode1 == false) { - assertSame(remoteNode2, node.getValue()); - } - assertFalse(action.isExecuted); // no local index execution - assertFalse(responseCalled.get()); // listener not called yet - - remoteResponseHandler.getValue().handleResponse(indexResponse); // call the listener for the remote node - assertTrue(responseCalled.get()); // now the listener we passed should have been delegated to by the remote listener - assertFalse(action.isExecuted); // still no local index execution - - // now make sure ingest nodes are rotated through with a subsequent request - reset(transportService); - action.execute(null, indexRequest, listener); - verify(transportService).sendRequest(node.capture(), eq(IndexAction.NAME), any(), remoteResponseHandler.capture()); - if (usedNode1) { - assertSame(remoteNode2, node.getValue()); - } else { - assertSame(remoteNode1, node.getValue()); - } - } -} diff --git a/core/src/test/java/org/elasticsearch/action/support/WaitActiveShardCountIT.java b/core/src/test/java/org/elasticsearch/action/support/WaitActiveShardCountIT.java index fbeac88dbe104..2e1a00afc2048 100644 --- a/core/src/test/java/org/elasticsearch/action/support/WaitActiveShardCountIT.java +++ b/core/src/test/java/org/elasticsearch/action/support/WaitActiveShardCountIT.java @@ -52,7 +52,7 @@ public void testReplicationWaitsForActiveShardCount() throws Exception { fail("can't index, does not enough active shard copies"); } catch (UnavailableShardsException e) { assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE)); - assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet shard count of [2] (have 1, needed 2). Timeout: [100ms], request: [index {[test][type1][1], source[{ \"type1\" : { \"id\" : \"1\", \"name\" : \"test\" } }]}]")); + assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet shard count of [2] (have 1, needed 2). Timeout: [100ms], request: [BulkShardRequest to [test] containing [1] requests]")); // but really, all is well } @@ -81,7 +81,7 @@ public void testReplicationWaitsForActiveShardCount() throws Exception { fail("can't index, not enough active shard copies"); } catch (UnavailableShardsException e) { assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE)); - assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet shard count of [" + ActiveShardCount.ALL + "] (have 2, needed 3). Timeout: [100ms], request: [index {[test][type1][1], source[{ \"type1\" : { \"id\" : \"1\", \"name\" : \"test\" } }]}]")); + assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet shard count of [" + ActiveShardCount.ALL + "] (have 2, needed 3). Timeout: [100ms], request: [BulkShardRequest to [test] containing [1] requests]")); // but really, all is well } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 420a96f968eb9..b898db91ba4e9 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -489,9 +489,11 @@ public void testPrimaryPhaseExecutesOrDelegatesRequestToRelocationTarget() throw } action.new AsyncPrimaryAction(request, primaryShard.allocationId().getId(), createTransportChannel(listener), task) { @Override - protected ReplicationOperation createReplicatedOperation(Request request, - ActionListener actionListener, Action.PrimaryShardReference primaryShardReference, - boolean executeOnReplicas) { + protected ReplicationOperation> + createReplicatedOperation(Request request, + ActionListener> actionListener, + TransportReplicationAction.PrimaryShardReference primaryShardReference, + boolean executeOnReplicas) { return new NoopReplicationOperation(request, actionListener) { public void execute() throws Exception { assertPhase(task, "primary"); @@ -539,9 +541,11 @@ public void testPrimaryPhaseExecutesDelegatedRequestOnRelocationTarget() throws AtomicBoolean executed = new AtomicBoolean(); action.new AsyncPrimaryAction(request, primaryShard.allocationId().getRelocationId(), createTransportChannel(listener), task) { @Override - protected ReplicationOperation createReplicatedOperation(Request request, - ActionListener actionListener, Action.PrimaryShardReference primaryShardReference, - boolean executeOnReplicas) { + protected ReplicationOperation> + createReplicatedOperation(Request request, + ActionListener> actionListener, + TransportReplicationAction.PrimaryShardReference primaryShardReference, + boolean executeOnReplicas) { return new NoopReplicationOperation(request, actionListener) { public void execute() throws Exception { assertPhase(task, "primary"); @@ -574,7 +578,7 @@ public void testPrimaryReference() throws Exception { }; Action.PrimaryShardReference primary = action.new PrimaryShardReference(shard, releasable); final Request request = new Request(); - Request replicaRequest = primary.perform(request).replicaRequest; + Request replicaRequest = (Request) primary.perform(request).replicaRequest; assertThat(replicaRequest.primaryTerm(), equalTo(primaryTerm)); @@ -682,13 +686,15 @@ public void testShadowIndexDisablesReplication() throws Exception { action.new AsyncPrimaryAction(new Request(shardId), primaryShard.allocationId().getId(), createTransportChannel(new PlainActionFuture<>()), null) { @Override - protected ReplicationOperation createReplicatedOperation(Request request, - ActionListener actionListener, Action.PrimaryShardReference primaryShardReference, + protected ReplicationOperation> createReplicatedOperation( + Request request, ActionListener> actionListener, + TransportReplicationAction.PrimaryShardReference primaryShardReference, boolean executeOnReplicas) { assertFalse(executeOnReplicas); assertFalse(executed.getAndSet(true)); return new NoopReplicationOperation(request, actionListener); } + }.run(); assertThat(executed.get(), equalTo(true)); } @@ -710,9 +716,11 @@ public void testCounterOnPrimary() throws Exception { final boolean respondWithError = i == 3; action.new AsyncPrimaryAction(request, primaryShard.allocationId().getId(), createTransportChannel(listener), task) { @Override - protected ReplicationOperation createReplicatedOperation(Request request, - ActionListener actionListener, Action.PrimaryShardReference primaryShardReference, - boolean executeOnReplicas) { + protected ReplicationOperation> + createReplicatedOperation(Request request, + ActionListener> actionListener, + TransportReplicationAction.PrimaryShardReference primaryShardReference, + boolean executeOnReplicas) { assertIndexShardCounter(1); if (throwExceptionOnCreation) { throw new ElasticsearchException("simulated exception, during createReplicatedOperation"); @@ -1097,14 +1105,14 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService return indexShard; } - class NoopReplicationOperation extends ReplicationOperation { - public NoopReplicationOperation(Request request, ActionListener listener) { + class NoopReplicationOperation extends ReplicationOperation> { + public NoopReplicationOperation(Request request, ActionListener> listener) { super(request, null, listener, true, null, null, TransportReplicationActionTests.this.logger, "noop"); } @Override public void execute() throws Exception { - this.resultListener.onResponse(action.new PrimaryResult(null, new Response())); + this.resultListener.onResponse(new TransportReplicationAction.PrimaryResult<>(null, new Response())); } } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 92820a1777999..f6f01d30853a1 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -36,7 +36,6 @@ import org.mockito.ArgumentCaptor; import java.util.HashSet; -import java.util.function.BiConsumer; import java.util.function.Consumer; import static org.mockito.Matchers.any; @@ -55,21 +54,27 @@ public void initCommonMocks() { } public void testPrimaryNoRefreshCall() throws Exception { - noRefreshCall(TestAction::shardOperationOnPrimary, TestAction.WritePrimaryResult::respond); + TestRequest request = new TestRequest(); + request.setRefreshPolicy(RefreshPolicy.NONE); // The default, but we'll set it anyway just to be explicit + TestAction testAction = new TestAction(); + TransportWriteAction.WritePrimaryResult result = + testAction.shardOperationOnPrimary(request, indexShard); + CapturingActionListener listener = new CapturingActionListener<>(); + result.respond(listener); + assertNotNull(listener.response); + assertNull(listener.failure); + verify(indexShard, never()).refresh(any()); + verify(indexShard, never()).addRefreshListener(any(), any()); } public void testReplicaNoRefreshCall() throws Exception { - noRefreshCall(TestAction::shardOperationOnReplica, TestAction.WriteReplicaResult::respond); - } - - private void noRefreshCall(ThrowingTriFunction action, - BiConsumer> responder) - throws Exception { TestRequest request = new TestRequest(); request.setRefreshPolicy(RefreshPolicy.NONE); // The default, but we'll set it anyway just to be explicit - Result result = action.apply(new TestAction(), request, indexShard); - CapturingActionListener listener = new CapturingActionListener<>(); - responder.accept(result, listener); + TestAction testAction = new TestAction(); + TransportWriteAction.WriteReplicaResult result = + testAction.shardOperationOnReplica(request, indexShard); + CapturingActionListener listener = new CapturingActionListener<>(); + result.respond(listener); assertNotNull(listener.response); assertNull(listener.failure); verify(indexShard, never()).refresh(any()); @@ -77,47 +82,66 @@ private void noRefreshCall(ThrowingTriFunction assertTrue(r.forcedRefresh)); + TestRequest request = new TestRequest(); + request.setRefreshPolicy(RefreshPolicy.IMMEDIATE); + TestAction testAction = new TestAction(); + TransportWriteAction.WritePrimaryResult result = + testAction.shardOperationOnPrimary(request, indexShard); + CapturingActionListener listener = new CapturingActionListener<>(); + result.respond(listener); + assertNotNull(listener.response); + assertNull(listener.failure); + assertTrue(listener.response.forcedRefresh); + verify(indexShard).refresh("refresh_flag_index"); + verify(indexShard, never()).addRefreshListener(any(), any()); } public void testReplicaImmediateRefresh() throws Exception { - immediateRefresh(TestAction::shardOperationOnReplica, TestAction.WriteReplicaResult::respond, r -> {}); - } - - private void immediateRefresh(ThrowingTriFunction action, - BiConsumer> responder, - Consumer responseChecker) throws Exception { TestRequest request = new TestRequest(); request.setRefreshPolicy(RefreshPolicy.IMMEDIATE); - Result result = action.apply(new TestAction(), request, indexShard); - CapturingActionListener listener = new CapturingActionListener<>(); - responder.accept(result, listener); + TestAction testAction = new TestAction(); + TransportWriteAction.WriteReplicaResult result = + testAction.shardOperationOnReplica(request, indexShard); + CapturingActionListener listener = new CapturingActionListener<>(); + result.respond(listener); assertNotNull(listener.response); assertNull(listener.failure); - responseChecker.accept(listener.response); verify(indexShard).refresh("refresh_flag_index"); verify(indexShard, never()).addRefreshListener(any(), any()); } public void testPrimaryWaitForRefresh() throws Exception { - waitForRefresh(TestAction::shardOperationOnPrimary, TestAction.WritePrimaryResult::respond, - (r, forcedRefresh) -> assertEquals(forcedRefresh, r.forcedRefresh)); - } + TestRequest request = new TestRequest(); + request.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL); - public void testReplicaWaitForRefresh() throws Exception { - waitForRefresh(TestAction::shardOperationOnReplica, TestAction.WriteReplicaResult::respond, (r, forcedRefresh) -> {}); + TestAction testAction = new TestAction(); + TransportWriteAction.WritePrimaryResult result = + testAction.shardOperationOnPrimary(request, indexShard); + CapturingActionListener listener = new CapturingActionListener<>(); + result.respond(listener); + assertNull(listener.response); // Haven't reallresponded yet + + @SuppressWarnings({ "unchecked", "rawtypes" }) + ArgumentCaptor> refreshListener = ArgumentCaptor.forClass((Class) Consumer.class); + verify(indexShard, never()).refresh(any()); + verify(indexShard).addRefreshListener(any(), refreshListener.capture()); + + // Now we can fire the listener manually and we'll get a response + boolean forcedRefresh = randomBoolean(); + refreshListener.getValue().accept(forcedRefresh); + assertNotNull(listener.response); + assertNull(listener.failure); + assertEquals(forcedRefresh, listener.response.forcedRefresh); } - private void waitForRefresh(ThrowingTriFunction action, - BiConsumer> responder, - BiConsumer resultChecker) throws Exception { + public void testReplicaWaitForRefresh() throws Exception { TestRequest request = new TestRequest(); request.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL); - Result result = action.apply(new TestAction(), request, indexShard); - CapturingActionListener listener = new CapturingActionListener<>(); - responder.accept(result, listener); + TestAction testAction = new TestAction(); + TransportWriteAction.WriteReplicaResult result = testAction.shardOperationOnReplica(request, indexShard); + CapturingActionListener listener = new CapturingActionListener<>(); + result.respond(listener); assertNull(listener.response); // Haven't responded yet - @SuppressWarnings({ "unchecked", "rawtypes" }) ArgumentCaptor> refreshListener = ArgumentCaptor.forClass((Class) Consumer.class); verify(indexShard, never()).refresh(any()); @@ -128,13 +152,12 @@ private void waitForRefresh(ThrowingTriFunction.WritePrimaryResult writePrimaryResult = + TransportWriteAction.WritePrimaryResult writePrimaryResult = testAction.shardOperationOnPrimary(request, indexShard); CapturingActionListener listener = new CapturingActionListener<>(); writePrimaryResult.respond(listener); @@ -145,7 +168,7 @@ public void testDocumentFailureInShardOperationOnPrimary() throws Exception { public void testDocumentFailureInShardOperationOnReplica() throws Exception { TestRequest request = new TestRequest(); TestAction testAction = new TestAction(randomBoolean(), true); - TransportWriteAction.WriteReplicaResult writeReplicaResult = + TransportWriteAction.WriteReplicaResult writeReplicaResult = testAction.shardOperationOnReplica(request, indexShard); CapturingActionListener listener = new CapturingActionListener<>(); writeReplicaResult.respond(listener); @@ -176,23 +199,24 @@ protected TestResponse newResponseInstance() { } @Override - protected WritePrimaryResult shardOperationOnPrimary(TestRequest request, IndexShard primary) throws Exception { - final WritePrimaryResult primaryResult; + protected WritePrimaryResult shardOperationOnPrimary( + TestRequest request, IndexShard primary) throws Exception { + final WritePrimaryResult primaryResult; if (withDocumentFailureOnPrimary) { - primaryResult = new WritePrimaryResult(request, null, null, new RuntimeException("simulated"), primary); + primaryResult = new WritePrimaryResult<>(request, null, null, new RuntimeException("simulated"), primary, logger); } else { - primaryResult = new WritePrimaryResult(request, new TestResponse(), location, null, primary); + primaryResult = new WritePrimaryResult<>(request, new TestResponse(), location, null, primary, logger); } return primaryResult; } @Override - protected WriteReplicaResult shardOperationOnReplica(TestRequest request, IndexShard replica) throws Exception { - final WriteReplicaResult replicaResult; + protected WriteReplicaResult shardOperationOnReplica(TestRequest request, IndexShard replica) throws Exception { + final WriteReplicaResult replicaResult; if (withDocumentFailureOnReplica) { - replicaResult = new WriteReplicaResult(request, null, new RuntimeException("simulated"), replica); + replicaResult = new WriteReplicaResult<>(request, null, new RuntimeException("simulated"), replica, logger); } else { - replicaResult = new WriteReplicaResult(request, location, null, replica); + replicaResult = new WriteReplicaResult<>(request, location, null, replica, logger); } return replicaResult; } @@ -227,8 +251,4 @@ public void onFailure(Exception failure) { this.failure = failure; } } - - private interface ThrowingTriFunction { - R apply(A a, B b, C c) throws Exception; - } } diff --git a/core/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java b/core/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java index 9d4c5480e2f0b..b67b9473444b3 100644 --- a/core/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java @@ -111,13 +111,12 @@ public void run() { ClusterBlockException.class, RestStatus.SERVICE_UNAVAILABLE ); - checkWriteAction( - false, timeout, + checkUpdateAction(false, timeout, client().prepareUpdate("test", "type1", "1") .setScript(new Script( ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, "test script", Collections.emptyMap())).setTimeout(timeout)); - checkWriteAction( + checkUpdateAction( true, timeout, client().prepareUpdate("no_index", "type1", "1") .setScript(new Script( @@ -133,18 +132,23 @@ public void run() { BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); bulkRequestBuilder.add(client().prepareIndex("test", "type1", "1").setSource(XContentFactory.jsonBuilder().startObject().endObject())); bulkRequestBuilder.add(client().prepareIndex("test", "type1", "2").setSource(XContentFactory.jsonBuilder().startObject().endObject())); - checkBulkAction(false, bulkRequestBuilder); + // the request should fail very quickly - use a large timeout and make sure it didn't pass... + timeout = new TimeValue(5000); + bulkRequestBuilder.setTimeout(timeout); + checkWriteAction(false, timeout, bulkRequestBuilder); bulkRequestBuilder = client().prepareBulk(); bulkRequestBuilder.add(client().prepareIndex("no_index", "type1", "1").setSource(XContentFactory.jsonBuilder().startObject().endObject())); bulkRequestBuilder.add(client().prepareIndex("no_index", "type1", "2").setSource(XContentFactory.jsonBuilder().startObject().endObject())); - checkBulkAction(true, bulkRequestBuilder); + timeout = new TimeValue(200); + bulkRequestBuilder.setTimeout(timeout); + checkWriteAction(true, timeout, bulkRequestBuilder); internalCluster().startNode(settings); client().admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); } - void checkWriteAction(boolean autoCreateIndex, TimeValue timeout, ActionRequestBuilder builder) { + void checkUpdateAction(boolean autoCreateIndex, TimeValue timeout, ActionRequestBuilder builder) { // we clean the metadata when loosing a master, therefore all operations on indices will auto create it, if allowed long now = System.currentTimeMillis(); try { @@ -162,18 +166,7 @@ void checkWriteAction(boolean autoCreateIndex, TimeValue timeout, ActionRequestB } } - void checkBulkAction(boolean indexShouldBeAutoCreated, BulkRequestBuilder builder) { - // bulk operation do not throw MasterNotDiscoveredException exceptions. The only test that auto create kicked in and failed is - // via the timeout, as bulk operation do not wait on blocks. - TimeValue timeout; - if (indexShouldBeAutoCreated) { - // we expect the bulk to fail because it will try to go to the master. Use small timeout and detect it has passed - timeout = new TimeValue(200); - } else { - // the request should fail very quickly - use a large timeout and make sure it didn't pass... - timeout = new TimeValue(5000); - } - builder.setTimeout(timeout); + void checkWriteAction(boolean indexShouldBeAutoCreated, TimeValue timeout, ActionRequestBuilder builder) { long now = System.currentTimeMillis(); try { builder.get(); @@ -185,7 +178,7 @@ void checkBulkAction(boolean indexShouldBeAutoCreated, BulkRequestBuilder builde assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE)); } else { // timeout is 5000 - assertThat(System.currentTimeMillis() - now, lessThan(timeout.millis() - 50)); + assertThat(System.currentTimeMillis() - now, lessThan(timeout.millis() + 50)); } } } diff --git a/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java b/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java index a6172e1a0f623..73d1d41449043 100644 --- a/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java +++ b/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java @@ -19,6 +19,7 @@ package org.elasticsearch.index; +import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.DocWriteResponse; diff --git a/core/src/test/java/org/elasticsearch/index/mapper/DynamicMappingDisabledTests.java b/core/src/test/java/org/elasticsearch/index/mapper/DynamicMappingDisabledTests.java index bf714714a7e90..aad4e4a495064 100644 --- a/core/src/test/java/org/elasticsearch/index/mapper/DynamicMappingDisabledTests.java +++ b/core/src/test/java/org/elasticsearch/index/mapper/DynamicMappingDisabledTests.java @@ -20,23 +20,32 @@ package org.elasticsearch.index.mapper; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.bulk.TransportBulkAction; +import org.elasticsearch.action.bulk.TransportShardBulkAction; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.TransportIndexAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.AutoCreateIndex; +import org.elasticsearch.action.update.UpdateHelper; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.MockTcpTransport; +import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.local.LocalTransport; import org.junit.After; @@ -56,12 +65,7 @@ public class DynamicMappingDisabledTests extends ESSingleNodeTestCase { private ClusterService clusterService; private LocalTransport transport; private TransportService transportService; - private IndicesService indicesService; - private ShardStateAction shardStateAction; - private ActionFilters actionFilters; - private IndexNameExpressionResolver indexNameExpressionResolver; - private AutoCreateIndex autoCreateIndex; - private Settings settings; + private TransportBulkAction transportBulkAction; @BeforeClass public static void createThreadPool() { @@ -71,7 +75,7 @@ public static void createThreadPool() { @Override public void setUp() throws Exception { super.setUp(); - settings = Settings.builder() + Settings settings = Settings.builder() .put(MapperService.INDEX_MAPPER_DYNAMIC_SETTING.getKey(), false) .build(); clusterService = createClusterService(threadPool); @@ -79,12 +83,17 @@ public void setUp() throws Exception { new NoneCircuitBreakerService()); transportService = new TransportService(clusterService.getSettings(), transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null); - indicesService = getInstanceFromNode(IndicesService.class); - shardStateAction = new ShardStateAction(settings, clusterService, transportService, null, null, threadPool); - actionFilters = new ActionFilters(Collections.emptySet()); - indexNameExpressionResolver = new IndexNameExpressionResolver(settings); - autoCreateIndex = new AutoCreateIndex(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - indexNameExpressionResolver); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + ShardStateAction shardStateAction = new ShardStateAction(settings, clusterService, transportService, null, null, threadPool); + ActionFilters actionFilters = new ActionFilters(Collections.emptySet()); + IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(settings); + AutoCreateIndex autoCreateIndex = new AutoCreateIndex(settings, new ClusterSettings(settings, + ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), indexNameExpressionResolver); + UpdateHelper updateHelper = new UpdateHelper(settings, null); + TransportShardBulkAction shardBulkAction = new TransportShardBulkAction(settings, transportService, clusterService, + indicesService, threadPool, shardStateAction, null, updateHelper, actionFilters, indexNameExpressionResolver); + transportBulkAction = new TransportBulkAction(settings, threadPool, transportService, clusterService, + null, shardBulkAction, null, actionFilters, indexNameExpressionResolver, autoCreateIndex, System::currentTimeMillis); } @After @@ -103,17 +112,20 @@ public static void destroyThreadPool() { } public void testDynamicDisabled() { - TransportIndexAction action = new TransportIndexAction(settings, transportService, clusterService, - indicesService, null, threadPool, shardStateAction, null, null, actionFilters, indexNameExpressionResolver, - autoCreateIndex); - IndexRequest request = new IndexRequest("index", "type", "1"); request.source("foo", 3); + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(request); final AtomicBoolean onFailureCalled = new AtomicBoolean(); - action.execute(request, new ActionListener() { + transportBulkAction.execute(bulkRequest, new ActionListener() { @Override - public void onResponse(IndexResponse indexResponse) { + public void onResponse(BulkResponse bulkResponse) { + BulkItemResponse itemResponse = bulkResponse.getItems()[0]; + assertTrue(itemResponse.isFailed()); + assertThat(itemResponse.getFailure().getCause(), instanceOf(IndexNotFoundException.class)); + assertEquals(itemResponse.getFailure().getCause().getMessage(), "no such index"); + onFailureCalled.set(true); fail("onResponse shouldn't be called"); } diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index d4ada78ab0032..ef3a16755bcab 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.bulk.TransportShardBulkAction; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.TransportIndexAction; @@ -64,8 +65,8 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; -import static org.elasticsearch.action.index.TransportIndexAction.executeIndexRequestOnPrimary; -import static org.elasticsearch.action.index.TransportIndexAction.executeIndexRequestOnReplica; +import static org.elasticsearch.action.bulk.TransportShardBulkAction.executeIndexRequestOnPrimary; +import static org.elasticsearch.action.bulk.TransportShardBulkAction.executeIndexRequestOnReplica; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; @@ -280,7 +281,7 @@ public void failShard(String message, Exception exception) { @Override public IndexingResult perform(IndexRequest request) throws Exception { - Engine.IndexResult indexResult = TransportIndexAction.executeIndexRequestOnPrimary(request, primary, + Engine.IndexResult indexResult = TransportShardBulkAction.executeIndexRequestOnPrimary(request, primary, null); if (indexResult.hasFailure() == false) { // update the version on request so it will happen on the replicas @@ -314,7 +315,7 @@ public void performOn(ShardRouting replicaRouting, IndexRequest request, ActionL try { IndexShard replica = replicationGroup.replicas.stream() .filter(s -> replicaRouting.isSameAllocation(s.routingEntry())).findFirst().get(); - TransportIndexAction.executeIndexRequestOnReplica(request, replica); + TransportShardBulkAction.executeIndexRequestOnReplica(request, replica); listener.onResponse(TransportResponse.Empty.INSTANCE); } catch (Exception t) { listener.onFailure(t);