From e8f26aa1d3ef2eac353a09bf5b0e49c3a698f711 Mon Sep 17 00:00:00 2001 From: Tim Reardon Date: Tue, 9 May 2017 12:59:35 -0400 Subject: [PATCH 1/2] NIFI-3854 Expand expression language support for Kafka processors --- .../kafka/pubsub/ConsumeKafkaRecord_0_10.java | 2 +- .../kafka/pubsub/ConsumeKafka_0_10.java | 2 +- .../nifi/processors/kafka/GetKafka.java | 22 +++++++++---------- .../nifi/processors/kafka/PutKafka.java | 6 ++--- .../nifi/processors/kafka/PutKafkaTest.java | 6 +++++ .../nifi/processors/kafka/TestGetKafka.java | 3 +++ .../processors/kafka/pubsub/ConsumeKafka.java | 2 +- 7 files changed, 26 insertions(+), 17 deletions(-) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java index 08828700b986..c44e25e88342 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java @@ -253,7 +253,7 @@ protected ConsumerPool createConsumerPool(final ProcessContext context, final Co final String topicType = context.getProperty(ConsumeKafkaRecord_0_10.TOPIC_TYPE).evaluateAttributeExpressions().getValue(); final List topics = new ArrayList<>(); final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue(); - final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue(); + final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue(); final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java index f678fa3c8913..84e9da689a0a 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java @@ -248,7 +248,7 @@ protected ConsumerPool createConsumerPool(final ProcessContext context, final Co final List topics = new ArrayList<>(); final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue(); final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue(); - final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue(); + final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue(); if (topicType.equals(TOPIC_NAME.getValue())) { for (final String topic : topicListing.split(",", 100)) { final String trimmedName = topic.trim(); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java index 84d6b8981d28..b1eadcfa35de 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java @@ -93,14 +93,14 @@ public class GetKafka extends AbstractProcessor { + " combinations. For example, host1:2181,host2:2181,host3:2188") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() .name("Topic Name") .description("The Kafka Topic to pull messages from") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor ZOOKEEPER_COMMIT_DELAY = new PropertyDescriptor.Builder() .name("Zookeeper Commit Frequency") @@ -160,7 +160,7 @@ public class GetKafka extends AbstractProcessor { .description("A Group ID is used to identify consumers that are within the same consumer group") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder() @@ -218,11 +218,11 @@ public Set getRelationships() { } public void createConsumers(final ProcessContext context) { - final String topic = context.getProperty(TOPIC).getValue(); + final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions().getValue(); final Properties props = new Properties(); - props.setProperty("zookeeper.connect", context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue()); - props.setProperty("group.id", context.getProperty(GROUP_ID).getValue()); + props.setProperty("zookeeper.connect", context.getProperty(ZOOKEEPER_CONNECTION_STRING).evaluateAttributeExpressions().getValue()); + props.setProperty("group.id", context.getProperty(GROUP_ID).evaluateAttributeExpressions().getValue()); props.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue()); props.setProperty("auto.commit.interval.ms", String.valueOf(context.getProperty(ZOOKEEPER_COMMIT_DELAY).asTimePeriod(TimeUnit.MILLISECONDS))); props.setProperty("auto.offset.reset", context.getProperty(AUTO_OFFSET_RESET).getValue()); @@ -257,7 +257,7 @@ public void createConsumers(final ProcessContext context) { } int partitionCount = KafkaUtils.retrievePartitionCountForTopic( - context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue(), context.getProperty(TOPIC).getValue()); + context.getProperty(ZOOKEEPER_CONNECTION_STRING).evaluateAttributeExpressions().getValue(), context.getProperty(TOPIC).evaluateAttributeExpressions().getValue()); final ConsumerConfig consumerConfig = new ConsumerConfig(props); consumer = Consumer.createJavaConsumerConnector(consumerConfig); @@ -267,12 +267,12 @@ public void createConsumers(final ProcessContext context) { int concurrentTaskToUse = context.getMaxConcurrentTasks(); if (context.getMaxConcurrentTasks() < partitionCount){ this.getLogger().warn("The amount of concurrent tasks '" + context.getMaxConcurrentTasks() + "' configured for " - + "this processor is less than the amount of partitions '" + partitionCount + "' for topic '" + context.getProperty(TOPIC).getValue() + "'. " + + "this processor is less than the amount of partitions '" + partitionCount + "' for topic '" + context.getProperty(TOPIC).evaluateAttributeExpressions().getValue() + "'. " + "Consider making it equal to the amount of partition count for most efficient event consumption."); } else if (context.getMaxConcurrentTasks() > partitionCount){ concurrentTaskToUse = partitionCount; this.getLogger().warn("The amount of concurrent tasks '" + context.getMaxConcurrentTasks() + "' configured for " - + "this processor is greater than the amount of partitions '" + partitionCount + "' for topic '" + context.getProperty(TOPIC).getValue() + "'. " + + "this processor is greater than the amount of partitions '" + partitionCount + "' for topic '" + context.getProperty(TOPIC).evaluateAttributeExpressions().getValue() + "'. " + "Therefore those tasks would never see a message. To avoid that the '" + partitionCount + "'(partition count) will be used to consume events"); } @@ -400,7 +400,7 @@ private void consumeFromKafka(final ProcessContext context, final ProcessSession final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); final String demarcator = context.getProperty(MESSAGE_DEMARCATOR).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); final byte[] demarcatorBytes = demarcator.getBytes(StandardCharsets.UTF_8); - final String topic = context.getProperty(TOPIC).getValue(); + final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions().getValue(); FlowFile flowFile = session.create(); @@ -481,4 +481,4 @@ private void releaseFlowFile(FlowFile flowFile, ProcessSession session, Map { .description("A comma-separated list of known Kafka Brokers in the format :") .required(true) .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() .name("Topic Name") @@ -288,7 +288,7 @@ protected boolean rendezvousWithKafka(ProcessContext context, ProcessSession ses flowFile = this.doRendezvousWithKafka(flowFile, context, session); if (!this.isFailedFlowFile(flowFile)) { session.getProvenanceReporter().send(flowFile, - context.getProperty(SEED_BROKERS).getValue() + "/" + context.getProperty(SEED_BROKERS).evaluateAttributeExpressions(flowFile).getValue() + "/" + context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue()); session.transfer(flowFile, REL_SUCCESS); } else { @@ -474,7 +474,7 @@ private Map buildFailedFlowFileAttributes(int lastAckedMessageIn private Properties buildKafkaConfigProperties(final ProcessContext context) { Properties properties = new Properties(); String timeout = String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); - properties.setProperty("bootstrap.servers", context.getProperty(SEED_BROKERS).getValue()); + properties.setProperty("bootstrap.servers", context.getProperty(SEED_BROKERS).evaluateAttributeExpressions().getValue()); properties.setProperty("acks", context.getProperty(DELIVERY_GUARANTEE).getValue()); properties.setProperty("buffer.memory", String.valueOf(context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue())); properties.setProperty("compression.type", context.getProperty(COMPRESSION_CODEC).getValue()); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/test/java/org/apache/nifi/processors/kafka/PutKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/test/java/org/apache/nifi/processors/kafka/PutKafkaTest.java index ce6e6e7e751b..5d55c54a3cb6 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/test/java/org/apache/nifi/processors/kafka/PutKafkaTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/test/java/org/apache/nifi/processors/kafka/PutKafkaTest.java @@ -69,6 +69,7 @@ public void validateSingleCharacterDemarcatedMessages() { String topicName = "validateSingleCharacterDemarcatedMessages"; PutKafka putKafka = new PutKafka(); TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setValidateExpressionUsage(false); runner.setProperty(PutKafka.TOPIC, topicName); runner.setProperty(PutKafka.CLIENT_NAME, "foo"); runner.setProperty(PutKafka.KEY, "key1"); @@ -96,6 +97,7 @@ public void validateMultiCharacterDelimitedMessages() { String topicName = "validateMultiCharacterDemarcatedMessagesAndCustomPartitioner"; PutKafka putKafka = new PutKafka(); TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setValidateExpressionUsage(false); runner.setProperty(PutKafka.TOPIC, topicName); runner.setProperty(PutKafka.CLIENT_NAME, "foo"); runner.setProperty(PutKafka.KEY, "key1"); @@ -123,6 +125,7 @@ public void validateDemarcationIntoEmptyMessages() { String topicName = "validateDemarcationIntoEmptyMessages"; PutKafka putKafka = new PutKafka(); final TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setValidateExpressionUsage(false); runner.setProperty(PutKafka.TOPIC, topicName); runner.setProperty(PutKafka.KEY, "key1"); runner.setProperty(PutKafka.CLIENT_NAME, "foo"); @@ -154,6 +157,7 @@ public void validateComplexRightPartialDemarcatedMessages() { String topicName = "validateComplexRightPartialDemarcatedMessages"; PutKafka putKafka = new PutKafka(); TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setValidateExpressionUsage(false); runner.setProperty(PutKafka.TOPIC, topicName); runner.setProperty(PutKafka.CLIENT_NAME, "foo"); runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort()); @@ -175,6 +179,7 @@ public void validateComplexLeftPartialDemarcatedMessages() { String topicName = "validateComplexLeftPartialDemarcatedMessages"; PutKafka putKafka = new PutKafka(); TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setValidateExpressionUsage(false); runner.setProperty(PutKafka.TOPIC, topicName); runner.setProperty(PutKafka.CLIENT_NAME, "foo"); runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort()); @@ -198,6 +203,7 @@ public void validateComplexPartialMatchDemarcatedMessages() { String topicName = "validateComplexPartialMatchDemarcatedMessages"; PutKafka putKafka = new PutKafka(); TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setValidateExpressionUsage(false); runner.setProperty(PutKafka.TOPIC, topicName); runner.setProperty(PutKafka.CLIENT_NAME, "foo"); runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort()); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java index dfcf0d9623cf..7ac08fae7b50 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java @@ -50,6 +50,7 @@ public static void configureLogging() { @Ignore("Intended only for local tests to verify functionality.") public void testIntegrationLocally() { final TestRunner runner = TestRunners.newTestRunner(GetKafka.class); + runner.setValidateExpressionUsage(false); runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, "192.168.0.101:2181"); runner.setProperty(GetKafka.TOPIC, "testX"); runner.setProperty(GetKafka.KAFKA_TIMEOUT, "3 secs"); @@ -73,6 +74,7 @@ public void testWithDelimiter() { final TestableProcessor proc = new TestableProcessor(null, messages); final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setValidateExpressionUsage(false); runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, "localhost:2181"); runner.setProperty(GetKafka.TOPIC, "testX"); runner.setProperty(GetKafka.KAFKA_TIMEOUT, "3 secs"); @@ -95,6 +97,7 @@ public void testWithDelimiterAndNotEnoughMessages() { final TestableProcessor proc = new TestableProcessor(null, messages); final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setValidateExpressionUsage(false); runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, "localhost:2181"); runner.setProperty(GetKafka.TOPIC, "testX"); runner.setProperty(GetKafka.KAFKA_TIMEOUT, "3 secs"); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java index ff017461ad76..d02d0118105b 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java @@ -247,7 +247,7 @@ protected ConsumerPool createConsumerPool(final ProcessContext context, final Co } final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue(); final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue(); - final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue(); + final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue(); return new ConsumerPool(maxLeases, demarcator, props, topics, maxUncommittedTime, keyEncoding, securityProtocol, bootstrapServers, log); } From 258c775e56e5f006815ac6fca777fbfd9b9920b9 Mon Sep 17 00:00:00 2001 From: Tim Reardon Date: Fri, 12 May 2017 13:07:02 -0400 Subject: [PATCH 2/2] NIFI-3854 Remove unnecessary disabling of TestRunner EL validation --- .../nifi/processors/kafka/pubsub/ConsumeKafkaTest.java | 3 --- .../kafka/pubsub/TestConsumeKafkaRecord_0_10.java | 3 --- .../java/org/apache/nifi/processors/kafka/PutKafkaTest.java | 6 ------ .../java/org/apache/nifi/processors/kafka/TestGetKafka.java | 3 --- .../nifi/processors/kafka/pubsub/ConsumeKafkaTest.java | 2 -- 5 files changed, 17 deletions(-) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java index cc524dd3d535..bd93e3bbb310 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java @@ -114,7 +114,6 @@ protected ConsumerPool createConsumerPool(final ProcessContext context, final Co } }; final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setValidateExpressionUsage(false); runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar"); runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName); @@ -145,7 +144,6 @@ protected ConsumerPool createConsumerPool(final ProcessContext context, final Co } }; final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setValidateExpressionUsage(false); runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); runner.setProperty(ConsumeKafka_0_10.TOPICS, "(fo.*)|(ba)"); runner.setProperty(ConsumeKafka_0_10.TOPIC_TYPE, "pattern"); @@ -177,7 +175,6 @@ protected ConsumerPool createConsumerPool(final ProcessContext context, final Co } }; final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setValidateExpressionUsage(false); runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar"); runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_10.java index da63877bafd1..3da74e46df7f 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_10.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_10.java @@ -132,7 +132,6 @@ public void validateGetAllMessages() throws Exception { when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); when(mockLease.commit()).thenReturn(Boolean.TRUE); - runner.setValidateExpressionUsage(false); runner.setProperty(ConsumeKafkaRecord_0_10.TOPICS, "foo,bar"); runner.setProperty(ConsumeKafkaRecord_0_10.GROUP_ID, groupName); runner.setProperty(ConsumeKafkaRecord_0_10.AUTO_OFFSET_RESET, ConsumeKafkaRecord_0_10.OFFSET_EARLIEST); @@ -155,7 +154,6 @@ public void validateGetAllMessagesPattern() throws Exception { when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); when(mockLease.commit()).thenReturn(Boolean.TRUE); - runner.setValidateExpressionUsage(false); runner.setProperty(ConsumeKafkaRecord_0_10.TOPICS, "(fo.*)|(ba)"); runner.setProperty(ConsumeKafkaRecord_0_10.TOPIC_TYPE, "pattern"); runner.setProperty(ConsumeKafkaRecord_0_10.GROUP_ID, groupName); @@ -179,7 +177,6 @@ public void validateGetErrorMessages() throws Exception { when(mockLease.continuePolling()).thenReturn(true, false); when(mockLease.commit()).thenReturn(Boolean.FALSE); - runner.setValidateExpressionUsage(false); runner.setProperty(ConsumeKafkaRecord_0_10.TOPICS, "foo,bar"); runner.setProperty(ConsumeKafkaRecord_0_10.GROUP_ID, groupName); runner.setProperty(ConsumeKafkaRecord_0_10.AUTO_OFFSET_RESET, ConsumeKafkaRecord_0_10.OFFSET_EARLIEST); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/test/java/org/apache/nifi/processors/kafka/PutKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/test/java/org/apache/nifi/processors/kafka/PutKafkaTest.java index 5d55c54a3cb6..ce6e6e7e751b 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/test/java/org/apache/nifi/processors/kafka/PutKafkaTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/test/java/org/apache/nifi/processors/kafka/PutKafkaTest.java @@ -69,7 +69,6 @@ public void validateSingleCharacterDemarcatedMessages() { String topicName = "validateSingleCharacterDemarcatedMessages"; PutKafka putKafka = new PutKafka(); TestRunner runner = TestRunners.newTestRunner(putKafka); - runner.setValidateExpressionUsage(false); runner.setProperty(PutKafka.TOPIC, topicName); runner.setProperty(PutKafka.CLIENT_NAME, "foo"); runner.setProperty(PutKafka.KEY, "key1"); @@ -97,7 +96,6 @@ public void validateMultiCharacterDelimitedMessages() { String topicName = "validateMultiCharacterDemarcatedMessagesAndCustomPartitioner"; PutKafka putKafka = new PutKafka(); TestRunner runner = TestRunners.newTestRunner(putKafka); - runner.setValidateExpressionUsage(false); runner.setProperty(PutKafka.TOPIC, topicName); runner.setProperty(PutKafka.CLIENT_NAME, "foo"); runner.setProperty(PutKafka.KEY, "key1"); @@ -125,7 +123,6 @@ public void validateDemarcationIntoEmptyMessages() { String topicName = "validateDemarcationIntoEmptyMessages"; PutKafka putKafka = new PutKafka(); final TestRunner runner = TestRunners.newTestRunner(putKafka); - runner.setValidateExpressionUsage(false); runner.setProperty(PutKafka.TOPIC, topicName); runner.setProperty(PutKafka.KEY, "key1"); runner.setProperty(PutKafka.CLIENT_NAME, "foo"); @@ -157,7 +154,6 @@ public void validateComplexRightPartialDemarcatedMessages() { String topicName = "validateComplexRightPartialDemarcatedMessages"; PutKafka putKafka = new PutKafka(); TestRunner runner = TestRunners.newTestRunner(putKafka); - runner.setValidateExpressionUsage(false); runner.setProperty(PutKafka.TOPIC, topicName); runner.setProperty(PutKafka.CLIENT_NAME, "foo"); runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort()); @@ -179,7 +175,6 @@ public void validateComplexLeftPartialDemarcatedMessages() { String topicName = "validateComplexLeftPartialDemarcatedMessages"; PutKafka putKafka = new PutKafka(); TestRunner runner = TestRunners.newTestRunner(putKafka); - runner.setValidateExpressionUsage(false); runner.setProperty(PutKafka.TOPIC, topicName); runner.setProperty(PutKafka.CLIENT_NAME, "foo"); runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort()); @@ -203,7 +198,6 @@ public void validateComplexPartialMatchDemarcatedMessages() { String topicName = "validateComplexPartialMatchDemarcatedMessages"; PutKafka putKafka = new PutKafka(); TestRunner runner = TestRunners.newTestRunner(putKafka); - runner.setValidateExpressionUsage(false); runner.setProperty(PutKafka.TOPIC, topicName); runner.setProperty(PutKafka.CLIENT_NAME, "foo"); runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort()); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java index 7ac08fae7b50..dfcf0d9623cf 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java @@ -50,7 +50,6 @@ public static void configureLogging() { @Ignore("Intended only for local tests to verify functionality.") public void testIntegrationLocally() { final TestRunner runner = TestRunners.newTestRunner(GetKafka.class); - runner.setValidateExpressionUsage(false); runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, "192.168.0.101:2181"); runner.setProperty(GetKafka.TOPIC, "testX"); runner.setProperty(GetKafka.KAFKA_TIMEOUT, "3 secs"); @@ -74,7 +73,6 @@ public void testWithDelimiter() { final TestableProcessor proc = new TestableProcessor(null, messages); final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setValidateExpressionUsage(false); runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, "localhost:2181"); runner.setProperty(GetKafka.TOPIC, "testX"); runner.setProperty(GetKafka.KAFKA_TIMEOUT, "3 secs"); @@ -97,7 +95,6 @@ public void testWithDelimiterAndNotEnoughMessages() { final TestableProcessor proc = new TestableProcessor(null, messages); final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setValidateExpressionUsage(false); runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, "localhost:2181"); runner.setProperty(GetKafka.TOPIC, "testX"); runner.setProperty(GetKafka.KAFKA_TIMEOUT, "3 secs"); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java index 0c1c4a7996fd..c4b0140b7b83 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java @@ -121,7 +121,6 @@ protected ConsumerPool createConsumerPool(final ProcessContext context, final Co } }; final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setValidateExpressionUsage(false); runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); runner.setProperty(ConsumeKafka.TOPICS, "foo,bar"); runner.setProperty(ConsumeKafka.GROUP_ID, groupName); @@ -152,7 +151,6 @@ protected ConsumerPool createConsumerPool(final ProcessContext context, final Co } }; final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setValidateExpressionUsage(false); runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); runner.setProperty(ConsumeKafka.TOPICS, "foo,bar"); runner.setProperty(ConsumeKafka.GROUP_ID, groupName);