From bebdb28ff59363402b05e258f8694690a2974a9f Mon Sep 17 00:00:00 2001
From: Mike Thomsen
Date: Sun, 8 Jul 2018 14:51:12 -0400
Subject: [PATCH 1/9] NIFI-5248 Added new Elasticsearch processors. NIFI-5248
Fixed a few stray 1.7.0 references. NIFI-5248 Removed build helper plugin.
NIFI-5248 Made changes requested in a review. NIFI-5248 Updated dummy
service. NIFI-5248 Made a few changes from a code review. NIFI-5248 Added
logic for removing nulls so record paths can be removed when no longer
needed. NIFI-5248 Switched from variable registry to flowfile level EL.
NIFI-5248 Added JsonPath code to remove index, id and type path statements.
NIFI-5248 Updated validation. NIFI-5248 Set the field to null instead of
empty string when nulling records. NIFI-5248 Fixed
TestElasticSearchClientService. NIFI-5248 Removed high level client and
switched over to low level client for everything. NIFI-5248 Added profiles
for ES 6 and ES 7 integration testing. NIFI-5248 Updated integration tests to
support 5 and 6. NIFI-5248 Fixed some style check breaks. NIFI-5248 Added
create operation type. NIFI-5248 Updated documentation. NIFI-5248 Added error
handling to PutElasticsearchRecord. NIFI-5248 Added error logging to
PutElasticsearchJson. NIFI-5248 Added split failed records option to
PutElasticsearchJson. NIFI-5248 Added documentation for
PutElasticsearchRecord. NIFI-5248 Updated import to not use * import.
NIFI-5248 Removed processor that is no longer relevant due to schema
inference. NIFI-5248 Renamed ElasticSearch instances to Elasticsearch where
we can within API guidelines.
---
.../pom.xml | 6 +
.../ElasticSearchClientService.java | 38 +-
.../elasticsearch/ElasticsearchError.java | 45 ++
.../elasticsearch/IndexOperationRequest.java | 31 +-
.../elasticsearch/IndexOperationResponse.java | 34 +-
.../nifi-elasticsearch-client-service/pom.xml | 43 +-
.../ElasticSearchClientServiceImpl.java | 214 +++++++---
.../elasticsearch/SearchResponseTest.groovy | 42 ++
.../ElasticSearch5ClientService_IT.groovy | 104 ++++-
.../ElasticSearchLookupService_IT.groovy | 10 +-
.../TestElasticSearchClientService.groovy | 19 +-
.../src/test/resources/setup-5.script | 45 ++
.../src/test/resources/setup-6.script | 46 ++
.../src/test/resources/setup-7.script | 45 ++
.../src/test/resources/setup.script | 41 --
.../pom.xml | 39 ++
.../DeleteByQueryElasticsearch.java | 6 +-
...r.java => ElasticsearchRestProcessor.java} | 37 +-
.../elasticsearch/JsonQueryElasticsearch.java | 9 +-
.../elasticsearch/PutElasticsearchRecord.java | 404 ++++++++++++++++++
.../put/FlowFileJsonDescription.java | 69 +++
.../put/JsonProcessingError.java | 24 ++
.../additionalDetails.html | 48 +++
.../additionalDetails.html | 64 +++
.../org.apache.nifi.processor.Processor | 1 +
.../additionalDetails.html | 2 +-
.../additionalDetails.html | 4 +-
.../additionalDetails.html | 41 ++
.../DeleteByQueryElasticsearchTest.groovy | 143 +++++++
.../JsonQueryElasticsearchTest.groovy} | 266 ++++++------
.../PutElasticsearchRecordTest.groovy | 272 ++++++++++++
.../TestElasticsearchClientService.groovy} | 103 +++--
.../AbstractMockElasticsearchClient.groovy | 71 +++
.../mock/MockBulkLoadClientService.groovy | 118 +++++
.../mock/MockElasticsearchError.groovy | 31 ++
.../src/test/java/.gitignore | 1 +
.../DeleteByQueryElasticsearchTest.java | 134 ------
37 files changed, 2191 insertions(+), 459 deletions(-)
create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticsearchError.java
create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/SearchResponseTest.groovy
create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-5.script
create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-6.script
create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-7.script
delete mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup.script
rename nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/{ElasticSearchRestProcessor.java => ElasticsearchRestProcessor.java} (67%)
create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/put/FlowFileJsonDescription.java
create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/put/JsonProcessingError.java
create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchJson/additionalDetails.html
create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html
create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html
create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearchTest.groovy
rename nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/{java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearchTest.java => groovy/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearchTest.groovy} (50%)
create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
rename nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/{java/org/apache/nifi/processors/elasticsearch/TestElasticSearchClientService.java => groovy/org/apache/nifi/processors/elasticsearch/TestElasticsearchClientService.groovy} (70%)
create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/AbstractMockElasticsearchClient.groovy
create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.groovy
create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockElasticsearchError.groovy
create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/.gitignore
delete mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearchTest.java
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/pom.xml
index a39c6cd502db..29b1590cd4da 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/pom.xml
@@ -43,5 +43,11 @@
1.11.0-SNAPSHOTprovided
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ 2.9.8
+ compile
+
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
index 57f359d08b84..d35c7846b4bd 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
@@ -30,12 +30,12 @@
import java.util.Map;
@Tags({"elasticsearch", "client"})
-@CapabilityDescription("A controller service for accessing an ElasticSearch client.")
+@CapabilityDescription("A controller service for accessing an Elasticsearch client.")
public interface ElasticSearchClientService extends ControllerService {
PropertyDescriptor HTTP_HOSTS = new PropertyDescriptor.Builder()
.name("el-cs-http-hosts")
.displayName("HTTP Hosts")
- .description("A comma-separated list of HTTP hosts that host ElasticSearch query nodes.")
+ .description("A comma-separated list of HTTP hosts that host Elasticsearch query nodes.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@@ -93,7 +93,7 @@ public interface ElasticSearchClientService extends ControllerService {
PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("el-cs-charset")
.displayName("Charset")
- .description("The charset to use for interpreting the response from ElasticSearch.")
+ .description("The charset to use for interpreting the response from Elasticsearch.")
.required(true)
.defaultValue("UTF-8")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
@@ -106,16 +106,26 @@ public interface ElasticSearchClientService extends ControllerService {
* @return IndexOperationResponse if successful
* @throws IOException thrown when there is an error.
*/
- IndexOperationResponse add(IndexOperationRequest operation) throws IOException;
+ IndexOperationResponse add(IndexOperationRequest operation);
/**
- * Index multiple documents.
+ * Bulk process multiple documents.
*
- * @param operations A list of documents to index.
+ * @param operations A list of index operations.
* @return IndexOperationResponse if successful.
* @throws IOException thrown when there is an error.
*/
- IndexOperationResponse add(List operations) throws IOException;
+ IndexOperationResponse bulk(List operations);
+
+ /**
+ * Count the documents that match the criteria.
+ *
+ * @param query A query in the JSON DSL syntax
+ * @param index The index to target.
+ * @param type The type to target.
+ * @return
+ */
+ Long count(String query, String index, String type);
/**
* Delete a document by its ID from an index.
@@ -125,7 +135,7 @@ public interface ElasticSearchClientService extends ControllerService {
* @param id The document ID to remove from the selected index.
* @return A DeleteOperationResponse object if successful.
*/
- DeleteOperationResponse deleteById(String index, String type, String id) throws IOException;
+ DeleteOperationResponse deleteById(String index, String type, String id);
/**
@@ -136,7 +146,7 @@ public interface ElasticSearchClientService extends ControllerService {
* @return A DeleteOperationResponse object if successful.
* @throws IOException thrown when there is an error.
*/
- DeleteOperationResponse deleteById(String index, String type, List ids) throws IOException;
+ DeleteOperationResponse deleteById(String index, String type, List ids);
/**
* Delete documents by query.
@@ -146,7 +156,7 @@ public interface ElasticSearchClientService extends ControllerService {
* @param type The type to target within the index. Optional.
* @return A DeleteOperationResponse object if successful.
*/
- DeleteOperationResponse deleteByQuery(String query, String index, String type) throws IOException;
+ DeleteOperationResponse deleteByQuery(String query, String index, String type);
/**
* Get a document by ID.
@@ -157,23 +167,23 @@ public interface ElasticSearchClientService extends ControllerService {
* @return Map if successful, null if not found.
* @throws IOException thrown when there is an error.
*/
- Map get(String index, String type, String id) throws IOException;
+ Map get(String index, String type, String id);
/**
* Perform a search using the JSON DSL.
*
* @param query A JSON string reprensenting the query.
* @param index The index to target. Optional.
- * @param type The type to target. Optional. Will not be used in future versions of ElasticSearch.
+ * @param type The type to target. Optional. Will not be used in future versions of Elasticsearch.
* @return A SearchResponse object if successful.
*/
- SearchResponse search(String query, String index, String type) throws IOException;
+ SearchResponse search(String query, String index, String type);
/**
* Build a transit URL to use with the provenance reporter.
* @param index Index targeted. Optional.
* @param type Type targeted. Optional
- * @return a URL describing the ElasticSearch cluster.
+ * @return a URL describing the Elasticsearch cluster.
*/
String getTransitUrl(String index, String type);
}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticsearchError.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticsearchError.java
new file mode 100644
index 000000000000..c383c8d41d0d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticsearchError.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.elasticsearch;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class ElasticsearchError extends RuntimeException {
+ /**
+ * These are names of common Elasticsearch exceptions where it is safe to assume
+ * that it's OK to retry the operation instead of just sending it to an error relationship.
+ */
+ public static final Set ELASTIC_ERROR_NAMES = new HashSet(){{
+ add("NoNodeAvailableException");
+ add("ElasticsearchTimeoutException");
+ add("ReceiveTimeoutTransportException");
+ add("NodeClosedException");
+ }};
+
+ protected boolean isElastic;
+
+ public ElasticsearchError(Exception ex) {
+ super(ex);
+ isElastic = ELASTIC_ERROR_NAMES.contains(ex.getClass().getSimpleName());
+ }
+
+ public boolean isElastic() {
+ return isElastic;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
index 9281adba299a..23aff57ae726 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
@@ -19,17 +19,23 @@
import java.util.Map;
+/**
+ * A POJO that represents an "operation on an index." It should be confused with just indexing documents, as it
+ * covers all CRUD-related operations that can be executed against an Elasticsearch index with documents.
+ */
public class IndexOperationRequest {
private String index;
private String type;
private String id;
private Map fields;
+ private Operation operation;
- public IndexOperationRequest(String index, String type, String id, Map fields) {
+ public IndexOperationRequest(String index, String type, String id, Map fields, Operation operation) {
this.index = index;
this.type = type;
this.id = id;
this.fields = fields;
+ this.operation = operation;
}
public String getIndex() {
@@ -47,4 +53,25 @@ public String getId() {
public Map getFields() {
return fields;
}
-}
+
+ public Operation getOperation() {
+ return operation;
+ }
+
+ public enum Operation {
+ Create("create"),
+ Delete("delete"),
+ Index("index"),
+ Update("update"),
+ Upsert("upsert");
+ String value;
+
+ Operation(String value) {
+ this.value = value;
+ }
+
+ public String getValue() {
+ return value;
+ }
+ }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java
index a22b7aad0590..c001f69068f9 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java
@@ -17,20 +17,44 @@
package org.apache.nifi.elasticsearch;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
public class IndexOperationResponse {
private long took;
- private long ingestTook;
+ private boolean hasErrors;
+ private List
This processor is intended for use with the ElasticSearch JSON DSL and ElasticSearch 5.X and newer. It is designed
- to be able to take a query from Kibana and execute it as-is against an ElasticSearch cluster. Like all processors in the
+
This processor is intended for use with the Elasticsearch JSON DSL and Elasticsearch 5.X and newer. It is designed
+ to be able to take a query from Kibana and execute it as-is against an Elasticsearch cluster. Like all processors in the
"restapi" bundle, it uses the official Elastic client APIs, so it supports leader detection.
The query to execute can be provided either in the Query configuration property or in an attribute on a flowfile. In
the latter case, the name of the attribute (Expression Language is supported here) must be provided in the Query Attribute
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html
new file mode 100644
index 000000000000..0ac19b2aab11
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html
@@ -0,0 +1,41 @@
+
+
+
+
+
+ PutElasticsearchRecord
+
+
+
+
+ This processor is for accessing the Elasticsearch Bulk API. It provides the ability to configure bulk operations on
+ a per-record basis which is what separates it from PutElasticsearchHttpRecord. For example, it is possible to define
+ multiple commands to index documents, followed by deletes, creates and update operations against the same index or
+ other indices as desired.
+
+
+ As part of the Elasticsearch REST API bundle, it uses a controller service to manage connection information and
+ that controller service is built on top of the official Elasticsearch client APIs. That provides features such as
+ automatic master detection against the cluster which is missing in the other bundles.
+
+
+ This processor builds one Elasticsearch Bulk API body per record set. Care should be taken to split up record sets
+ into appropriately-sized chunks so that NiFi does not run out of memory and the requests sent to Elasticsearch are
+ not too large for it to handle. When failures do occur, this processor is capable of attempting to write the records
+ that failed to an output record writer so that only failed records can be processed downstream or replayed.
+
+
+
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearchTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearchTest.groovy
new file mode 100644
index 000000000000..8e913ff70364
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearchTest.groovy
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch
+
+import org.apache.nifi.util.MockFlowFile
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.junit.Assert
+import org.junit.Test
+
+class DeleteByQueryElasticsearchTest {
+ private static final String INDEX = "test_idx"
+ private static final String TYPE = "test_type"
+ private static final String QUERY_ATTR = "es.delete.query"
+ private static final String CLIENT_NAME = "clientService"
+
+ private TestElasticsearchClientService client
+
+ private void initClient(TestRunner runner) throws Exception {
+ client = new TestElasticsearchClientService(true)
+ runner.addControllerService(CLIENT_NAME, client)
+ runner.enableControllerService(client)
+ runner.setProperty(DeleteByQueryElasticsearch.CLIENT_SERVICE, CLIENT_NAME)
+ }
+
+ private void postTest(TestRunner runner, String queryParam) {
+ runner.assertTransferCount(DeleteByQueryElasticsearch.REL_FAILURE, 0)
+ runner.assertTransferCount(DeleteByQueryElasticsearch.REL_SUCCESS, 1)
+
+ List flowFiles = runner.getFlowFilesForRelationship(DeleteByQueryElasticsearch.REL_SUCCESS)
+ String attr = flowFiles.get(0).getAttribute(DeleteByQueryElasticsearch.TOOK_ATTRIBUTE)
+ String query = flowFiles.get(0).getAttribute(QUERY_ATTR)
+ Assert.assertNotNull(attr)
+ Assert.assertEquals(attr, "100")
+ Assert.assertNotNull(query)
+ Assert.assertEquals(queryParam, query)
+ }
+
+ @Test
+ void testWithFlowfileInput() throws Exception {
+ String query = "{ \"query\": { \"match_all\": {} }}"
+ TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class)
+ runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX)
+ runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE)
+ runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR)
+ initClient(runner)
+ runner.assertValid()
+ runner.enqueue(query)
+ runner.run()
+
+ postTest(runner, query)
+ }
+
+ @Test
+ void testWithQuery() throws Exception {
+ String query = "{\n" +
+ "\t\"query\": {\n" +
+ "\t\t\"match\": {\n" +
+ "\t\t\t\"\${field.name}.keyword\": \"test\"\n" +
+ "\t\t}\n" +
+ "\t}\n" +
+ "}"
+ Map attrs = new HashMap(){{
+ put("field.name", "test_field")
+ }}
+ TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class)
+ runner.setProperty(DeleteByQueryElasticsearch.QUERY, query)
+ runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX)
+ runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE)
+ runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR)
+ initClient(runner)
+ runner.assertValid()
+ runner.enqueue("", attrs)
+ runner.run()
+
+ postTest(runner, query.replace('${field.name}', "test_field"))
+
+ runner.clearTransferState()
+
+ query = "{\n" +
+ "\t\"query\": {\n" +
+ "\t\t\"match\": {\n" +
+ "\t\t\t\"test_field.keyword\": \"test\"\n" +
+ "\t\t}\n" +
+ "\t}\n" +
+ "}"
+ runner.setProperty(DeleteByQueryElasticsearch.QUERY, query)
+ runner.setIncomingConnection(false)
+ runner.assertValid()
+ runner.run()
+ postTest(runner, query)
+ }
+
+ @Test
+ void testErrorAttribute() throws Exception {
+ String query = "{ \"query\": { \"match_all\": {} }}"
+ TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class)
+ runner.setProperty(DeleteByQueryElasticsearch.QUERY, query)
+ runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX)
+ runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE)
+ runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR)
+ initClient(runner)
+ client.setThrowErrorInDelete(true)
+ runner.assertValid()
+ runner.enqueue("")
+ runner.run()
+
+ runner.assertTransferCount(DeleteByQueryElasticsearch.REL_SUCCESS, 0)
+ runner.assertTransferCount(DeleteByQueryElasticsearch.REL_FAILURE, 1)
+
+ MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(DeleteByQueryElasticsearch.REL_FAILURE).get(0)
+ String attr = mockFlowFile.getAttribute(DeleteByQueryElasticsearch.ERROR_ATTRIBUTE)
+ Assert.assertNotNull(attr)
+ }
+
+ @Test
+ void testInputHandling() {
+ String query = "{ \"query\": { \"match_all\": {} }}"
+ TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class)
+ runner.setProperty(DeleteByQueryElasticsearch.QUERY, query)
+ runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX)
+ runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE)
+ runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR)
+ initClient(runner)
+ runner.assertValid()
+ runner.run()
+ }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearchTest.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearchTest.groovy
similarity index 50%
rename from nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearchTest.java
rename to nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearchTest.groovy
index cd7ac196e1d0..33a26de59a5f 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearchTest.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearchTest.groovy
@@ -3,7 +3,7 @@
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
+ * (the "License") you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
@@ -15,54 +15,54 @@
* limitations under the License.
*/
-package org.apache.nifi.processors.elasticsearch;
+package org.apache.nifi.processors.elasticsearch
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Assert;
-import org.junit.Test;
+import org.apache.nifi.util.MockFlowFile
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.junit.Assert
+import org.junit.Test
-import java.util.List;
+import java.util.List
-public class JsonQueryElasticsearchTest {
- private static final String INDEX_NAME = "messages";
+class JsonQueryElasticsearchTest {
+ private static final String INDEX_NAME = "messages"
- public void testCounts(TestRunner runner, int success, int hits, int failure, int aggregations) {
- runner.assertTransferCount(JsonQueryElasticsearch.REL_ORIGINAL, success);
- runner.assertTransferCount(JsonQueryElasticsearch.REL_HITS, hits);
- runner.assertTransferCount(JsonQueryElasticsearch.REL_FAILURE, failure);
- runner.assertTransferCount(JsonQueryElasticsearch.REL_AGGREGATIONS, aggregations);
+ void testCounts(TestRunner runner, int success, int hits, int failure, int aggregations) {
+ runner.assertTransferCount(JsonQueryElasticsearch.REL_ORIGINAL, success)
+ runner.assertTransferCount(JsonQueryElasticsearch.REL_HITS, hits)
+ runner.assertTransferCount(JsonQueryElasticsearch.REL_FAILURE, failure)
+ runner.assertTransferCount(JsonQueryElasticsearch.REL_AGGREGATIONS, aggregations)
}
@Test
- public void testBasicQuery() throws Exception {
-
- JsonQueryElasticsearch processor = new JsonQueryElasticsearch();
- TestRunner runner = TestRunners.newTestRunner(processor);
- TestElasticSearchClientService service = new TestElasticSearchClientService(false);
- runner.addControllerService("esService", service);
- runner.enableControllerService(service);
- runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService");
- runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME);
- runner.setProperty(JsonQueryElasticsearch.TYPE, "message");
- runner.setValidateExpressionUsage(true);
- runner.setProperty(JsonQueryElasticsearch.QUERY, "{ \"query\": { \"match_all\": {} }}");
-
- runner.enqueue("test");
- runner.run(1, true, true);
- testCounts(runner, 1, 1, 0, 0);
-
- runner.setProperty(JsonQueryElasticsearch.SPLIT_UP_HITS, JsonQueryElasticsearch.SPLIT_UP_YES);
- runner.clearProvenanceEvents();
- runner.clearTransferState();
- runner.enqueue("test");
- runner.run(1, true, true);
- testCounts(runner, 1, 10, 0, 0);
+ void testBasicQuery() throws Exception {
+
+ JsonQueryElasticsearch processor = new JsonQueryElasticsearch()
+ TestRunner runner = TestRunners.newTestRunner(processor)
+ TestElasticsearchClientService service = new TestElasticsearchClientService(false)
+ runner.addControllerService("esService", service)
+ runner.enableControllerService(service)
+ runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService")
+ runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME)
+ runner.setProperty(JsonQueryElasticsearch.TYPE, "message")
+ runner.setValidateExpressionUsage(true)
+ runner.setProperty(JsonQueryElasticsearch.QUERY, "{ \"query\": { \"match_all\": {} }}")
+
+ runner.enqueue("test")
+ runner.run(1, true, true)
+ testCounts(runner, 1, 1, 0, 0)
+
+ runner.setProperty(JsonQueryElasticsearch.SPLIT_UP_HITS, JsonQueryElasticsearch.SPLIT_UP_YES)
+ runner.clearProvenanceEvents()
+ runner.clearTransferState()
+ runner.enqueue("test")
+ runner.run(1, true, true)
+ testCounts(runner, 1, 10, 0, 0)
}
@Test
- public void testAggregations() throws Exception {
+ void testAggregations() throws Exception {
String query = "{\n" +
"\t\"query\": {\n" +
"\t\t\"match_all\": {}\n" +
@@ -79,42 +79,42 @@ public void testAggregations() throws Exception {
"\t\t\t}\n" +
"\t\t}\n" +
"\t}\n" +
- "}";
+ "}"
- JsonQueryElasticsearch processor = new JsonQueryElasticsearch();
- TestRunner runner = TestRunners.newTestRunner(processor);
- TestElasticSearchClientService service = new TestElasticSearchClientService(true);
- runner.addControllerService("esService", service);
- runner.enableControllerService(service);
- runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService");
- runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME);
- runner.setProperty(JsonQueryElasticsearch.TYPE, "message");
- runner.setValidateExpressionUsage(true);
- runner.setProperty(JsonQueryElasticsearch.QUERY, query);
+ JsonQueryElasticsearch processor = new JsonQueryElasticsearch()
+ TestRunner runner = TestRunners.newTestRunner(processor)
+ TestElasticsearchClientService service = new TestElasticsearchClientService(true)
+ runner.addControllerService("esService", service)
+ runner.enableControllerService(service)
+ runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService")
+ runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME)
+ runner.setProperty(JsonQueryElasticsearch.TYPE, "message")
+ runner.setValidateExpressionUsage(true)
+ runner.setProperty(JsonQueryElasticsearch.QUERY, query)
- runner.enqueue("test");
- runner.run(1, true, true);
- testCounts(runner, 1, 1, 0, 1);
+ runner.enqueue("test")
+ runner.run(1, true, true)
+ testCounts(runner, 1, 1, 0, 1)
- runner.clearTransferState();
+ runner.clearTransferState()
//Test with the query parameter and no incoming connection
- runner.setIncomingConnection(false);
- runner.run(1, true, true);
- testCounts(runner, 0, 1, 0, 1);
- runner.setIncomingConnection(true);
+ runner.setIncomingConnection(false)
+ runner.run(1, true, true)
+ testCounts(runner, 0, 1, 0, 1)
+ runner.setIncomingConnection(true)
- runner.clearTransferState();
- runner.clearProvenanceEvents();
- runner.setProperty(JsonQueryElasticsearch.SPLIT_UP_AGGREGATIONS, JsonQueryElasticsearch.SPLIT_UP_YES);
- runner.enqueue("test");
- runner.run(1, true, true);
- testCounts(runner, 1, 1, 0, 2);
+ runner.clearTransferState()
+ runner.clearProvenanceEvents()
+ runner.setProperty(JsonQueryElasticsearch.SPLIT_UP_AGGREGATIONS, JsonQueryElasticsearch.SPLIT_UP_YES)
+ runner.enqueue("test")
+ runner.run(1, true, true)
+ testCounts(runner, 1, 1, 0, 2)
- runner.clearProvenanceEvents();
- runner.clearTransferState();
+ runner.clearProvenanceEvents()
+ runner.clearTransferState()
query = "{\n" +
"\t\"query\": {\n" +
@@ -123,30 +123,30 @@ public void testAggregations() throws Exception {
"\t\"aggs\": {\n" +
"\t\t\"test_agg\": {\n" +
"\t\t\t\"terms\": {\n" +
- "\t\t\t\t\"field\": \"${fieldValue}\"\n" +
+ "\t\t\t\t\"field\": \"\${fieldValue}\"\n" +
"\t\t\t}\n" +
"\t\t},\n" +
"\t\t\"test_agg2\": {\n" +
"\t\t\t\"terms\": {\n" +
- "\t\t\t\t\"field\": \"${fieldValue}\"\n" +
+ "\t\t\t\t\"field\": \"\${fieldValue}\"\n" +
"\t\t\t}\n" +
"\t\t}\n" +
"\t}\n" +
- "}";
- runner.setVariable("fieldValue", "msg");
- runner.setVariable("es.index", INDEX_NAME);
- runner.setVariable("es.type", "msg");
- runner.setProperty(JsonQueryElasticsearch.QUERY, query);
- runner.setProperty(JsonQueryElasticsearch.INDEX, "${es.index}");
- runner.setProperty(JsonQueryElasticsearch.TYPE, "${es.type}");
- runner.setValidateExpressionUsage(true);
- runner.enqueue("test");
- runner.run(1, true, true);
- testCounts(runner, 1, 1, 0, 2);
+ "}"
+ runner.setVariable("fieldValue", "msg")
+ runner.setVariable("es.index", INDEX_NAME)
+ runner.setVariable("es.type", "msg")
+ runner.setProperty(JsonQueryElasticsearch.QUERY, query)
+ runner.setProperty(JsonQueryElasticsearch.INDEX, "\${es.index}")
+ runner.setProperty(JsonQueryElasticsearch.TYPE, "\${es.type}")
+ runner.setValidateExpressionUsage(true)
+ runner.enqueue("test")
+ runner.run(1, true, true)
+ testCounts(runner, 1, 1, 0, 2)
}
@Test
- public void testErrorDuringSearch() throws Exception {
+ void testErrorDuringSearch() throws Exception {
String query = "{\n" +
"\t\"query\": {\n" +
"\t\t\"match_all\": {}\n" +
@@ -163,28 +163,28 @@ public void testErrorDuringSearch() throws Exception {
"\t\t\t}\n" +
"\t\t}\n" +
"\t}\n" +
- "}";
-
-
- JsonQueryElasticsearch processor = new JsonQueryElasticsearch();
- TestRunner runner = TestRunners.newTestRunner(processor);
- TestElasticSearchClientService service = new TestElasticSearchClientService(true);
- service.setThrowErrorInSearch(true);
- runner.addControllerService("esService", service);
- runner.enableControllerService(service);
- runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService");
- runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME);
- runner.setProperty(JsonQueryElasticsearch.TYPE, "message");
- runner.setValidateExpressionUsage(true);
- runner.setProperty(JsonQueryElasticsearch.QUERY, query);
-
- runner.enqueue("test");
- runner.run(1, true, true);
- testCounts(runner, 0, 0, 1, 0);
+ "}"
+
+
+ JsonQueryElasticsearch processor = new JsonQueryElasticsearch()
+ TestRunner runner = TestRunners.newTestRunner(processor)
+ TestElasticsearchClientService service = new TestElasticsearchClientService(true)
+ service.setThrowErrorInSearch(true)
+ runner.addControllerService("esService", service)
+ runner.enableControllerService(service)
+ runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService")
+ runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME)
+ runner.setProperty(JsonQueryElasticsearch.TYPE, "message")
+ runner.setValidateExpressionUsage(true)
+ runner.setProperty(JsonQueryElasticsearch.QUERY, query)
+
+ runner.enqueue("test")
+ runner.run(1, true, true)
+ testCounts(runner, 0, 0, 1, 0)
}
@Test
- public void testQueryAttribute() throws Exception {
+ void testQueryAttribute() throws Exception {
final String query = "{\n" +
"\t\"query\": {\n" +
"\t\t\"match_all\": {}\n" +
@@ -201,32 +201,52 @@ public void testQueryAttribute() throws Exception {
"\t\t\t}\n" +
"\t\t}\n" +
"\t}\n" +
- "}";
- final String queryAttr = "es.query";
-
-
- JsonQueryElasticsearch processor = new JsonQueryElasticsearch();
- TestRunner runner = TestRunners.newTestRunner(processor);
- TestElasticSearchClientService service = new TestElasticSearchClientService(true);
- runner.addControllerService("esService", service);
- runner.enableControllerService(service);
- runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService");
- runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME);
- runner.setProperty(JsonQueryElasticsearch.TYPE, "message");
- runner.setValidateExpressionUsage(true);
- runner.setProperty(JsonQueryElasticsearch.QUERY, query);
- runner.setProperty(JsonQueryElasticsearch.QUERY_ATTRIBUTE, queryAttr);
-
- runner.enqueue("test");
- runner.run(1, true, true);
- testCounts(runner, 1, 1, 0, 1);
- List flowFiles = runner.getFlowFilesForRelationship(JsonQueryElasticsearch.REL_AGGREGATIONS);
- flowFiles.addAll(runner.getFlowFilesForRelationship(JsonQueryElasticsearch.REL_HITS));
+ "}"
+ final String queryAttr = "es.query"
+
+
+ JsonQueryElasticsearch processor = new JsonQueryElasticsearch()
+ TestRunner runner = TestRunners.newTestRunner(processor)
+ TestElasticsearchClientService service = new TestElasticsearchClientService(true)
+ runner.addControllerService("esService", service)
+ runner.enableControllerService(service)
+ runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService")
+ runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME)
+ runner.setProperty(JsonQueryElasticsearch.TYPE, "message")
+ runner.setValidateExpressionUsage(true)
+ runner.setProperty(JsonQueryElasticsearch.QUERY, query)
+ runner.setProperty(JsonQueryElasticsearch.QUERY_ATTRIBUTE, queryAttr)
+
+ runner.enqueue("test")
+ runner.run(1, true, true)
+ testCounts(runner, 1, 1, 0, 1)
+ List flowFiles = runner.getFlowFilesForRelationship(JsonQueryElasticsearch.REL_AGGREGATIONS)
+ flowFiles.addAll(runner.getFlowFilesForRelationship(JsonQueryElasticsearch.REL_HITS))
for (MockFlowFile mockFlowFile : flowFiles) {
- String attr = mockFlowFile.getAttribute(queryAttr);
- Assert.assertNotNull("Missing query attribute", attr);
- Assert.assertEquals("Query had wrong value.", query, attr);
+ String attr = mockFlowFile.getAttribute(queryAttr)
+ Assert.assertNotNull("Missing query attribute", attr)
+ Assert.assertEquals("Query had wrong value.", query, attr)
}
}
+
+ @Test
+ void testInputHandling() {
+ JsonQueryElasticsearch processor = new JsonQueryElasticsearch()
+ TestRunner runner = TestRunners.newTestRunner(processor)
+ TestElasticsearchClientService service = new TestElasticsearchClientService(false)
+ runner.addControllerService("esService", service)
+ runner.enableControllerService(service)
+ runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService")
+ runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME)
+ runner.setProperty(JsonQueryElasticsearch.TYPE, "message")
+ runner.setValidateExpressionUsage(true)
+ runner.setProperty(JsonQueryElasticsearch.QUERY, "{ \"query\": { \"match_all\": {} }}")
+
+ runner.run()
+
+ runner.assertTransferCount(JsonQueryElasticsearch.REL_SUCCESS, 0)
+ runner.assertTransferCount(JsonQueryElasticsearch.REL_FAILURE, 0)
+ runner.assertTransferCount(JsonQueryElasticsearch.REL_RETRY, 0)
+ }
}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
new file mode 100644
index 000000000000..5f48af0a58f3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch
+
+import org.apache.avro.Schema
+import org.apache.nifi.avro.AvroTypeUtil
+import org.apache.nifi.elasticsearch.IndexOperationRequest
+import org.apache.nifi.elasticsearch.IndexOperationResponse
+import org.apache.nifi.json.JsonRecordSetWriter
+import org.apache.nifi.json.JsonTreeReader
+import org.apache.nifi.processors.elasticsearch.mock.AbstractMockElasticsearchClient
+import org.apache.nifi.processors.elasticsearch.mock.MockBulkLoadClientService
+import org.apache.nifi.schema.access.SchemaAccessStrategy
+import org.apache.nifi.schema.access.SchemaAccessUtils
+import org.apache.nifi.serialization.record.MockSchemaRegistry
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.junit.Assert
+import org.junit.Before
+import org.junit.Test
+
+import static groovy.json.JsonOutput.*
+
+class PutElasticsearchRecordTest {
+ MockBulkLoadClientService clientService
+ MockSchemaRegistry registry
+ JsonTreeReader reader
+ TestRunner runner
+
+ static final String SCHEMA = prettyPrint(toJson([
+ name: "TestSchema",
+ type: "record",
+ fields: [
+ [ name: "msg", type: "string" ],
+ [ name: "from", type: "string" ]
+ ]
+ ]))
+
+ static final String flowFileContents = prettyPrint(toJson([
+ [ msg: "Hello, world", from: "john.smith" ],
+ [ msg: "Hi, back at ya!", from: "jane.doe" ]
+ ]))
+
+ @Before
+ void setup() {
+ clientService = new MockBulkLoadClientService()
+ registry = new MockSchemaRegistry()
+ reader = new JsonTreeReader()
+ runner = TestRunners.newTestRunner(PutElasticsearchRecord.class)
+
+ registry.addSchema("simple", AvroTypeUtil.createSchema(new Schema.Parser().parse(SCHEMA)))
+
+ clientService.response = new IndexOperationResponse(1500)
+
+ runner.addControllerService("registry", registry)
+ runner.addControllerService("reader", reader)
+ runner.addControllerService("clientService", clientService)
+ runner.setProperty(reader, SchemaAccessUtils.SCHEMA_REGISTRY, "registry")
+ runner.setProperty(PutElasticsearchRecord.RECORD_READER, "reader")
+ runner.setProperty(PutElasticsearchRecord.INDEX, "test_index")
+ runner.setProperty(PutElasticsearchRecord.TYPE, "test_type")
+ runner.setProperty(PutElasticsearchRecord.CLIENT_SERVICE, "clientService")
+ runner.enableControllerService(registry)
+ runner.enableControllerService(reader)
+ runner.enableControllerService(clientService)
+
+ runner.assertValid()
+ }
+
+ void basicTest(int failure, int retry, int success) {
+ runner.enqueue(flowFileContents, [ "schema.name": "simple" ])
+ runner.run()
+
+ runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, failure)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, retry)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, success)
+ }
+
+ @Test
+ void simpleTest() {
+ basicTest(0, 0, 1)
+ }
+
+ @Test
+ void testFatalError() {
+ clientService.throwFatalError = true
+ basicTest(1, 0, 0)
+ }
+
+ @Test
+ void testRetriable() {
+ clientService.throwRetriableError = true
+ basicTest(0, 1, 0)
+ }
+
+ @Test
+ void testRecordPathFeatures() {
+ def newSchema = prettyPrint(toJson([
+ type: "record",
+ name: "RecordPathTestType",
+ fields: [
+ [ name: "id", type: "string" ],
+ [ name: "index", type: "string" ],
+ [ name: "type", type: "string" ],
+ [ name: "operation", type: ["null", "string"]],
+ [ name: "msg", type: "string"]
+ ]
+ ]))
+
+ def flowFileContents = prettyPrint(toJson([
+ [ id: "rec-1", index: "bulk_a", type: "message", msg: "Hello" ],
+ [ id: "rec-2", index: "bulk_b", type: "message", msg: "Hello" ],
+ [ id: "rec-3", index: "bulk_a", type: "message", msg: "Hello" ],
+ [ id: "rec-4", index: "bulk_b", type: "message", msg: "Hello" ],
+ [ id: "rec-5", index: "bulk_a", type: "message", msg: "Hello" ],
+ [ id: "rec-6", index: "bulk_b", type: "message", msg: "Hello" ]
+ ]))
+
+ def evalClosure = { List items ->
+ def a = items.findAll { it.index == "bulk_a" }.size()
+ def b = items.findAll { it.index == "bulk_b" }.size()
+ items.each {
+ Assert.assertNotNull(it.id)
+ Assert.assertTrue(it.id.startsWith("rec-"))
+ Assert.assertEquals("message", it.type)
+ }
+ Assert.assertEquals(3, a)
+ Assert.assertEquals(3, b)
+ }
+
+ clientService.evalClosure = evalClosure
+
+ registry.addSchema("recordPathTest", AvroTypeUtil.createSchema(new Schema.Parser().parse(newSchema)))
+
+ runner.setProperty(PutElasticsearchRecord.ID_RECORD_PATH, "/id")
+ runner.setProperty(PutElasticsearchRecord.INDEX_RECORD_PATH, "/index")
+ runner.setProperty(PutElasticsearchRecord.TYPE_RECORD_PATH, "/type")
+ runner.enqueue(flowFileContents, [
+ "schema.name": "recordPathTest"
+ ])
+
+ runner.run()
+ runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+
+ runner.clearTransferState()
+
+ flowFileContents = prettyPrint(toJson([
+ [ id: "rec-1", index: null, type: null, msg: "Hello" ],
+ [ id: "rec-2", index: null, type: null, msg: "Hello" ],
+ [ id: "rec-3", index: null, type: null, msg: "Hello" ],
+ [ id: "rec-4", index: null, type: null, msg: "Hello" ],
+ [ id: "rec-5", index: null, type: null, msg: "Hello" ],
+ [ id: "rec-6", index: "bulk_b", type: "message", msg: "Hello" ]
+ ]))
+
+ evalClosure = { List items ->
+ def testTypeCount = items.findAll { it.type == "test_type" }.size()
+ def messageTypeCount = items.findAll { it.type == "message" }.size()
+ def testIndexCount = items.findAll { it.index == "test_index" }.size()
+ def bulkIndexCount = items.findAll { it.index.startsWith("bulk_") }.size()
+ def indexOperationCount = items.findAll { it.operation == IndexOperationRequest.Operation.Index }.size()
+ Assert.assertEquals(5, testTypeCount)
+ Assert.assertEquals(1, messageTypeCount)
+ Assert.assertEquals(5, testIndexCount)
+ Assert.assertEquals(1, bulkIndexCount)
+ Assert.assertEquals(6, indexOperationCount)
+ }
+
+ clientService.evalClosure = evalClosure
+ runner.enqueue(flowFileContents, [
+ "schema.name": "recordPathTest"
+ ])
+ runner.run()
+
+ runner.clearTransferState()
+
+ runner.setProperty(PutElasticsearchRecord.OPERATION_RECORD_PATH, "/operation")
+
+ flowFileContents = prettyPrint(toJson([
+ [ id: "rec-1", index: "bulk_a", type: "message", msg: "Hello", operation: "index" ],
+ [ id: "rec-2", index: "bulk_b", type: "message", msg: "Hello", operation: "Index" ],
+ [ id: "rec-3", index: "bulk_a", type: "message", msg: "Hello", operation: "DeLEte" ],
+ [ id: "rec-4", index: "bulk_b", type: "message", msg: "Hello", operation: "updATE" ],
+ [ id: "rec-5", index: "bulk_a", type: "message", msg: "Hello", operation: "upsert" ],
+ [ id: "rec-6", index: "bulk_b", type: "message", msg: "Hello", operation: "uPSERT" ]
+ ]))
+
+ clientService.evalClosure = { List items ->
+ int index = items.findAll { it.operation == IndexOperationRequest.Operation.Index }.size()
+ int delete = items.findAll { it.operation == IndexOperationRequest.Operation.Delete }.size()
+ int update = items.findAll { it.operation == IndexOperationRequest.Operation.Update }.size()
+ int upsert = items.findAll { it.operation == IndexOperationRequest.Operation.Upsert }.size()
+ Assert.assertEquals(2, index)
+ Assert.assertEquals(1, delete)
+ Assert.assertEquals(1, update)
+ Assert.assertEquals(2, upsert)
+ }
+
+ runner.enqueue(flowFileContents, [
+ "schema.name": "recordPathTest"
+ ])
+ runner.run()
+ runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+ }
+
+ @Test
+ void testInputRequired() {
+ runner.run()
+ runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 0)
+ }
+
+ @Test
+ void testErrorRelationship() {
+ def writer = new JsonRecordSetWriter()
+ runner.addControllerService("writer", writer)
+ runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_NAME_PROPERTY)
+ runner.setProperty(writer, SchemaAccessUtils.SCHEMA_REGISTRY, "registry")
+ runner.enableControllerService(writer)
+ runner.setProperty(PutElasticsearchRecord.ERROR_RECORD_WRITER, "writer")
+
+ def newSchema = prettyPrint(toJson([
+ type: "record",
+ name: "RecordPathTestType",
+ fields: [
+ [ name: "id", type: "string" ],
+ [ name: "operation", type: "string" ],
+ [ name: "field1", type: ["null", "string"]],
+ [ name: "field2", type: "string"]
+ ]
+ ]))
+
+ def values = [
+ [ id: "1", operation: 'index', field1: 'value1', field2: '20' ],
+ [ id: "2", operation: 'create', field1: 'value1', field2: '20' ],
+ [ id: "2", operation: 'create', field1: 'value1', field2: '20' ],
+ [ id: "3", operation: 'index', field1: 'value1', field2: '20abcd' ]
+ ]
+
+ clientService.response = IndexOperationResponse.fromJsonResponse(MockBulkLoadClientService.SAMPLE_ERROR_RESPONSE)
+
+ registry.addSchema("errorTest", AvroTypeUtil.createSchema(new Schema.Parser().parse(newSchema)))
+ runner.enqueue(prettyPrint(toJson(values)), [ 'schema.name': 'errorTest' ])
+ runner.setProperty(PutElasticsearchRecord.LOG_ERROR_RESPONSES, "true")
+ runner.assertValid()
+ runner.run()
+
+ runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 1)
+
+ def errorFF = runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[0]
+ assert errorFF.getAttribute(PutElasticsearchRecord.ATTR_RECORD_COUNT) == "1"
+ }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticSearchClientService.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/TestElasticsearchClientService.groovy
similarity index 70%
rename from nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticSearchClientService.java
rename to nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/TestElasticsearchClientService.groovy
index b1d42209ffcc..b1d559b50cef 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticSearchClientService.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/TestElasticsearchClientService.groovy
@@ -15,82 +15,79 @@
* limitations under the License.
*/
-package org.apache.nifi.processors.elasticsearch;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.elasticsearch.DeleteOperationResponse;
-import org.apache.nifi.elasticsearch.ElasticSearchClientService;
-import org.apache.nifi.elasticsearch.IndexOperationRequest;
-import org.apache.nifi.elasticsearch.IndexOperationResponse;
-import org.apache.nifi.elasticsearch.SearchResponse;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class TestElasticSearchClientService extends AbstractControllerService implements ElasticSearchClientService {
- private boolean returnAggs;
- private boolean throwErrorInSearch;
- private boolean throwErrorInDelete;
-
- public TestElasticSearchClientService(boolean returnAggs) {
- this.returnAggs = returnAggs;
+package org.apache.nifi.processors.elasticsearch
+
+import groovy.json.JsonSlurper
+import org.apache.nifi.controller.AbstractControllerService
+import org.apache.nifi.elasticsearch.DeleteOperationResponse
+import org.apache.nifi.elasticsearch.ElasticSearchClientService
+import org.apache.nifi.elasticsearch.IndexOperationRequest
+import org.apache.nifi.elasticsearch.IndexOperationResponse
+import org.apache.nifi.elasticsearch.SearchResponse
+
+class TestElasticsearchClientService extends AbstractControllerService implements ElasticSearchClientService {
+ private boolean returnAggs
+ private boolean throwErrorInSearch
+ private boolean throwErrorInDelete
+
+ TestElasticsearchClientService(boolean returnAggs) {
+ this.returnAggs = returnAggs
}
@Override
- public IndexOperationResponse add(IndexOperationRequest operation) throws IOException {
- return add(Arrays.asList(operation));
+ IndexOperationResponse add(IndexOperationRequest operation) {
+ return add(Arrays.asList(operation))
}
@Override
- public IndexOperationResponse add(List operations) throws IOException {
- return new IndexOperationResponse(100L, 100L);
+ IndexOperationResponse bulk(List operations) {
+ return new IndexOperationResponse(100L, 100L)
}
@Override
- public DeleteOperationResponse deleteById(String index, String type, String id) throws IOException {
- return deleteById(index, type, Arrays.asList(id));
+ Long count(String query, String index, String type) {
+ return null
}
@Override
- public DeleteOperationResponse deleteById(String index, String type, List ids) throws IOException {
+ DeleteOperationResponse deleteById(String index, String type, String id) {
+ return deleteById(index, type, Arrays.asList(id))
+ }
+
+ @Override
+ DeleteOperationResponse deleteById(String index, String type, List ids) {
if (throwErrorInDelete) {
- throw new IOException("Simulated IOException");
+ throw new IOException("Simulated IOException")
}
- return new DeleteOperationResponse(100L);
+ return new DeleteOperationResponse(100L)
}
@Override
- public DeleteOperationResponse deleteByQuery(String query, String index, String type) throws IOException {
- return deleteById(index, type, Arrays.asList("1"));
+ DeleteOperationResponse deleteByQuery(String query, String index, String type) {
+ return deleteById(index, type, Arrays.asList("1"))
}
@Override
- public Map get(String index, String type, String id) throws IOException {
- return new HashMap(){{
- put("msg", "one");
- }};
+ Map get(String index, String type, String id) {
+ return [ "msg": "one" ]
}
@Override
- public SearchResponse search(String query, String index, String type) throws IOException {
+ SearchResponse search(String query, String index, String type) {
if (throwErrorInSearch) {
- throw new IOException("Simulated IOException");
+ throw new IOException("Simulated IOException")
}
- ObjectMapper mapper = new ObjectMapper();
- List> hits = (List>)mapper.readValue(HITS_RESULT, List.class);
- Map aggs = returnAggs ? (Map)mapper.readValue(AGGS_RESULT, Map.class) : null;
- SearchResponse response = new SearchResponse(hits, aggs, 15, 5, false);
- return response;
+ def mapper = new JsonSlurper()
+ def hits = mapper.parseText(HITS_RESULT)
+ def aggs = returnAggs ? mapper.parseText(AGGS_RESULT) : null
+ SearchResponse response = new SearchResponse(hits, aggs, 15, 5, false)
+ return response
}
@Override
- public String getTransitUrl(String index, String type) {
- return String.format("http://localhost:9400/%s/%s", index, type);
+ String getTransitUrl(String index, String type) {
+ "http://localhost:9400/${index}/${type}"
}
private static final String AGGS_RESULT = "{\n" +
@@ -146,7 +143,7 @@ public String getTransitUrl(String index, String type) {
" }\n" +
" ]\n" +
" }\n" +
- " }";
+ " }"
private static final String HITS_RESULT = "[\n" +
" {\n" +
@@ -239,13 +236,13 @@ public String getTransitUrl(String index, String type) {
" \"msg\": \"five\"\n" +
" }\n" +
" }\n" +
- " ]";
+ " ]"
- public void setThrowErrorInSearch(boolean throwErrorInSearch) {
- this.throwErrorInSearch = throwErrorInSearch;
+ void setThrowErrorInSearch(boolean throwErrorInSearch) {
+ this.throwErrorInSearch = throwErrorInSearch
}
- public void setThrowErrorInDelete(boolean throwErrorInDelete) {
- this.throwErrorInDelete = throwErrorInDelete;
+ void setThrowErrorInDelete(boolean throwErrorInDelete) {
+ this.throwErrorInDelete = throwErrorInDelete
}
}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/AbstractMockElasticsearchClient.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/AbstractMockElasticsearchClient.groovy
new file mode 100644
index 000000000000..fdbdac1bedf9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/AbstractMockElasticsearchClient.groovy
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch.mock
+
+import org.apache.nifi.controller.AbstractControllerService
+import org.apache.nifi.elasticsearch.*
+
+class AbstractMockElasticsearchClient extends AbstractControllerService implements ElasticSearchClientService {
+ boolean throwRetriableError
+ boolean throwFatalError
+
+ @Override
+ IndexOperationResponse add(IndexOperationRequest operation) {
+ return null
+ }
+
+ @Override
+ IndexOperationResponse bulk(List operations) {
+ return null
+ }
+
+ @Override
+ Long count(String query, String index, String type) {
+ return null
+ }
+
+ @Override
+ DeleteOperationResponse deleteById(String index, String type, String id) {
+ return null
+ }
+
+ @Override
+ DeleteOperationResponse deleteById(String index, String type, List ids) {
+ return null
+ }
+
+ @Override
+ DeleteOperationResponse deleteByQuery(String query, String index, String type) {
+ return null
+ }
+
+ @Override
+ Map get(String index, String type, String id) {
+ return null
+ }
+
+ @Override
+ SearchResponse search(String query, String index, String type) {
+ return null
+ }
+
+ @Override
+ String getTransitUrl(String index, String type) {
+ return null
+ }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.groovy
new file mode 100644
index 000000000000..3c4d9022e002
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.groovy
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch.mock
+
+
+import org.apache.nifi.elasticsearch.IndexOperationRequest
+import org.apache.nifi.elasticsearch.IndexOperationResponse
+
+class MockBulkLoadClientService extends AbstractMockElasticsearchClient {
+ IndexOperationResponse response
+ Closure evalClosure
+
+ @Override
+ IndexOperationResponse bulk(List items) {
+ if (throwRetriableError) {
+ throw new MockElasticsearchError(true)
+ } else if (throwFatalError) {
+ throw new MockElasticsearchError(false)
+ }
+
+ if (evalClosure) {
+ evalClosure.call(items)
+ }
+
+ response
+ }
+
+ static final SAMPLE_ERROR_RESPONSE = """
+{
+ "took" : 18,
+ "errors" : true,
+ "items" : [
+ {
+ "index" : {
+ "_index" : "test",
+ "_type" : "_doc",
+ "_id" : "1",
+ "_version" : 4,
+ "result" : "updated",
+ "_shards" : {
+ "total" : 2,
+ "successful" : 1,
+ "failed" : 0
+ },
+ "_seq_no" : 4,
+ "_primary_term" : 1,
+ "status" : 200
+ }
+ },
+ {
+ "create" : {
+ "_index" : "test",
+ "_type" : "_doc",
+ "_id" : "2",
+ "_version" : 1,
+ "result" : "created",
+ "_shards" : {
+ "total" : 2,
+ "successful" : 1,
+ "failed" : 0
+ },
+ "_seq_no" : 1,
+ "_primary_term" : 1,
+ "status" : 201
+ }
+ },
+ {
+ "create" : {
+ "_index" : "test",
+ "_type" : "_doc",
+ "_id" : "3",
+ "_version" : 1,
+ "result" : "created",
+ "_shards" : {
+ "total" : 2,
+ "successful" : 1,
+ "failed" : 0
+ },
+ "_seq_no" : 3,
+ "_primary_term" : 1,
+ "status" : 201
+ }
+ },
+ {
+ "index" : {
+ "_index" : "test",
+ "_type" : "_doc",
+ "_id" : "4",
+ "status" : 400,
+ "error" : {
+ "type" : "mapper_parsing_exception",
+ "reason" : "failed to parse field [field2] of type [integer] in document with id '4'",
+ "caused_by" : {
+ "type" : "number_format_exception",
+ "reason" : "For input string: 20abc"
+ }
+ }
+ }
+ }
+ ]
+}
+ """
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockElasticsearchError.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockElasticsearchError.groovy
new file mode 100644
index 000000000000..7b1073be5b02
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockElasticsearchError.groovy
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch.mock
+
+import org.apache.nifi.elasticsearch.ElasticsearchError
+
+class MockElasticsearchError extends ElasticsearchError {
+ MockElasticsearchError(boolean isElastic) {
+ this(new Exception())
+ this.isElastic = isElastic
+ }
+
+ MockElasticsearchError(Exception ex) {
+ super(ex)
+ }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/.gitignore b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/.gitignore
new file mode 100644
index 000000000000..74f8eb786399
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/.gitignore
@@ -0,0 +1 @@
+# Placeholder to force compilation
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearchTest.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearchTest.java
deleted file mode 100644
index ade1102deffc..000000000000
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearchTest.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.elasticsearch;
-
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class DeleteByQueryElasticsearchTest {
- private static final String INDEX = "test_idx";
- private static final String TYPE = "test_type";
- private static final String QUERY_ATTR = "es.delete.query";
- private static final String CLIENT_NAME = "clientService";
-
- private TestElasticSearchClientService client;
-
- private void initClient(TestRunner runner) throws Exception {
- client = new TestElasticSearchClientService(true);
- runner.addControllerService(CLIENT_NAME, client);
- runner.enableControllerService(client);
- runner.setProperty(DeleteByQueryElasticsearch.CLIENT_SERVICE, CLIENT_NAME);
- }
-
- private void postTest(TestRunner runner, String queryParam) {
- runner.assertTransferCount(DeleteByQueryElasticsearch.REL_FAILURE, 0);
- runner.assertTransferCount(DeleteByQueryElasticsearch.REL_SUCCESS, 1);
-
- List flowFiles = runner.getFlowFilesForRelationship(DeleteByQueryElasticsearch.REL_SUCCESS);
- String attr = flowFiles.get(0).getAttribute(DeleteByQueryElasticsearch.TOOK_ATTRIBUTE);
- String query = flowFiles.get(0).getAttribute(QUERY_ATTR);
- Assert.assertNotNull(attr);
- Assert.assertEquals(attr, "100");
- Assert.assertNotNull(query);
- Assert.assertEquals(queryParam, query);
- }
-
- @Test
- public void testWithFlowfileInput() throws Exception {
- String query = "{ \"query\": { \"match_all\": {} }}";
- TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class);
- runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX);
- runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE);
- runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR);
- initClient(runner);
- runner.assertValid();
- runner.enqueue(query);
- runner.run();
-
- postTest(runner, query);
- }
-
- @Test
- public void testWithQuery() throws Exception {
- String query = "{\n" +
- "\t\"query\": {\n" +
- "\t\t\"match\": {\n" +
- "\t\t\t\"${field.name}.keyword\": \"test\"\n" +
- "\t\t}\n" +
- "\t}\n" +
- "}";
- Map attrs = new HashMap(){{
- put("field.name", "test_field");
- }};
- TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class);
- runner.setProperty(DeleteByQueryElasticsearch.QUERY, query);
- runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX);
- runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE);
- runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR);
- initClient(runner);
- runner.assertValid();
- runner.enqueue("", attrs);
- runner.run();
-
- postTest(runner, query.replace("${field.name}", "test_field"));
-
- runner.clearTransferState();
-
- query = "{\n" +
- "\t\"query\": {\n" +
- "\t\t\"match\": {\n" +
- "\t\t\t\"test_field.keyword\": \"test\"\n" +
- "\t\t}\n" +
- "\t}\n" +
- "}";
- runner.setProperty(DeleteByQueryElasticsearch.QUERY, query);
- runner.setIncomingConnection(false);
- runner.assertValid();
- runner.run();
- postTest(runner, query);
- }
-
- @Test
- public void testErrorAttribute() throws Exception {
- String query = "{ \"query\": { \"match_all\": {} }}";
- TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class);
- runner.setProperty(DeleteByQueryElasticsearch.QUERY, query);
- runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX);
- runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE);
- runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR);
- initClient(runner);
- client.setThrowErrorInDelete(true);
- runner.assertValid();
- runner.enqueue("");
- runner.run();
-
- runner.assertTransferCount(DeleteByQueryElasticsearch.REL_SUCCESS, 0);
- runner.assertTransferCount(DeleteByQueryElasticsearch.REL_FAILURE, 1);
-
- MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(DeleteByQueryElasticsearch.REL_FAILURE).get(0);
- String attr = mockFlowFile.getAttribute(DeleteByQueryElasticsearch.ERROR_ATTRIBUTE);
- Assert.assertNotNull(attr);
- }
-}
From b6ec6906f1872bdac681db8dedddb2daa14ddcd6 Mon Sep 17 00:00:00 2001
From: Mike Thomsen
Date: Wed, 25 Sep 2019 07:00:07 -0400
Subject: [PATCH 2/9] NIFI-5248 Added groovy-json test dependency.
---
.../nifi-elasticsearch-restapi-processors/pom.xml | 6 ++++++
1 file changed, 6 insertions(+)
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml
index bce18df35f13..2f7e74aa3bb2 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml
@@ -133,6 +133,12 @@ language governing permissions and limitations under the License. -->
1.10.0-SNAPSHOTtest
+
+ org.codehaus.groovy
+ groovy-json
+ ${nifi.groovy.version}
+ test
+
From 2dab72dc6fbbd63b0e7b504dd3c73bb8740368af Mon Sep 17 00:00:00 2001
From: Mike Thomsen
Date: Wed, 25 Sep 2019 07:14:50 -0400
Subject: [PATCH 3/9] NIFI-5248 Updated PutElasticsearchRecord to only do index
operations.
---
.../elasticsearch/PutElasticsearchRecord.java | 81 +------------------
.../elasticsearch/api/BulkOperation.java | 48 +++++++++++
.../PutElasticsearchRecordTest.groovy | 32 +++-----
3 files changed, 62 insertions(+), 99 deletions(-)
create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/BulkOperation.java
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
index a235fbae0b80..7c6a3ce61153 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
@@ -36,6 +36,7 @@
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.elasticsearch.api.BulkOperation;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.RecordPathResult;
@@ -47,7 +48,6 @@
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.util.StringUtils;
@@ -74,16 +74,6 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
.required(true)
.build();
- static final PropertyDescriptor OPERATION_RECORD_PATH = new PropertyDescriptor.Builder()
- .name("put-es-record-operation-path")
- .displayName("Operation Record Path")
- .description("A record path expression to retrieve index operation setting from each record. If left blank, " +
- "all operations will be assumed to be index operations.")
- .addValidator(new RecordPathValidator())
- .required(false)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .build();
-
static final PropertyDescriptor ID_RECORD_PATH = new PropertyDescriptor.Builder()
.name("put-es-record-id-path")
.displayName("ID Record Path")
@@ -141,7 +131,6 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
_temp.add(TYPE);
_temp.add(CLIENT_SERVICE);
_temp.add(RECORD_READER);
- _temp.add(OPERATION_RECORD_PATH);
_temp.add(ID_RECORD_PATH);
_temp.add(INDEX_RECORD_PATH);
_temp.add(TYPE_RECORD_PATH);
@@ -262,19 +251,16 @@ private BulkOperation buildOperations(FlowFile flowFile, ProcessContext context,
final String typePath = context.getProperty(TYPE_RECORD_PATH).isSet()
? context.getProperty(TYPE_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue()
: null;
- final String operationPath = context.getProperty(OPERATION_RECORD_PATH).isSet()
- ? context.getProperty(OPERATION_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue()
- : null;
RecordPath path = idPath != null ? recordPathCache.getCompiled(idPath) : null;
RecordPath iPath = indexPath != null ? recordPathCache.getCompiled(indexPath) : null;
RecordPath tPath = typePath != null ? recordPathCache.getCompiled(typePath) : null;
- RecordPath oPath = operationPath != null ? recordPathCache.getCompiled(operationPath) : null;
+
Record record;
while ((record = reader.nextRecord()) != null) {
final String idx = getFromRecordPath(record, iPath, index);
final String t = getFromRecordPath(record, tPath, type);
- final IndexOperationRequest.Operation o = getOperation(record, oPath);
+ final IndexOperationRequest.Operation o = IndexOperationRequest.Operation.Index;
final String id = path != null ? getFromRecordPath(record, path, null) : null;
Map contentMap = (Map) DataTypeUtils.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
@@ -316,43 +302,6 @@ private void removeEmpty(Map input) {
input.putAll(copy);
}
- private String getId(Record record, RecordPath path) {
- RecordPathResult result = path.evaluate(record);
- Optional value = result.getSelectedFields().findFirst();
- if (value.isPresent() && value.get().getValue() != null) {
- return value.get().getValue().toString();
- } else {
- throw new RuntimeException("Missing ID field in one of the records.");
- }
- }
-
- private IndexOperationRequest.Operation getOperation(Record record, RecordPath path) {
- String val = getFromRecordPath(record, path, IndexOperationRequest.Operation.Index.name()).toLowerCase();
- IndexOperationRequest.Operation retVal;
-
- switch (val) {
- case "create":
- retVal = IndexOperationRequest.Operation.Create;
- break;
- case "delete":
- retVal = IndexOperationRequest.Operation.Delete;
- break;
- case "index":
- retVal = IndexOperationRequest.Operation.Index;
- break;
- case "update":
- retVal = IndexOperationRequest.Operation.Update;
- break;
- case "upsert":
- retVal = IndexOperationRequest.Operation.Upsert;
- break;
- default:
- throw new ProcessException(String.format("Invalid Elasticsearch operation %s", val));
- }
-
- return retVal;
- }
-
private String getFromRecordPath(Record record, RecordPath path, final String fallback) {
if (path == null) {
return fallback;
@@ -377,28 +326,4 @@ private String getFromRecordPath(Record record, RecordPath path, final String fa
return fallback;
}
}
-
- class BulkOperation {
- private List operationList;
- private List originalRecords;
- private RecordSchema schema;
-
- BulkOperation(List operationList, List originalRecords, RecordSchema schema) {
- this.operationList = operationList;
- this.originalRecords = originalRecords;
- this.schema = schema;
- }
-
- public List getOperationList() {
- return operationList;
- }
-
- public List getOriginalRecords() {
- return originalRecords;
- }
-
- public RecordSchema getSchema() {
- return schema;
- }
- }
}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/BulkOperation.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/BulkOperation.java
new file mode 100644
index 000000000000..f54aa600e1f6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/BulkOperation.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch.api;
+
+import org.apache.nifi.elasticsearch.IndexOperationRequest;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.util.List;
+
+public class BulkOperation {
+ private List operationList;
+ private List originalRecords;
+ private RecordSchema schema;
+
+ public BulkOperation(List operationList, List originalRecords, RecordSchema schema) {
+ this.operationList = operationList;
+ this.originalRecords = originalRecords;
+ this.schema = schema;
+ }
+
+ public List getOperationList() {
+ return operationList;
+ }
+
+ public List getOriginalRecords() {
+ return originalRecords;
+ }
+
+ public RecordSchema getSchema() {
+ return schema;
+ }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
index 5f48af0a58f3..704b28859f88 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
@@ -117,7 +117,6 @@ class PutElasticsearchRecordTest {
[ name: "id", type: "string" ],
[ name: "index", type: "string" ],
[ name: "type", type: "string" ],
- [ name: "operation", type: ["null", "string"]],
[ name: "msg", type: "string"]
]
]))
@@ -191,26 +190,18 @@ class PutElasticsearchRecordTest {
runner.clearTransferState()
- runner.setProperty(PutElasticsearchRecord.OPERATION_RECORD_PATH, "/operation")
-
flowFileContents = prettyPrint(toJson([
- [ id: "rec-1", index: "bulk_a", type: "message", msg: "Hello", operation: "index" ],
- [ id: "rec-2", index: "bulk_b", type: "message", msg: "Hello", operation: "Index" ],
- [ id: "rec-3", index: "bulk_a", type: "message", msg: "Hello", operation: "DeLEte" ],
- [ id: "rec-4", index: "bulk_b", type: "message", msg: "Hello", operation: "updATE" ],
- [ id: "rec-5", index: "bulk_a", type: "message", msg: "Hello", operation: "upsert" ],
- [ id: "rec-6", index: "bulk_b", type: "message", msg: "Hello", operation: "uPSERT" ]
+ [ id: "rec-1", index: "bulk_a", type: "message", msg: "Hello" ],
+ [ id: "rec-2", index: "bulk_b", type: "message", msg: "Hello" ],
+ [ id: "rec-3", index: "bulk_a", type: "message", msg: "Hello" ],
+ [ id: "rec-4", index: "bulk_b", type: "message", msg: "Hello" ],
+ [ id: "rec-5", index: "bulk_a", type: "message", msg: "Hello" ],
+ [ id: "rec-6", index: "bulk_b", type: "message", msg: "Hello" ]
]))
clientService.evalClosure = { List items ->
int index = items.findAll { it.operation == IndexOperationRequest.Operation.Index }.size()
- int delete = items.findAll { it.operation == IndexOperationRequest.Operation.Delete }.size()
- int update = items.findAll { it.operation == IndexOperationRequest.Operation.Update }.size()
- int upsert = items.findAll { it.operation == IndexOperationRequest.Operation.Upsert }.size()
- Assert.assertEquals(2, index)
- Assert.assertEquals(1, delete)
- Assert.assertEquals(1, update)
- Assert.assertEquals(2, upsert)
+ Assert.assertEquals(6, index)
}
runner.enqueue(flowFileContents, [
@@ -242,17 +233,16 @@ class PutElasticsearchRecordTest {
name: "RecordPathTestType",
fields: [
[ name: "id", type: "string" ],
- [ name: "operation", type: "string" ],
[ name: "field1", type: ["null", "string"]],
[ name: "field2", type: "string"]
]
]))
def values = [
- [ id: "1", operation: 'index', field1: 'value1', field2: '20' ],
- [ id: "2", operation: 'create', field1: 'value1', field2: '20' ],
- [ id: "2", operation: 'create', field1: 'value1', field2: '20' ],
- [ id: "3", operation: 'index', field1: 'value1', field2: '20abcd' ]
+ [ id: "1", field1: 'value1', field2: '20' ],
+ [ id: "2", field1: 'value1', field2: '20' ],
+ [ id: "2", field1: 'value1', field2: '20' ],
+ [ id: "3", field1: 'value1', field2: '20abcd' ]
]
clientService.response = IndexOperationResponse.fromJsonResponse(MockBulkLoadClientService.SAMPLE_ERROR_RESPONSE)
From 7ffceb16d827d96e1374357dfe8696d4e58cd108 Mon Sep 17 00:00:00 2001
From: Mike Thomsen
Date: Wed, 25 Sep 2019 07:22:38 -0400
Subject: [PATCH 4/9] NIFI-5248 Added batch size property and refactored the
way relationships and properties are added.
---
.../elasticsearch/PutElasticsearchRecord.java | 43 ++++++++-----------
1 file changed, 18 insertions(+), 25 deletions(-)
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
index 7c6a3ce61153..79d23593c3f6 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
@@ -36,6 +36,7 @@
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.elasticsearch.api.BulkOperation;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath;
@@ -54,6 +55,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -74,6 +76,15 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
.required(true)
.build();
+ static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+ .name("put-es-record-batch-size")
+ .displayName("Batch Size")
+ .description("The number of records to send over in a single batch.")
+ .defaultValue("100")
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .required(true)
+ .build();
+
static final PropertyDescriptor ID_RECORD_PATH = new PropertyDescriptor.Builder()
.name("put-es-record-id-path")
.displayName("ID Record Path")
@@ -114,31 +125,13 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
.required(false)
.build();
- static final List DESCRIPTORS;
- static final Set RELATIONSHIPS;
-
- static {
- Set _rels = new HashSet<>();
- _rels.add(REL_SUCCESS);
- _rels.add(REL_FAILURE);
- _rels.add(REL_RETRY);
- _rels.add(REL_FAILED_RECORDS);
-
- RELATIONSHIPS = Collections.unmodifiableSet(_rels);
-
- List _temp = new ArrayList<>();
- _temp.add(INDEX);
- _temp.add(TYPE);
- _temp.add(CLIENT_SERVICE);
- _temp.add(RECORD_READER);
- _temp.add(ID_RECORD_PATH);
- _temp.add(INDEX_RECORD_PATH);
- _temp.add(TYPE_RECORD_PATH);
- _temp.add(LOG_ERROR_RESPONSES);
- _temp.add(ERROR_RECORD_WRITER);
-
- DESCRIPTORS = Collections.unmodifiableList(_temp);
- }
+ static final List DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
+ INDEX, TYPE, CLIENT_SERVICE, RECORD_READER, BATCH_SIZE, ID_RECORD_PATH, INDEX_RECORD_PATH, TYPE_RECORD_PATH,
+ LOG_ERROR_RESPONSES, ERROR_RECORD_WRITER
+ ));
+ static final Set RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+ REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_RECORDS
+ )));
@Override
public Set getRelationships() {
From 739e3ffaa03b5c2f9b118aa428051cac575baecc Mon Sep 17 00:00:00 2001
From: Mike Thomsen
Date: Wed, 25 Sep 2019 07:33:21 -0400
Subject: [PATCH 5/9] NIFI-5248 Added batch processing support.
---
.../elasticsearch/PutElasticsearchRecord.java | 167 ++++++++++--------
1 file changed, 90 insertions(+), 77 deletions(-)
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
index 79d23593c3f6..d166c5d4a1c5 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
@@ -82,6 +82,7 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
.description("The number of records to send over in a single batch.")
.defaultValue("100")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.build();
@@ -166,54 +167,57 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
return;
}
- try {
- BulkOperation bundle = buildOperations(input, context, session);
- IndexOperationResponse response = clientService.bulk(bundle.getOperationList());
- if (response.hasErrors()) {
- if(logErrors || getLogger().isDebugEnabled()) {
- List> errors = response.getItems();
- ObjectMapper mapper = new ObjectMapper();
- mapper.enable(SerializationFeature.INDENT_OUTPUT);
- String output = String.format("An error was encountered while processing bulk operations. Server response below:\n\n%s", mapper.writeValueAsString(errors));
-
- if (logErrors) {
- getLogger().error(output);
- } else {
- getLogger().debug(output);
- }
- }
+ final String index = context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
+ final String type = context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
+ final String idPath = context.getProperty(ID_RECORD_PATH).isSet()
+ ? context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(input).getValue()
+ : null;
+ final String indexPath = context.getProperty(INDEX_RECORD_PATH).isSet()
+ ? context.getProperty(INDEX_RECORD_PATH).evaluateAttributeExpressions(input).getValue()
+ : null;
+ final String typePath = context.getProperty(TYPE_RECORD_PATH).isSet()
+ ? context.getProperty(TYPE_RECORD_PATH).evaluateAttributeExpressions(input).getValue()
+ : null;
- if (writerFactory != null) {
- FlowFile errorFF = session.create(input);
- try (OutputStream os = session.write(errorFF);
- RecordSetWriter writer = writerFactory.createWriter(getLogger(), bundle.getSchema(), os )) {
-
- int added = 0;
- writer.beginRecordSet();
- for (int index = 0; index < response.getItems().size(); index++) {
- Map current = response.getItems().get(index);
- String key = current.keySet().stream().findFirst().get();
- Map inner = (Map) current.get(key);
- if (inner.containsKey("error")) {
- writer.write(bundle.getOriginalRecords().get(index));
- added++;
- }
- }
- writer.finishRecordSet();
- writer.close();
- os.close();
+ RecordPath path = idPath != null ? recordPathCache.getCompiled(idPath) : null;
+ RecordPath iPath = indexPath != null ? recordPathCache.getCompiled(indexPath) : null;
+ RecordPath tPath = typePath != null ? recordPathCache.getCompiled(typePath) : null;
- errorFF = session.putAttribute(errorFF, ATTR_RECORD_COUNT, String.valueOf(added));
+ int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger();
- session.transfer(errorFF, REL_FAILED_RECORDS);
- } catch (Exception ex) {
- getLogger().error("", ex);
- session.remove(errorFF);
- throw ex;
- }
+ try (final InputStream inStream = session.read(input);
+ final RecordReader reader = readerFactory.createRecordReader(input, inStream, getLogger())) {
+ Record record;
+ List operationList = new ArrayList<>();
+ List originals = new ArrayList<>();
+
+ while ((record = reader.nextRecord()) != null) {
+ final String idx = getFromRecordPath(record, iPath, index);
+ final String t = getFromRecordPath(record, tPath, type);
+ final IndexOperationRequest.Operation o = IndexOperationRequest.Operation.Index;
+ final String id = path != null ? getFromRecordPath(record, path, null) : null;
+
+ Map contentMap = (Map) DataTypeUtils.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+
+ removeEmpty(contentMap);
+
+ operationList.add(new IndexOperationRequest(idx, t, id, contentMap, o));
+ originals.add(record);
+
+ if (operationList.size() == batchSize) {
+ BulkOperation bundle = new BulkOperation(operationList, originals, reader.getSchema());
+ indexDocuments(bundle, session, input);
+
+ operationList.clear();
+ originals.clear();
}
}
+ if (operationList.size() > 0) {
+ BulkOperation bundle = new BulkOperation(operationList, originals, reader.getSchema());
+ indexDocuments(bundle, session, input);
+ }
+
session.transfer(input, REL_SUCCESS);
} catch (ElasticsearchError ese) {
String msg = String.format("Encountered a server-side problem with Elasticsearch. %s",
@@ -227,46 +231,55 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
}
}
- private BulkOperation buildOperations(FlowFile flowFile, ProcessContext context, ProcessSession session) {
- try (final InputStream inStream = session.read(flowFile);
- final RecordReader reader = readerFactory.createRecordReader(flowFile, inStream, getLogger())) {
- List operationList = new ArrayList<>();
- List originals = new ArrayList<>();
-
- final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
- final String type = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
- final String idPath = context.getProperty(ID_RECORD_PATH).isSet()
- ? context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue()
- : null;
- final String indexPath = context.getProperty(INDEX_RECORD_PATH).isSet()
- ? context.getProperty(INDEX_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue()
- : null;
- final String typePath = context.getProperty(TYPE_RECORD_PATH).isSet()
- ? context.getProperty(TYPE_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue()
- : null;
-
- RecordPath path = idPath != null ? recordPathCache.getCompiled(idPath) : null;
- RecordPath iPath = indexPath != null ? recordPathCache.getCompiled(indexPath) : null;
- RecordPath tPath = typePath != null ? recordPathCache.getCompiled(typePath) : null;
-
- Record record;
- while ((record = reader.nextRecord()) != null) {
- final String idx = getFromRecordPath(record, iPath, index);
- final String t = getFromRecordPath(record, tPath, type);
- final IndexOperationRequest.Operation o = IndexOperationRequest.Operation.Index;
- final String id = path != null ? getFromRecordPath(record, path, null) : null;
+ private boolean indexDocuments(BulkOperation bundle, ProcessSession session, FlowFile input) throws Exception {
+ IndexOperationResponse response = clientService.bulk(bundle.getOperationList());
+ if (response.hasErrors()) {
+ if(logErrors || getLogger().isDebugEnabled()) {
+ List> errors = response.getItems();
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.enable(SerializationFeature.INDENT_OUTPUT);
+ String output = String.format("An error was encountered while processing bulk operations. Server response below:\n\n%s", mapper.writeValueAsString(errors));
+
+ if (logErrors) {
+ getLogger().error(output);
+ } else {
+ getLogger().debug(output);
+ }
+ }
- Map contentMap = (Map) DataTypeUtils.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+ if (writerFactory != null) {
+ FlowFile errorFF = session.create(input);
+ try (OutputStream os = session.write(errorFF);
+ RecordSetWriter writer = writerFactory.createWriter(getLogger(), bundle.getSchema(), os )) {
+
+ int added = 0;
+ writer.beginRecordSet();
+ for (int index = 0; index < response.getItems().size(); index++) {
+ Map current = response.getItems().get(index);
+ String key = current.keySet().stream().findFirst().get();
+ Map inner = (Map) current.get(key);
+ if (inner.containsKey("error")) {
+ writer.write(bundle.getOriginalRecords().get(index));
+ added++;
+ }
+ }
+ writer.finishRecordSet();
+ writer.close();
+ os.close();
- removeEmpty(contentMap);
+ errorFF = session.putAttribute(errorFF, ATTR_RECORD_COUNT, String.valueOf(added));
- operationList.add(new IndexOperationRequest(idx, t, id, contentMap, o));
- originals.add(record);
+ session.transfer(errorFF, REL_FAILED_RECORDS);
+ } catch (Exception ex) {
+ getLogger().error("", ex);
+ session.remove(errorFF);
+ throw ex;
+ }
}
- return new BulkOperation(operationList, originals, reader.getSchema());
- } catch (Exception e) {
- throw new RuntimeException(e);
+ return false;
+ } else {
+ return true;
}
}
From 0a294cddfc24cfa17aacd929d9aa25eae4309288 Mon Sep 17 00:00:00 2001
From: Mike Thomsen
Date: Thu, 24 Oct 2019 08:20:01 -0400
Subject: [PATCH 6/9] NIFI-5248 Updated error handling.
---
.../elasticsearch/PutElasticsearchRecord.java | 29 +++++++++++++++----
1 file changed, 24 insertions(+), 5 deletions(-)
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
index d166c5d4a1c5..fc811adb639a 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
@@ -184,6 +184,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
RecordPath tPath = typePath != null ? recordPathCache.getCompiled(typePath) : null;
int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger();
+ List badRecords = new ArrayList<>();
try (final InputStream inStream = session.read(input);
final RecordReader reader = readerFactory.createRecordReader(input, inStream, getLogger())) {
@@ -206,7 +207,10 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
if (operationList.size() == batchSize) {
BulkOperation bundle = new BulkOperation(operationList, originals, reader.getSchema());
- indexDocuments(bundle, session, input);
+ FlowFile bad = indexDocuments(bundle, session, input);
+ if (bad != null) {
+ badRecords.add(bad);
+ }
operationList.clear();
originals.clear();
@@ -215,7 +219,10 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
if (operationList.size() > 0) {
BulkOperation bundle = new BulkOperation(operationList, originals, reader.getSchema());
- indexDocuments(bundle, session, input);
+ FlowFile bad = indexDocuments(bundle, session, input);
+ if (bad != null) {
+ badRecords.add(bad);
+ }
}
session.transfer(input, REL_SUCCESS);
@@ -225,13 +232,23 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
getLogger().error(msg, ese);
Relationship rel = ese.isElastic() ? REL_RETRY : REL_FAILURE;
session.transfer(input, rel);
+ removeBadRecordFlowFiles(badRecords, session);
} catch (Exception ex) {
getLogger().error("Could not index documents.", ex);
session.transfer(input, REL_FAILURE);
+ removeBadRecordFlowFiles(badRecords, session);
}
}
- private boolean indexDocuments(BulkOperation bundle, ProcessSession session, FlowFile input) throws Exception {
+ private void removeBadRecordFlowFiles(List bad, ProcessSession session) {
+ for (FlowFile badFlowFile : bad) {
+ session.remove(badFlowFile);
+ }
+
+ bad.clear();
+ }
+
+ private FlowFile indexDocuments(BulkOperation bundle, ProcessSession session, FlowFile input) throws Exception {
IndexOperationResponse response = clientService.bulk(bundle.getOperationList());
if (response.hasErrors()) {
if(logErrors || getLogger().isDebugEnabled()) {
@@ -270,6 +287,8 @@ private boolean indexDocuments(BulkOperation bundle, ProcessSession session, Flo
errorFF = session.putAttribute(errorFF, ATTR_RECORD_COUNT, String.valueOf(added));
session.transfer(errorFF, REL_FAILED_RECORDS);
+
+ return errorFF;
} catch (Exception ex) {
getLogger().error("", ex);
session.remove(errorFF);
@@ -277,9 +296,9 @@ private boolean indexDocuments(BulkOperation bundle, ProcessSession session, Flo
}
}
- return false;
+ return null;
} else {
- return true;
+ return null;
}
}
From e3a6c402e5439383ad8432b0414a02ea4e473ccd Mon Sep 17 00:00:00 2001
From: Mike Thomsen
Date: Mon, 11 Nov 2019 18:24:29 -0500
Subject: [PATCH 7/9] NIFI-5248 Updated to 1.11.0-SNAPSHOT.
---
.../nifi-elasticsearch-restapi-processors/pom.xml | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml
index 2f7e74aa3bb2..5d7ef896a3a1 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml
@@ -59,7 +59,7 @@ language governing permissions and limitations under the License. -->
org.apache.nifinifi-schema-registry-service-api
- 1.10.0-SNAPSHOT
+ 1.11.0-SNAPSHOTcommons-io
@@ -118,19 +118,19 @@ language governing permissions and limitations under the License. -->
org.apache.nifinifi-record-path
- 1.10.0-SNAPSHOT
+ 1.11.0-SNAPSHOTcompileorg.apache.nifinifi-mock-record-utils
- 1.10.0-SNAPSHOT
+ 1.11.0-SNAPSHOTtestorg.apache.nifinifi-record-serialization-services
- 1.10.0-SNAPSHOT
+ 1.11.0-SNAPSHOTtest
From 1a4541f590df72d586fa8709836f9ab56b31aa78 Mon Sep 17 00:00:00 2001
From: Mike Thomsen
Date: Tue, 12 Nov 2019 07:56:29 -0500
Subject: [PATCH 8/9] NIFI-5248 Made changes requested in a code review.
---
.../org/apache/nifi/elasticsearch/IndexOperationRequest.java | 2 +-
.../processors/elasticsearch/PutElasticsearchRecord.java | 1 +
.../elasticsearch/PutElasticsearchRecordTest.groovy | 5 ++---
3 files changed, 4 insertions(+), 4 deletions(-)
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
index 23aff57ae726..df0679a2f009 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
@@ -20,7 +20,7 @@
import java.util.Map;
/**
- * A POJO that represents an "operation on an index." It should be confused with just indexing documents, as it
+ * A POJO that represents an "operation on an index." It should not be confused with just indexing documents, as it
* covers all CRUD-related operations that can be executed against an Elasticsearch index with documents.
*/
public class IndexOperationRequest {
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
index fc811adb639a..3788b8a03b37 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
@@ -231,6 +231,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
ese.isElastic() ? "Moving to retry." : "Moving to failure");
getLogger().error(msg, ese);
Relationship rel = ese.isElastic() ? REL_RETRY : REL_FAILURE;
+ session.penalize(input);
session.transfer(input, rel);
removeBadRecordFlowFiles(badRecords, session);
} catch (Exception ex) {
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
index 704b28859f88..62f7c1b08f14 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
@@ -23,9 +23,7 @@ import org.apache.nifi.elasticsearch.IndexOperationRequest
import org.apache.nifi.elasticsearch.IndexOperationResponse
import org.apache.nifi.json.JsonRecordSetWriter
import org.apache.nifi.json.JsonTreeReader
-import org.apache.nifi.processors.elasticsearch.mock.AbstractMockElasticsearchClient
import org.apache.nifi.processors.elasticsearch.mock.MockBulkLoadClientService
-import org.apache.nifi.schema.access.SchemaAccessStrategy
import org.apache.nifi.schema.access.SchemaAccessUtils
import org.apache.nifi.serialization.record.MockSchemaRegistry
import org.apache.nifi.util.TestRunner
@@ -34,7 +32,8 @@ import org.junit.Assert
import org.junit.Before
import org.junit.Test
-import static groovy.json.JsonOutput.*
+import static groovy.json.JsonOutput.prettyPrint
+import static groovy.json.JsonOutput.toJson
class PutElasticsearchRecordTest {
MockBulkLoadClientService clientService
From 999159c1f97fdb89ee7e683667c249973f21308c Mon Sep 17 00:00:00 2001
From: Mike Thomsen
Date: Tue, 12 Nov 2019 17:56:21 -0500
Subject: [PATCH 9/9] NIFI-5248 Made a few more changes from a code review.
---
.../nifi/elasticsearch/ElasticSearchClientServiceImpl.java | 6 ++++--
.../processors/elasticsearch/PutElasticsearchRecord.java | 2 +-
2 files changed, 5 insertions(+), 3 deletions(-)
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
index 763a32d05701..f856c76bd758 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
@@ -231,7 +231,7 @@ protected void buildRequest(IndexOperationRequest request, StringBuilder builder
builder.append(indexDocument).append("\n");
} else if (request.getOperation().equals(IndexOperationRequest.Operation.Update)
|| request.getOperation().equals(IndexOperationRequest.Operation.Upsert)) {
- Map doc = new HashMap(){{
+ Map doc = new HashMap() {{
put("doc", request.getFields());
if (request.getOperation().equals(IndexOperationRequest.Operation.Upsert)) {
put("doc_as_upsert", true);
@@ -251,7 +251,9 @@ public IndexOperationResponse bulk(List operations) {
buildRequest(or, payload);
}
- getLogger().info(payload.toString());
+ if (getLogger().isDebugEnabled()) {
+ getLogger().debug(payload.toString());
+ }
HttpEntity entity = new NStringEntity(payload.toString(), ContentType.APPLICATION_JSON);
StopWatch watch = new StopWatch();
watch.start();
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
index 3788b8a03b37..bbc86e88105d 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
@@ -118,7 +118,7 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
static final PropertyDescriptor ERROR_RECORD_WRITER = new PropertyDescriptor.Builder()
.name("put-es-record-error-writer")
.displayName("Error Record Writer")
- .displayName("If this configuration property is set, the response from Elasticsearch will be examined for failed records " +
+ .description("If this configuration property is set, the response from Elasticsearch will be examined for failed records " +
"and the failed records will be written to a record set with this record writer service and sent to the \"errors\" " +
"relationship.")
.identifiesControllerService(RecordSetWriterFactory.class)