Skip to content

Commit

Permalink
NIFI-12411: add enums for allowable values and revert property name c…
Browse files Browse the repository at this point in the history
…hanges to avoid breaking changes

Signed-off-by: Umar Hussain <umarhussain.work@gmail.com>
  • Loading branch information
umarhussain15 committed Apr 7, 2024
1 parent 559ccce commit 0f18099
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.GetResponse;
import java.util.EnumSet;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
Expand Down Expand Up @@ -78,14 +79,6 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
public static final String DEFAULT_HEADERS_KEY_PREFIX = "consume.amqp";
public static final String AMQP_ROUTING_KEY_ATTRIBUTE = "amqp$routingKey";
public static final String AMQP_EXCHANGE_ATTRIBUTE = "amqp$exchange";

public static final AllowableValue HEADERS_FORMAT_COMMA_SEPARATED_STRING = new AllowableValue("Comma-Separated String", "Comma-Separated String",
"Put all headers as a string with the specified separator in the attribute 'amqp$headers'.");
public static final AllowableValue HEADERS_FORMAT_JSON_STRING = new AllowableValue("JSON String", "JSON String",
"Format all headers as JSON string and output in the attribute 'amqp$headers'. It will include keys with null value as well.");
public static final AllowableValue HEADERS_FORMAT_ATTRIBUTES = new AllowableValue("FlowFile Attributes", "FlowFile Attributes",
"Put each header as attribute of the flow file with a prefix specified in the properties");

public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
.name("Queue")
.description("The name of the existing AMQP Queue from which messages will be consumed. Usually pre-defined by AMQP administrator. ")
Expand Down Expand Up @@ -131,8 +124,8 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
.name("header.format")
.displayName("Header Output Format")
.description("Defines how to output headers from the received message")
.allowableValues(HEADERS_FORMAT_COMMA_SEPARATED_STRING, HEADERS_FORMAT_JSON_STRING, HEADERS_FORMAT_ATTRIBUTES)
.defaultValue(HEADERS_FORMAT_COMMA_SEPARATED_STRING.getValue())
.allowableValues(OutputHeaderFormat.getAllowedValues())
.defaultValue(OutputHeaderFormat.COMMA_SEPARATED_STRING)
.required(true)
.build();
public static final PropertyDescriptor HEADER_KEY_PREFIX = new PropertyDescriptor.Builder()
Expand All @@ -141,7 +134,7 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
.description("Text to be prefixed to header keys as the are added to the FlowFile attributes. Processor will append '.' to the value of this property")
.defaultValue(DEFAULT_HEADERS_KEY_PREFIX)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.dependsOn(HEADER_FORMAT, HEADERS_FORMAT_ATTRIBUTES)
.dependsOn(HEADER_FORMAT, OutputHeaderFormat.ATTRIBUTES)
.required(true)
.build();

Expand All @@ -152,7 +145,7 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
)
.addValidator(StandardValidators.SINGLE_CHAR_VALIDATOR)
.defaultValue(",")
.dependsOn(HEADER_FORMAT, HEADERS_FORMAT_COMMA_SEPARATED_STRING)
.dependsOn(HEADER_FORMAT, OutputHeaderFormat.COMMA_SEPARATED_STRING)
.required(false)
.build();
static final PropertyDescriptor REMOVE_CURLY_BRACES = new PropertyDescriptor.Builder()
Expand All @@ -162,7 +155,7 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.defaultValue("False")
.allowableValues("True", "False")
.dependsOn(HEADER_FORMAT, HEADERS_FORMAT_COMMA_SEPARATED_STRING)
.dependsOn(HEADER_FORMAT, OutputHeaderFormat.COMMA_SEPARATED_STRING)
.required(false)
.build();

Expand Down Expand Up @@ -279,7 +272,7 @@ private Map<String, String> buildAttributes(final BasicProperties properties, fi
addAttribute(attributes, AMQP_EXCHANGE_ATTRIBUTE, envelope.getExchange());
Map<String, Object> headers = properties.getHeaders();
if (headers != null) {
if (HEADERS_FORMAT_ATTRIBUTES.getValue().equals(headerFormat)) {
if (OutputHeaderFormat.ATTRIBUTES.getValue().equals(headerFormat)) {
headers.forEach((key, value) -> addAttribute(attributes, String.format("%s.%s", headerAttributePrefix, key), value));
} else {
addAttribute(attributes, AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE,
Expand Down Expand Up @@ -307,13 +300,13 @@ private String buildHeaders(Map<String, Object> headers) {
return null;
}
String headerString = null;
if ( HEADERS_FORMAT_COMMA_SEPARATED_STRING.getValue().equals(headerFormat)) {
if ( OutputHeaderFormat.COMMA_SEPARATED_STRING.getValue().equals(headerFormat)) {
headerString = convertMapToString(headers, valueSeparatorForHeaders);

if (!removeCurlyBraces) {
headerString = "{" + headerString + "}";
}
} else if (HEADERS_FORMAT_JSON_STRING.getValue().equals(headerFormat)) {
} else if (OutputHeaderFormat.JSON_STRING.getValue().equals(headerFormat)) {
try {
headerString = convertMapToJSONString(headers);
} catch (JsonProcessingException e) {
Expand Down Expand Up @@ -350,4 +343,42 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
public Set<Relationship> getRelationships() {
return relationships;
}

public enum OutputHeaderFormat implements DescribedValue{
COMMA_SEPARATED_STRING("Comma-Separated String", "Comma-Separated String",
"Put all headers as a string with the specified separator in the attribute 'amqp$headers'."),
JSON_STRING("JSON String", "JSON String",
"Format all headers as JSON string and output in the attribute 'amqp$headers'. It will include keys with null value as well."),
ATTRIBUTES("FlowFile Attributes", "FlowFile Attributes",
"Put each header as attribute of the flow file with a prefix specified in the properties");
private final String value;
private final String displayName;
private final String description;

OutputHeaderFormat(String value, String displayName, String description) {

this.value = value;
this.displayName = displayName;
this.description = description;
}

public static EnumSet<OutputHeaderFormat> getAllowedValues(){
return EnumSet.of(COMMA_SEPARATED_STRING, JSON_STRING, ATTRIBUTES);
}

@Override
public String getValue() {
return value;
}

@Override
public String getDisplayName() {
return displayName;
}

@Override
public String getDescription() {
return description;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Connection;
import java.util.EnumSet;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
Expand All @@ -28,7 +29,7 @@
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
Expand Down Expand Up @@ -76,15 +77,8 @@
})
public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {

public static final AllowableValue HEADERS_FROM_ATTRIBUTES = new AllowableValue("headersFromAttributes", "Attributes Matching Regex",
"Select attributes based on regex pattern to put in rabbitmq headers. Key of the attribute will be used as header key");
public static final AllowableValue HEADERS_FROM_STRING = new AllowableValue("headersFromString", "Attribute 'amp$headers' Value",
"Prepare headers from 'amp$headers' attribute string");
public static final AllowableValue HEADERS_FROM_BOTH = new AllowableValue("headersFromBoth", "Regex Match And 'amp$headers' Value",
"Take headers from both sources: 'amp$headers' attribute and attributes matching Regex. In case of key duplication precedence property will define which value to take.");
public static final PropertyDescriptor EXCHANGE = new PropertyDescriptor.Builder()
.name("exchange.name")
.displayName("Exchange Name")
.name("Exchange Name")
.description("The name of the AMQP Exchange the messages will be sent to. Usually provided by the AMQP administrator (e.g., 'amq.direct'). "
+ "It is an optional property. If kept empty the messages will be sent to a default AMQP exchange.")
.required(true)
Expand All @@ -93,7 +87,7 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
.addValidator(Validator.VALID)
.build();
public static final PropertyDescriptor ROUTING_KEY = new PropertyDescriptor.Builder()
.name("routing.key")
.name("Routing Key")
.displayName("Routing Key")
.description("The name of the Routing Key that will be used by AMQP to route messages from the exchange to a destination queue(s). "
+ "Usually provided by the administrator (e.g., 'myKey')In the event when messages are sent to a default exchange this property "
Expand All @@ -104,32 +98,32 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor HEADERS_SOURCE = new PropertyDescriptor.Builder()
.name("headers.source")
.name("Headers Source")
.displayName("Headers Source")
.description(
"The source of the headers which will be put in the published message. They can come either from the processor property as a string or they can be " +
"picked from flow file attributes based on Regex expression.")
.required(true)
.allowableValues(HEADERS_FROM_STRING, HEADERS_FROM_ATTRIBUTES, HEADERS_FROM_BOTH)
.defaultValue(HEADERS_FROM_STRING.getValue())
.allowableValues(InputHeaderSource.getAllowedValues())
.defaultValue(InputHeaderSource.STRING)
.build();
public static final PropertyDescriptor HEADERS_SOURCE_PRECEDENCE = new PropertyDescriptor.Builder()
.name("headers.source.precedence")
.name("Headers Source Precedence")
.displayName("Headers Source Precedence")
.description(
"In case of key duplication in header sources, this defines which value to take.")
.required(true)
.dependsOn(HEADERS_SOURCE,HEADERS_FROM_BOTH)
.allowableValues(HEADERS_FROM_STRING, HEADERS_FROM_ATTRIBUTES)
.defaultValue(HEADERS_FROM_STRING.getValue())
.dependsOn(HEADERS_SOURCE,InputHeaderSource.BOTH)
.allowableValues(InputHeaderSource.STRING, InputHeaderSource.ATTRIBUTES)
.defaultValue(InputHeaderSource.STRING)
.build();
public static final PropertyDescriptor HEADERS_ATTRIBUTES_REGEX = new PropertyDescriptor.Builder()
.name("attributes.to.headers.regex")
.name("Attributes To Headers Regular Expression")
.displayName("Attributes To Headers Regular Expression")
.description("Regular expression that will be evaluated against the flow file attributes to select "
+ "the matching attributes and put as AMQP headers.")
.required(true)
.dependsOn(HEADERS_SOURCE, HEADERS_FROM_ATTRIBUTES, HEADERS_FROM_BOTH)
.dependsOn(HEADERS_SOURCE, InputHeaderSource.ATTRIBUTES, InputHeaderSource.BOTH)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true))
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
Expand All @@ -140,7 +134,7 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
.description("The character that is used to split key-value for headers. The value must only one character. "
+ "Otherwise you will get an error message")
.defaultValue(",")
.dependsOn(HEADERS_SOURCE, HEADERS_FROM_STRING,HEADERS_FROM_BOTH)
.dependsOn(HEADERS_SOURCE, InputHeaderSource.STRING, InputHeaderSource.BOTH)
.addValidator(StandardValidators.SINGLE_CHAR_VALIDATOR)
.required(false)
.build();
Expand Down Expand Up @@ -199,7 +193,7 @@ public void onScheduled(ProcessContext context) {
* <p>
* NOTE: Attributes extracted from {@link FlowFile} are considered candidates for AMQP properties if their names are prefixed with
* "amq$" (e.g., amqp$contentType=text/xml). For "amqp$headers" it depends on the value of
* {@link PublishAMQP#HEADERS_SOURCE}, if the value is {@link PublishAMQP#HEADERS_FROM_STRING} then message headers are created from this attribute value,
* {@link PublishAMQP#HEADERS_SOURCE}, if the value is {@link InputHeaderSource#ATTRIBUTES} then message headers are created from this attribute value,
* otherwise this attribute will be ignored.
*/
@Override
Expand Down Expand Up @@ -316,13 +310,13 @@ private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile flowFile) {
*/
private Map<String, Object> prepareAMQPHeaders(FlowFile flowFile) {
final Map<String, Object> headers = new HashMap<>();
if (HEADERS_FROM_ATTRIBUTES.getValue().equals(selectedHeaderSource)) {
if (InputHeaderSource.ATTRIBUTES.getValue().equals(selectedHeaderSource)) {
headers.putAll(attributesToHeaders(flowFile.getAttributes()));
} else if (HEADERS_FROM_STRING.getValue().equals(selectedHeaderSource)) {
} else if (InputHeaderSource.STRING.getValue().equals(selectedHeaderSource)) {
readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE, value -> headers.putAll(validateAMQPHeaderProperty(value, headerSeparator)));
} else {
// When precedence matches, put values in the last so it can override keys from other source
if (HEADERS_FROM_ATTRIBUTES.getValue().equals(headerSourcePrecedence)) {
if (InputHeaderSource.ATTRIBUTES.getValue().equals(headerSourcePrecedence)) {
readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE, value -> headers.putAll(validateAMQPHeaderProperty(value, headerSeparator)));
headers.putAll(attributesToHeaders(flowFile.getAttributes()));
} else {
Expand Down Expand Up @@ -370,4 +364,43 @@ private Map<String, Object> validateAMQPHeaderProperty(String amqpPropValue, Cha
}
return headers;
}
public enum InputHeaderSource implements DescribedValue {

ATTRIBUTES("headersFromAttributes", "Attributes Matching Regex",
"Select attributes based on regex pattern to put in rabbitmq headers. Key of the attribute will be used as header key"),
STRING("headersFromString", "Attribute 'amp$headers' Value",
"Prepare headers from 'amp$headers' attribute string"),
BOTH("headersFromBoth", "Regex Match And 'amp$headers' Value",
"Take headers from both sources: 'amp$headers' attribute and attributes matching Regex. In case of key duplication precedence property will define which value to take.");

private final String value;
private final String displayName;
private final String description;

InputHeaderSource(String value, String displayName, String description) {

this.value = value;
this.displayName = displayName;
this.description = description;
}

public static EnumSet<InputHeaderSource> getAllowedValues(){
return EnumSet.of(STRING, ATTRIBUTES, BOTH);
}

@Override
public String getValue() {
return value;
}

@Override
public String getDisplayName() {
return displayName;
}

@Override
public String getDescription() {
return description;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import org.apache.nifi.amqp.processors.ConsumeAMQP.OutputHeaderFormat;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
Expand Down Expand Up @@ -187,7 +188,7 @@ public void validateHeaderWithJsonStringForHeaderFormatParameterConsumeAndTransf

ConsumeAMQP proc = new LocalConsumeAMQP(connection);
TestRunner runner = initTestRunner(proc);
runner.setProperty(ConsumeAMQP.HEADER_FORMAT, ConsumeAMQP.HEADERS_FORMAT_JSON_STRING);
runner.setProperty(ConsumeAMQP.HEADER_FORMAT, OutputHeaderFormat.JSON_STRING);
runner.run();
final MockFlowFile successFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
assertNotNull(successFF);
Expand Down Expand Up @@ -222,7 +223,7 @@ public void validateHeaderWithFlowFileAttributeForHeaderFormatParameterConsumeAn

ConsumeAMQP proc = new LocalConsumeAMQP(connection);
TestRunner runner = initTestRunner(proc);
runner.setProperty(ConsumeAMQP.HEADER_FORMAT, ConsumeAMQP.HEADERS_FORMAT_ATTRIBUTES);
runner.setProperty(ConsumeAMQP.HEADER_FORMAT, OutputHeaderFormat.ATTRIBUTES);
runner.setProperty(ConsumeAMQP.HEADER_KEY_PREFIX,headerPrefix);
runner.run();
final MockFlowFile successFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
Expand Down
Loading

0 comments on commit 0f18099

Please sign in to comment.