Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,25 @@
*/
package org.apache.nifi.processors.elasticsearch;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import okhttp3.Authenticator;
import okhttp3.Credentials;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.Route;
import java.io.IOException;
import java.io.InputStream;
import java.net.Proxy;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import javax.net.ssl.SSLContext;

import org.apache.commons.text.StringEscapeUtils;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
Expand All @@ -39,35 +45,41 @@
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.StringUtils;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import okhttp3.Authenticator;
import okhttp3.Credentials;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.Route;

import org.apache.nifi.util.Tuple;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.io.IOException;
import java.io.InputStream;
import java.net.Proxy;
import java.net.URL;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

/**
* A base class for Elasticsearch processors that use the HTTP API
*/
public abstract class AbstractElasticsearchHttpProcessor extends AbstractElasticsearchProcessor {

static enum ElasticsearchVersion {
ES_7,
ES_LESS_THAN_7
}

static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
static final String FIELD_INCLUDE_QUERY_PARAM_ES7 = "_source_includes";
static final String QUERY_QUERY_PARAM = "q";
static final String SORT_QUERY_PARAM = "sort";
static final String SIZE_QUERY_PARAM = "size";
Expand Down Expand Up @@ -138,6 +150,18 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();

public static final PropertyDescriptor ES_VERSION = new PropertyDescriptor.Builder()
.name("elasticsearch-http-version")
.displayName("Elasticsearch Version")
.description("The major version of elasticsearch (this affects some HTTP query parameters and the way responses are parsed).")
.required(true)
.allowableValues(
new AllowableValue(ElasticsearchVersion.ES_LESS_THAN_7.name(), "< 7.0", "Any version of Elasticsearch less than 7.0"),
new AllowableValue(ElasticsearchVersion.ES_7.name(), "7.x", "Elasticsearch version 7.x"))
.defaultValue(ElasticsearchVersion.ES_LESS_THAN_7.name())
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that using allowable value vs supporting Expression language doesn't go well together, but users would expect this property supports Expression language so that they can change ES versions in different environments. Especially in case they want to upgrade ES while NiFi workflow has lots of processors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably suggest not having this property at all.

Instead, change the processors so that they can support users calling different versions of ES and let ES deal with its own specific logic.

For example, call ES 7 with a type (other than _doc) - it's actually still allowed but is deprecated and is warned about on the ES side (but would recommend not trying to relocate that logic in nifi as it'll be different again in later versions). This also means the change becomes a lot simpler - make _type optional and omit it from relevant ES calls. Maybe add/update documentation where appropriate (e.g. GET needs _doc or _source)


private final AtomicReference<OkHttpClient> okHttpClientAtomicReference = new AtomicReference<>();

@Override
Expand All @@ -159,6 +183,7 @@ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String proper
static {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(ES_URL);
properties.add(ES_VERSION);
properties.add(PROP_SSL_CONTEXT_SERVICE);
properties.add(CHARSET);
properties.add(USERNAME);
Expand Down Expand Up @@ -316,9 +341,12 @@ protected void buildBulkCommand(StringBuilder sb, String index, String docType,
if (indexOp.equalsIgnoreCase("index")) {
sb.append("{\"index\": { \"_index\": \"");
sb.append(StringEscapeUtils.escapeJson(index));
sb.append("\", \"_type\": \"");
sb.append(StringEscapeUtils.escapeJson(docType));
sb.append("\"");
if (!(StringUtils.isEmpty(docType) | docType == null)){
Copy link
Contributor

@ChrisSamo632 ChrisSamo632 Nov 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably use isBlank rather than isEmpty her and no need for the additional null (also use ||, not |)

Same comment for similar changes before

sb.append(", \"_type\": \"");
sb.append(StringEscapeUtils.escapeJson(docType));
sb.append("\"");
}
if (!StringUtils.isEmpty(id)) {
sb.append(", \"_id\": \"");
sb.append(StringEscapeUtils.escapeJson(id));
Expand All @@ -330,11 +358,15 @@ protected void buildBulkCommand(StringBuilder sb, String index, String docType,
} else if (indexOp.equalsIgnoreCase("upsert") || indexOp.equalsIgnoreCase("update")) {
sb.append("{\"update\": { \"_index\": \"");
sb.append(StringEscapeUtils.escapeJson(index));
sb.append("\", \"_type\": \"");
sb.append(StringEscapeUtils.escapeJson(docType));
sb.append("\", \"_id\": \"");
sb.append("\"");
if (!(StringUtils.isEmpty(docType) | docType == null)){
sb.append(", \"_type\": \"");
sb.append(StringEscapeUtils.escapeJson(docType));
sb.append("\"");
}
sb.append(", \"_id\": \"");
sb.append(StringEscapeUtils.escapeJson(id));
sb.append("\" }\n");
sb.append("\" } }\n");
sb.append("{\"doc\": ");
sb.append(jsonString);
sb.append(", \"doc_as_upsert\": ");
Expand All @@ -343,11 +375,48 @@ protected void buildBulkCommand(StringBuilder sb, String index, String docType,
} else if (indexOp.equalsIgnoreCase("delete")) {
sb.append("{\"delete\": { \"_index\": \"");
sb.append(StringEscapeUtils.escapeJson(index));
sb.append("\", \"_type\": \"");
sb.append(StringEscapeUtils.escapeJson(docType));
sb.append("\", \"_id\": \"");
sb.append("\"");
if (!(StringUtils.isEmpty(docType) | docType == null)){
sb.append(", \"_type\": \"");
sb.append(StringEscapeUtils.escapeJson(docType));
sb.append("\"");
}
sb.append(", \"_id\": \"");
sb.append(StringEscapeUtils.escapeJson(id));
sb.append("\" }\n");
sb.append("\" }}\n");
}
}

protected String getFieldIncludeParameter(ElasticsearchVersion esVersion) {
return esVersion.equals(ElasticsearchVersion.ES_LESS_THAN_7)
? FIELD_INCLUDE_QUERY_PARAM : FIELD_INCLUDE_QUERY_PARAM_ES7;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change was actually added in ES 6, not 7

Suggest simply using _source parameter instead as that's the same for all versions. If users want to use the _source_include(s) (or excludes) parameters instead, they can use dynamic properties on the processors

}

static class ElasticsearchTypeValidator implements Validator {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed if the ES version parameter goes away

private boolean pre7TypeRequired;

/**
* Creates a validator for an ES type
* @param pre7TypeRequired If true, 'type' will be required for ES
* before version 7.0.
*/
public ElasticsearchTypeValidator(boolean pre7TypeRequired) {
this.pre7TypeRequired = pre7TypeRequired;
}

@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
ElasticsearchVersion esVersion = ElasticsearchVersion.valueOf(context
.getProperty(ES_VERSION).getValue());
if (esVersion == ElasticsearchVersion.ES_7) {
return new ValidationResult.Builder().valid(org.apache.commons.lang3.StringUtils.isBlank(input) || "_doc".equals(input))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_source is also Valid here for ES 7 in a GET document request (i.e. the Fetch processor)

.explanation("Elasticsearch no longer supports 'type' as of version 7.0. Please use '_doc' or leave blank.")
.build();
} else {
return new ValidationResult.Builder().valid(!pre7TypeRequired || org.apache.commons.lang3.StringUtils.isNotBlank(input))
.explanation("Elasticsearch prior to version 7.0 requires a 'type' to be set.")
.build();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;

import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -120,11 +121,11 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
.name("fetch-es-type")
.displayName("Type")
.description("The (optional) type of this document, used by Elasticsearch for indexing and searching. If the property is empty, "
+ "the first document matching the identifier across all types will be retrieved.")
.required(false)
.description("The type of this document (if empty, the first document matching the identifier across all types will be retrieved). "
+ "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
.required(true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to make it mandatory?? If so, would you elaborate why? I guess ElasticsearchTypeValidator should be enough. Probably you want it to align with other processors. If so, I'd suggest make it optional in all this processor family because ElasticsearchTypeValidator can handle it properly. And users will not have to set empty string.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this should be optional everywhere and a sensible (cross-ES version) operation/value provided where this is unset, e.g. omit _type from bulk operations, no effect for queries, use _all for GET (Fetch) for backward compatability (users using ES 7+ will need to set this to an appropriate value, e.g. _doc)

.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(new ElasticsearchTypeValidator(false))
.build();

public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
Expand All @@ -149,6 +150,7 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
relationships = Collections.unmodifiableSet(_rels);

final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
descriptors.add(ES_VERSION);
descriptors.add(DOC_ID);
descriptors.add(INDEX);
descriptors.add(TYPE);
Expand Down Expand Up @@ -199,6 +201,8 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
final String fields = context.getProperty(FIELDS).isSet()
? context.getProperty(FIELDS).evaluateAttributeExpressions(flowFile).getValue()
: null;
final ElasticsearchVersion esVersion = ElasticsearchVersion.valueOf(context.getProperty(ES_VERSION)
.getValue());

// Authentication
final String username = context.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue();
Expand All @@ -214,7 +218,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session

// read the url property from the context
final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue());
final URL url = buildRequestURL(urlstr, docId, index, docType, fields, context);
final URL url = buildRequestURL(urlstr, docId, index, docType, fields, context, esVersion);
final long startNanos = System.nanoTime();

getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "GET", null);
Expand Down Expand Up @@ -306,7 +310,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
}
}

private URL buildRequestURL(String baseUrl, String docId, String index, String type, String fields, ProcessContext context) throws MalformedURLException {
private URL buildRequestURL(String baseUrl, String docId, String index, String type, String fields, ProcessContext context, ElasticsearchVersion esVersion) throws MalformedURLException {
if (StringUtils.isEmpty(baseUrl)) {
throw new MalformedURLException("Base URL cannot be null");
}
Expand All @@ -316,7 +320,8 @@ private URL buildRequestURL(String baseUrl, String docId, String index, String t
builder.addPathSegment(docId);
if (!StringUtils.isEmpty(fields)) {
String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(","));
builder.addQueryParameter(FIELD_INCLUDE_QUERY_PARAM, trimmedFields);
final String fieldIncludeParameter = getFieldIncludeParameter(esVersion);
builder.addQueryParameter(fieldIncludeParameter, trimmedFields);
}

// Find the user-added properties and set them as query parameters on the URL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,11 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
.name("put-es-type")
.displayName("Type")
.description("The type of this document (used by Elasticsearch for indexing and searching)")
.description("The type of this document (required by Elasticsearch versions < 7.0 for indexing and searching). "
+ "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.addValidator(new ElasticsearchTypeValidator(true))
.build();

public static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
Expand Down Expand Up @@ -152,6 +153,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
relationships = Collections.unmodifiableSet(_rels);

final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
descriptors.add(ES_VERSION);
descriptors.add(ID_ATTRIBUTE);
descriptors.add(INDEX);
descriptors.add(TYPE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,11 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
.name("put-es-record-type")
.displayName("Type")
.description("The type of this document (used by Elasticsearch for indexing and searching)")
.description("The type of this document (required by Elasticsearch versions < 7.0 for indexing and searching). "
+ "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.addValidator(new ElasticsearchTypeValidator(true))
.build();

static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
Expand Down Expand Up @@ -260,6 +261,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
relationships = Collections.unmodifiableSet(_rels);

final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
descriptors.add(ES_VERSION);
descriptors.add(RECORD_READER);
descriptors.add(RECORD_WRITER);
descriptors.add(LOG_ALL_ERRORS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,11 @@ public enum QueryInfoRouteStrategy {
public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
.name("query-es-type")
.displayName("Type")
.description(
"The (optional) type of this query, used by Elasticsearch for indexing and searching. If the property is empty, "
+ "the the query will match across all types.")
.required(false)
.description("The type of this document (if empty, searches across all types). "
+ "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(new ElasticsearchTypeValidator(false))
.build();

public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
Expand Down Expand Up @@ -236,6 +235,7 @@ public enum QueryInfoRouteStrategy {
static {
final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
descriptors.add(QUERY);
descriptors.add(ES_VERSION);
descriptors.add(PAGE_SIZE);
descriptors.add(INDEX);
descriptors.add(TYPE);
Expand Down Expand Up @@ -316,7 +316,8 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
.evaluateAttributeExpressions(flowFile).getValue() : null;
final boolean targetIsContent = context.getProperty(TARGET).getValue()
.equals(TARGET_FLOW_FILE_CONTENT);

final ElasticsearchVersion esVersion = ElasticsearchVersion.valueOf(context.getProperty(ES_VERSION)
.getValue());
// Authentication
final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
Expand Down Expand Up @@ -344,7 +345,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
}

final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort,
mPageSize, fromIndex, context);
mPageSize, fromIndex, context, esVersion);

final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl,
username, password, "GET", null);
Expand Down Expand Up @@ -505,7 +506,7 @@ private int getPage(final Response getResponse, final URL url, final ProcessCont
}

private URL buildRequestURL(String baseUrl, String query, String index, String type, String fields,
String sort, int pageSize, int fromIndex, ProcessContext context) throws MalformedURLException {
String sort, int pageSize, int fromIndex, ProcessContext context, ElasticsearchVersion esVersion) throws MalformedURLException {
if (StringUtils.isEmpty(baseUrl)) {
throw new MalformedURLException("Base URL cannot be null");
}
Expand All @@ -520,7 +521,8 @@ private URL buildRequestURL(String baseUrl, String query, String index, String t
builder.addQueryParameter(FROM_QUERY_PARAM, String.valueOf(fromIndex));
if (!StringUtils.isEmpty(fields)) {
String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(","));
builder.addQueryParameter(FIELD_INCLUDE_QUERY_PARAM, trimmedFields);
final String fieldIncludeParameter = getFieldIncludeParameter(esVersion);
builder.addQueryParameter(fieldIncludeParameter, trimmedFields);
}
if (!StringUtils.isEmpty(sort)) {
String trimmedFields = Stream.of(sort.split(",")).map(String::trim).collect(Collectors.joining(","));
Expand Down
Loading