diff --git a/src/main/java/org/elasticsearch/action/admin/indices/delete/TransportDeleteIndexAction.java b/src/main/java/org/elasticsearch/action/admin/indices/delete/TransportDeleteIndexAction.java index 4031dcfa8a76c..0b935d691fe2a 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/delete/TransportDeleteIndexAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/delete/TransportDeleteIndexAction.java @@ -31,11 +31,10 @@ import org.elasticsearch.cluster.metadata.MetaDataDeleteIndexService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.util.concurrent.atomic.AtomicInteger; - /** * Delete index action. */ @@ -102,9 +101,10 @@ protected ClusterBlockException checkBlock(DeleteIndexRequest request, ClusterSt protected void masterOperation(final DeleteIndexRequest request, final ClusterState state, final ActionListener listener) throws ElasticSearchException { if (request.indices().length == 0) { listener.onResponse(new DeleteIndexResponse(true)); + return; } // TODO: this API should be improved, currently, if one delete index failed, we send a failure, we should send a response array that includes all the indices that were deleted - final AtomicInteger count = new AtomicInteger(request.indices().length); + final CountDown count = new CountDown(request.indices().length); for (final String index : request.indices()) { deleteIndexService.deleteIndex(new MetaDataDeleteIndexService.Request(index).timeout(request.timeout()).masterTimeout(request.masterNodeTimeout()), new MetaDataDeleteIndexService.Listener() { @@ -116,7 +116,7 @@ public void onResponse(MetaDataDeleteIndexService.Response response) { if (!response.acknowledged()) { ack = false; } - if (count.decrementAndGet() == 0) { + if (count.countDown()) { if (lastFailure != null) { listener.onFailure(lastFailure); } else { @@ -129,7 +129,7 @@ public void onResponse(MetaDataDeleteIndexService.Response response) { public void onFailure(Throwable t) { logger.debug("[{}] failed to delete index", t, index); lastFailure = t; - if (count.decrementAndGet() == 0) { + if (count.countDown()) { listener.onFailure(t); } } diff --git a/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerRequest.java index be04adb88c46d..83be17e040410 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerRequest.java @@ -19,24 +19,33 @@ package org.elasticsearch.action.admin.indices.warmer.delete; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.action.support.master.MasterNodeOperationRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; import java.io.IOException; +import static org.elasticsearch.common.unit.TimeValue.readTimeValue; +import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; + /** * A request to delete an index warmer. */ -public class DeleteWarmerRequest extends MasterNodeOperationRequest { +public class DeleteWarmerRequest extends MasterNodeOperationRequest + implements AcknowledgedRequest { private String name; private String[] indices = Strings.EMPTY_ARRAY; + private TimeValue timeout = timeValueSeconds(10); + DeleteWarmerRequest() { } @@ -87,11 +96,31 @@ public String[] indices() { return indices; } + @Override + public DeleteWarmerRequest timeout(String timeout) { + this.timeout = TimeValue.parseTimeValue(timeout, this.timeout); + return this; + } + + @Override + public DeleteWarmerRequest timeout(TimeValue timeout) { + this.timeout = timeout; + return this; + } + + @Override + public TimeValue timeout() { + return timeout; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); name = in.readOptionalString(); indices = in.readStringArray(); + if (in.getVersion().onOrAfter(Version.V_0_90_6)) { + timeout = readTimeValue(in); + } } @Override @@ -99,5 +128,8 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeOptionalString(name); out.writeStringArrayNullable(indices); + if (out.getVersion().onOrAfter(Version.V_0_90_6)) { + timeout.writeTo(out); + } } } diff --git a/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerRequestBuilder.java b/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerRequestBuilder.java index 26b3a0ddab6e1..b15e5fc6ab3c9 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerRequestBuilder.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerRequestBuilder.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; import org.elasticsearch.client.IndicesAdminClient; import org.elasticsearch.client.internal.InternalIndicesAdminClient; +import org.elasticsearch.common.unit.TimeValue; /** * @@ -47,6 +48,23 @@ public DeleteWarmerRequestBuilder setName(String name) { return this; } + /** + * Sets the maximum wait for acknowledgement from other nodes + */ + public DeleteWarmerRequestBuilder setTimeout(TimeValue timeout) { + request.timeout(timeout); + return this; + } + + /** + * Timeout to wait for the operation to be acknowledged by current cluster nodes. Defaults + * to 10s. + */ + public DeleteWarmerRequestBuilder setTimeout(String timeout) { + request.timeout(timeout); + return this; + } + @Override protected void doExecute(ActionListener listener) { ((IndicesAdminClient) client).deleteWarmer(request, listener); diff --git a/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerResponse.java b/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerResponse.java index e5d834ae06abc..0d8b70efb974d 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerResponse.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerResponse.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.admin.indices.warmer.delete; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -28,7 +29,7 @@ /** * A response for a delete warmer. */ -public class DeleteWarmerResponse extends ActionResponse { +public class DeleteWarmerResponse extends ActionResponse implements AcknowledgedResponse { private boolean acknowledged; diff --git a/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/TransportDeleteWarmerAction.java b/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/TransportDeleteWarmerAction.java index d28f98926b6d0..7964814530439 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/TransportDeleteWarmerAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/TransportDeleteWarmerAction.java @@ -23,13 +23,15 @@ import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; @@ -88,7 +90,27 @@ protected ClusterBlockException checkBlock(DeleteWarmerRequest request, ClusterS @Override protected void masterOperation(final DeleteWarmerRequest request, final ClusterState state, final ActionListener listener) throws ElasticSearchException { - clusterService.submitStateUpdateTask("delete_warmer [" + request.name() + "]", new TimeoutClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("delete_warmer [" + request.name() + "]", new AckedClusterStateUpdateTask() { + + @Override + public boolean mustAck(DiscoveryNode discoveryNode) { + return true; + } + + @Override + public void onAllNodesAcked(@Nullable Throwable t) { + listener.onResponse(new DeleteWarmerResponse(true)); + } + + @Override + public void onAckTimeout() { + listener.onResponse(new DeleteWarmerResponse(false)); + } + + @Override + public TimeValue ackTimeout() { + return request.timeout(); + } @Override public TimeValue timeout() { @@ -161,7 +183,7 @@ public ClusterState execute(ClusterState currentState) { @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onResponse(new DeleteWarmerResponse(true)); + } }); } diff --git a/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequest.java index 3824431b53bd6..5d5872f12cdfd 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequest.java @@ -19,27 +19,35 @@ package org.elasticsearch.action.admin.indices.warmer.put; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.action.support.master.MasterNodeOperationRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; import java.io.IOException; import static org.elasticsearch.action.ValidateActions.addValidationError; +import static org.elasticsearch.common.unit.TimeValue.readTimeValue; +import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; /** * A request to put a search warmer. */ -public class PutWarmerRequest extends MasterNodeOperationRequest { +public class PutWarmerRequest extends MasterNodeOperationRequest + implements AcknowledgedRequest { private String name; private SearchRequest searchRequest; + private TimeValue timeout = timeValueSeconds(10); + PutWarmerRequest() { } @@ -86,6 +94,22 @@ SearchRequest searchRequest() { return this.searchRequest; } + @Override + public PutWarmerRequest timeout(String timeout) { + return this; + } + + @Override + public PutWarmerRequest timeout(TimeValue timeout) { + this.timeout = timeout; + return this; + } + + @Override + public TimeValue timeout() { + return timeout; + } + @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = searchRequest.validate(); @@ -103,6 +127,9 @@ public void readFrom(StreamInput in) throws IOException { searchRequest = new SearchRequest(); searchRequest.readFrom(in); } + if (in.getVersion().onOrAfter(Version.V_0_90_6)) { + timeout = readTimeValue(in); + } } @Override @@ -115,5 +142,8 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(true); searchRequest.writeTo(out); } + if (out.getVersion().onOrAfter(Version.V_0_90_6)) { + timeout.writeTo(out); + } } } diff --git a/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequestBuilder.java b/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequestBuilder.java index 13669b324aaf5..77b2143dc7e56 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequestBuilder.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequestBuilder.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; import org.elasticsearch.client.IndicesAdminClient; import org.elasticsearch.client.internal.InternalIndicesAdminClient; +import org.elasticsearch.common.unit.TimeValue; /** * @@ -63,6 +64,23 @@ public PutWarmerRequestBuilder setSearchRequest(SearchRequestBuilder searchReque return this; } + /** + * Sets the maximum wait for acknowledgement from other nodes + */ + public PutWarmerRequestBuilder setTimeout(TimeValue timeout) { + request.timeout(timeout); + return this; + } + + /** + * Timeout to wait for the operation to be acknowledged by current cluster nodes. Defaults + * to 10s. + */ + public PutWarmerRequestBuilder setTimeout(String timeout) { + request.timeout(timeout); + return this; + } + @Override protected void doExecute(ActionListener listener) { ((IndicesAdminClient) client).putWarmer(request, listener); diff --git a/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerResponse.java b/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerResponse.java index f0f58f4347dd3..c7daac75a9567 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerResponse.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerResponse.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.admin.indices.warmer.put; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -28,7 +29,7 @@ /** * The response of put warmer operation. */ -public class PutWarmerResponse extends ActionResponse { +public class PutWarmerResponse extends ActionResponse implements AcknowledgedResponse { private boolean acknowledged; diff --git a/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/TransportPutWarmerAction.java b/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/TransportPutWarmerAction.java index f151a40d1b793..ca42f75a0f735 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/TransportPutWarmerAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/TransportPutWarmerAction.java @@ -24,13 +24,15 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -96,7 +98,27 @@ public void onResponse(SearchResponse searchResponse) { return; } - clusterService.submitStateUpdateTask("put_warmer [" + request.name() + "]", new TimeoutClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("put_warmer [" + request.name() + "]", new AckedClusterStateUpdateTask() { + + @Override + public boolean mustAck(DiscoveryNode discoveryNode) { + return true; + } + + @Override + public void onAllNodesAcked(@Nullable Throwable t) { + listener.onResponse(new PutWarmerResponse(true)); + } + + @Override + public void onAckTimeout() { + listener.onResponse(new PutWarmerResponse(false)); + } + + @Override + public TimeValue ackTimeout() { + return request.timeout(); + } @Override public TimeValue timeout() { @@ -161,7 +183,7 @@ public ClusterState execute(ClusterState currentState) { @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onResponse(new PutWarmerResponse(true)); + } }); } diff --git a/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java b/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java index 364927a04e520..bef76264b6c8c 100644 --- a/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java +++ b/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.BaseTransportRequestHandler; @@ -36,7 +37,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.action.search.type.TransportSearchHelper.parseScrollId; @@ -64,7 +64,7 @@ protected void doExecute(ClearScrollRequest request, final ActionListener[]> contexts = new ArrayList[]>(); final AtomicReference expHolder; @@ -86,11 +86,11 @@ private Async(ClearScrollRequest request, ActionListener li this.request = request; this.listener = listener; this.expHolder = new AtomicReference(); - this.expectedOps = new AtomicInteger(expectedOps); + this.expectedOps = new CountDown(expectedOps); } public void run() { - if (expectedOps.get() == 0) { + if (expectedOps.isCountedDown()) { listener.onResponse(new ClearScrollResponse(true)); return; } @@ -135,8 +135,7 @@ public void onFailure(Throwable e) { } void onFreedContext() { - assert expectedOps.get() > 0; - if (expectedOps.decrementAndGet() == 0) { + if (expectedOps.countDown()) { boolean succeeded = expHolder.get() == null; listener.onResponse(new ClearScrollResponse(succeeded)); } @@ -144,8 +143,7 @@ void onFreedContext() { void onFailedFreedContext(Throwable e, DiscoveryNode node) { logger.warn("Clear SC failed on node[{}]", e, node); - assert expectedOps.get() > 0; - if (expectedOps.decrementAndGet() == 0) { + if (expectedOps.countDown()) { listener.onResponse(new ClearScrollResponse(false)); } else { expHolder.set(e); diff --git a/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java b/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java new file mode 100644 index 0000000000000..3e25ee9aa6ffb --- /dev/null +++ b/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java @@ -0,0 +1,49 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.elasticsearch.action.support.master; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.common.unit.TimeValue; + +/** + * Interface that allows to mark action requests that support acknowledgements. + * Facilitates consistency across different api. + */ +public interface AcknowledgedRequest> { + + /** + * Allows to set the timeout + * @param timeout timeout as a string (e.g. 1s) + * @return the request itself + */ + T timeout(String timeout); + + /** + * Allows to set the timeout + * @param timeout timeout as a {@link TimeValue} + * @return the request itself + */ + T timeout(TimeValue timeout); + + /** + * Returns the current timeout + * @return the current timeout as a {@link TimeValue} + */ + TimeValue timeout(); +} diff --git a/src/main/java/org/elasticsearch/action/support/master/AcknowledgedResponse.java b/src/main/java/org/elasticsearch/action/support/master/AcknowledgedResponse.java new file mode 100644 index 0000000000000..90bf831a892cd --- /dev/null +++ b/src/main/java/org/elasticsearch/action/support/master/AcknowledgedResponse.java @@ -0,0 +1,32 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.elasticsearch.action.support.master; + +/** + * Interface that allows to mark action responses that support acknowledgements. + * Facilitates consistency across different api. + */ +public interface AcknowledgedResponse { + + /** + * Returns whether the response is acknowledged or not + * @return true if the response is acknowledged, false otherwise + */ + boolean isAcknowledged(); +} diff --git a/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java b/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java new file mode 100644 index 0000000000000..ca4f697df74b3 --- /dev/null +++ b/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java @@ -0,0 +1,55 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.elasticsearch.cluster; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.unit.TimeValue; + +/** + * An extension interface to {@link ClusterStateUpdateTask} that allows to be notified when + * all the nodes have acknowledged a cluster state update request + */ +public interface AckedClusterStateUpdateTask extends TimeoutClusterStateUpdateTask { + + /** + * Called to determine which nodes the acknowledgement is expected from + * @param discoveryNode a node + * @return true if the node is expected to send ack back, false otherwise + */ + boolean mustAck(DiscoveryNode discoveryNode); + + /** + * Called once all the nodes have acknowledged the cluster state update request. Must be + * very lightweight execution, since it gets executed on the cluster service thread. + * @param t optional error that might have been thrown + */ + void onAllNodesAcked(@Nullable Throwable t); + + /** + * Called once the acknowledgement timeout defined by + * {@link AckedClusterStateUpdateTask#ackTimeout()} has expired + */ + void onAckTimeout(); + + /** + * Acknowledgement timeout, maximum time interval to wait for acknowledgements + */ + TimeValue ackTimeout(); +} diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesService.java index 619f598e3bde0..c8b16bf247c4e 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesService.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.Index; @@ -44,8 +45,6 @@ import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder; import static org.elasticsearch.cluster.metadata.IndexMetaData.newIndexMetaDataBuilder; @@ -243,13 +242,12 @@ public boolean acknowledged() { private class CountDownListener implements NodeAliasesUpdatedAction.Listener { - private final AtomicBoolean notified = new AtomicBoolean(); - private final AtomicInteger countDown; + private final CountDown countDown; private final Listener listener; private final long version; public CountDownListener(int countDown, Listener listener, long version) { - this.countDown = new AtomicInteger(countDown); + this.countDown = new CountDown(countDown); this.listener = listener; this.version = version; } @@ -258,20 +256,18 @@ public CountDownListener(int countDown, Listener listener, long version) { public void onAliasesUpdated(NodeAliasesUpdatedAction.NodeAliasesUpdatedResponse response) { if (version <= response.version()) { logger.trace("Received NodeAliasesUpdatedResponse with version [{}] from [{}]", response.version(), response.nodeId()); - if (countDown.decrementAndGet() == 0) { + if (countDown.countDown()) { aliasOperationPerformedAction.remove(this); - if (notified.compareAndSet(false, true)) { - logger.trace("NodeAliasUpdated was acknowledged by all expected nodes, returning"); - listener.onResponse(new Response(true)); - } + logger.trace("NodeAliasUpdated was acknowledged by all expected nodes, returning"); + listener.onResponse(new Response(true)); } } } @Override public void onTimeout() { - aliasOperationPerformedAction.remove(this); - if (notified.compareAndSet(false, true)) { + if (countDown.fastForward()) { + aliasOperationPerformedAction.remove(this); listener.onResponse(new Response(false)); } } diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java index 646863232a50c..34a90a0a2faec 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java @@ -38,6 +38,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.index.Index; import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.indices.IndexPrimaryShardNotAllocatedException; @@ -46,8 +47,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; /** * @@ -273,14 +272,12 @@ public boolean acknowledged() { } private class CountDownListener implements NodeIndicesStateUpdatedAction.Listener { - - private final AtomicBoolean notified = new AtomicBoolean(); - private final AtomicInteger countDown; + private final CountDown countDown; private final Listener listener; private final long version; - public CountDownListener(int countDown, Listener listener, long version) { - this.countDown = new AtomicInteger(countDown); + public CountDownListener(int count, Listener listener, long version) { + this.countDown = new CountDown(count); this.listener = listener; this.version = version; } @@ -289,20 +286,18 @@ public CountDownListener(int countDown, Listener listener, long version) { public void onIndexStateUpdated(NodeIndicesStateUpdatedAction.NodeIndexStateUpdatedResponse response) { if (version <= response.version()) { logger.trace("Received NodeIndexStateUpdatedResponse with version [{}] from [{}]", response.version(), response.nodeId()); - if (countDown.decrementAndGet() == 0) { + if (countDown.countDown()) { indicesStateUpdatedAction.remove(this); - if (notified.compareAndSet(false, true)) { - logger.trace("NodeIndexStateUpdated was acknowledged by all expected nodes, returning"); - listener.onResponse(new Response(true)); - } + logger.trace("NodeIndexStateUpdated was acknowledged by all expected nodes, returning"); + listener.onResponse(new Response(true)); } } } @Override public void onTimeout() { - indicesStateUpdatedAction.remove(this); - if (notified.compareAndSet(false, true)) { + if (countDown.fastForward()) { + indicesStateUpdatedAction.remove(this); listener.onResponse(new Response(false)); } } diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java index 1ed50e09df48e..9fd5312c8f2dc 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java @@ -36,6 +36,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.index.Index; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MapperService; @@ -49,8 +50,6 @@ import java.util.*; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import static com.google.common.collect.Maps.newHashMap; import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder; @@ -593,8 +592,7 @@ public boolean acknowledged() { private class CountDownListener implements NodeMappingCreatedAction.Listener { - private final AtomicBoolean notified = new AtomicBoolean(); - private final AtomicInteger countDown; + private final CountDown countDown; private final Listener listener; private final long minClusterStateVersion; @@ -604,7 +602,7 @@ private class CountDownListener implements NodeMappingCreatedAction.Listener { * @param listener listener to call when counter reaches 0. */ public CountDownListener(int countDown, long minClusterStateVersion, Listener listener) { - this.countDown = new AtomicInteger(countDown); + this.countDown = new CountDown(countDown); this.listener = listener; this.minClusterStateVersion = minClusterStateVersion; } @@ -619,18 +617,16 @@ public void onNodeMappingCreated(NodeMappingCreatedAction.NodeMappingCreatedResp } public void decrementCounter() { - if (countDown.decrementAndGet() == 0) { + if (countDown.countDown()) { mappingCreatedAction.remove(this); - if (notified.compareAndSet(false, true)) { - listener.onResponse(new Response(true)); - } + listener.onResponse(new Response(true)); } } @Override public void onTimeout() { - mappingCreatedAction.remove(this); - if (notified.compareAndSet(false, true)) { + if (countDown.fastForward()) { + mappingCreatedAction.remove(this); listener.onResponse(new Response(false)); } } diff --git a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index 3e9f769a0deea..ffa71de669a51 100644 --- a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.operation.OperationRouting; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; @@ -49,6 +50,7 @@ import java.util.List; import java.util.Queue; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder; import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; @@ -317,6 +319,7 @@ public void run() { } try { + Discovery.AckListener ackListener = new NoOpAckListener(); if (newClusterState.nodes().localNodeMaster()) { // only the master controls the version numbers Builder builder = ClusterState.builder().state(newClusterState).version(newClusterState.version() + 1); @@ -327,6 +330,19 @@ public void run() { builder.metaData(MetaData.builder().metaData(newClusterState.metaData()).version(newClusterState.metaData().version() + 1)); } newClusterState = builder.build(); + + if (updateTask instanceof AckedClusterStateUpdateTask) { + final AckedClusterStateUpdateTask ackedUpdateTask = (AckedClusterStateUpdateTask) updateTask; + try { + ackListener = new AckCountDownListener(ackedUpdateTask, newClusterState.version(), newClusterState.nodes(), threadPool); + } catch(EsRejectedExecutionException ex) { + if (logger.isDebugEnabled()) { + logger.debug("Couldn't schedule timeout thread - node might be shutting down", ex); + } + //timeout straightaway, otherwise we could wait forever as the timeout thread has not started + ackedUpdateTask.onAckTimeout(); + } + } } else { if (previousClusterState.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK) && !newClusterState.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK)) { // force an update, its a fresh update from the master as we transition from a start of not having a master to having one @@ -381,7 +397,7 @@ public void run() { // we don't want to notify if (newClusterState.nodes().localNodeMaster()) { logger.debug("publishing cluster state version {}", newClusterState.version()); - discoveryService.publish(newClusterState); + discoveryService.publish(newClusterState, ackListener); } // update the current cluster state @@ -409,18 +425,26 @@ public void run() { }); } + //manual ack only from the master at the end of the publish + if (newClusterState.nodes().localNodeMaster()) { + try { + ackListener.onNodeAck(localNode(), null); + } catch(Throwable t) { + logger.debug("error while processing ack for master node [{}]", t, newClusterState.nodes().localNode()); + } + } if (updateTask instanceof ProcessedClusterStateUpdateTask) { ((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(source, previousClusterState, newClusterState); } logger.debug("processing [{}]: done applying updated cluster_state (version: {})", source, newClusterState.version()); - } catch (Exception e) { + } catch (Throwable t) { StringBuilder sb = new StringBuilder("failed to apply updated cluster state:\nversion [").append(newClusterState.version()).append("], source [").append(source).append("]\n"); sb.append(newClusterState.nodes().prettyPrint()); sb.append(newClusterState.routingTable().prettyPrint()); sb.append(newClusterState.readOnlyRoutingNodes().prettyPrint()); - logger.warn(sb.toString(), e); + logger.warn(sb.toString(), t); // TODO: do we want to call updateTask.onFailure here? } } @@ -584,4 +608,69 @@ public void run() { listener.offMaster(); } } + + private static class NoOpAckListener implements Discovery.AckListener { + @Override + public void onNodeAck(DiscoveryNode node, @Nullable Throwable t) { + } + + @Override + public void onTimeout() { + } + } + + private class AckCountDownListener implements Discovery.AckListener { + private final AckedClusterStateUpdateTask ackedUpdateTask; + private final long version; + private final CountDown countDown; + private final Future ackTimeoutCallback; + private Throwable lastFailure; + + AckCountDownListener(AckedClusterStateUpdateTask ackedUpdateTask, long clusterStateVersion, DiscoveryNodes nodes, ThreadPool threadPool) { + this.ackedUpdateTask = ackedUpdateTask; + this.version = clusterStateVersion; + int countDown = 0; + for (DiscoveryNode node : nodes) { + if (ackedUpdateTask.mustAck(node)) { + countDown++; + } + } + logger.trace("expecting {} acknowledgements for cluster_state update (version: {})", countDown, version); + this.countDown = new CountDown(countDown); + this.ackTimeoutCallback = threadPool.schedule(ackedUpdateTask.ackTimeout(), ThreadPool.Names.GENERIC, new Runnable() { + @Override + public void run() { + onTimeout(); + } + }); + } + + @Override + public void onNodeAck(DiscoveryNode node, @Nullable Throwable t) { + if (!ackedUpdateTask.mustAck(node)) { + return; + } + if (t == null) { + logger.trace("ack received from node [{}], cluster_state update (version: {})", node, version); + } else { + this.lastFailure = t; + logger.debug("ack received from node [{}], cluster_state update (version: {})", t, node, version); + } + + if (countDown.countDown()) { + logger.trace("all expected nodes acknowledged cluster_state update (version: {})", version); + ackTimeoutCallback.cancel(true); + ackedUpdateTask.onAllNodesAcked(lastFailure); + } + } + + @Override + public void onTimeout() { + if (countDown.fastForward()) { + logger.trace("timeout waiting for acknowledgement for cluster_state update (version: {})", version); + ackedUpdateTask.onAckTimeout(); + } + } + } + } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/common/util/concurrent/CountDown.java b/src/main/java/org/elasticsearch/common/util/concurrent/CountDown.java new file mode 100644 index 0000000000000..d9e2b961a971e --- /dev/null +++ b/src/main/java/org/elasticsearch/common/util/concurrent/CountDown.java @@ -0,0 +1,81 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.ElasticSearchIllegalArgumentException; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A simple thread safe count-down class that in contrast to a {@link CountDownLatch} + * never blocks. This class is useful if a certain action has to wait for N concurrent + * tasks to return or a timeout to occur in order to proceed. + */ +public final class CountDown { + + private final AtomicInteger countDown; + private final int originalCount; + + public CountDown(int count) { + if (count < 0) { + throw new ElasticSearchIllegalArgumentException("count must be greater or equal to 0 but was: " + count); + } + this.originalCount = count; + this.countDown = new AtomicInteger(count); + } + + /** + * Decrements the count-down and returns true iff this call + * reached zero otherwise false + */ + public boolean countDown() { + assert originalCount > 0; + for (;;) { + final int current = countDown.get(); + assert current >= 0; + if (current == 0) { + return false; + } + if (countDown.compareAndSet(current, current - 1)) { + return current == 1; + } + } + } + + /** + * Fast forwards the count-down to zero and returns true iff + * the count down reached zero with this fast forward call otherwise + * false + */ + public boolean fastForward() { + assert originalCount > 0; + assert countDown.get() >= 0; + return countDown.getAndSet(0) > 0; + } + + /** + * Returns true iff the count-down has reached zero. Otherwise false + */ + public boolean isCountedDown() { + assert countDown.get() >= 0; + return countDown.get() == 0; + } +} diff --git a/src/main/java/org/elasticsearch/discovery/AckClusterStatePublishResponseHandler.java b/src/main/java/org/elasticsearch/discovery/AckClusterStatePublishResponseHandler.java new file mode 100644 index 0000000000000..ee6f5c4473c51 --- /dev/null +++ b/src/main/java/org/elasticsearch/discovery/AckClusterStatePublishResponseHandler.java @@ -0,0 +1,71 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.elasticsearch.discovery; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.ESLoggerFactory; + +/** + * Allows to wait for all nodes to reply to the publish of a new cluster state + * and notifies the {@link org.elasticsearch.discovery.Discovery.AckListener} + * so that the cluster state update can be acknowledged + */ +public class AckClusterStatePublishResponseHandler extends BlockingClusterStatePublishResponseHandler { + + private static final ESLogger logger = ESLoggerFactory.getLogger(AckClusterStatePublishResponseHandler.class.getName()); + + private final Discovery.AckListener ackListener; + + /** + * Creates a new AckClusterStatePublishResponseHandler + * @param nonMasterNodes number of nodes that are supposed to reply to a cluster state publish from master + * @param ackListener the {@link org.elasticsearch.discovery.Discovery.AckListener} to notify for each response + * gotten from non master nodes + */ + public AckClusterStatePublishResponseHandler(int nonMasterNodes, Discovery.AckListener ackListener) { + //Don't count the master as acknowledged, because it's not done yet + //otherwise we might end up with all the nodes but the master holding the latest cluster state + super(nonMasterNodes); + this.ackListener = ackListener; + } + + @Override + public void onResponse(DiscoveryNode node) { + super.onResponse(node); + onNodeAck(ackListener, node, null); + } + + @Override + public void onFailure(DiscoveryNode node, Throwable t) { + try { + super.onFailure(node, t); + } finally { + onNodeAck(ackListener, node, t); + } + } + + private void onNodeAck(final Discovery.AckListener ackListener, DiscoveryNode node, Throwable t) { + try { + ackListener.onNodeAck(node, t); + } catch (Throwable t1) { + logger.debug("error while processing ack for node [{}]", t1, node); + } + } +} diff --git a/src/main/java/org/elasticsearch/discovery/BlockingClusterStatePublishResponseHandler.java b/src/main/java/org/elasticsearch/discovery/BlockingClusterStatePublishResponseHandler.java new file mode 100644 index 0000000000000..8568e2b823e26 --- /dev/null +++ b/src/main/java/org/elasticsearch/discovery/BlockingClusterStatePublishResponseHandler.java @@ -0,0 +1,60 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.elasticsearch.discovery; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.unit.TimeValue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + + +/** + * Default implementation of {@link ClusterStatePublishResponseHandler}, allows to await a reply + * to a cluster state publish from all non master nodes, up to a timeout + */ +public class BlockingClusterStatePublishResponseHandler implements ClusterStatePublishResponseHandler { + + private final CountDownLatch latch; + + /** + * Creates a new BlockingClusterStatePublishResponseHandler + * @param nonMasterNodes number of nodes that are supposed to reply to a cluster state publish from master + */ + public BlockingClusterStatePublishResponseHandler(int nonMasterNodes) { + //Don't count the master, as it's the one that does the publish + //the master won't call onResponse either + this.latch = new CountDownLatch(nonMasterNodes); + } + + @Override + public void onResponse(DiscoveryNode node) { + latch.countDown(); + } + + @Override + public void onFailure(DiscoveryNode node, Throwable t) { + latch.countDown(); + } + + @Override + public boolean awaitAllNodes(TimeValue timeout) throws InterruptedException { + return latch.await(timeout.millis(), TimeUnit.MILLISECONDS); + } +} diff --git a/src/main/java/org/elasticsearch/discovery/ClusterStatePublishResponseHandler.java b/src/main/java/org/elasticsearch/discovery/ClusterStatePublishResponseHandler.java new file mode 100644 index 0000000000000..9a0344f5b0cb2 --- /dev/null +++ b/src/main/java/org/elasticsearch/discovery/ClusterStatePublishResponseHandler.java @@ -0,0 +1,49 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.elasticsearch.discovery; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.unit.TimeValue; + +/** + * Handles responses obtained when publishing a new cluster state from master to all non master nodes. + * Allows to await a reply from all non master nodes, up to a timeout + */ +public interface ClusterStatePublishResponseHandler { + + /** + * Called for each response obtained from non master nodes + * @param node the node that replied to the publish event + */ + void onResponse(DiscoveryNode node); + + /** + * Called for each failure obtained from non master nodes + * @param node the node that replied to the publish event + */ + void onFailure(DiscoveryNode node, Throwable t); + + /** + * Allows to wait for all non master nodes to reply to the publish event up to a timeout + * @param timeout the timeout + * @return true if the timeout expired or not, false otherwise + * @throws InterruptedException + */ + boolean awaitAllNodes(TimeValue timeout) throws InterruptedException; +} diff --git a/src/main/java/org/elasticsearch/discovery/Discovery.java b/src/main/java/org/elasticsearch/discovery/Discovery.java index c8597349f879e..5dae73de02eab 100644 --- a/src/main/java/org/elasticsearch/discovery/Discovery.java +++ b/src/main/java/org/elasticsearch/discovery/Discovery.java @@ -24,8 +24,8 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.LifecycleComponent; -import org.elasticsearch.common.inject.internal.Nullable; import org.elasticsearch.node.service.NodeService; import org.elasticsearch.rest.RestStatus; @@ -60,6 +60,14 @@ public interface Discovery extends LifecycleComponent { /** * Publish all the changes to the cluster from the master (can be called just by the master). The publish * process should not publish this state to the master as well! (the master is sending it...). + * + * The {@link AckListener} allows to keep track of the ack received from nodes, and verify whether + * they updated their own cluster state or not. */ - void publish(ClusterState clusterState); + void publish(ClusterState clusterState, AckListener ackListener); + + public static interface AckListener { + void onNodeAck(DiscoveryNode node, @Nullable Throwable t); + void onTimeout(); + } } diff --git a/src/main/java/org/elasticsearch/discovery/DiscoveryService.java b/src/main/java/org/elasticsearch/discovery/DiscoveryService.java index 206120c1b3be3..69e890435b91a 100644 --- a/src/main/java/org/elasticsearch/discovery/DiscoveryService.java +++ b/src/main/java/org/elasticsearch/discovery/DiscoveryService.java @@ -107,11 +107,13 @@ public String nodeDescription() { /** * Publish all the changes to the cluster from the master (can be called just by the master). The publish * process should not publish this state to the master as well! (the master is sending it...). + * + * The {@link org.elasticsearch.discovery.Discovery.AckListener} allows to acknowledge the publish + * event based on the response gotten from all nodes */ - public void publish(ClusterState clusterState) { - if (!lifecycle.started()) { - return; + public void publish(ClusterState clusterState, Discovery.AckListener ackListener) { + if (lifecycle.started()) { + discovery.publish(clusterState, ackListener); } - discovery.publish(clusterState); } } diff --git a/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java b/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java index f3eedc533fbec..049437bdf7862 100644 --- a/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java @@ -35,8 +35,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.discovery.InitialStateDiscoveryListener; +import org.elasticsearch.discovery.*; import org.elasticsearch.node.service.NodeService; import org.elasticsearch.transport.TransportService; @@ -44,8 +43,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -58,6 +55,8 @@ */ public class LocalDiscovery extends AbstractLifecycleComponent implements Discovery { + private static final LocalDiscovery[] NO_MEMBERS = new LocalDiscovery[0]; + private final TransportService transportService; private final ClusterService clusterService; private final DiscoveryNodeService discoveryNodeService; @@ -277,24 +276,33 @@ public String nodeDescription() { return clusterName.value() + "/" + localNode.id(); } - @Override - public void publish(ClusterState clusterState) { + public void publish(ClusterState clusterState, final Discovery.AckListener ackListener) { if (!master) { throw new ElasticSearchIllegalStateException("Shouldn't publish state when not master"); } + LocalDiscovery[] members = members(); + if (members.length > 0) { + publish(members, clusterState, new AckClusterStatePublishResponseHandler(members.length - 1, ackListener)); + } + } + + private LocalDiscovery[] members() { ClusterGroup clusterGroup = clusterGroups.get(clusterName); if (clusterGroup == null) { - // nothing to publish to - return; + return NO_MEMBERS; } + Queue members = clusterGroup.members(); + return members.toArray(new LocalDiscovery[members.size()]); + } + + private void publish(LocalDiscovery[] members, ClusterState clusterState, final ClusterStatePublishResponseHandler publishResponseHandler) { + try { // we do the marshaling intentionally, to check it works well... final byte[] clusterStateBytes = Builder.toBytes(clusterState); - LocalDiscovery[] members = clusterGroup.members().toArray(new LocalDiscovery[0]); - final CountDownLatch latch = new CountDownLatch(members.length); - for (LocalDiscovery discovery : members) { + + for (final LocalDiscovery discovery : members) { if (discovery.master) { - latch.countDown(); continue; } final ClusterState nodeSpecificClusterState = ClusterState.Builder.fromBytes(clusterStateBytes, discovery.localNode); @@ -318,23 +326,23 @@ public ClusterState execute(ClusterState currentState) { @Override public void onFailure(String source, Throwable t) { logger.error("unexpected failure during [{}]", t, source); - latch.countDown(); + publishResponseHandler.onFailure(discovery.localNode, t); } @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { sendInitialStateEventIfNeeded(); - latch.countDown(); + publishResponseHandler.onResponse(discovery.localNode); } }); } else { - latch.countDown(); + publishResponseHandler.onResponse(discovery.localNode); } } if (publishTimeout.millis() > 0) { try { - boolean awaited = latch.await(publishTimeout.millis(), TimeUnit.MILLISECONDS); + boolean awaited = publishResponseHandler.awaitAllNodes(publishTimeout); if (!awaited) { logger.debug("awaiting all nodes to process published state {} timed out, timeout {}", clusterState.version(), publishTimeout); } diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 170bc841d3510..678b4c673a0f6 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -262,13 +262,13 @@ public NodeService nodeService() { } @Override - public void publish(ClusterState clusterState) { + public void publish(ClusterState clusterState, AckListener ackListener) { if (!master) { throw new ElasticSearchIllegalStateException("Shouldn't publish state when not master"); } latestDiscoNodes = clusterState.nodes(); nodesFD.updateNodes(clusterState.nodes()); - publishClusterState.publish(clusterState); + publishClusterState.publish(clusterState, ackListener); } private void asyncJoinCluster() { diff --git a/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index f8618ac22e3b9..13536a1aa3f9d 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -30,14 +30,15 @@ import org.elasticsearch.common.io.stream.*; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.discovery.AckClusterStatePublishResponseHandler; +import org.elasticsearch.discovery.ClusterStatePublishResponseHandler; +import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; import java.io.IOException; import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; /** * @@ -78,16 +79,18 @@ public void close() { transportService.removeHandler(PublishClusterStateRequestHandler.ACTION); } - public void publish(ClusterState clusterState) { + public void publish(ClusterState clusterState, final Discovery.AckListener ackListener) { + publish(clusterState, new AckClusterStatePublishResponseHandler(clusterState.nodes().size()-1, ackListener)); + } + + private void publish(ClusterState clusterState, final ClusterStatePublishResponseHandler publishResponseHandler) { + DiscoveryNode localNode = nodesProvider.nodes().localNode(); Map serializedStates = Maps.newHashMap(); - final CountDownLatch latch = new CountDownLatch(clusterState.nodes().size()); for (final DiscoveryNode node : clusterState.nodes()) { if (node.equals(localNode)) { - // no need to send to our self - latch.countDown(); continue; } // try and serialize the cluster state once (or per version), so we don't serialize it @@ -104,7 +107,7 @@ public void publish(ClusterState clusterState) { serializedStates.put(node.version(), bytes); } catch (Throwable e) { logger.warn("failed to serialize cluster_state before publishing it to node {}", e, node); - latch.countDown(); + publishResponseHandler.onFailure(node, e); continue; } } @@ -120,24 +123,25 @@ public void publish(ClusterState clusterState) { @Override public void handleResponse(TransportResponse.Empty response) { - latch.countDown(); + publishResponseHandler.onResponse(node); } @Override public void handleException(TransportException exp) { logger.debug("failed to send cluster state to [{}]", exp, node); - latch.countDown(); + publishResponseHandler.onFailure(node, exp); } }); } catch (Throwable t) { - latch.countDown(); + logger.debug("error sending cluster state to [{}]", t, node); + publishResponseHandler.onFailure(node, t); } } if (publishTimeout.millis() > 0) { // only wait if the publish timeout is configured... try { - boolean awaited = latch.await(publishTimeout.millis(), TimeUnit.MILLISECONDS); + boolean awaited = publishResponseHandler.awaitAllNodes(publishTimeout); if (!awaited) { logger.debug("awaiting all nodes to process published state {} timed out, timeout {}", clusterState.version(), publishTimeout); } diff --git a/src/main/java/org/elasticsearch/rest/action/admin/indices/warmer/delete/RestDeleteWarmerAction.java b/src/main/java/org/elasticsearch/rest/action/admin/indices/warmer/delete/RestDeleteWarmerAction.java index 1388f5dc35e07..8682d950cee92 100644 --- a/src/main/java/org/elasticsearch/rest/action/admin/indices/warmer/delete/RestDeleteWarmerAction.java +++ b/src/main/java/org/elasticsearch/rest/action/admin/indices/warmer/delete/RestDeleteWarmerAction.java @@ -52,6 +52,7 @@ public void handleRequest(final RestRequest request, final RestChannel channel) DeleteWarmerRequest deleteWarmerRequest = new DeleteWarmerRequest(request.param("name")) .indices(Strings.splitStringByCommaToArray(request.param("index"))); deleteWarmerRequest.listenerThreaded(false); + deleteWarmerRequest.timeout(request.paramAsTime("timeout", deleteWarmerRequest.timeout())); deleteWarmerRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteWarmerRequest.masterNodeTimeout())); client.admin().indices().deleteWarmer(deleteWarmerRequest, new ActionListener() { @Override diff --git a/src/main/java/org/elasticsearch/rest/action/admin/indices/warmer/put/RestPutWarmerAction.java b/src/main/java/org/elasticsearch/rest/action/admin/indices/warmer/put/RestPutWarmerAction.java index 1c283581281a6..47ee0f08c8429 100644 --- a/src/main/java/org/elasticsearch/rest/action/admin/indices/warmer/put/RestPutWarmerAction.java +++ b/src/main/java/org/elasticsearch/rest/action/admin/indices/warmer/put/RestPutWarmerAction.java @@ -55,6 +55,7 @@ public void handleRequest(final RestRequest request, final RestChannel channel) .types(Strings.splitStringByCommaToArray(request.param("type"))) .source(request.content(), request.contentUnsafe()); putWarmerRequest.searchRequest(searchRequest); + putWarmerRequest.timeout(request.paramAsTime("timeout", putWarmerRequest.timeout())); putWarmerRequest.masterNodeTimeout(request.paramAsTime("master_timeout", putWarmerRequest.masterNodeTimeout())); client.admin().indices().putWarmer(putWarmerRequest, new ActionListener() { @Override diff --git a/src/test/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerRequestTests.java b/src/test/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerRequestTests.java new file mode 100644 index 0000000000000..12a3be11cff76 --- /dev/null +++ b/src/test/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerRequestTests.java @@ -0,0 +1,78 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.elasticsearch.action.admin.indices.warmer.delete; + +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.InputStreamStreamInput; +import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ElasticSearchTestCase; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; + +import static org.hamcrest.CoreMatchers.equalTo; + +public class DeleteWarmerRequestTests extends ElasticSearchTestCase { + + @Test + public void testDeleteWarmerTimeoutBwComp_Pre0906Format() throws Exception { + DeleteWarmerRequest outRequest = new DeleteWarmerRequest("warmer1"); + outRequest.timeout(TimeValue.timeValueMillis(1000)); + + ByteArrayOutputStream outBuffer = new ByteArrayOutputStream(); + OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer); + out.setVersion(Version.V_0_90_0); + outRequest.writeTo(out); + + ByteArrayInputStream esInBuffer = new ByteArrayInputStream(outBuffer.toByteArray()); + InputStreamStreamInput esBuffer = new InputStreamStreamInput(esInBuffer); + esBuffer.setVersion(Version.V_0_90_0); + DeleteWarmerRequest inRequest = new DeleteWarmerRequest(); + inRequest.readFrom(esBuffer); + + assertThat(inRequest.name(), equalTo("warmer1")); + //timeout is default as we don't read it from the received buffer + assertThat(inRequest.timeout().millis(), equalTo(new DeleteWarmerRequest().timeout().millis())); + + } + + @Test + public void testDeleteWarmerTimeoutBwComp_Post0906Format() throws Exception { + DeleteWarmerRequest outRequest = new DeleteWarmerRequest("warmer1"); + outRequest.timeout(TimeValue.timeValueMillis(1000)); + + ByteArrayOutputStream outBuffer = new ByteArrayOutputStream(); + OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer); + out.setVersion(Version.V_0_90_6); + outRequest.writeTo(out); + + ByteArrayInputStream esInBuffer = new ByteArrayInputStream(outBuffer.toByteArray()); + InputStreamStreamInput esBuffer = new InputStreamStreamInput(esInBuffer); + esBuffer.setVersion(Version.V_0_90_6); + DeleteWarmerRequest inRequest = new DeleteWarmerRequest(); + inRequest.readFrom(esBuffer); + + assertThat(inRequest.name(), equalTo("warmer1")); + //timeout is default as we don't read it from the received buffer + assertThat(inRequest.timeout().millis(), equalTo(outRequest.timeout().millis())); + + } +} diff --git a/src/test/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequestTests.java b/src/test/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequestTests.java new file mode 100644 index 0000000000000..4889b10785687 --- /dev/null +++ b/src/test/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequestTests.java @@ -0,0 +1,76 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.elasticsearch.action.admin.indices.warmer.put; + +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.InputStreamStreamInput; +import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ElasticSearchTestCase; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; + +import static org.hamcrest.CoreMatchers.equalTo; + +public class PutWarmerRequestTests extends ElasticSearchTestCase { + + @Test + public void testPutWarmerTimeoutBwComp_Pre0906Format() throws Exception { + PutWarmerRequest outRequest = new PutWarmerRequest("warmer1"); + outRequest.timeout(TimeValue.timeValueMillis(1000)); + + ByteArrayOutputStream outBuffer = new ByteArrayOutputStream(); + OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer); + out.setVersion(Version.V_0_90_0); + outRequest.writeTo(out); + + ByteArrayInputStream esInBuffer = new ByteArrayInputStream(outBuffer.toByteArray()); + InputStreamStreamInput esBuffer = new InputStreamStreamInput(esInBuffer); + esBuffer.setVersion(Version.V_0_90_0); + PutWarmerRequest inRequest = new PutWarmerRequest(); + inRequest.readFrom(esBuffer); + + assertThat(inRequest.name(), equalTo("warmer1")); + //timeout is default as we don't read it from the received buffer + assertThat(inRequest.timeout().millis(), equalTo(new PutWarmerRequest().timeout().millis())); + } + + @Test + public void testPutWarmerTimeoutBwComp_Post0906Format() throws Exception { + PutWarmerRequest outRequest = new PutWarmerRequest("warmer1"); + outRequest.timeout(TimeValue.timeValueMillis(1000)); + + ByteArrayOutputStream outBuffer = new ByteArrayOutputStream(); + OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer); + out.setVersion(Version.V_0_90_6); + outRequest.writeTo(out); + + ByteArrayInputStream esInBuffer = new ByteArrayInputStream(outBuffer.toByteArray()); + InputStreamStreamInput esBuffer = new InputStreamStreamInput(esInBuffer); + esBuffer.setVersion(Version.V_0_90_6); + PutWarmerRequest inRequest = new PutWarmerRequest(); + inRequest.readFrom(esBuffer); + + assertThat(inRequest.name(), equalTo("warmer1")); + //timeout is default as we don't read it from the received buffer + assertThat(inRequest.timeout().millis(), equalTo(outRequest.timeout().millis())); + } +} diff --git a/src/test/java/org/elasticsearch/common/util/concurrent/CountDownTest.java b/src/test/java/org/elasticsearch/common/util/concurrent/CountDownTest.java new file mode 100644 index 0000000000000..83b2eae53c3fd --- /dev/null +++ b/src/test/java/org/elasticsearch/common/util/concurrent/CountDownTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util.concurrent; + +import com.carrotsearch.randomizedtesting.annotations.Repeat; +import org.elasticsearch.test.ElasticSearchTestCase; +import org.hamcrest.Matchers; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; + + +public class CountDownTest extends ElasticSearchTestCase { + + @Test @Repeat(iterations = 1000) + public void testConcurrent() throws InterruptedException { + final AtomicInteger count = new AtomicInteger(0); + final CountDown countDown = new CountDown(atLeast(10)); + Thread[] threads = new Thread[atLeast(3)]; + final CountDownLatch latch = new CountDownLatch(1); + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread() { + + public void run() { + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(); + } + while (true) { + if(frequently()) { + if (countDown.isCountedDown()) { + break; + } + } + if (countDown.countDown()) { + count.incrementAndGet(); + break; + } + } + } + }; + threads[i].start(); + } + latch.countDown(); + Thread.yield(); + if (rarely()) { + if (countDown.fastForward()) { + count.incrementAndGet(); + } + assertThat(countDown.isCountedDown(), equalTo(true)); + assertThat(countDown.fastForward(), equalTo(false)); + + } + + for (Thread thread : threads) { + thread.join(); + } + assertThat(countDown.isCountedDown(), equalTo(true)); + assertThat(count.get(), Matchers.equalTo(1)); + } + + @Test + public void testSingleThreaded() { + int atLeast = atLeast(10); + final CountDown countDown = new CountDown(atLeast); + while(!countDown.isCountedDown()) { + atLeast--; + if (countDown.countDown()) { + assertThat(atLeast, equalTo(0)); + assertThat(countDown.isCountedDown(), equalTo(true)); + assertThat(countDown.fastForward(), equalTo(false)); + break; + } + if (rarely()) { + assertThat(countDown.fastForward(), equalTo(true)); + assertThat(countDown.isCountedDown(), equalTo(true)); + assertThat(countDown.fastForward(), equalTo(false)); + } + assertThat(atLeast, greaterThan(0)); + } + + } +} diff --git a/src/test/java/org/elasticsearch/indices/warmer/LocalGatewayIndicesWarmerTests.java b/src/test/java/org/elasticsearch/indices/warmer/LocalGatewayIndicesWarmerTests.java index e7940c1bb5fc3..70d4918e1fe08 100644 --- a/src/test/java/org/elasticsearch/indices/warmer/LocalGatewayIndicesWarmerTests.java +++ b/src/test/java/org/elasticsearch/indices/warmer/LocalGatewayIndicesWarmerTests.java @@ -20,6 +20,8 @@ package org.elasticsearch.indices.warmer; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.indices.warmer.delete.DeleteWarmerResponse; +import org.elasticsearch.action.admin.indices.warmer.put.PutWarmerResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.Priority; import org.elasticsearch.common.logging.ESLogger; @@ -58,12 +60,14 @@ public void testStatePersistence() throws Exception { client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().execute().actionGet(); - client().admin().indices().preparePutWarmer("warmer_1") + PutWarmerResponse putWarmerResponse = client().admin().indices().preparePutWarmer("warmer_1") .setSearchRequest(client().prepareSearch("test").setQuery(QueryBuilders.termQuery("field", "value1"))) .execute().actionGet(); - client().admin().indices().preparePutWarmer("warmer_2") + assertThat(putWarmerResponse.isAcknowledged(), equalTo(true)); + putWarmerResponse = client().admin().indices().preparePutWarmer("warmer_2") .setSearchRequest(client().prepareSearch("test").setQuery(QueryBuilders.termQuery("field", "value2"))) .execute().actionGet(); + assertThat(putWarmerResponse.isAcknowledged(), equalTo(true)); logger.info("--> put template with warmer"); client().admin().indices().preparePutTemplate("template_1") @@ -123,7 +127,8 @@ public Settings onNodeStopped(String nodeName) throws Exception { logger.info("--> delete warmer warmer_1"); - client().admin().indices().prepareDeleteWarmer().setIndices("test").setName("warmer_1").execute().actionGet(); + DeleteWarmerResponse deleteWarmerResponse = client().admin().indices().prepareDeleteWarmer().setIndices("test").setName("warmer_1").execute().actionGet(); + assertThat(deleteWarmerResponse.isAcknowledged(), equalTo(true)); logger.info("--> verify warmers (delete) are registered in cluster state"); clusterState = client().admin().cluster().prepareState().execute().actionGet().getState(); diff --git a/src/test/java/org/elasticsearch/indices/warmer/SimpleIndicesWarmerTests.java b/src/test/java/org/elasticsearch/indices/warmer/SimpleIndicesWarmerTests.java index d22e7d8674e14..af60d76290f66 100644 --- a/src/test/java/org/elasticsearch/indices/warmer/SimpleIndicesWarmerTests.java +++ b/src/test/java/org/elasticsearch/indices/warmer/SimpleIndicesWarmerTests.java @@ -19,8 +19,12 @@ package org.elasticsearch.indices.warmer; +import com.google.common.collect.ImmutableList; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.admin.indices.warmer.delete.DeleteWarmerResponse; import org.elasticsearch.action.admin.indices.warmer.get.GetWarmersResponse; +import org.elasticsearch.action.admin.indices.warmer.put.PutWarmerResponse; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.index.query.QueryBuilders; @@ -30,6 +34,8 @@ import org.hamcrest.Matchers; import org.junit.Test; +import java.util.Map; + import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -45,12 +51,14 @@ public void simpleWarmerTests() { .execute().actionGet(); ensureGreen(); - client().admin().indices().preparePutWarmer("warmer_1") + PutWarmerResponse putWarmerResponse = client().admin().indices().preparePutWarmer("warmer_1") .setSearchRequest(client().prepareSearch("test").setTypes("a1").setQuery(QueryBuilders.termQuery("field", "value1"))) .execute().actionGet(); - client().admin().indices().preparePutWarmer("warmer_2") + assertThat(putWarmerResponse.isAcknowledged(), equalTo(true)); + putWarmerResponse = client().admin().indices().preparePutWarmer("warmer_2") .setSearchRequest(client().prepareSearch("test").setTypes("a2").setQuery(QueryBuilders.termQuery("field", "value2"))) .execute().actionGet(); + assertThat(putWarmerResponse.isAcknowledged(), equalTo(true)); client().prepareIndex("test", "type1", "1").setSource("field", "value1").setRefresh(true).execute().actionGet(); client().prepareIndex("test", "type1", "2").setSource("field", "value2").setRefresh(true).execute().actionGet(); @@ -165,6 +173,30 @@ public void deleteNonExistentIndexWarmerTest() { } } + @Test + public void deleteIndexWarmerTest() { + createIndex("test"); + ensureGreen(); + + PutWarmerResponse putWarmerResponse = client().admin().indices().preparePutWarmer("custom_warmer") + .setSearchRequest(client().prepareSearch("test").setTypes("test").setQuery(QueryBuilders.matchAllQuery())) + .get(); + assertThat(putWarmerResponse.isAcknowledged(), equalTo(true)); + + GetWarmersResponse getWarmersResponse = client().admin().indices().prepareGetWarmers("test").get(); + assertThat(getWarmersResponse.warmers().size(), equalTo(1)); + Map.Entry> entry = getWarmersResponse.warmers().entrySet().iterator().next(); + assertThat(entry.getKey(), equalTo("test")); + assertThat(entry.getValue().size(), equalTo(1)); + assertThat(entry.getValue().iterator().next().name(), equalTo("custom_warmer")); + + DeleteWarmerResponse deleteWarmerResponse = client().admin().indices().prepareDeleteWarmer().setIndices("test").setName("custom_warmer").get(); + assertThat(deleteWarmerResponse.isAcknowledged(), equalTo(true)); + + getWarmersResponse = client().admin().indices().prepareGetWarmers("test").get(); + assertThat(getWarmersResponse.warmers().size(), equalTo(0)); + } + @Test // issue 3246 public void ensureThatIndexWarmersCanBeChangedOnRuntime() throws Exception { client().admin().indices().prepareCreate("test") @@ -172,9 +204,10 @@ public void ensureThatIndexWarmersCanBeChangedOnRuntime() throws Exception { .execute().actionGet(); ensureGreen(); - client().admin().indices().preparePutWarmer("custom_warmer") + PutWarmerResponse putWarmerResponse = client().admin().indices().preparePutWarmer("custom_warmer") .setSearchRequest(client().prepareSearch("test").setTypes("test").setQuery(QueryBuilders.matchAllQuery())) .execute().actionGet(); + assertThat(putWarmerResponse.isAcknowledged(), equalTo(true)); client().prepareIndex("test", "test", "1").setSource("foo", "bar").setRefresh(true).execute().actionGet(); @@ -193,4 +226,43 @@ private long getWarmerRuns() { IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats("test").clear().setWarmer(true).execute().actionGet(); return indicesStatsResponse.getIndex("test").getPrimaries().warmer.total(); } + + @Test + public void testPutWarmerAcknowledgement() { + createIndex("test"); + ensureGreen(); + + PutWarmerResponse putWarmerResponse = client().admin().indices().preparePutWarmer("custom_warmer") + .setSearchRequest(client().prepareSearch("test").setTypes("test").setQuery(QueryBuilders.matchAllQuery())) + .get(); + assertThat(putWarmerResponse.isAcknowledged(), equalTo(true)); + + for (Client client : clients()) { + GetWarmersResponse getWarmersResponse = client.admin().indices().prepareGetWarmers().setLocal(true).get(); + assertThat(getWarmersResponse.warmers().size(), equalTo(1)); + Map.Entry> entry = getWarmersResponse.warmers().entrySet().iterator().next(); + assertThat(entry.getKey(), equalTo("test")); + assertThat(entry.getValue().size(), equalTo(1)); + assertThat(entry.getValue().get(0).name(), equalTo("custom_warmer")); + } + } + + @Test + public void testDeleteWarmerAcknowledgement() { + createIndex("test"); + ensureGreen(); + + PutWarmerResponse putWarmerResponse = client().admin().indices().preparePutWarmer("custom_warmer") + .setSearchRequest(client().prepareSearch("test").setTypes("test").setQuery(QueryBuilders.matchAllQuery())) + .get(); + assertThat(putWarmerResponse.isAcknowledged(), equalTo(true)); + + DeleteWarmerResponse deleteWarmerResponse = client().admin().indices().prepareDeleteWarmer().setIndices("test").setName("custom_warmer").get(); + assertThat(deleteWarmerResponse.isAcknowledged(), equalTo(true)); + + for (Client client : clients()) { + GetWarmersResponse getWarmersResponse = client.admin().indices().prepareGetWarmers().setLocal(true).get(); + assertThat(getWarmersResponse.warmers().size(), equalTo(0)); + } + } } diff --git a/src/test/java/org/elasticsearch/search/scan/SearchScanScrollingTests.java b/src/test/java/org/elasticsearch/search/scan/SearchScanScrollingTests.java index 212968a964b92..dbad84cc36b95 100644 --- a/src/test/java/org/elasticsearch/search/scan/SearchScanScrollingTests.java +++ b/src/test/java/org/elasticsearch/search/scan/SearchScanScrollingTests.java @@ -22,7 +22,6 @@ import com.google.common.collect.Sets; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; -import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.search.SearchHit; @@ -31,6 +30,7 @@ import java.util.Set; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.test.hamcrest.ElasticSearchAssertions.assertHitCount; import static org.hamcrest.Matchers.equalTo; public class SearchScanScrollingTests extends AbstractIntegrationTest { @@ -40,9 +40,8 @@ public void testRandomized() throws Exception { } private void testScroll(int numberOfShards, long numberOfDocs, int size, boolean unbalanced) throws Exception { - wipeIndex("test"); - client().admin().indices().prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", numberOfShards)).execute().actionGet(); - client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet(); + client().admin().indices().prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", numberOfShards)).get(); + ensureGreen(); Set ids = Sets.newHashSet(); Set expectedIds = Sets.newHashSet(); @@ -66,7 +65,7 @@ private void testScroll(int numberOfShards, long numberOfDocs, int size, boolean } } - client().admin().indices().prepareRefresh().execute().actionGet(); + refresh(); SearchResponse searchResponse = client().prepareSearch() .setSearchType(SearchType.SCAN) @@ -75,15 +74,15 @@ private void testScroll(int numberOfShards, long numberOfDocs, int size, boolean .setScroll(TimeValue.timeValueMinutes(2)) .execute().actionGet(); try { - assertThat(searchResponse.getHits().totalHits(), equalTo(numberOfDocs)); + assertHitCount(searchResponse, numberOfDocs); // start scrolling, until we get not results while (true) { searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)).execute().actionGet(); - assertThat(searchResponse.getHits().totalHits(), equalTo(numberOfDocs)); - assertThat(searchResponse.getFailedShards(), equalTo(0)); + assertHitCount(searchResponse, numberOfDocs); + for (SearchHit hit : searchResponse.getHits()) { - assertThat(hit.id() + "should not exists in the result set", ids.contains(hit.id()), equalTo(false)); + assertThat(hit.id() + "should not exist in the result set", ids.contains(hit.id()), equalTo(false)); ids.add(hit.id()); } if (searchResponse.getHits().hits().length == 0) {