Skip to content

Commit

Permalink
Bulk: Failed preparsing does not fail whole bulk request
Browse files Browse the repository at this point in the history
If a preparsing of the source is needed (due to mapping configuration,
which extracts the routing/id value from the source) and the source is not
valid JSON, then the whole bulk request is failed instead of a single
BulkRequest.

This commit ensures, that a broken JSON request is not forwarded to the
destination shard and creates an appropriate BulkItemResponse, which
includes a failure.

This also implied changing the BulkItemResponse serialization, because one
cannot be sure anymore, if a response includes an ID, in case it was not
specified and could not be extracted from the JSON.

Closes #4745
  • Loading branch information
spinscale committed Jan 27, 2014
1 parent 02d5e1c commit ae0aaa2
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 11 deletions.
Expand Up @@ -264,7 +264,7 @@ public void readFrom(StreamInput in) throws IOException {
if (in.readBoolean()) {
String fIndex = in.readSharedString();
String fType = in.readSharedString();
String fId = in.readString();
String fId = in.readOptionalString();
String fMessage = in.readString();
RestStatus status = RestStatus.readFrom(in);
failure = new Failure(fIndex, fType, fId, fMessage, status);
Expand Down Expand Up @@ -294,7 +294,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(true);
out.writeSharedString(failure.getIndex());
out.writeSharedString(failure.getType());
out.writeString(failure.getId());
out.writeOptionalString(failure.getId());
out.writeString(failure.getMessage());
RestStatus.writeTo(out, failure.getStatus());
}
Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
Expand Down Expand Up @@ -51,10 +52,7 @@
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;

import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -153,7 +151,10 @@ private void executeBulk(final BulkRequest bulkRequest, final long startTime, fi
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.WRITE);

MetaData metaData = clusterState.metaData();
for (ActionRequest request : bulkRequest.requests) {
final AtomicArray<BulkItemResponse> responses = new AtomicArray<BulkItemResponse>(bulkRequest.requests.size());

for (int i = 0; i < bulkRequest.requests.size(); i++) {
ActionRequest request = bulkRequest.requests.get(i);
if (request instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) request;
String aliasOrIndex = indexRequest.index();
Expand All @@ -163,7 +164,15 @@ private void executeBulk(final BulkRequest bulkRequest, final long startTime, fi
if (metaData.hasIndex(indexRequest.index())) {
mappingMd = metaData.index(indexRequest.index()).mappingOrDefault(indexRequest.type());
}
indexRequest.process(metaData, aliasOrIndex, mappingMd, allowIdGeneration);
try {
indexRequest.process(metaData, aliasOrIndex, mappingMd, allowIdGeneration);
} catch (ElasticsearchParseException e) {
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), e);
BulkItemResponse bulkItemResponse = new BulkItemResponse(i, "index", failure);
responses.set(i, bulkItemResponse);
// make sure the request gets never processed again
bulkRequest.requests.set(i, null);
}
} else if (request instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) request;
deleteRequest.routing(clusterState.metaData().resolveIndexRouting(deleteRequest.routing(), deleteRequest.index()));
Expand All @@ -174,8 +183,6 @@ private void executeBulk(final BulkRequest bulkRequest, final long startTime, fi
updateRequest.index(clusterState.metaData().concreteIndex(updateRequest.index()));
}
}
final AtomicArray<BulkItemResponse> responses = new AtomicArray<BulkItemResponse>(bulkRequest.requests.size());


// first, go over all the requests and create a ShardId -> Operations mapping
Map<ShardId, List<BulkItemRequest>> requestsByShard = Maps.newHashMap();
Expand Down
Expand Up @@ -566,7 +566,7 @@ public void process(MetaData metaData, String aliasOrIndex, @Nullable MappingMet
timestamp = MappingMetaData.Timestamp.parseStringTimestamp(timestamp, mappingMd.timestamp().dateTimeFormatter());
}
} catch (Exception e) {
throw new ElasticsearchParseException("failed to parse doc to extract routing/timestamp", e);
throw new ElasticsearchParseException("failed to parse doc to extract routing/timestamp/id", e);
} finally {
if (parser != null) {
parser.close();
Expand Down
83 changes: 83 additions & 0 deletions src/test/java/org/elasticsearch/document/BulkTests.java
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.document;

import com.google.common.base.Charsets;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.count.CountResponse;
Expand All @@ -30,6 +32,7 @@
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
Expand Down Expand Up @@ -507,4 +510,84 @@ public void run() {
assertThat(successes, equalTo(1));
}

@Test // issue 4745
public void preParsingSourceDueToMappingShouldNotBreakCompleteBulkRequest() throws Exception {
XContentBuilder builder = jsonBuilder().startObject()
.startObject("type")
.startObject("_timestamp")
.field("enabled", true)
.field("path", "last_modified")
.endObject()
.endObject()
.endObject();
CreateIndexResponse createIndexResponse = prepareCreate("test").addMapping("type", builder).get();
assertAcked(createIndexResponse);

String brokenBuildRequestData = "{\"index\": {\"_id\": \"1\"}}\n" +
"{\"name\": \"Malformed}\n" +
"{\"index\": {\"_id\": \"2\"}}\n" +
"{\"name\": \"Good\", \"last_modified\" : \"2013-04-05\"}\n";

BulkResponse bulkResponse = client().prepareBulk().add(brokenBuildRequestData.getBytes(Charsets.UTF_8), 0, brokenBuildRequestData.length(), false, "test", "type").setRefresh(true).get();
assertThat(bulkResponse.getItems().length, is(2));
assertThat(bulkResponse.getItems()[0].isFailed(), is(true));
assertThat(bulkResponse.getItems()[1].isFailed(), is(false));

assertExists(get("test", "type", "2"));
}

@Test // issue 4745
public void preParsingSourceDueToRoutingShouldNotBreakCompleteBulkRequest() throws Exception {
XContentBuilder builder = jsonBuilder().startObject()
.startObject("type")
.startObject("_routing")
.field("required", true)
.field("path", "my_routing")
.endObject()
.endObject()
.endObject();
CreateIndexResponse createIndexResponse = prepareCreate("test").addMapping("type", builder).get();
assertAcked(createIndexResponse);
ensureYellow("test");

String brokenBuildRequestData = "{\"index\": {} }\n" +
"{\"name\": \"Malformed}\n" +
"{\"index\": { \"_id\" : \"24000\" } }\n" +
"{\"name\": \"Good\", \"my_routing\" : \"48000\"}\n";

BulkResponse bulkResponse = client().prepareBulk().add(brokenBuildRequestData.getBytes(Charsets.UTF_8), 0, brokenBuildRequestData.length(), false, "test", "type").setRefresh(true).get();
assertThat(bulkResponse.getItems().length, is(2));
assertThat(bulkResponse.getItems()[0].isFailed(), is(true));
assertThat(bulkResponse.getItems()[1].isFailed(), is(false));

assertExists(client().prepareGet("test", "type", "24000").setRouting("48000").get());
}


@Test // issue 4745
public void preParsingSourceDueToIdShouldNotBreakCompleteBulkRequest() throws Exception {
XContentBuilder builder = jsonBuilder().startObject()
.startObject("type")
.startObject("_id")
.field("path", "my_id")
.endObject()
.endObject()
.endObject();
CreateIndexResponse createIndexResponse = prepareCreate("test").addMapping("type", builder).get();
assertAcked(createIndexResponse);
ensureYellow("test");

String brokenBuildRequestData = "{\"index\": {} }\n" +
"{\"name\": \"Malformed}\n" +
"{\"index\": {} }\n" +
"{\"name\": \"Good\", \"my_id\" : \"48\"}\n";

BulkResponse bulkResponse = client().prepareBulk().add(brokenBuildRequestData.getBytes(Charsets.UTF_8), 0, brokenBuildRequestData.length(), false, "test", "type").setRefresh(true).get();
assertThat(bulkResponse.getItems().length, is(2));
assertThat(bulkResponse.getItems()[0].isFailed(), is(true));
assertThat(bulkResponse.getItems()[1].isFailed(), is(false));

assertExists(get("test", "type", "48"));
}

}
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.percolate.PercolateResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
Expand All @@ -52,6 +53,7 @@
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Locale;
import java.util.Set;

import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -149,6 +151,11 @@ public static void assertMatchCount(PercolateResponse percolateResponse, long ex
assertVersionSerializable(percolateResponse);
}

public static void assertExists(GetResponse response) {
String message = String.format(Locale.ROOT, "Expected %s/%s/%s to exist, but does not", response.getIndex(), response.getType(), response.getId());
assertThat(message, response.isExists(), is(true));
}

public static void assertFirstHit(SearchResponse searchResponse, Matcher<SearchHit> matcher) {
assertSearchHit(searchResponse, 1, matcher);
}
Expand Down

0 comments on commit ae0aaa2

Please sign in to comment.