From 76707086dfef73e2ee8e2ea9dd9f6d9c725b96be Mon Sep 17 00:00:00 2001 From: Corey Fritz Date: Mon, 16 Jul 2018 21:06:01 -0400 Subject: [PATCH] NIFI-5388: enabled EL support for dynamic properties of Kafka 1.0 processors --- .../processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java | 8 ++++++-- .../nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java | 8 ++++++-- .../processors/kafka/pubsub/PublishKafkaRecord_1_0.java | 4 +++- .../nifi/processors/kafka/pubsub/PublishKafka_1_0.java | 4 +++- 4 files changed, 18 insertions(+), 6 deletions(-) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java index efcc24205197..6b0d8d196ca8 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java @@ -74,7 +74,8 @@ @DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.", description = "These properties will be added on the Kafka configuration after loading any provided configuration properties." + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged." - + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ") + + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ", + expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY) @SeeAlso({ConsumeKafka_1_0.class, PublishKafka_1_0.class, PublishKafkaRecord_1_0.class}) public class ConsumeKafkaRecord_1_0 extends AbstractProcessor { @@ -264,7 +265,10 @@ public void close() { protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { return new PropertyDescriptor.Builder() .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.") - .name(propertyDescriptorName).addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ConsumerConfig.class)).dynamic(true) + .name(propertyDescriptorName) + .addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ConsumerConfig.class)) + .dynamic(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java index ff6a250c30aa..ccee699f5935 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java @@ -70,7 +70,8 @@ @DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.", description = "These properties will be added on the Kafka configuration after loading any provided configuration properties." + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged." - + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ") + + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ", + expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY) public class ConsumeKafka_1_0 extends AbstractProcessor { static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset"); @@ -253,7 +254,10 @@ public void close() { protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { return new PropertyDescriptor.Builder() .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.") - .name(propertyDescriptorName).addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ConsumerConfig.class)).dynamic(true) + .name(propertyDescriptorName) + .addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ConsumerConfig.class)) + .dynamic(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java index d0f368ddaf6d..82a391895dde 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java @@ -73,7 +73,8 @@ @DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.", description = "These properties will be added on the Kafka configuration after loading any provided configuration properties." + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged." - + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ") + + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ", + expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY) @WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to " + "FlowFiles that are routed to success.") @SeeAlso({PublishKafka_1_0.class, ConsumeKafka_1_0.class, ConsumeKafkaRecord_1_0.class}) @@ -293,6 +294,7 @@ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String .name(propertyDescriptorName) .addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class)) .dynamic(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java index d7e910db7ef5..477aefd826fc 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java @@ -68,7 +68,8 @@ @DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.", description = "These properties will be added on the Kafka configuration after loading any provided configuration properties." + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged." - + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ") + + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ", + expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY) @WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to " + "FlowFiles that are routed to success. If the Property is not set, this will always be 1, but if the Property is set, it may " + "be greater than 1.") @@ -289,6 +290,7 @@ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String .name(propertyDescriptorName) .addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class)) .dynamic(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); }