Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-3139 Added host:port list and non empty EL validators in Standar… #1367

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -22,9 +22,11 @@
import java.net.URL;
import java.nio.charset.Charset;
import java.nio.charset.UnsupportedCharsetException;
import java.time.Instant;
import java.text.NumberFormat;
import java.text.ParseException;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -158,6 +160,55 @@ public ValidationResult validate(final String subject, final String value, final
}
};

/**
* {@link Validator} that ensures that value's length > 0 and that expression language is present
*/
public static final Validator NON_EMPTY_EL_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
}
return StandardValidators.NON_EMPTY_VALIDATOR.validate(subject, input, context);
}
};

/**
* {@link Validator} that ensures that value is a non-empty comma separated list of hostname:port
*/
public static final Validator HOSTNAME_PORT_LIST_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
// expression language
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
}
// not empty
ValidationResult nonEmptyValidatorResult = StandardValidators.NON_EMPTY_VALIDATOR.validate(subject, input, context);
if (!nonEmptyValidatorResult.isValid()) {
return nonEmptyValidatorResult;
}
// check format
final List<String> hostnamePortList = Arrays.asList(input.split(","));
for (String hostnamePort : hostnamePortList) {
String[] addresses = hostnamePort.split(":");
// Protect against invalid input like http://127.0.0.1:9300 (URL scheme should not be there)
if (addresses.length != 2) {
return new ValidationResult.Builder().subject(subject).input(input).explanation(
"Must be in hostname:port form (no scheme such as http://").valid(false).build();
}

// Validate the port
String port = addresses[1].trim();
ValidationResult portValidatorResult = StandardValidators.PORT_VALIDATOR.validate(subject, port, context);
if (!portValidatorResult.isValid()) {
return portValidatorResult;
}
}
return new ValidationResult.Builder().subject(subject).input(input).explanation("Valid cluster definition").valid(true).build();
}
};

/**
* {@link Validator} that ensures that value has 1+ non-whitespace
* characters
Expand Down
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.nifi.util.validator;

import org.apache.nifi.processor.util.StandardValidators;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
Expand All @@ -26,6 +25,7 @@
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.processor.util.StandardValidators;
import org.junit.Test;
import org.mockito.Mockito;

Expand All @@ -45,6 +45,61 @@ public void testNonBlankValidator() {
assertTrue(vr.isValid());
}

@Test
public void testNonEmptyELValidator() {
Validator val = StandardValidators.NON_EMPTY_EL_VALIDATOR;
ValidationContext vc = mock(ValidationContext.class);
Mockito.when(vc.isExpressionLanguageSupported("foo")).thenReturn(true);

ValidationResult vr = val.validate("foo", "", vc);
assertFalse(vr.isValid());

vr = val.validate("foo", " h", vc);
assertTrue(vr.isValid());

Mockito.when(vc.isExpressionLanguagePresent("${test}")).thenReturn(true);
vr = val.validate("foo", "${test}", vc);
assertTrue(vr.isValid());

vr = val.validate("foo", "${test", vc);
assertTrue(vr.isValid());
}

@Test
public void testHostnamePortListValidator() {
Validator val = StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR;
ValidationContext vc = mock(ValidationContext.class);
Mockito.when(vc.isExpressionLanguageSupported("foo")).thenReturn(true);

ValidationResult vr = val.validate("foo", "", vc);
assertFalse(vr.isValid());

vr = val.validate("foo", "localhost", vc);
assertFalse(vr.isValid());

vr = val.validate("foo", "test:0", vc);
assertFalse(vr.isValid());

vr = val.validate("foo", "test:65536", vc);
assertFalse(vr.isValid());

vr = val.validate("foo", "test:6666,localhost", vc);
assertFalse(vr.isValid());

vr = val.validate("foo", "test:65535", vc);
assertTrue(vr.isValid());

vr = val.validate("foo", "test:65535,localhost:666,127.0.0.1:8989", vc);
assertTrue(vr.isValid());

Mockito.when(vc.isExpressionLanguagePresent("${test}")).thenReturn(true);
vr = val.validate("foo", "${test}", vc);
assertTrue(vr.isValid());

vr = val.validate("foo", "${test", vc);
assertFalse(vr.isValid());
}

@Test
public void testTimePeriodValidator() {
Validator val = StandardValidators.createTimePeriodValidator(1L, TimeUnit.SECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
Expand Down
Expand Up @@ -33,7 +33,6 @@
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
Expand All @@ -60,31 +59,6 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {

public static final int DEFAULT_CASSANDRA_PORT = 9042;

private static final Validator HOSTNAME_PORT_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
final List<String> esList = Arrays.asList(input.split(","));
for (String hostnamePort : esList) {
String[] addresses = hostnamePort.split(":");
// Protect against invalid input like http://127.0.0.1:9042 (URL scheme should not be there)
if (addresses.length != 2) {
return new ValidationResult.Builder().subject(subject).input(input).explanation(
"Each entry must be in hostname:port form (no scheme such as http://, and port must be specified)")
.valid(false).build();
}
// Validate the port
String port = addresses[1].trim();
ValidationResult portValidatorResult = StandardValidators.PORT_VALIDATOR.validate(subject, port, context);
if (!portValidatorResult.isValid()) {
return portValidatorResult;
}

}
return new ValidationResult.Builder().subject(subject).input(input).explanation(
"Valid cluster definition").valid(true).build();
}
};

// Common descriptors
public static final PropertyDescriptor CONTACT_POINTS = new PropertyDescriptor.Builder()
.name("Cassandra Contact Points")
Expand All @@ -93,7 +67,7 @@ public ValidationResult validate(final String subject, final String input, final
+ " The default client port for Cassandra is 9042, but the port(s) must be explicitly specified.")
.required(true)
.expressionLanguageSupported(false)
.addValidator(HOSTNAME_PORT_VALIDATOR)
.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
.build();

public static final PropertyDescriptor KEYSPACE = new PropertyDescriptor.Builder()
Expand Down
Expand Up @@ -17,8 +17,6 @@
package org.apache.nifi.processors.elasticsearch;

import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
Expand Down Expand Up @@ -46,26 +44,6 @@

abstract class AbstractElasticsearch5TransportClientProcessor extends AbstractElasticsearch5Processor {

/**
* This validator ensures the Elasticsearch hosts property is a valid list of hostname:port entries
*/
private static final Validator HOSTNAME_PORT_VALIDATOR = (subject, input, context) -> {
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
}
final List<String> esList = Arrays.asList(input.split(","));
for (String hostnamePort : esList) {
String[] addresses = hostnamePort.split(":");
// Protect against invalid input like http://127.0.0.1:9300 (URL scheme should not be there)
if (addresses.length != 2) {
return new ValidationResult.Builder().subject(subject).input(input).explanation(
"Must be in hostname:port form (no scheme such as http://").valid(false).build();
}
}
return new ValidationResult.Builder().subject(subject).input(input).explanation(
"Valid cluster definition").valid(true).build();
};

protected static final PropertyDescriptor CLUSTER_NAME = new PropertyDescriptor.Builder()
.name("el5-cluster-name")
.displayName("Cluster Name")
Expand All @@ -84,7 +62,7 @@ abstract class AbstractElasticsearch5TransportClientProcessor extends AbstractEl
+ "connect to hosts. The default transport client port is 9300.")
.required(true)
.expressionLanguageSupported(true)
.addValidator(HOSTNAME_PORT_VALIDATOR)
.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
.build();

public static final PropertyDescriptor PROP_XPACK_LOCATION = new PropertyDescriptor.Builder()
Expand Down
Expand Up @@ -19,7 +19,6 @@
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
Expand All @@ -36,13 +35,6 @@
*/
public abstract class AbstractElasticsearchProcessor extends AbstractProcessor {

static final Validator NON_EMPTY_EL_VALIDATOR = (subject, value, context) -> {
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
}
return StandardValidators.NON_EMPTY_VALIDATOR.validate(subject, value, context);
};

public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("The SSL Context Service used to provide client certificate information for TLS/SSL "
Expand Down
Expand Up @@ -17,8 +17,6 @@
package org.apache.nifi.processors.elasticsearch;

import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
Expand All @@ -43,28 +41,8 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;


public abstract class AbstractElasticsearchTransportClientProcessor extends AbstractElasticsearchProcessor {

/**
* This validator ensures the Elasticsearch hosts property is a valid list of hostname:port entries
*/
private static final Validator HOSTNAME_PORT_VALIDATOR = (subject, input, context) -> {
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
}
final List<String> esList = Arrays.asList(input.split(","));
for (String hostnamePort : esList) {
String[] addresses = hostnamePort.split(":");
// Protect against invalid input like http://127.0.0.1:9300 (URL scheme should not be there)
if (addresses.length != 2) {
return new ValidationResult.Builder().subject(subject).input(input).explanation(
"Must be in hostname:port form (no scheme such as http://").valid(false).build();
}
}
return new ValidationResult.Builder().subject(subject).input(input).explanation("Valid cluster definition").valid(true).build();
};

protected static final PropertyDescriptor CLUSTER_NAME = new PropertyDescriptor.Builder()
.name("Cluster Name")
.description("Name of the ES cluster (for example, elasticsearch_brew). Defaults to 'elasticsearch'")
Expand All @@ -81,7 +59,7 @@ public abstract class AbstractElasticsearchTransportClientProcessor extends Abst
+ "connect to hosts. The default transport client port is 9300.")
.required(true)
.expressionLanguageSupported(false)
.addValidator(HOSTNAME_PORT_VALIDATOR)
.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
.expressionLanguageSupported(true)
.build();

Expand Down
Expand Up @@ -96,15 +96,15 @@ public class PutElasticsearch extends AbstractElasticsearchTransportClientProces
.description("The type of this document (used by Elasticsearch for indexing and searching)")
.required(true)
.expressionLanguageSupported(true)
.addValidator(NON_EMPTY_EL_VALIDATOR)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();

public static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
.name("Index Operation")
.description("The type of the operation used to index (index, update, upsert)")
.required(true)
.expressionLanguageSupported(true)
.addValidator(NON_EMPTY_EL_VALIDATOR)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.defaultValue("index")
.build();

Expand Down
Expand Up @@ -104,7 +104,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
.description("The type of this document (used by Elasticsearch for indexing and searching)")
.required(true)
.expressionLanguageSupported(true)
.addValidator(NON_EMPTY_EL_VALIDATOR)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();

public static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
Expand All @@ -113,7 +113,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
.description("The type of the operation used to index (index, update, upsert, delete)")
.required(true)
.expressionLanguageSupported(true)
.addValidator(NON_EMPTY_EL_VALIDATOR)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.defaultValue("index")
.build();

Expand Down
Expand Up @@ -52,10 +52,6 @@ final class KafkaProcessorUtils {

final Logger logger = LoggerFactory.getLogger(this.getClass());

private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}";

private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*";

static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
"The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters");
Expand All @@ -77,8 +73,7 @@ final class KafkaProcessorUtils {
.displayName("Kafka Brokers")
.description("A comma-separated list of known Kafka Brokers in the format <host>:<port>")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX)))
.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
.expressionLanguageSupported(true)
.defaultValue("localhost:9092")
.build();
Expand Down
Expand Up @@ -31,7 +31,6 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;

import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
Expand Down Expand Up @@ -63,10 +62,6 @@
+ " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.")
public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {

private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}";

private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*";

public static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery",
"FlowFile will be routed to"
+ " failure unless the message is replicated to the appropriate number of Kafka Nodes according to the Topic configuration");
Expand Down Expand Up @@ -110,7 +105,7 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
.name("Known Brokers")
.description("A comma-separated list of known Kafka Brokers in the format <host>:<port>")
.required(true)
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX)))
.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
.expressionLanguageSupported(false)
.build();
public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
Expand Down