From 2fac219f0098290246abf9b3db3d081ef536fe5d Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Tue, 29 Nov 2016 11:29:47 -0500 Subject: [PATCH 1/2] NIFI-3095: Add EL support to Elasticsearch 2.x (and HTTP) processors --- .../AbstractElasticsearchHttpProcessor.java | 7 ++- .../AbstractElasticsearchProcessor.java | 16 ++++-- ...ElasticsearchTransportClientProcessor.java | 50 +++++++++++-------- .../elasticsearch/FetchElasticsearch.java | 34 ++++++++----- .../elasticsearch/FetchElasticsearchHttp.java | 35 +++++++------ .../elasticsearch/PutElasticsearch.java | 44 +++++++++------- .../elasticsearch/PutElasticsearchHttp.java | 47 ++++++++++------- .../elasticsearch/QueryElasticsearchHttp.java | 35 ++++++++----- .../ScrollElasticsearchHttp.java | 39 +++++++++------ .../elasticsearch/TestFetchElasticsearch.java | 31 ++++++++++++ .../TestFetchElasticsearchHttp.java | 29 +++++++++++ .../elasticsearch/TestPutElasticsearch.java | 33 ++++++++++++ .../TestPutElasticsearchHttp.java | 27 ++++++++++ .../TestQueryElasticsearchHttp.java | 23 +++++++++ .../TestScrollElasticsearchHttp.java | 24 +++++++++ 15 files changed, 354 insertions(+), 120 deletions(-) diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java index c4121b56119f..d67ce6c540a8 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java @@ -56,6 +56,7 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic .description("Elasticsearch URL which will be connected to, including scheme (http, e.g.), host, and port. The default port for the REST API is 9200.") .required(true) .addValidator(StandardValidators.URL_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder() @@ -81,6 +82,7 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic .required(true) .defaultValue("5 secs") .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor RESPONSE_TIMEOUT = new PropertyDescriptor.Builder() @@ -90,6 +92,7 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic .required(true) .defaultValue("15 secs") .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(true) .build(); private final AtomicReference okHttpClientAtomicReference = new AtomicReference<>(); @@ -109,8 +112,8 @@ protected void createElasticsearchClient(ProcessContext context) throws ProcessE } // Set timeouts - okHttpClient.connectTimeout((context.getProperty(CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()), TimeUnit.MILLISECONDS); - okHttpClient.readTimeout(context.getProperty(RESPONSE_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(), TimeUnit.MILLISECONDS); + okHttpClient.connectTimeout((context.getProperty(CONNECT_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue()), TimeUnit.MILLISECONDS); + okHttpClient.readTimeout(context.getProperty(RESPONSE_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue(), TimeUnit.MILLISECONDS); final SSLContextService sslService = context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); final SSLContext sslContext = sslService == null ? null : sslService.createSSLContext(SSLContextService.ClientAuth.NONE); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java index 76c7224412fb..a88914e4cd64 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java @@ -19,6 +19,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.exception.ProcessException; @@ -28,7 +29,6 @@ import java.util.Collection; import java.util.HashSet; -import java.util.Map; import java.util.Set; /** @@ -36,6 +36,13 @@ */ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor { + protected static final Validator NON_EMPTY_EL_VALIDATOR = (subject, value, context) -> { + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { + return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build(); + } + return new ValidationResult.Builder().subject(subject).input(value).valid(value != null && !value.isEmpty()).explanation(subject + " cannot be empty").build(); + }; + public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() .name("SSL Context Service") .description("The SSL Context Service used to provide client certificate information for TLS/SSL " @@ -50,6 +57,7 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor { .required(true) .defaultValue("UTF-8") .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() @@ -57,6 +65,7 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor { .description("Username to access the Elasticsearch cluster") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() @@ -74,8 +83,9 @@ protected Collection customValidate(ValidationContext validati Set results = new HashSet<>(); // Ensure that if username or password is set, then the other is too - Map propertyMap = validationContext.getProperties(); - if (StringUtils.isEmpty(propertyMap.get(USERNAME)) != StringUtils.isEmpty(propertyMap.get(PASSWORD))) { + String userName = validationContext.getProperty(USERNAME).evaluateAttributeExpressions().getValue(); + String password = validationContext.getProperty(PASSWORD).evaluateAttributeExpressions().getValue(); + if (StringUtils.isEmpty(userName) != StringUtils.isEmpty(password)) { results.add(new ValidationResult.Builder().valid(false).explanation( "If username or password is specified, then the other must be specified as well").build()); } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchTransportClientProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchTransportClientProcessor.java index e1c9d4d7eaaf..cc0f497d3f34 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchTransportClientProcessor.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchTransportClientProcessor.java @@ -17,7 +17,6 @@ package org.apache.nifi.processors.elasticsearch; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; import org.apache.nifi.logging.ComponentLog; @@ -50,21 +49,20 @@ public abstract class AbstractElasticsearchTransportClientProcessor extends Abst /** * This validator ensures the Elasticsearch hosts property is a valid list of hostname:port entries */ - private static final Validator HOSTNAME_PORT_VALIDATOR = new Validator() { - @Override - public ValidationResult validate(final String subject, final String input, final ValidationContext context) { - final List esList = Arrays.asList(input.split(",")); - for (String hostnamePort : esList) { - String[] addresses = hostnamePort.split(":"); - // Protect against invalid input like http://127.0.0.1:9300 (URL scheme should not be there) - if (addresses.length != 2) { - return new ValidationResult.Builder().subject(subject).input(input).explanation( - "Must be in hostname:port form (no scheme such as http://").valid(false).build(); - } + private static final Validator HOSTNAME_PORT_VALIDATOR = (subject, input, context) -> { + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) { + return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build(); + } + final List esList = Arrays.asList(input.split(",")); + for (String hostnamePort : esList) { + String[] addresses = hostnamePort.split(":"); + // Protect against invalid input like http://127.0.0.1:9300 (URL scheme should not be there) + if (addresses.length != 2) { + return new ValidationResult.Builder().subject(subject).input(input).explanation( + "Must be in hostname:port form (no scheme such as http://").valid(false).build(); } - return new ValidationResult.Builder().subject(subject).input(input).explanation( - "Valid cluster definition").valid(true).build(); } + return new ValidationResult.Builder().subject(subject).input(input).explanation("Valid cluster definition").valid(true).build(); }; protected static final PropertyDescriptor CLUSTER_NAME = new PropertyDescriptor.Builder() @@ -73,6 +71,7 @@ public ValidationResult validate(final String subject, final String input, final .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .defaultValue("elasticsearch") + .expressionLanguageSupported(true) .build(); protected static final PropertyDescriptor HOSTS = new PropertyDescriptor.Builder() @@ -83,6 +82,7 @@ public ValidationResult validate(final String subject, final String input, final .required(true) .expressionLanguageSupported(false) .addValidator(HOSTNAME_PORT_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor PROP_SHIELD_LOCATION = new PropertyDescriptor.Builder() @@ -93,6 +93,7 @@ public ValidationResult validate(final String subject, final String input, final + "lib/ directory, doing so will prevent the Shield plugin from being loaded.") .required(false) .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .expressionLanguageSupported(true) .build(); protected static final PropertyDescriptor PING_TIMEOUT = new PropertyDescriptor.Builder() @@ -101,7 +102,8 @@ public ValidationResult validate(final String subject, final String input, final "For example, 5s (5 seconds). If non-local recommended is 30s") .required(true) .defaultValue("5s") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(true) .build(); protected static final PropertyDescriptor SAMPLER_INTERVAL = new PropertyDescriptor.Builder() @@ -110,7 +112,8 @@ public ValidationResult validate(final String subject, final String input, final + "If non-local recommended is 30s.") .required(true) .defaultValue("5s") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(true) .build(); protected AtomicReference esClient = new AtomicReference<>(); @@ -135,10 +138,10 @@ protected void createElasticsearchClient(ProcessContext context) throws ProcessE log.debug("Creating ElasticSearch Client"); try { - final String clusterName = context.getProperty(CLUSTER_NAME).getValue(); - final String pingTimeout = context.getProperty(PING_TIMEOUT).getValue(); - final String samplerInterval = context.getProperty(SAMPLER_INTERVAL).getValue(); - final String username = context.getProperty(USERNAME).getValue(); + final String clusterName = context.getProperty(CLUSTER_NAME).evaluateAttributeExpressions().getValue(); + final String pingTimeout = context.getProperty(PING_TIMEOUT).evaluateAttributeExpressions().getValue(); + final String samplerInterval = context.getProperty(SAMPLER_INTERVAL).evaluateAttributeExpressions().getValue(); + final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue(); final String password = context.getProperty(PASSWORD).getValue(); final SSLContextService sslService = @@ -149,7 +152,7 @@ protected void createElasticsearchClient(ProcessContext context) throws ProcessE .put("client.transport.ping_timeout", pingTimeout) .put("client.transport.nodes_sampler_interval", samplerInterval); - String shieldUrl = context.getProperty(PROP_SHIELD_LOCATION).getValue(); + String shieldUrl = context.getProperty(PROP_SHIELD_LOCATION).evaluateAttributeExpressions().getValue(); if (sslService != null) { settingsBuilder.put("shield.transport.ssl", "true") .put("shield.ssl.keystore.path", sslService.getKeyStoreFile()) @@ -171,7 +174,7 @@ protected void createElasticsearchClient(ProcessContext context) throws ProcessE TransportClient transportClient = getTransportClient(settingsBuilder, shieldUrl, username, password); - final String hosts = context.getProperty(HOSTS).getValue(); + final String hosts = context.getProperty(HOSTS).evaluateAttributeExpressions().getValue(); esHosts = getEsHosts(hosts); if (esHosts != null) { @@ -268,6 +271,9 @@ private List getEsHosts(String hosts) { for (String item : esList) { String[] addresses = item.split(":"); + if (addresses.length != 2) { + throw new ArrayIndexOutOfBoundsException("Not in host:port format"); + } final String hostName = addresses[0].trim(); final int port = Integer.parseInt(addresses[1].trim()); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch.java index 67aaae787789..643edbb6ef07 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch.java @@ -105,18 +105,17 @@ public class FetchElasticsearch extends AbstractElasticsearchTransportClientProc .build(); - @Override - public Set getRelationships() { - final Set relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - relationships.add(REL_FAILURE); - relationships.add(REL_RETRY); - relationships.add(REL_NOT_FOUND); - return Collections.unmodifiableSet(relationships); - } + private static final Set relationships; + private static final List propertyDescriptors; + + static { + final Set _rels = new HashSet<>(); + _rels.add(REL_SUCCESS); + _rels.add(REL_FAILURE); + _rels.add(REL_RETRY); + _rels.add(REL_NOT_FOUND); + relationships = Collections.unmodifiableSet(_rels); - @Override - public final List getSupportedPropertyDescriptors() { final List descriptors = new ArrayList<>(); descriptors.add(CLUSTER_NAME); descriptors.add(HOSTS); @@ -131,9 +130,18 @@ public final List getSupportedPropertyDescriptors() { descriptors.add(TYPE); descriptors.add(CHARSET); - return Collections.unmodifiableList(descriptors); + propertyDescriptors = Collections.unmodifiableList(descriptors); } + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return propertyDescriptors; + } @OnScheduled public void setup(ProcessContext context) { @@ -151,7 +159,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue(); final String docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue(); final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue(); - final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue()); + final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue()); final ComponentLog logger = getLogger(); try { diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java index ac598bfcfd89..b3cd6009acdd 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java @@ -131,19 +131,17 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + private static final Set relationships; + private static final List propertyDescriptors; - @Override - public Set getRelationships() { - final Set relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - relationships.add(REL_FAILURE); - relationships.add(REL_RETRY); - relationships.add(REL_NOT_FOUND); - return Collections.unmodifiableSet(relationships); - } + static { + final Set _rels = new HashSet<>(); + _rels.add(REL_SUCCESS); + _rels.add(REL_FAILURE); + _rels.add(REL_RETRY); + _rels.add(REL_NOT_FOUND); + relationships = Collections.unmodifiableSet(_rels); - @Override - public final List getSupportedPropertyDescriptors() { final List descriptors = new ArrayList<>(); descriptors.add(ES_URL); descriptors.add(PROP_SSL_CONTEXT_SERVICE); @@ -156,9 +154,18 @@ public final List getSupportedPropertyDescriptors() { descriptors.add(TYPE); descriptors.add(FIELDS); - return Collections.unmodifiableList(descriptors); + propertyDescriptors = Collections.unmodifiableList(descriptors); + } + + @Override + public Set getRelationships() { + return relationships; } + @Override + public final List getSupportedPropertyDescriptors() { + return propertyDescriptors; + } @OnScheduled public void setup(ProcessContext context) { @@ -194,7 +201,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session : null; // Authentication - final String username = context.getProperty(USERNAME).getValue(); + final String username = context.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue(); final String password = context.getProperty(PASSWORD).getValue(); final ComponentLog logger = getLogger(); @@ -204,7 +211,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session logger.debug("Fetching {}/{}/{} from Elasticsearch", new Object[]{index, docType, docId}); // read the url property from the context - final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).getValue()); + final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue()); final URL url = buildRequestURL(urlstr, docId, index, docType, fields); final long startNanos = System.nanoTime(); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java index 216efd4a771f..d208d40d3428 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java @@ -96,8 +96,7 @@ public class PutElasticsearch extends AbstractElasticsearchTransportClientProces .description("The type of this document (used by Elasticsearch for indexing and searching)") .required(true) .expressionLanguageSupported(true) - .addValidator(StandardValidators.createAttributeExpressionLanguageValidator( - AttributeExpression.ResultType.STRING, true)) + .addValidator(NON_EMPTY_EL_VALIDATOR) .build(); public static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder() @@ -105,8 +104,7 @@ public class PutElasticsearch extends AbstractElasticsearchTransportClientProces .description("The type of the operation used to index (index, update, upsert)") .required(true) .expressionLanguageSupported(true) - .addValidator(StandardValidators.createAttributeExpressionLanguageValidator( - AttributeExpression.ResultType.STRING, true)) + .addValidator(NON_EMPTY_EL_VALIDATOR) .defaultValue("index") .build(); @@ -116,20 +114,19 @@ public class PutElasticsearch extends AbstractElasticsearchTransportClientProces .required(true) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .defaultValue("100") + .expressionLanguageSupported(true) .build(); + private static final Set relationships; + private static final List propertyDescriptors; - @Override - public Set getRelationships() { - final Set relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - relationships.add(REL_FAILURE); - relationships.add(REL_RETRY); - return Collections.unmodifiableSet(relationships); - } + static { + final Set _rels = new HashSet<>(); + _rels.add(REL_SUCCESS); + _rels.add(REL_FAILURE); + _rels.add(REL_RETRY); + relationships = Collections.unmodifiableSet(_rels); - @Override - public final List getSupportedPropertyDescriptors() { final List descriptors = new ArrayList<>(); descriptors.add(CLUSTER_NAME); descriptors.add(HOSTS); @@ -146,7 +143,17 @@ public final List getSupportedPropertyDescriptors() { descriptors.add(BATCH_SIZE); descriptors.add(INDEX_OP); - return Collections.unmodifiableList(descriptors); + propertyDescriptors = Collections.unmodifiableList(descriptors); + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return propertyDescriptors; } @OnScheduled @@ -156,16 +163,16 @@ public void setup(ProcessContext context) { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + + final ComponentLog logger = getLogger(); final String id_attribute = context.getProperty(ID_ATTRIBUTE).getValue(); - final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue()); + final int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger(); final List flowFiles = session.get(batchSize); if (flowFiles.isEmpty()) { return; } - final ComponentLog logger = getLogger(); // Keep track of the list of flow files that need to be transferred. As they are transferred, remove them from the list. List flowFilesToTransfer = new LinkedList<>(flowFiles); try { @@ -178,6 +185,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final String index = context.getProperty(INDEX).evaluateAttributeExpressions(file).getValue(); final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(file).getValue(); final String indexOp = context.getProperty(INDEX_OP).evaluateAttributeExpressions(file).getValue(); + final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(file).getValue()); final String id = file.getAttribute(id_attribute); if (id == null) { diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java index 3ba46bb362b5..9c4a71e795b8 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java @@ -104,8 +104,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { .description("The type of this document (used by Elasticsearch for indexing and searching)") .required(true) .expressionLanguageSupported(true) - .addValidator(StandardValidators.createAttributeExpressionLanguageValidator( - AttributeExpression.ResultType.STRING, true)) + .addValidator(NON_EMPTY_EL_VALIDATOR) .build(); public static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder() @@ -114,8 +113,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { .description("The type of the operation used to index (index, update, upsert, delete)") .required(true) .expressionLanguageSupported(true) - .addValidator(StandardValidators.createAttributeExpressionLanguageValidator( - AttributeExpression.ResultType.STRING, true)) + .addValidator(NON_EMPTY_EL_VALIDATOR) .defaultValue("index") .build(); @@ -128,19 +126,19 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { .required(true) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .defaultValue("100") + .expressionLanguageSupported(true) .build(); - @Override - public Set getRelationships() { - final Set relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - relationships.add(REL_FAILURE); - relationships.add(REL_RETRY); - return Collections.unmodifiableSet(relationships); - } + private static final Set relationships; + private static final List propertyDescriptors; + + static { + final Set _rels = new HashSet<>(); + _rels.add(REL_SUCCESS); + _rels.add(REL_FAILURE); + _rels.add(REL_RETRY); + relationships = Collections.unmodifiableSet(_rels); - @Override - public final List getSupportedPropertyDescriptors() { final List descriptors = new ArrayList<>(); descriptors.add(ES_URL); descriptors.add(PROP_SSL_CONTEXT_SERVICE); @@ -154,7 +152,18 @@ public final List getSupportedPropertyDescriptors() { descriptors.add(CHARSET); descriptors.add(BATCH_SIZE); descriptors.add(INDEX_OP); - return Collections.unmodifiableList(descriptors); + + propertyDescriptors = Collections.unmodifiableList(descriptors); + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return propertyDescriptors; } @Override @@ -192,7 +201,7 @@ public void setup(ProcessContext context) { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + final int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger(); final List flowFiles = session.get(batchSize); if (flowFiles.isEmpty()) { @@ -200,9 +209,9 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } final String id_attribute = context.getProperty(ID_ATTRIBUTE).getValue(); - final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue()); + final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions().getValue()); // Authentication - final String username = context.getProperty(USERNAME).getValue(); + final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue(); final String password = context.getProperty(PASSWORD).getValue(); @@ -213,7 +222,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session List flowFilesToTransfer = new LinkedList<>(flowFiles); final StringBuilder sb = new StringBuilder(); - final String baseUrl = trimToEmpty(context.getProperty(ES_URL).getValue()); + final String baseUrl = trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue()); final URL url; try { url = new URL((baseUrl.endsWith("/") ? baseUrl : baseUrl + "/") + "_bulk"); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java index f921323143ce..74ea6ffba445 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java @@ -174,17 +174,16 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { .allowableValues(TARGET_FLOW_FILE_CONTENT, TARGET_FLOW_FILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); - @Override - public Set getRelationships() { - final Set relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - relationships.add(REL_FAILURE); - relationships.add(REL_RETRY); - return Collections.unmodifiableSet(relationships); - } + private static final Set relationships; + private static final List propertyDescriptors; + + static { + final Set _rels = new HashSet<>(); + _rels.add(REL_SUCCESS); + _rels.add(REL_FAILURE); + _rels.add(REL_RETRY); + relationships = Collections.unmodifiableSet(_rels); - @Override - public final List getSupportedPropertyDescriptors() { final List descriptors = new ArrayList<>(); descriptors.add(ES_URL); descriptors.add(PROP_SSL_CONTEXT_SERVICE); @@ -201,7 +200,17 @@ public final List getSupportedPropertyDescriptors() { descriptors.add(LIMIT); descriptors.add(TARGET); - return Collections.unmodifiableList(descriptors); + propertyDescriptors = Collections.unmodifiableList(descriptors); + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return propertyDescriptors; } @OnScheduled @@ -247,7 +256,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session .equals(TARGET_FLOW_FILE_CONTENT); // Authentication - final String username = context.getProperty(USERNAME).getValue(); + final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue(); final String password = context.getProperty(PASSWORD).getValue(); final ComponentLog logger = getLogger(); @@ -261,7 +270,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final long startNanos = System.nanoTime(); // read the url property from the context - final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).getValue()); + final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue()); boolean hitLimit = false; do { diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java index 3d897cf9752d..10237e3fff62 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java @@ -159,16 +159,15 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor .required(true).expressionLanguageSupported(true) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build(); - @Override - public Set getRelationships() { - final Set relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - relationships.add(REL_FAILURE); - return Collections.unmodifiableSet(relationships); - } + private static final Set relationships; + private static final List propertyDescriptors; + + static { + final Set _rels = new HashSet<>(); + _rels.add(REL_SUCCESS); + _rels.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(_rels); - @Override - public final List getSupportedPropertyDescriptors() { final List descriptors = new ArrayList<>(); descriptors.add(ES_URL); descriptors.add(PROP_SSL_CONTEXT_SERVICE); @@ -184,7 +183,17 @@ public final List getSupportedPropertyDescriptors() { descriptors.add(FIELDS); descriptors.add(SORT); - return Collections.unmodifiableList(descriptors); + propertyDescriptors = Collections.unmodifiableList(descriptors); + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return propertyDescriptors; } @OnScheduled @@ -227,7 +236,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session .getProperty(SCROLL_DURATION).evaluateAttributeExpressions(flowFile).getValue() : null; // Authentication - final String username = context.getProperty(USERNAME).getValue(); + final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue(); final String password = context.getProperty(PASSWORD).getValue(); final ComponentLog logger = getLogger(); @@ -235,10 +244,10 @@ public void onTrigger(final ProcessContext context, final ProcessSession session try { String scrollId = loadScrollId(context.getStateManager()); + // read the url property from the context + final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions() + .getValue()); if (scrollId != null) { - // read the url property from the context - final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL) - .getValue()); final URL scrollurl = buildRequestURL(urlstr, query, index, docType, fields, sort, scrollId, pageSize, scroll); final long startNanos = System.nanoTime(); @@ -251,8 +260,6 @@ public void onTrigger(final ProcessContext context, final ProcessSession session docType, query }); // read the url property from the context - final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL) - .getValue()); final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort, scrollId, pageSize, scroll); final long startNanos = System.nanoTime(); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java index 9b68f2eeb1a2..ba22b65c9271 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java @@ -102,6 +102,37 @@ public void testFetchElasticsearchOnTrigger() throws IOException { out.assertAttributeEquals("doc_id", "28039652140"); } + @Test + public void testFetchElasticsearchOnTriggerEL() throws IOException { + runner = TestRunners.newTestRunner(new FetchElasticsearchTestProcessor(true)); // all docs are found + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "${cluster.name}"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "${hosts}"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "${ping.timeout}"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "${sampler.interval}"); + + runner.setProperty(FetchElasticsearch.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(FetchElasticsearch.TYPE, "status"); + runner.assertNotValid(); + runner.setProperty(FetchElasticsearch.DOC_ID, "${doc_id}"); + runner.assertValid(); + runner.setVariable("cluster.name", "elasticsearch"); + runner.setVariable("hosts", "127.0.0.1:9300"); + runner.setVariable("ping.timeout", "5s"); + runner.setVariable("sampler.interval", "5s"); + + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearch.REL_SUCCESS).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", "28039652140"); + } + @Test public void testFetchElasticsearchOnTriggerWithFailures() throws IOException { runner = TestRunners.newTestRunner(new FetchElasticsearchTestProcessor(false)); // simulate doc not found diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java index 82fa3dae2af3..f7f9907fb2d5 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java @@ -63,6 +63,35 @@ public void teardown() { @Test public void testFetchElasticsearchOnTrigger() throws IOException { + runner = TestRunners.newTestRunner(new FetchElasticsearchHttpTestProcessor(true)); // all docs are found + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "${es.url}"); + + runner.setProperty(FetchElasticsearchHttp.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(FetchElasticsearchHttp.TYPE, "status"); + runner.assertNotValid(); + runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}"); + runner.assertValid(); + runner.setProperty(AbstractElasticsearchHttpProcessor.CONNECT_TIMEOUT, "${connect.timeout}"); + runner.assertValid(); + + runner.setVariable("es.url", "http://127.0.0.1:9200"); + runner.setVariable("connect.timeout", "5s"); + + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearchHttp.REL_SUCCESS).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", "28039652140"); + } + + @Test + public void testFetchElasticsearchOnTriggerEL() throws IOException { runner = TestRunners.newTestRunner(new FetchElasticsearchHttpTestProcessor(true)); // all docs are found runner.setValidateExpressionUsage(true); runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java index 4e6a820d6222..6d6da5a01f26 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java @@ -102,6 +102,39 @@ public void testPutElasticSearchOnTrigger() throws IOException { out.assertAttributeEquals("doc_id", "28039652140"); } + @Test + public void testPutElasticSearchOnTriggerEL() throws IOException { + runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "${cluster.name}"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "${hosts}"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "${ping.timeout}"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "${sampler.interval}"); + + runner.setProperty(PutElasticsearch.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(PutElasticsearch.TYPE, "status"); + runner.setProperty(PutElasticsearch.BATCH_SIZE, "1"); + runner.assertNotValid(); + runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id"); + runner.assertValid(); + runner.setVariable("cluster.name", "elasticsearch"); + runner.setVariable("hosts", "127.0.0.1:9300"); + runner.setVariable("ping.timeout", "5s"); + runner.setVariable("sampler.interval", "5s"); + + + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", "28039652140"); + } + @Test public void testPutElasticSearchOnTriggerWithFailures() throws IOException { runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(true)); // simulate failures diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java index 9ce578f61538..fae63eeb62a8 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java @@ -126,6 +126,33 @@ public void testPutElasticSearchOnTriggerDelete() throws IOException { out.assertAttributeEquals("doc_id", "28039652140"); } + @Test + public void testPutElasticSearchOnTriggerEL() throws IOException { + runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "${es.url}"); + + runner.setProperty(PutElasticsearchHttp.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttp.TYPE, "status"); + runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1"); + runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id"); + runner.setProperty(AbstractElasticsearchHttpProcessor.CONNECT_TIMEOUT, "${connect.timeout}"); + runner.assertValid(); + + runner.setVariable("es.url", "http://127.0.0.1:9200"); + runner.setVariable("connect.timeout", "5s"); + + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_SUCCESS).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", "28039652140"); + } + @Test public void testPutElasticSearchOnTriggerBadIndexOp() throws IOException { runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java index b9ec1f98af05..ccd74faed352 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java @@ -76,6 +76,29 @@ public void testQueryElasticsearchOnTrigger_withInput() throws IOException { runAndVerifySuccess(true); } + @Test + public void testQueryElasticsearchOnTrigger_withInput_EL() throws IOException { + runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor()); + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "${es.url}"); + + runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.TYPE, "status"); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.QUERY, + "source:Twitter AND identifier:\"${identifier}\""); + runner.assertValid(); + runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2"); + runner.assertValid(); + runner.setProperty(AbstractElasticsearchHttpProcessor.CONNECT_TIMEOUT, "${connect.timeout}"); + runner.assertValid(); + + runner.setVariable("es.url", "http://127.0.0.1:9200"); + + runAndVerifySuccess(true); + } + @Test public void testQueryElasticsearchOnTrigger_withInput_attributeTarget() throws IOException { runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor()); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java index 261626925703..a1a4e8df2e24 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java @@ -78,6 +78,30 @@ public void testScrollElasticsearchOnTrigger_withNoInput() throws IOException { runAndVerifySuccess(); } + @Test + public void testScrollElasticsearchOnTrigger_withNoInput_EL() throws IOException { + runner = TestRunners.newTestRunner(new ScrollElasticsearchHttpTestProcessor()); + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "${es.url}"); + + runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(ScrollElasticsearchHttp.TYPE, "status"); + runner.assertNotValid(); + runner.setProperty(ScrollElasticsearchHttp.QUERY, + "source:WZ AND identifier:\"${identifier}\""); + runner.assertValid(); + runner.setProperty(ScrollElasticsearchHttp.PAGE_SIZE, "2"); + runner.assertValid(); + runner.setProperty(AbstractElasticsearchHttpProcessor.CONNECT_TIMEOUT, "${connect.timeout}"); + runner.assertValid(); + + runner.setVariable("es.url", "http://127.0.0.1:9200"); + + runner.setIncomingConnection(false); + runAndVerifySuccess(); + } + private void runAndVerifySuccess() { runner.enqueue("".getBytes(), new HashMap() { { From 19c01866164c2cf69685c7a2d630c11106a13953 Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Thu, 1 Dec 2016 15:55:03 -0500 Subject: [PATCH 2/2] NIFI-3095: Incorporated review comments --- .../elasticsearch/AbstractElasticsearchProcessor.java | 5 +++-- .../AbstractElasticsearchTransportClientProcessor.java | 2 +- .../processors/elasticsearch/FetchElasticsearchHttp.java | 9 +++++++-- .../processors/elasticsearch/PutElasticsearchHttp.java | 6 ++++-- .../processors/elasticsearch/QueryElasticsearchHttp.java | 3 ++- .../elasticsearch/ScrollElasticsearchHttp.java | 4 +++- .../elasticsearch/TestFetchElasticsearchHttp.java | 4 ++-- 7 files changed, 22 insertions(+), 11 deletions(-) diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java index a88914e4cd64..c5e4cc3d792c 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java @@ -36,11 +36,11 @@ */ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor { - protected static final Validator NON_EMPTY_EL_VALIDATOR = (subject, value, context) -> { + static final Validator NON_EMPTY_EL_VALIDATOR = (subject, value, context) -> { if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build(); } - return new ValidationResult.Builder().subject(subject).input(value).valid(value != null && !value.isEmpty()).explanation(subject + " cannot be empty").build(); + return StandardValidators.NON_EMPTY_VALIDATOR.validate(subject, value, context); }; public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() @@ -74,6 +74,7 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor { .required(false) .sensitive(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) .build(); protected abstract void createElasticsearchClient(ProcessContext context) throws ProcessException; diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchTransportClientProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchTransportClientProcessor.java index cc0f497d3f34..a16a0dd6a5f2 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchTransportClientProcessor.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchTransportClientProcessor.java @@ -142,7 +142,7 @@ protected void createElasticsearchClient(ProcessContext context) throws ProcessE final String pingTimeout = context.getProperty(PING_TIMEOUT).evaluateAttributeExpressions().getValue(); final String samplerInterval = context.getProperty(SAMPLER_INTERVAL).evaluateAttributeExpressions().getValue(); final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue(); - final String password = context.getProperty(PASSWORD).getValue(); + final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue(); final SSLContextService sslService = context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java index b3cd6009acdd..8fd30dcc9eb6 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java @@ -202,10 +202,11 @@ public void onTrigger(final ProcessContext context, final ProcessSession session // Authentication final String username = context.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue(); - final String password = context.getProperty(PASSWORD).getValue(); + final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue(); final ComponentLog logger = getLogger(); + Response getResponse = null; try { logger.debug("Fetching {}/{}/{} from Elasticsearch", new Object[]{index, docType, docId}); @@ -215,7 +216,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final URL url = buildRequestURL(urlstr, docId, index, docType, fields); final long startNanos = System.nanoTime(); - final Response getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "GET", null); + getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "GET", null); final int statusCode = getResponse.code(); if (isSuccess(statusCode)) { @@ -297,6 +298,10 @@ public void onTrigger(final ProcessContext context, final ProcessSession session session.remove(flowFile); } context.yield(); + } finally { + if (getResponse != null) { + getResponse.close(); + } } } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java index 9c4a71e795b8..2b39a86b9594 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java @@ -209,10 +209,10 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } final String id_attribute = context.getProperty(ID_ATTRIBUTE).getValue(); - final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions().getValue()); + // Authentication final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue(); - final String password = context.getProperty(PASSWORD).getValue(); + final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue(); OkHttpClient okHttpClient = getClient(); @@ -234,6 +234,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session for (FlowFile file : flowFiles) { final String index = context.getProperty(INDEX).evaluateAttributeExpressions(file).getValue(); + final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(file).getValue()); if (StringUtils.isEmpty(index)) { logger.error("No value for index in for {}, transferring to failure", new Object[]{id_attribute, file}); flowFilesToTransfer.remove(file); @@ -377,6 +378,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to failure", new Object[]{statusCode, getResponse.message()}); session.transfer(flowFilesToTransfer, REL_FAILURE); } + getResponse.close(); } } } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java index 74ea6ffba445..f65816e12514 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java @@ -257,7 +257,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session // Authentication final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue(); - final String password = context.getProperty(PASSWORD).getValue(); + final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue(); final ComponentLog logger = getLogger(); @@ -288,6 +288,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session numResults = this.getPage(getResponse, queryUrl, context, session, flowFile, logger, startNanos, targetIsContent); fromIndex += pageSize; + getResponse.close(); } while (numResults > 0 && !hitLimit); if (flowFile != null) { diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java index 10237e3fff62..0442bf7bf13e 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java @@ -237,7 +237,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session // Authentication final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue(); - final String password = context.getProperty(PASSWORD).getValue(); + final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue(); final ComponentLog logger = getLogger(); @@ -255,6 +255,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final Response getResponse = sendRequestToElasticsearch(okHttpClient, scrollurl, username, password, "GET", null); this.getPage(getResponse, scrollurl, context, session, flowFile, logger, startNanos); + getResponse.close(); } else { logger.debug("Querying {}/{} from Elasticsearch: {}", new Object[] { index, docType, query }); @@ -267,6 +268,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl, username, password, "GET", null); this.getPage(getResponse, queryUrl, context, session, flowFile, logger, startNanos); + getResponse.close(); } } catch (IOException ioe) { diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java index f7f9907fb2d5..28bc06091d49 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java @@ -62,7 +62,7 @@ public void teardown() { } @Test - public void testFetchElasticsearchOnTrigger() throws IOException { + public void testFetchElasticsearchOnTriggerEL() throws IOException { runner = TestRunners.newTestRunner(new FetchElasticsearchHttpTestProcessor(true)); // all docs are found runner.setValidateExpressionUsage(true); runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "${es.url}"); @@ -91,7 +91,7 @@ public void testFetchElasticsearchOnTrigger() throws IOException { } @Test - public void testFetchElasticsearchOnTriggerEL() throws IOException { + public void testFetchElasticsearchOnTrigger() throws IOException { runner = TestRunners.newTestRunner(new FetchElasticsearchHttpTestProcessor(true)); // all docs are found runner.setValidateExpressionUsage(true); runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");