From d87219e73e61bb6b6f06c2273e58f5976672aa43 Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Tue, 22 Nov 2016 20:07:31 -0500 Subject: [PATCH 1/2] NIFI-3087: Added unit tests to PutElasticsearch(Http) to illustrate issue --- .../elasticsearch/TestPutElasticsearch.java | 22 ++++++++++++++----- .../TestPutElasticsearchHttp.java | 16 ++++++++------ 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java index d7fb43934dc0..4e6a820d6222 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java @@ -112,15 +112,19 @@ public void testPutElasticSearchOnTriggerWithFailures() throws IOException { runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s"); runner.setProperty(PutElasticsearch.INDEX, "doc"); runner.setProperty(PutElasticsearch.TYPE, "status"); - runner.setProperty(PutElasticsearch.BATCH_SIZE, "1"); + runner.setProperty(PutElasticsearch.BATCH_SIZE, "2"); runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id"); runner.enqueue(docExample, new HashMap() {{ put("doc_id", "28039652140"); }}); + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652141"); + }}); runner.run(1, true, true); - runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_FAILURE, 1); + runner.assertTransferCount(PutElasticsearch.REL_FAILURE, 1); + runner.assertTransferCount(PutElasticsearch.REL_SUCCESS, 1); final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_FAILURE).get(0); assertNotNull(out); out.assertAttributeEquals("doc_id", "28039652140"); @@ -349,10 +353,16 @@ public void addListener(ActionListener actionListener) { public BulkResponse get() throws InterruptedException, ExecutionException { BulkResponse response = mock(BulkResponse.class); when(response.hasFailures()).thenReturn(responseHasFailures); - BulkItemResponse item = mock(BulkItemResponse.class); - when(item.getItemId()).thenReturn(1); - when(item.isFailed()).thenReturn(true); - when(response.getItems()).thenReturn(new BulkItemResponse[]{item}); + BulkItemResponse item1 = mock(BulkItemResponse.class); + BulkItemResponse item2 = mock(BulkItemResponse.class); + when(item1.getItemId()).thenReturn(1); + when(item1.isFailed()).thenReturn(true); + BulkItemResponse.Failure failure = mock(BulkItemResponse.Failure.class); + when(failure.getMessage()).thenReturn("Bad message"); + when(item1.getFailure()).thenReturn(failure); + when(item2.getItemId()).thenReturn(2); + when(item2.isFailed()).thenReturn(false); + when(response.getItems()).thenReturn(new BulkItemResponse[]{item1, item2}); return response; } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java index 1172004bb7ff..9ce578f61538 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java @@ -201,13 +201,15 @@ public void testPutElasticsearchOnTriggerWithNoIdAttribute() throws IOException runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); runner.setProperty(PutElasticsearchHttp.INDEX, "doc"); runner.setProperty(PutElasticsearchHttp.TYPE, "status"); - runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1"); + runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "2"); runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id"); + runner.enqueue(docExample); runner.enqueue(docExample); runner.run(1, true, true); - runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_FAILURE, 1); + runner.assertTransferCount(PutElasticsearchHttp.REL_FAILURE, 1); + runner.assertTransferCount(PutElasticsearchHttp.REL_SUCCESS, 1); final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_FAILURE).get(0); assertNotNull(out); } @@ -308,12 +310,12 @@ public Call answer(InvocationOnMock invocationOnMock) throws Throwable { sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":\"400\","); sb.append("\"error\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed to parse [gender]\","); sb.append("\"caused_by\":{\"type\":\"json_parse_exception\",\"reason\":\"Unexpected end-of-input in VALUE_STRING\\n at "); - sb.append("[Source: org.elasticsearch.common.io.stream.InputStreamStreamInput@1a2e3ac4; line: 1, column: 39]\"}}}}"); - } else { - sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":"); - sb.append(statusCode); - sb.append(",\"_source\":{\"text\": \"This is a test document\"}}}"); + sb.append("[Source: org.elasticsearch.common.io.stream.InputStreamStreamInput@1a2e3ac4; line: 1, column: 39]\"}}}},"); } + sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":"); + sb.append(statusCode); + sb.append(",\"_source\":{\"text\": \"This is a test document\"}}}"); + sb.append("]}"); Response mockResponse = new Response.Builder() .request(realRequest) From 3f4e389cb82d69d39bb6bfd2d4c9ebb4584aacd1 Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Tue, 22 Nov 2016 20:08:17 -0500 Subject: [PATCH 2/2] NIFI-3087: Fixed issue with partial failure responses in PutElasticsearch(Http) --- .../elasticsearch/PutElasticsearch.java | 24 +++++++++++-------- .../elasticsearch/PutElasticsearchHttp.java | 2 +- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java index f64180b20cac..216efd4a771f 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java @@ -211,17 +211,21 @@ public void process(final InputStream in) throws IOException { final BulkResponse response = bulk.execute().actionGet(); if (response.hasFailures()) { - for (final BulkItemResponse item : response.getItems()) { - final FlowFile flowFile = flowFilesToTransfer.get(item.getItemId()); - if (item.isFailed()) { - logger.error("Failed to insert {} into Elasticsearch due to {}, transferring to failure", - new Object[]{flowFile, item.getFailure().getMessage()}); - session.transfer(flowFile, REL_FAILURE); - - } else { - session.transfer(flowFile, REL_SUCCESS); + // Responses are guaranteed to be in order, remove them in reverse order + BulkItemResponse[] responses = response.getItems(); + if (responses != null && responses.length > 0) { + for (int i = responses.length - 1; i >= 0; i--) { + final FlowFile flowFile = flowFilesToTransfer.get(i); + if (responses[i].isFailed()) { + logger.error("Failed to insert {} into Elasticsearch due to {}, transferring to failure", + new Object[]{flowFile, responses[i].getFailure().getMessage()}); + session.transfer(flowFile, REL_FAILURE); + + } else { + session.transfer(flowFile, REL_SUCCESS); + } + flowFilesToTransfer.remove(flowFile); } - flowFilesToTransfer.remove(flowFile); } } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java index 71171006c0b6..3ba46bb362b5 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java @@ -328,7 +328,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session if (itemNodeArray.size() > 0) { // All items are returned whether they succeeded or failed, so iterate through the item array // at the same time as the flow file list, moving each to success or failure accordingly - for (int i = 0; i < itemNodeArray.size(); i++) { + for (int i = itemNodeArray.size() - 1; i >= 0; i--) { JsonNode itemNode = itemNodeArray.get(i); FlowFile flowFile = flowFilesToTransfer.remove(i); int status = itemNode.findPath("status").asInt();