diff --git a/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequest.java index 3824431b53bd6..5d5872f12cdfd 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequest.java @@ -19,27 +19,35 @@ package org.elasticsearch.action.admin.indices.warmer.put; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.action.support.master.MasterNodeOperationRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; import java.io.IOException; import static org.elasticsearch.action.ValidateActions.addValidationError; +import static org.elasticsearch.common.unit.TimeValue.readTimeValue; +import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; /** * A request to put a search warmer. */ -public class PutWarmerRequest extends MasterNodeOperationRequest { +public class PutWarmerRequest extends MasterNodeOperationRequest + implements AcknowledgedRequest { private String name; private SearchRequest searchRequest; + private TimeValue timeout = timeValueSeconds(10); + PutWarmerRequest() { } @@ -86,6 +94,22 @@ SearchRequest searchRequest() { return this.searchRequest; } + @Override + public PutWarmerRequest timeout(String timeout) { + return this; + } + + @Override + public PutWarmerRequest timeout(TimeValue timeout) { + this.timeout = timeout; + return this; + } + + @Override + public TimeValue timeout() { + return timeout; + } + @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = searchRequest.validate(); @@ -103,6 +127,9 @@ public void readFrom(StreamInput in) throws IOException { searchRequest = new SearchRequest(); searchRequest.readFrom(in); } + if (in.getVersion().onOrAfter(Version.V_0_90_6)) { + timeout = readTimeValue(in); + } } @Override @@ -115,5 +142,8 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(true); searchRequest.writeTo(out); } + if (out.getVersion().onOrAfter(Version.V_0_90_6)) { + timeout.writeTo(out); + } } } diff --git a/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequestBuilder.java b/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequestBuilder.java index 13669b324aaf5..77b2143dc7e56 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequestBuilder.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequestBuilder.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; import org.elasticsearch.client.IndicesAdminClient; import org.elasticsearch.client.internal.InternalIndicesAdminClient; +import org.elasticsearch.common.unit.TimeValue; /** * @@ -63,6 +64,23 @@ public PutWarmerRequestBuilder setSearchRequest(SearchRequestBuilder searchReque return this; } + /** + * Sets the maximum wait for acknowledgement from other nodes + */ + public PutWarmerRequestBuilder setTimeout(TimeValue timeout) { + request.timeout(timeout); + return this; + } + + /** + * Timeout to wait for the operation to be acknowledged by current cluster nodes. Defaults + * to 10s. + */ + public PutWarmerRequestBuilder setTimeout(String timeout) { + request.timeout(timeout); + return this; + } + @Override protected void doExecute(ActionListener listener) { ((IndicesAdminClient) client).putWarmer(request, listener); diff --git a/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerResponse.java b/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerResponse.java index f0f58f4347dd3..c7daac75a9567 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerResponse.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerResponse.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.admin.indices.warmer.put; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -28,7 +29,7 @@ /** * The response of put warmer operation. */ -public class PutWarmerResponse extends ActionResponse { +public class PutWarmerResponse extends ActionResponse implements AcknowledgedResponse { private boolean acknowledged; diff --git a/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/TransportPutWarmerAction.java b/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/TransportPutWarmerAction.java index f151a40d1b793..ca42f75a0f735 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/TransportPutWarmerAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/TransportPutWarmerAction.java @@ -24,13 +24,15 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -96,7 +98,27 @@ public void onResponse(SearchResponse searchResponse) { return; } - clusterService.submitStateUpdateTask("put_warmer [" + request.name() + "]", new TimeoutClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("put_warmer [" + request.name() + "]", new AckedClusterStateUpdateTask() { + + @Override + public boolean mustAck(DiscoveryNode discoveryNode) { + return true; + } + + @Override + public void onAllNodesAcked(@Nullable Throwable t) { + listener.onResponse(new PutWarmerResponse(true)); + } + + @Override + public void onAckTimeout() { + listener.onResponse(new PutWarmerResponse(false)); + } + + @Override + public TimeValue ackTimeout() { + return request.timeout(); + } @Override public TimeValue timeout() { @@ -161,7 +183,7 @@ public ClusterState execute(ClusterState currentState) { @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onResponse(new PutWarmerResponse(true)); + } }); } diff --git a/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java b/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java new file mode 100644 index 0000000000000..3e25ee9aa6ffb --- /dev/null +++ b/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java @@ -0,0 +1,49 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.elasticsearch.action.support.master; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.common.unit.TimeValue; + +/** + * Interface that allows to mark action requests that support acknowledgements. + * Facilitates consistency across different api. + */ +public interface AcknowledgedRequest> { + + /** + * Allows to set the timeout + * @param timeout timeout as a string (e.g. 1s) + * @return the request itself + */ + T timeout(String timeout); + + /** + * Allows to set the timeout + * @param timeout timeout as a {@link TimeValue} + * @return the request itself + */ + T timeout(TimeValue timeout); + + /** + * Returns the current timeout + * @return the current timeout as a {@link TimeValue} + */ + TimeValue timeout(); +} diff --git a/src/main/java/org/elasticsearch/action/support/master/AcknowledgedResponse.java b/src/main/java/org/elasticsearch/action/support/master/AcknowledgedResponse.java new file mode 100644 index 0000000000000..90bf831a892cd --- /dev/null +++ b/src/main/java/org/elasticsearch/action/support/master/AcknowledgedResponse.java @@ -0,0 +1,32 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.elasticsearch.action.support.master; + +/** + * Interface that allows to mark action responses that support acknowledgements. + * Facilitates consistency across different api. + */ +public interface AcknowledgedResponse { + + /** + * Returns whether the response is acknowledged or not + * @return true if the response is acknowledged, false otherwise + */ + boolean isAcknowledged(); +} diff --git a/src/main/java/org/elasticsearch/rest/action/admin/indices/warmer/put/RestPutWarmerAction.java b/src/main/java/org/elasticsearch/rest/action/admin/indices/warmer/put/RestPutWarmerAction.java index 1c283581281a6..47ee0f08c8429 100644 --- a/src/main/java/org/elasticsearch/rest/action/admin/indices/warmer/put/RestPutWarmerAction.java +++ b/src/main/java/org/elasticsearch/rest/action/admin/indices/warmer/put/RestPutWarmerAction.java @@ -55,6 +55,7 @@ public void handleRequest(final RestRequest request, final RestChannel channel) .types(Strings.splitStringByCommaToArray(request.param("type"))) .source(request.content(), request.contentUnsafe()); putWarmerRequest.searchRequest(searchRequest); + putWarmerRequest.timeout(request.paramAsTime("timeout", putWarmerRequest.timeout())); putWarmerRequest.masterNodeTimeout(request.paramAsTime("master_timeout", putWarmerRequest.masterNodeTimeout())); client.admin().indices().putWarmer(putWarmerRequest, new ActionListener() { @Override diff --git a/src/test/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequestTests.java b/src/test/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequestTests.java new file mode 100644 index 0000000000000..d29dfaa8e15ab --- /dev/null +++ b/src/test/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequestTests.java @@ -0,0 +1,76 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.elasticsearch.action.admin.indices.warmer.put; + +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.InputStreamStreamInput; +import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; + +import static org.hamcrest.CoreMatchers.equalTo; + +public class PutWarmerRequestTests extends ElasticsearchTestCase { + + @Test + public void testPutWarmerTimeoutBwComp_Pre0906Format() throws Exception { + PutWarmerRequest outRequest = new PutWarmerRequest("warmer1"); + outRequest.timeout(TimeValue.timeValueMillis(1000)); + + ByteArrayOutputStream outBuffer = new ByteArrayOutputStream(); + OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer); + out.setVersion(Version.V_0_90_0); + outRequest.writeTo(out); + + ByteArrayInputStream esInBuffer = new ByteArrayInputStream(outBuffer.toByteArray()); + InputStreamStreamInput esBuffer = new InputStreamStreamInput(esInBuffer); + esBuffer.setVersion(Version.V_0_90_0); + PutWarmerRequest inRequest = new PutWarmerRequest(); + inRequest.readFrom(esBuffer); + + assertThat(inRequest.name(), equalTo("warmer1")); + //timeout is default as we don't read it from the received buffer + assertThat(inRequest.timeout().millis(), equalTo(new PutWarmerRequest().timeout().millis())); + } + + @Test + public void testPutWarmerTimeoutBwComp_Post0906Format() throws Exception { + PutWarmerRequest outRequest = new PutWarmerRequest("warmer1"); + outRequest.timeout(TimeValue.timeValueMillis(1000)); + + ByteArrayOutputStream outBuffer = new ByteArrayOutputStream(); + OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer); + out.setVersion(Version.V_0_90_6); + outRequest.writeTo(out); + + ByteArrayInputStream esInBuffer = new ByteArrayInputStream(outBuffer.toByteArray()); + InputStreamStreamInput esBuffer = new InputStreamStreamInput(esInBuffer); + esBuffer.setVersion(Version.V_0_90_6); + PutWarmerRequest inRequest = new PutWarmerRequest(); + inRequest.readFrom(esBuffer); + + assertThat(inRequest.name(), equalTo("warmer1")); + //timeout is default as we don't read it from the received buffer + assertThat(inRequest.timeout().millis(), equalTo(outRequest.timeout().millis())); + } +} diff --git a/src/test/java/org/elasticsearch/indices/warmer/LocalGatewayIndicesWarmerTests.java b/src/test/java/org/elasticsearch/indices/warmer/LocalGatewayIndicesWarmerTests.java index e7940c1bb5fc3..9039e07cce88c 100644 --- a/src/test/java/org/elasticsearch/indices/warmer/LocalGatewayIndicesWarmerTests.java +++ b/src/test/java/org/elasticsearch/indices/warmer/LocalGatewayIndicesWarmerTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.indices.warmer; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.indices.warmer.put.PutWarmerResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.Priority; import org.elasticsearch.common.logging.ESLogger; @@ -58,12 +59,14 @@ public void testStatePersistence() throws Exception { client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().execute().actionGet(); - client().admin().indices().preparePutWarmer("warmer_1") + PutWarmerResponse putWarmerResponse = client().admin().indices().preparePutWarmer("warmer_1") .setSearchRequest(client().prepareSearch("test").setQuery(QueryBuilders.termQuery("field", "value1"))) .execute().actionGet(); - client().admin().indices().preparePutWarmer("warmer_2") + assertThat(putWarmerResponse.isAcknowledged(), equalTo(true)); + putWarmerResponse = client().admin().indices().preparePutWarmer("warmer_2") .setSearchRequest(client().prepareSearch("test").setQuery(QueryBuilders.termQuery("field", "value2"))) .execute().actionGet(); + assertThat(putWarmerResponse.isAcknowledged(), equalTo(true)); logger.info("--> put template with warmer"); client().admin().indices().preparePutTemplate("template_1") diff --git a/src/test/java/org/elasticsearch/indices/warmer/SimpleIndicesWarmerTests.java b/src/test/java/org/elasticsearch/indices/warmer/SimpleIndicesWarmerTests.java index d22e7d8674e14..8fc72af3f223d 100644 --- a/src/test/java/org/elasticsearch/indices/warmer/SimpleIndicesWarmerTests.java +++ b/src/test/java/org/elasticsearch/indices/warmer/SimpleIndicesWarmerTests.java @@ -19,8 +19,11 @@ package org.elasticsearch.indices.warmer; +import com.google.common.collect.ImmutableList; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.warmer.get.GetWarmersResponse; +import org.elasticsearch.action.admin.indices.warmer.put.PutWarmerResponse; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.index.query.QueryBuilders; @@ -30,6 +33,8 @@ import org.hamcrest.Matchers; import org.junit.Test; +import java.util.Map; + import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -45,12 +50,14 @@ public void simpleWarmerTests() { .execute().actionGet(); ensureGreen(); - client().admin().indices().preparePutWarmer("warmer_1") + PutWarmerResponse putWarmerResponse = client().admin().indices().preparePutWarmer("warmer_1") .setSearchRequest(client().prepareSearch("test").setTypes("a1").setQuery(QueryBuilders.termQuery("field", "value1"))) .execute().actionGet(); - client().admin().indices().preparePutWarmer("warmer_2") + assertThat(putWarmerResponse.isAcknowledged(), equalTo(true)); + putWarmerResponse = client().admin().indices().preparePutWarmer("warmer_2") .setSearchRequest(client().prepareSearch("test").setTypes("a2").setQuery(QueryBuilders.termQuery("field", "value2"))) .execute().actionGet(); + assertThat(putWarmerResponse.isAcknowledged(), equalTo(true)); client().prepareIndex("test", "type1", "1").setSource("field", "value1").setRefresh(true).execute().actionGet(); client().prepareIndex("test", "type1", "2").setSource("field", "value2").setRefresh(true).execute().actionGet(); @@ -172,9 +179,10 @@ public void ensureThatIndexWarmersCanBeChangedOnRuntime() throws Exception { .execute().actionGet(); ensureGreen(); - client().admin().indices().preparePutWarmer("custom_warmer") + PutWarmerResponse putWarmerResponse = client().admin().indices().preparePutWarmer("custom_warmer") .setSearchRequest(client().prepareSearch("test").setTypes("test").setQuery(QueryBuilders.matchAllQuery())) .execute().actionGet(); + assertThat(putWarmerResponse.isAcknowledged(), equalTo(true)); client().prepareIndex("test", "test", "1").setSource("foo", "bar").setRefresh(true).execute().actionGet(); @@ -193,4 +201,24 @@ private long getWarmerRuns() { IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats("test").clear().setWarmer(true).execute().actionGet(); return indicesStatsResponse.getIndex("test").getPrimaries().warmer.total(); } + + @Test + public void testPutWarmerAcknowledgement() { + createIndex("test"); + ensureGreen(); + + PutWarmerResponse putWarmerResponse = client().admin().indices().preparePutWarmer("custom_warmer") + .setSearchRequest(client().prepareSearch("test").setTypes("test").setQuery(QueryBuilders.matchAllQuery())) + .get(); + assertThat(putWarmerResponse.isAcknowledged(), equalTo(true)); + + for (Client client : clients()) { + GetWarmersResponse getWarmersResponse = client.admin().indices().prepareGetWarmers().setLocal(true).get(); + assertThat(getWarmersResponse.warmers().size(), equalTo(1)); + Map.Entry> entry = getWarmersResponse.warmers().entrySet().iterator().next(); + assertThat(entry.getKey(), equalTo("test")); + assertThat(entry.getValue().size(), equalTo(1)); + assertThat(entry.getValue().get(0).name(), equalTo("custom_warmer")); + } + } }