From 9c619f03f36b1e13d48dc767ab8288384e548d37 Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Mon, 27 Jan 2014 11:07:59 +0100 Subject: [PATCH] Bulk: Failed preparsing does not fail whole bulk request If a preparsing of the source is needed (due to mapping configuration, which extracts the routing/id value from the source) and the source is not valid JSON, then the whole bulk request is failed instead of a single BulkRequest. This commit ensures, that a broken JSON request is not forwarded to the destination shard and creates an appropriate BulkItemResponse, which includes a failure. This also implied changing the BulkItemResponse serialization, because one cannot be sure anymore, if a response includes an ID, in case it was not specified and could not be extracted from the JSON. Closes #4745 --- .../action/bulk/BulkItemResponse.java | 8 +- .../action/bulk/TransportBulkAction.java | 23 +++-- .../action/index/IndexRequest.java | 2 +- .../org/elasticsearch/document/BulkTests.java | 83 +++++++++++++++++++ .../hamcrest/ElasticsearchAssertions.java | 9 ++ 5 files changed, 114 insertions(+), 11 deletions(-) diff --git a/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java b/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java index 09c3c659987fd..6fc775c283ef4 100644 --- a/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java +++ b/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java @@ -245,7 +245,11 @@ public void readFrom(StreamInput in) throws IOException { } if (in.readBoolean()) { - failure = new Failure(in.readString(), in.readString(), in.readString(), in.readString()); + String fIndex = in.readString(); + String fType = in.readString(); + String fId = in.readOptionalString(); + String fMessage = in.readString(); + failure = new Failure(fIndex, fType, fId, fMessage); } } @@ -271,7 +275,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(true); out.writeString(failure.getIndex()); out.writeString(failure.getType()); - out.writeString(failure.getId()); + out.writeOptionalString(failure.getId()); out.writeString(failure.getMessage()); } } diff --git a/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index a0caee8e641f4..679e3dbfdff15 100644 --- a/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -22,6 +22,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.elasticsearch.ElasticSearchParseException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; @@ -50,10 +51,7 @@ import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportService; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -152,7 +150,10 @@ private void executeBulk(final BulkRequest bulkRequest, final long startTime, fi clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.WRITE); MetaData metaData = clusterState.metaData(); - for (ActionRequest request : bulkRequest.requests) { + final AtomicArray responses = new AtomicArray(bulkRequest.requests.size()); + + for (int i = 0; i < bulkRequest.requests.size(); i++) { + ActionRequest request = bulkRequest.requests.get(i); if (request instanceof IndexRequest) { IndexRequest indexRequest = (IndexRequest) request; String aliasOrIndex = indexRequest.index(); @@ -162,7 +163,15 @@ private void executeBulk(final BulkRequest bulkRequest, final long startTime, fi if (metaData.hasIndex(indexRequest.index())) { mappingMd = metaData.index(indexRequest.index()).mappingOrDefault(indexRequest.type()); } - indexRequest.process(metaData, aliasOrIndex, mappingMd, allowIdGeneration); + try { + indexRequest.process(metaData, aliasOrIndex, mappingMd, allowIdGeneration); + } catch (ElasticSearchParseException e) { + BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), e.getMessage()); + BulkItemResponse bulkItemResponse = new BulkItemResponse(i, "index", failure); + responses.set(i, bulkItemResponse); + // make sure the request gets never processed again + bulkRequest.requests.set(i, null); + } } else if (request instanceof DeleteRequest) { DeleteRequest deleteRequest = (DeleteRequest) request; deleteRequest.routing(clusterState.metaData().resolveIndexRouting(deleteRequest.routing(), deleteRequest.index())); @@ -173,8 +182,6 @@ private void executeBulk(final BulkRequest bulkRequest, final long startTime, fi updateRequest.index(clusterState.metaData().concreteIndex(updateRequest.index())); } } - final AtomicArray responses = new AtomicArray(bulkRequest.requests.size()); - // first, go over all the requests and create a ShardId -> Operations mapping Map> requestsByShard = Maps.newHashMap(); diff --git a/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/src/main/java/org/elasticsearch/action/index/IndexRequest.java index ad672a5a07fad..2e82b082c3a04 100644 --- a/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -580,7 +580,7 @@ public void process(MetaData metaData, String aliasOrIndex, @Nullable MappingMet timestamp = MappingMetaData.Timestamp.parseStringTimestamp(timestamp, mappingMd.timestamp().dateTimeFormatter()); } } catch (Exception e) { - throw new ElasticSearchParseException("failed to parse doc to extract routing/timestamp", e); + throw new ElasticSearchParseException("failed to parse doc to extract routing/timestamp/id", e); } finally { if (parser != null) { parser.close(); diff --git a/src/test/java/org/elasticsearch/document/BulkTests.java b/src/test/java/org/elasticsearch/document/BulkTests.java index b0f3408e18117..e506fd7e45de0 100644 --- a/src/test/java/org/elasticsearch/document/BulkTests.java +++ b/src/test/java/org/elasticsearch/document/BulkTests.java @@ -1,5 +1,7 @@ package org.elasticsearch.document; +import com.google.common.base.Charsets; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; @@ -11,6 +13,7 @@ import org.elasticsearch.common.Priority; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.junit.Test; @@ -452,4 +455,84 @@ public void run() { logger.info("failed to induce a version conflict."); } + @Test // issue 4745 + public void preParsingSourceDueToMappingShouldNotBreakCompleteBulkRequest() throws Exception { + XContentBuilder builder = jsonBuilder().startObject() + .startObject("type") + .startObject("_timestamp") + .field("enabled", true) + .field("path", "last_modified") + .endObject() + .endObject() + .endObject(); + CreateIndexResponse createIndexResponse = prepareCreate("test").addMapping("type", builder).get(); + assertAcked(createIndexResponse); + + String brokenBuildRequestData = "{\"index\": {\"_id\": \"1\"}}\n" + + "{\"name\": \"Malformed}\n" + + "{\"index\": {\"_id\": \"2\"}}\n" + + "{\"name\": \"Good\", \"last_modified\" : \"2013-04-05\"}\n"; + + BulkResponse bulkResponse = client().prepareBulk().add(brokenBuildRequestData.getBytes(Charsets.UTF_8), 0, brokenBuildRequestData.length(), false, "test", "type").setRefresh(true).get(); + assertThat(bulkResponse.getItems().length, is(2)); + assertThat(bulkResponse.getItems()[0].isFailed(), is(true)); + assertThat(bulkResponse.getItems()[1].isFailed(), is(false)); + + assertExists(get("test", "type", "2")); + } + + @Test // issue 4745 + public void preParsingSourceDueToRoutingShouldNotBreakCompleteBulkRequest() throws Exception { + XContentBuilder builder = jsonBuilder().startObject() + .startObject("type") + .startObject("_routing") + .field("required", true) + .field("path", "my_routing") + .endObject() + .endObject() + .endObject(); + CreateIndexResponse createIndexResponse = prepareCreate("test").addMapping("type", builder).get(); + assertAcked(createIndexResponse); + ensureYellow(); + + String brokenBuildRequestData = "{\"index\": {} }\n" + + "{\"name\": \"Malformed}\n" + + "{\"index\": { \"_id\" : \"24000\" } }\n" + + "{\"name\": \"Good\", \"my_routing\" : \"48000\"}\n"; + + BulkResponse bulkResponse = client().prepareBulk().add(brokenBuildRequestData.getBytes(Charsets.UTF_8), 0, brokenBuildRequestData.length(), false, "test", "type").setRefresh(true).get(); + assertThat(bulkResponse.getItems().length, is(2)); + assertThat(bulkResponse.getItems()[0].isFailed(), is(true)); + assertThat(bulkResponse.getItems()[1].isFailed(), is(false)); + + assertExists(client().prepareGet("test", "type", "24000").setRouting("48000").get()); + } + + + @Test // issue 4745 + public void preParsingSourceDueToIdShouldNotBreakCompleteBulkRequest() throws Exception { + XContentBuilder builder = jsonBuilder().startObject() + .startObject("type") + .startObject("_id") + .field("path", "my_id") + .endObject() + .endObject() + .endObject(); + CreateIndexResponse createIndexResponse = prepareCreate("test").addMapping("type", builder).get(); + assertAcked(createIndexResponse); + ensureYellow(); + + String brokenBuildRequestData = "{\"index\": {} }\n" + + "{\"name\": \"Malformed}\n" + + "{\"index\": {} }\n" + + "{\"name\": \"Good\", \"my_id\" : \"48\"}\n"; + + BulkResponse bulkResponse = client().prepareBulk().add(brokenBuildRequestData.getBytes(Charsets.UTF_8), 0, brokenBuildRequestData.length(), false, "test", "type").setRefresh(true).get(); + assertThat(bulkResponse.getItems().length, is(2)); + assertThat(bulkResponse.getItems()[0].isFailed(), is(true)); + assertThat(bulkResponse.getItems()[1].isFailed(), is(false)); + + assertExists(get("test", "type", "48")); + } + } diff --git a/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java b/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java index 0074bc23bb379..b8aee16e7f2fc 100644 --- a/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java +++ b/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java @@ -29,6 +29,9 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.count.CountResponse; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.percolate.PercolateResponse; +import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse; @@ -50,6 +53,7 @@ import java.lang.reflect.InvocationTargetException; import java.util.Arrays; import java.util.HashSet; +import java.util.Locale; import java.util.Set; import static org.hamcrest.MatcherAssert.assertThat; @@ -139,6 +143,11 @@ public static void assertHitCount(CountResponse countResponse, long expectedHitC assertVersionSerializable(countResponse); } + public static void assertExists(GetResponse response) { + String message = String.format(Locale.ROOT, "Expected %s/%s/%s to exist, but does not", response.getIndex(), response.getType(), response.getId()); + assertThat(message, response.isExists(), is(true)); + } + public static void assertFirstHit(SearchResponse searchResponse, Matcher matcher) { assertSearchHit(searchResponse, 1, matcher); }