From f80ea837ca96d01dbd9d33676ef6b6e7b635b06d Mon Sep 17 00:00:00 2001 From: Sam Xiao Date: Wed, 26 Nov 2025 15:50:19 -0500 Subject: [PATCH 1/2] Handle individual doc parsing failure gracefully in bulk request with pipeline (#138624) (cherry picked from commit 933354b9cf2e5391aebcf944c8eb3b38907b065d) --- docs/changelog/138624.yaml | 6 ++ .../elasticsearch/ingest/IngestClientIT.java | 86 +++++++++++++++++++ .../elasticsearch/ingest/IngestService.java | 15 +++- .../ingest/IngestServiceTests.java | 84 ++++++++++++++++++ 4 files changed, 190 insertions(+), 1 deletion(-) create mode 100644 docs/changelog/138624.yaml diff --git a/docs/changelog/138624.yaml b/docs/changelog/138624.yaml new file mode 100644 index 0000000000000..679a4ca8a4525 --- /dev/null +++ b/docs/changelog/138624.yaml @@ -0,0 +1,6 @@ +pr: 138624 +summary: Handle individual doc parsing failure in bulk request with pipeline +area: Ingest Node +type: bug +issues: + - 138445 diff --git a/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestClientIT.java b/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestClientIT.java index 7cb610ba26e3e..0b0e9f2dc309b 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestClientIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestClientIT.java @@ -426,6 +426,92 @@ public void testPipelineProcessorOnFailure() throws Exception { assertThat(inserted.get("readme"), equalTo("pipeline with id [3] is a bad pipeline")); } + public void testBulkRequestWithInvalidJsonAndPipeline() throws Exception { + // Test that when a document with invalid JSON is in a bulk request with a pipeline, + // the invalid document fails gracefully without causing the entire bulk request to fail. + // This tests the fix for https://github.com/elastic/elasticsearch/issues/138445 + + createIndex("test_index"); + + putJsonPipeline( + "test-pipeline", + (builder, params) -> builder.field("description", "test pipeline") + .startArray("processors") + .startObject() + .startObject("test") + .endObject() + .endObject() + .endArray() + ); + + // Create a bulk request with valid and invalid documents + BulkRequest bulkRequest = new BulkRequest(); + + // Valid document + IndexRequest validRequest = new IndexRequest("test_index").id("valid_doc"); + validRequest.source("{\"valid\":\"test\"}", XContentType.JSON); + validRequest.setPipeline("test-pipeline"); + bulkRequest.add(validRequest); + + // Invalid document with missing closing brace + IndexRequest invalidRequest = new IndexRequest("test_index").id("invalid_doc"); + invalidRequest.source("{\"invalid\":\"json\"", XContentType.JSON); + invalidRequest.setPipeline("test-pipeline"); + bulkRequest.add(invalidRequest); + + // Invalid document with duplicate fields + IndexRequest invalidRequest2 = new IndexRequest("test_index").id("invalid_doc2"); + invalidRequest2.source("{\"invalid\":\"json\", \"invalid\":\"json\"}", XContentType.JSON); + invalidRequest2.setPipeline("test-pipeline"); + bulkRequest.add(invalidRequest2); + + // Another valid document + IndexRequest validRequest2 = new IndexRequest("test_index").id("valid_doc2"); + validRequest2.source("{\"valid\":\"test2\"}", XContentType.JSON); + validRequest2.setPipeline("test-pipeline"); + bulkRequest.add(validRequest2); + + BulkResponse response = client().bulk(bulkRequest).actionGet(); + + // The bulk request should succeed + assertThat(response.hasFailures(), is(true)); + assertThat(response.getItems().length, equalTo(4)); + + // First document should succeed + BulkItemResponse item0 = response.getItems()[0]; + assertThat(item0.isFailed(), is(false)); + assertThat(item0.getResponse().getId(), equalTo("valid_doc")); + assertThat(item0.getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED)); + + // Second document should fail + BulkItemResponse item1 = response.getItems()[1]; + assertThat(item1.isFailed(), is(true)); + assertThat(item1.getFailure().getStatus(), equalTo(org.elasticsearch.rest.RestStatus.BAD_REQUEST)); + assertThat(item1.getFailure().getCause(), instanceOf(IllegalArgumentException.class)); + + // Third document should fail + BulkItemResponse item2 = response.getItems()[2]; + assertThat(item2.isFailed(), is(true)); + assertThat(item2.getFailure().getStatus(), equalTo(org.elasticsearch.rest.RestStatus.BAD_REQUEST)); + assertThat(item2.getFailure().getCause(), instanceOf(IllegalArgumentException.class)); + + // Fourth document should succeed + BulkItemResponse item3 = response.getItems()[3]; + assertThat(item3.isFailed(), is(false)); + assertThat(item3.getResponse().getId(), equalTo("valid_doc2")); + assertThat(item3.getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED)); + + // Verify that the valid documents were indexed + assertThat(client().prepareGet("test_index", "valid_doc").get().isExists(), is(true)); + assertThat(client().prepareGet("test_index", "valid_doc2").get().isExists(), is(true)); + // Verify that the invalid documents were not indexed + assertThat(client().prepareGet("test_index", "invalid_doc").get().isExists(), is(false)); + assertThat(client().prepareGet("test_index", "invalid_doc2").get().isExists(), is(false)); + + // cleanup + deletePipeline("test-pipeline"); + } + public static class ExtendedIngestTestPlugin extends IngestTestPlugin { @Override diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 03eef5f713854..5fcac6492da04 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -799,7 +799,20 @@ protected void doRun() { final XContentMeteringParserDecorator meteringParserDecorator = documentParsingProvider.newMeteringParserDecorator( indexRequest ); - final IngestDocument ingestDocument = newIngestDocument(indexRequest, meteringParserDecorator); + final IngestDocument ingestDocument; + try { + ingestDocument = newIngestDocument(indexRequest, meteringParserDecorator); + } catch (Exception e) { + // Document parsing failed (e.g. invalid JSON). Handle this gracefully + // by marking this document as failed and continuing with other documents. + final long ingestTimeInNanos = System.nanoTime() - startTimeInNanos; + totalMetrics.postIngest(ingestTimeInNanos); + totalMetrics.ingestFailed(); + ref.close(); + i++; + onFailure.accept(slot, e); + continue; + } final org.elasticsearch.script.Metadata originalDocumentMetadata = ingestDocument.getMetadata().clone(); // the document listener gives us three-way logic: a document can fail processing (1), or it can // be successfully processed. a successfully processed document can be kept (2) or dropped (3). diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 37dee399efde8..de6a22f799699 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -65,6 +65,7 @@ import org.elasticsearch.test.MockLog; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParseException; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xcontent.cbor.CborXContent; import org.junit.Before; @@ -1572,6 +1573,89 @@ public void testBulkRequestExecutionWithFailures() throws Exception { verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } + public void testBulkRequestExecutionWithInvalidJsonDocument() { + // Test that when a document with invalid JSON (e.g., duplicate keys) is in a bulk request with a pipeline, + // the invalid document fails gracefully without causing the entire bulk request to fail. + BulkRequest bulkRequest = new BulkRequest(); + String pipelineId = "_id"; + + // Valid document that should succeed + IndexRequest validRequest = new IndexRequest("_index").id("valid").setPipeline(pipelineId).setFinalPipeline("_none"); + validRequest.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1"); + validRequest.setListExecutedPipelines(true); + bulkRequest.add(validRequest); + + // Invalid document with missing closing brace + String invalidJson = "{\"invalid\":\"json\""; + IndexRequest invalidRequest = new IndexRequest("_index").id("invalid").setPipeline(pipelineId).setFinalPipeline("_none"); + invalidRequest.source(new BytesArray(invalidJson), XContentType.JSON); + bulkRequest.add(invalidRequest); + + // Another valid document that should succeed + IndexRequest validRequest2 = new IndexRequest("_index").id("valid2").setPipeline(pipelineId).setFinalPipeline("_none"); + validRequest2.source(Requests.INDEX_CONTENT_TYPE, "field2", "value2"); + validRequest2.setListExecutedPipelines(true); + bulkRequest.add(validRequest2); + + // Invalid document with duplicated keys + String invalidJson2 = "{\"@timestamp\":\"2024-06-01T00:00:00Z\",\"@timestamp\":\"2024-06-01T00:00:00Z\"}"; + IndexRequest invalidRequest2 = new IndexRequest("_index").id("invalid").setPipeline(pipelineId).setFinalPipeline("_none"); + invalidRequest2.source(new BytesArray(invalidJson2), XContentType.JSON); + bulkRequest.add(invalidRequest2); + + final Processor processor = mock(Processor.class); + when(processor.getType()).thenReturn("mock"); + when(processor.getTag()).thenReturn("mockTag"); + doAnswer(args -> { + BiConsumer handler = args.getArgument(1); + handler.accept(RandomDocumentPicks.randomIngestDocument(random()), null); + return null; + }).when(processor).execute(any(), any()); + + IngestService ingestService = createWithProcessors(Map.of("mock", (factories, tag, description, config) -> processor)); + PutPipelineRequest putRequest = new PutPipelineRequest( + "_id1", + new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), + XContentType.JSON + ); + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .build(); + ClusterState previousClusterState = clusterState; + clusterState = executePut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + + BiConsumer requestItemErrorHandler = mock(); + final BiConsumer onCompletion = mock(); + + ingestService.executeBulkRequest( + 4, + bulkRequest.requests(), + indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not redirect to failure store"), + requestItemErrorHandler, + onCompletion, + EsExecutors.DIRECT_EXECUTOR_SERVICE + ); + + // The invalid documents should fail with a parsing error + verify(requestItemErrorHandler).accept( + eq(1), // slot 1 is the invalid document + argThat(e -> e instanceof XContentParseException) + ); + verify(requestItemErrorHandler).accept( + eq(3), // slot 3 is the other invalid document + argThat(e -> e instanceof XContentParseException) + ); + + // The bulk listener should still be called with success + verify(onCompletion).accept(any(), eq(null)); + assertStats(ingestService.stats().totalStats(), 4, 2, 0); + // Verify that the valid documents were processed (they should have their pipelines executed) + assertThat(validRequest.getExecutedPipelines(), equalTo(List.of(pipelineId))); + assertThat(validRequest2.getExecutedPipelines(), equalTo(List.of(pipelineId))); + } + public void testExecuteFailureRedirection() throws Exception { final CompoundProcessor processor = mockCompoundProcessor(); IngestService ingestService = createWithProcessors( From d1ec7872cb112d8abfaf8dedeee6fc0687202e7d Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Thu, 27 Nov 2025 00:45:14 +0000 Subject: [PATCH 2/2] [CI] Auto commit changes from spotless --- .../test/java/org/elasticsearch/ingest/IngestServiceTests.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index de6a22f799699..33c05dbafa831 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -1618,8 +1618,7 @@ public void testBulkRequestExecutionWithInvalidJsonDocument() { new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON ); - ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) - .build(); + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).build(); ClusterState previousClusterState = clusterState; clusterState = executePut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));