New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Do not fail whole request on closed index #6790
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.elasticsearch.action; | ||
|
||
/** | ||
* Generic interface to group ActionRequest, which work on single document level | ||
* | ||
* Forces this class return index/type/id getters | ||
*/ | ||
public interface SingleDocumentWriteRequest { | ||
|
||
String index(); | ||
String type(); | ||
String id(); | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,7 @@ | |
import org.elasticsearch.ExceptionsHelper; | ||
import org.elasticsearch.action.ActionListener; | ||
import org.elasticsearch.action.ActionRequest; | ||
import org.elasticsearch.action.SingleDocumentWriteRequest; | ||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; | ||
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; | ||
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction; | ||
|
@@ -38,15 +39,18 @@ | |
import org.elasticsearch.cluster.ClusterService; | ||
import org.elasticsearch.cluster.ClusterState; | ||
import org.elasticsearch.cluster.block.ClusterBlockLevel; | ||
import org.elasticsearch.cluster.metadata.IndexMetaData; | ||
import org.elasticsearch.cluster.metadata.MappingMetaData; | ||
import org.elasticsearch.cluster.metadata.MetaData; | ||
import org.elasticsearch.cluster.routing.GroupShardsIterator; | ||
import org.elasticsearch.cluster.routing.ShardIterator; | ||
import org.elasticsearch.common.inject.Inject; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.util.concurrent.AtomicArray; | ||
import org.elasticsearch.index.Index; | ||
import org.elasticsearch.index.shard.ShardId; | ||
import org.elasticsearch.indices.IndexAlreadyExistsException; | ||
import org.elasticsearch.indices.IndexClosedException; | ||
import org.elasticsearch.rest.RestStatus; | ||
import org.elasticsearch.threadpool.ThreadPool; | ||
import org.elasticsearch.transport.BaseTransportRequestHandler; | ||
|
@@ -96,26 +100,15 @@ protected void doExecute(final BulkRequest bulkRequest, final ActionListener<Bul | |
if (autoCreateIndex.needToCheck()) { | ||
final Set<String> indices = Sets.newHashSet(); | ||
for (ActionRequest request : bulkRequest.requests) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. honestly, BulkRequest should then only allow you to add |
||
if (request instanceof IndexRequest) { | ||
IndexRequest indexRequest = (IndexRequest) request; | ||
if (!indices.contains(indexRequest.index())) { | ||
indices.add(indexRequest.index()); | ||
} | ||
} else if (request instanceof DeleteRequest) { | ||
DeleteRequest deleteRequest = (DeleteRequest) request; | ||
if (!indices.contains(deleteRequest.index())) { | ||
indices.add(deleteRequest.index()); | ||
} | ||
} else if (request instanceof UpdateRequest) { | ||
UpdateRequest updateRequest = (UpdateRequest) request; | ||
if (!indices.contains(updateRequest.index())) { | ||
indices.add(updateRequest.index()); | ||
if (request instanceof SingleDocumentWriteRequest) { | ||
SingleDocumentWriteRequest req = (SingleDocumentWriteRequest) request; | ||
if (!indices.contains(req.index())) { | ||
indices.add(req.index()); | ||
} | ||
} else { | ||
throw new ElasticsearchException("Parsed unknown request in bulk actions: " + request.getClass().getSimpleName()); | ||
} | ||
} | ||
|
||
final AtomicInteger counter = new AtomicInteger(indices.size()); | ||
ClusterState state = clusterService.state(); | ||
for (final String index : indices) { | ||
|
@@ -199,32 +192,39 @@ private void executeBulk(final BulkRequest bulkRequest, final long startTime, fi | |
MetaData metaData = clusterState.metaData(); | ||
for (int i = 0; i < bulkRequest.requests.size(); i++) { | ||
ActionRequest request = bulkRequest.requests.get(i); | ||
if (request instanceof IndexRequest) { | ||
IndexRequest indexRequest = (IndexRequest) request; | ||
String aliasOrIndex = indexRequest.index(); | ||
indexRequest.index(clusterState.metaData().concreteSingleIndex(indexRequest.index())); | ||
|
||
MappingMetaData mappingMd = null; | ||
if (metaData.hasIndex(indexRequest.index())) { | ||
mappingMd = metaData.index(indexRequest.index()).mappingOrDefault(indexRequest.type()); | ||
if (request instanceof SingleDocumentWriteRequest) { | ||
SingleDocumentWriteRequest req = (SingleDocumentWriteRequest) request; | ||
if (addFailureIfIndexIsClosed(req, bulkRequest, responses, i)) { | ||
continue; | ||
} | ||
try { | ||
indexRequest.process(metaData, aliasOrIndex, mappingMd, allowIdGeneration); | ||
} catch (ElasticsearchParseException e) { | ||
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), e); | ||
BulkItemResponse bulkItemResponse = new BulkItemResponse(i, "index", failure); | ||
responses.set(i, bulkItemResponse); | ||
// make sure the request gets never processed again | ||
bulkRequest.requests.set(i, null); | ||
|
||
if (request instanceof IndexRequest) { | ||
IndexRequest indexRequest = (IndexRequest) request; | ||
String aliasOrIndex = indexRequest.index(); | ||
indexRequest.index(clusterState.metaData().concreteSingleIndex(indexRequest.index())); | ||
|
||
MappingMetaData mappingMd = null; | ||
if (metaData.hasIndex(indexRequest.index())) { | ||
mappingMd = metaData.index(indexRequest.index()).mappingOrDefault(indexRequest.type()); | ||
} | ||
try { | ||
indexRequest.process(metaData, aliasOrIndex, mappingMd, allowIdGeneration); | ||
} catch (ElasticsearchParseException e) { | ||
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), e); | ||
BulkItemResponse bulkItemResponse = new BulkItemResponse(i, "index", failure); | ||
responses.set(i, bulkItemResponse); | ||
// make sure the request gets never processed again | ||
bulkRequest.requests.set(i, null); | ||
} | ||
} else if (request instanceof DeleteRequest) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if we add |
||
DeleteRequest deleteRequest = (DeleteRequest) request; | ||
deleteRequest.routing(clusterState.metaData().resolveIndexRouting(deleteRequest.routing(), deleteRequest.index())); | ||
deleteRequest.index(clusterState.metaData().concreteSingleIndex(deleteRequest.index())); | ||
} else if (request instanceof UpdateRequest) { | ||
UpdateRequest updateRequest = (UpdateRequest) request; | ||
updateRequest.routing(clusterState.metaData().resolveIndexRouting(updateRequest.routing(), updateRequest.index())); | ||
updateRequest.index(clusterState.metaData().concreteSingleIndex(updateRequest.index())); | ||
} | ||
} else if (request instanceof DeleteRequest) { | ||
DeleteRequest deleteRequest = (DeleteRequest) request; | ||
deleteRequest.routing(clusterState.metaData().resolveIndexRouting(deleteRequest.routing(), deleteRequest.index())); | ||
deleteRequest.index(clusterState.metaData().concreteSingleIndex(deleteRequest.index())); | ||
} else if (request instanceof UpdateRequest) { | ||
UpdateRequest updateRequest = (UpdateRequest) request; | ||
updateRequest.routing(clusterState.metaData().resolveIndexRouting(updateRequest.routing(), updateRequest.index())); | ||
updateRequest.index(clusterState.metaData().concreteSingleIndex(updateRequest.index())); | ||
} | ||
} | ||
|
||
|
@@ -337,6 +337,23 @@ private void finishHim() { | |
} | ||
} | ||
|
||
private boolean addFailureIfIndexIsClosed(SingleDocumentWriteRequest request, BulkRequest bulkRequest, AtomicArray<BulkItemResponse> responses, int idx) { | ||
MetaData metaData = this.clusterService.state().metaData(); | ||
String concreteIndex = this.clusterService.state().metaData().concreteSingleIndex(request.index()); | ||
boolean isClosed = metaData.index(concreteIndex).getState() == IndexMetaData.State.CLOSE; | ||
|
||
if (isClosed) { | ||
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(request.index(), request.type(), request.id(), | ||
new IndexClosedException(new Index(metaData.index(request.index()).getIndex()))); | ||
BulkItemResponse bulkItemResponse = new BulkItemResponse(idx, "index", failure); | ||
responses.set(idx, bulkItemResponse); | ||
// make sure the request gets never processed again | ||
bulkRequest.requests.set(idx, null); | ||
} | ||
|
||
return isClosed; | ||
} | ||
|
||
class TransportHandler extends BaseTransportRequestHandler<BulkRequest> { | ||
|
||
@Override | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.elasticsearch.document; | ||
|
||
import org.elasticsearch.action.bulk.BulkRequest; | ||
import org.elasticsearch.action.bulk.BulkResponse; | ||
import org.elasticsearch.action.delete.DeleteRequest; | ||
import org.elasticsearch.action.index.IndexRequest; | ||
import org.elasticsearch.action.search.SearchResponse; | ||
import org.elasticsearch.action.update.UpdateRequest; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.test.ElasticsearchIntegrationTest; | ||
import org.junit.Test; | ||
|
||
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; | ||
import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; | ||
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; | ||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; | ||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; | ||
import static org.hamcrest.Matchers.is; | ||
|
||
/** | ||
* | ||
*/ | ||
@ClusterScope(numDataNodes = 1, scope = Scope.SUITE) | ||
public class BulkNoAutoCreateIndexTests extends ElasticsearchIntegrationTest { | ||
|
||
@Override | ||
protected Settings nodeSettings(int nodeOrdinal) { | ||
return settingsBuilder().put(super.nodeSettings(nodeOrdinal)).put("action.auto_create_index", false).build(); | ||
} | ||
|
||
@Test // issue 6410 | ||
public void testThatMissingIndexDoesNotAbortFullBulkRequest() throws Exception { | ||
createIndex("bulkindex1", "bulkindex2"); | ||
BulkRequest bulkRequest = new BulkRequest(); | ||
bulkRequest.add(new IndexRequest("bulkindex1", "index1_type", "1").source("text", "hallo1")) | ||
.add(new IndexRequest("bulkindex2", "index2_type", "1").source("text", "hallo2")) | ||
.add(new IndexRequest("bulkindex2", "index2_type").source("text", "hallo2")) | ||
.add(new UpdateRequest("bulkindex2", "index2_type", "2").doc("foo", "bar")) | ||
.add(new DeleteRequest("bulkindex2", "index2_type", "3")) | ||
.refresh(true); | ||
|
||
client().bulk(bulkRequest).get(); | ||
SearchResponse searchResponse = client().prepareSearch("bulkindex*").get(); | ||
assertHitCount(searchResponse, 3); | ||
|
||
assertAcked(client().admin().indices().prepareClose("bulkindex2")); | ||
|
||
BulkResponse bulkResponse = client().bulk(bulkRequest).get(); | ||
assertThat(bulkResponse.hasFailures(), is(true)); | ||
assertThat(bulkResponse.getItems().length, is(5)); | ||
|
||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we should call it
Indexable
orDocumentRequest
something like this and it should also have a routing or even better the common denominator of update / delete / index ie all the methods they share?