From 39f362326e46b0e0df075288e1b5521f825d2b7b Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 1 Mar 2013 14:59:51 +0100 Subject: [PATCH] Short Curcuit response if no indices exits and make sure listener is notified. Closes #2692 --- ...portIndicesReplicationOperationAction.java | 57 ++++++------ .../deleteByQuery/DeleteByQueryTests.java | 88 +++++++++++++++++++ 2 files changed, 118 insertions(+), 27 deletions(-) create mode 100644 src/test/java/org/elasticsearch/test/integration/deleteByQuery/DeleteByQueryTests.java diff --git a/src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java b/src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java index f1b7cef074072..56371a1dc4d7f 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java @@ -86,36 +86,39 @@ protected void doExecute(final Request request, final ActionListener l final AtomicReferenceArray indexResponses = new AtomicReferenceArray(concreteIndices.length); Map> routingMap = resolveRouting(clusterState, request); - - for (final String index : concreteIndices) { - Set routing = null; - if (routingMap != null) { - routing = routingMap.get(index); - } - IndexRequest indexRequest = newIndexRequestInstance(request, index, routing); - // no threading needed, all is done on the index replication one - indexRequest.listenerThreaded(false); - indexAction.execute(indexRequest, new ActionListener() { - @Override - public void onResponse(IndexResponse result) { - indexResponses.set(indexCounter.getAndIncrement(), result); - if (completionCounter.decrementAndGet() == 0) { - listener.onResponse(newResponseInstance(request, indexResponses)); - } + if (concreteIndices == null || concreteIndices.length == 0) { + listener.onResponse(newResponseInstance(request, indexResponses)); + } else { + for (final String index : concreteIndices) { + Set routing = null; + if (routingMap != null) { + routing = routingMap.get(index); } - - @Override - public void onFailure(Throwable e) { - e.printStackTrace(); - int index = indexCounter.getAndIncrement(); - if (accumulateExceptions()) { - indexResponses.set(index, e); + IndexRequest indexRequest = newIndexRequestInstance(request, index, routing); + // no threading needed, all is done on the index replication one + indexRequest.listenerThreaded(false); + indexAction.execute(indexRequest, new ActionListener() { + @Override + public void onResponse(IndexResponse result) { + indexResponses.set(indexCounter.getAndIncrement(), result); + if (completionCounter.decrementAndGet() == 0) { + listener.onResponse(newResponseInstance(request, indexResponses)); + } } - if (completionCounter.decrementAndGet() == 0) { - listener.onResponse(newResponseInstance(request, indexResponses)); + + @Override + public void onFailure(Throwable e) { + e.printStackTrace(); + int index = indexCounter.getAndIncrement(); + if (accumulateExceptions()) { + indexResponses.set(index, e); + } + if (completionCounter.decrementAndGet() == 0) { + listener.onResponse(newResponseInstance(request, indexResponses)); + } } - } - }); + }); + } } } diff --git a/src/test/java/org/elasticsearch/test/integration/deleteByQuery/DeleteByQueryTests.java b/src/test/java/org/elasticsearch/test/integration/deleteByQuery/DeleteByQueryTests.java new file mode 100644 index 0000000000000..90ff32164f5db --- /dev/null +++ b/src/test/java/org/elasticsearch/test/integration/deleteByQuery/DeleteByQueryTests.java @@ -0,0 +1,88 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.integration.deleteByQuery; + +import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder; +import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.test.integration.AbstractNodesTests; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; + +public class DeleteByQueryTests extends AbstractNodesTests { + + private Client client; + + @BeforeClass + public void createNodes() throws Exception { + startNode("server1"); + startNode("server2"); + client = getClient(); + } + + @AfterClass + public void closeNodes() { + client.close(); + closeAllNodes(); + } + + protected Client getClient() { + return client("server1"); + } + + @Test + public void testDeleteAllNoIndices() { + client.admin().indices().prepareDelete().execute().actionGet(); + client.admin().indices().prepareRefresh().execute().actionGet(); + DeleteByQueryRequestBuilder deleteByQueryRequestBuilder = new DeleteByQueryRequestBuilder(client); + deleteByQueryRequestBuilder.setQuery(QueryBuilders.matchAllQuery()); + DeleteByQueryResponse actionGet = deleteByQueryRequestBuilder.execute().actionGet(); + assertThat(actionGet.getIndices().size(), equalTo(0)); + } + + @Test + public void testDeleteAllOneIndex() { + client.admin().indices().prepareDelete().execute().actionGet(); + + String json = "{" + "\"user\":\"kimchy\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"trying out Elastic Search\"" + "}"; + + client.prepareIndex("twitter", "tweet").setSource(json).setRefresh(true).execute().actionGet(); + + SearchResponse search = client.prepareSearch().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(); + assertThat(search.getHits().totalHits(), equalTo(1l)); + DeleteByQueryRequestBuilder deleteByQueryRequestBuilder = new DeleteByQueryRequestBuilder(client); + deleteByQueryRequestBuilder.setQuery(QueryBuilders.matchAllQuery()); + + DeleteByQueryResponse actionGet = deleteByQueryRequestBuilder.execute().actionGet(); + assertThat(actionGet.getIndex("twitter"), notNullValue()); + assertThat(actionGet.getIndex("twitter").getFailedShards(), equalTo(0)); + + client.admin().indices().prepareRefresh().execute().actionGet(); + search = client.prepareSearch().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(); + assertThat(search.getHits().totalHits(), equalTo(0l)); + } +}