Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/138624.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 138624
summary: Handle individual doc parsing failure in bulk request with pipeline
area: Ingest Node
type: bug
issues:
- 138445
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,92 @@ public void testPipelineProcessorOnFailure() throws Exception {
assertThat(inserted.get("readme"), equalTo("pipeline with id [3] is a bad pipeline"));
}

public void testBulkRequestWithInvalidJsonAndPipeline() throws Exception {
// Test that when a document with invalid JSON is in a bulk request with a pipeline,
// the invalid document fails gracefully without causing the entire bulk request to fail.
// This tests the fix for https://github.com/elastic/elasticsearch/issues/138445

createIndex("test_index");

putJsonPipeline(
"test-pipeline",
(builder, params) -> builder.field("description", "test pipeline")
.startArray("processors")
.startObject()
.startObject("test")
.endObject()
.endObject()
.endArray()
);

// Create a bulk request with valid and invalid documents
BulkRequest bulkRequest = new BulkRequest();

// Valid document
IndexRequest validRequest = new IndexRequest("test_index").id("valid_doc");
validRequest.source("{\"valid\":\"test\"}", XContentType.JSON);
validRequest.setPipeline("test-pipeline");
bulkRequest.add(validRequest);

// Invalid document with missing closing brace
IndexRequest invalidRequest = new IndexRequest("test_index").id("invalid_doc");
invalidRequest.source("{\"invalid\":\"json\"", XContentType.JSON);
invalidRequest.setPipeline("test-pipeline");
bulkRequest.add(invalidRequest);

// Invalid document with duplicate fields
IndexRequest invalidRequest2 = new IndexRequest("test_index").id("invalid_doc2");
invalidRequest2.source("{\"invalid\":\"json\", \"invalid\":\"json\"}", XContentType.JSON);
invalidRequest2.setPipeline("test-pipeline");
bulkRequest.add(invalidRequest2);

// Another valid document
IndexRequest validRequest2 = new IndexRequest("test_index").id("valid_doc2");
validRequest2.source("{\"valid\":\"test2\"}", XContentType.JSON);
validRequest2.setPipeline("test-pipeline");
bulkRequest.add(validRequest2);

BulkResponse response = client().bulk(bulkRequest).actionGet();

// The bulk request should succeed
assertThat(response.hasFailures(), is(true));
assertThat(response.getItems().length, equalTo(4));

// First document should succeed
BulkItemResponse item0 = response.getItems()[0];
assertThat(item0.isFailed(), is(false));
assertThat(item0.getResponse().getId(), equalTo("valid_doc"));
assertThat(item0.getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));

// Second document should fail
BulkItemResponse item1 = response.getItems()[1];
assertThat(item1.isFailed(), is(true));
assertThat(item1.getFailure().getStatus(), equalTo(org.elasticsearch.rest.RestStatus.BAD_REQUEST));
assertThat(item1.getFailure().getCause(), instanceOf(IllegalArgumentException.class));

// Third document should fail
BulkItemResponse item2 = response.getItems()[2];
assertThat(item2.isFailed(), is(true));
assertThat(item2.getFailure().getStatus(), equalTo(org.elasticsearch.rest.RestStatus.BAD_REQUEST));
assertThat(item2.getFailure().getCause(), instanceOf(IllegalArgumentException.class));

// Fourth document should succeed
BulkItemResponse item3 = response.getItems()[3];
assertThat(item3.isFailed(), is(false));
assertThat(item3.getResponse().getId(), equalTo("valid_doc2"));
assertThat(item3.getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));

// Verify that the valid documents were indexed
assertThat(client().prepareGet("test_index", "valid_doc").get().isExists(), is(true));
assertThat(client().prepareGet("test_index", "valid_doc2").get().isExists(), is(true));
// Verify that the invalid documents were not indexed
assertThat(client().prepareGet("test_index", "invalid_doc").get().isExists(), is(false));
assertThat(client().prepareGet("test_index", "invalid_doc2").get().isExists(), is(false));

// cleanup
deletePipeline("test-pipeline");
}

public static class ExtendedIngestTestPlugin extends IngestTestPlugin {

@Override
Expand Down
15 changes: 14 additions & 1 deletion server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -799,7 +799,20 @@ protected void doRun() {
final XContentMeteringParserDecorator meteringParserDecorator = documentParsingProvider.newMeteringParserDecorator(
indexRequest
);
final IngestDocument ingestDocument = newIngestDocument(indexRequest, meteringParserDecorator);
final IngestDocument ingestDocument;
try {
ingestDocument = newIngestDocument(indexRequest, meteringParserDecorator);
} catch (Exception e) {
// Document parsing failed (e.g. invalid JSON). Handle this gracefully
// by marking this document as failed and continuing with other documents.
final long ingestTimeInNanos = System.nanoTime() - startTimeInNanos;
totalMetrics.postIngest(ingestTimeInNanos);
totalMetrics.ingestFailed();
ref.close();
i++;
onFailure.accept(slot, e);
continue;
}
final org.elasticsearch.script.Metadata originalDocumentMetadata = ingestDocument.getMetadata().clone();
// the document listener gives us three-way logic: a document can fail processing (1), or it can
// be successfully processed. a successfully processed document can be kept (2) or dropped (3).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.elasticsearch.test.MockLog;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParseException;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xcontent.cbor.CborXContent;
import org.junit.Before;
Expand Down Expand Up @@ -1572,6 +1573,88 @@ public void testBulkRequestExecutionWithFailures() throws Exception {
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
}

public void testBulkRequestExecutionWithInvalidJsonDocument() {
// Test that when a document with invalid JSON (e.g., duplicate keys) is in a bulk request with a pipeline,
// the invalid document fails gracefully without causing the entire bulk request to fail.
BulkRequest bulkRequest = new BulkRequest();
String pipelineId = "_id";

// Valid document that should succeed
IndexRequest validRequest = new IndexRequest("_index").id("valid").setPipeline(pipelineId).setFinalPipeline("_none");
validRequest.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1");
validRequest.setListExecutedPipelines(true);
bulkRequest.add(validRequest);

// Invalid document with missing closing brace
String invalidJson = "{\"invalid\":\"json\"";
IndexRequest invalidRequest = new IndexRequest("_index").id("invalid").setPipeline(pipelineId).setFinalPipeline("_none");
invalidRequest.source(new BytesArray(invalidJson), XContentType.JSON);
bulkRequest.add(invalidRequest);

// Another valid document that should succeed
IndexRequest validRequest2 = new IndexRequest("_index").id("valid2").setPipeline(pipelineId).setFinalPipeline("_none");
validRequest2.source(Requests.INDEX_CONTENT_TYPE, "field2", "value2");
validRequest2.setListExecutedPipelines(true);
bulkRequest.add(validRequest2);

// Invalid document with duplicated keys
String invalidJson2 = "{\"@timestamp\":\"2024-06-01T00:00:00Z\",\"@timestamp\":\"2024-06-01T00:00:00Z\"}";
IndexRequest invalidRequest2 = new IndexRequest("_index").id("invalid").setPipeline(pipelineId).setFinalPipeline("_none");
invalidRequest2.source(new BytesArray(invalidJson2), XContentType.JSON);
bulkRequest.add(invalidRequest2);

final Processor processor = mock(Processor.class);
when(processor.getType()).thenReturn("mock");
when(processor.getTag()).thenReturn("mockTag");
doAnswer(args -> {
BiConsumer<IngestDocument, Exception> handler = args.getArgument(1);
handler.accept(RandomDocumentPicks.randomIngestDocument(random()), null);
return null;
}).when(processor).execute(any(), any());

IngestService ingestService = createWithProcessors(Map.of("mock", (factories, tag, description, config) -> processor));
PutPipelineRequest putRequest = new PutPipelineRequest(
"_id1",
new BytesArray("{\"processors\": [{\"mock\" : {}}]}"),
XContentType.JSON
);
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).build();
ClusterState previousClusterState = clusterState;
clusterState = executePut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));

BiConsumer<Integer, Exception> requestItemErrorHandler = mock();
final BiConsumer<Thread, Exception> onCompletion = mock();

ingestService.executeBulkRequest(
4,
bulkRequest.requests(),
indexReq -> {},
(s) -> false,
(slot, targetIndex, e) -> fail("Should not redirect to failure store"),
requestItemErrorHandler,
onCompletion,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);

// The invalid documents should fail with a parsing error
verify(requestItemErrorHandler).accept(
eq(1), // slot 1 is the invalid document
argThat(e -> e instanceof XContentParseException)
);
verify(requestItemErrorHandler).accept(
eq(3), // slot 3 is the other invalid document
argThat(e -> e instanceof XContentParseException)
);

// The bulk listener should still be called with success
verify(onCompletion).accept(any(), eq(null));
assertStats(ingestService.stats().totalStats(), 4, 2, 0);
// Verify that the valid documents were processed (they should have their pipelines executed)
assertThat(validRequest.getExecutedPipelines(), equalTo(List.of(pipelineId)));
assertThat(validRequest2.getExecutedPipelines(), equalTo(List.of(pipelineId)));
}

public void testExecuteFailureRedirection() throws Exception {
final CompoundProcessor processor = mockCompoundProcessor();
IngestService ingestService = createWithProcessors(
Expand Down