Skip to content

Commit

Permalink
NIFI-12411: Update PublishAMQP to read AMQP headers value from FlowFi…
Browse files Browse the repository at this point in the history
…le attributes and `amq$headers` string

Processor can now also create message headers using attributes matching the given Regex property. The name of attribute is used as header key.
In addition to that it can read values from two sources i.e. attributes matching Regex and the headers present in `amq$headers` string. In case
of key duplication in both sources the precedence property decide which value will be used.

AMQP attributes keys are now set as constants strings and reused where needed.

Added `onScheduled` to PublishAMQP and ConsumeAMQP to read all properties which don't change during running state.

Signed-off-by: Umar Hussain <umarhussain.work@gmail.com>
  • Loading branch information
umarhussain15 committed Dec 2, 2023
1 parent 3b0d810 commit a84db6e
Show file tree
Hide file tree
Showing 6 changed files with 452 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

/**
* Generic publisher of messages to AMQP-based messaging system. It is based on
* RabbitMQ client API (https://www.rabbitmq.com/api-guide.html)
* RabbitMQ client API (<a href="https://www.rabbitmq.com/api-guide.html">Java Client API Guide</a>)
*/
final class AMQPPublisher extends AMQPWorker {

Expand Down Expand Up @@ -63,7 +63,7 @@ void publish(byte[] bytes, BasicProperties properties, String routingKey, String
exchange = exchange == null ? "" : exchange.trim();

if (processorLog.isDebugEnabled()) {
if (exchange.length() == 0) {
if (exchange.isEmpty()) {
processorLog.debug("The 'exchangeName' is not specified. Messages will be sent to default exchange");
}
processorLog.debug("Successfully connected AMQPPublisher to " + this.connectionString + " and '" + exchange
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
Expand All @@ -49,14 +48,28 @@

/**
* Base processor that uses RabbitMQ client API
* (https://www.rabbitmq.com/api-guide.html) to rendezvous with AMQP-based
* (<a href="https://www.rabbitmq.com/api-guide.html">Java Client API Guide</a>) to rendezvous with AMQP-based
* messaging systems version 0.9.1
*
* @param <T> the type of {@link AMQPWorker}. Please see {@link AMQPPublisher}
* and {@link AMQPConsumer}
*/
abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProcessor {

public static final String AMQP_APPID_ATTRIBUTE = "amqp$appId";
public static final String AMQP_CONTENT_ENCODING_ATTRIBUTE = "amqp$contentEncoding";
public static final String AMQP_CONTENT_TYPE_ATTRIBUTE = "amqp$contentType";
public static final String AMQP_HEADERS_ATTRIBUTE = "amqp$headers";
public static final String AMQP_DELIVERY_MODE_ATTRIBUTE = "amqp$deliveryMode";
public static final String AMQP_PRIORITY_ATTRIBUTE = "amqp$priority";
public static final String AMQP_CORRELATION_ID_ATTRIBUTE = "amqp$correlationId";
public static final String AMQP_REPLY_TO_ATTRIBUTE = "amqp$replyTo";
public static final String AMQP_EXPIRATION_ATTRIBUTE = "amqp$expiration";
public static final String AMQP_MESSAGE_ID_ATTRIBUTE = "amqp$messageId";
public static final String AMQP_TIMESTAMP_ATTRIBUTE = "amqp$timestamp";
public static final String AMQP_TYPE_ATTRIBUTE = "amqp$type";
public static final String AMQP_USER_ID_ATTRIBUTE = "amqp$userId";
public static final String AMQP_CLUSTER_ID_ATTRIBUTE = "amqp$clusterId";
public static final PropertyDescriptor BROKERS = new PropertyDescriptor.Builder()
.name("Brokers")
.description("A comma-separated list of known AMQP Brokers in the format <host>:<port> (e.g., localhost:5672). If this is " +
Expand Down Expand Up @@ -137,18 +150,16 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
private static final List<PropertyDescriptor> propertyDescriptors;

static {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(BROKERS);
properties.add(HOST);
properties.add(PORT);
properties.add(V_HOST);
properties.add(USER);
properties.add(PASSWORD);
properties.add(AMQP_VERSION);
properties.add(SSL_CONTEXT_SERVICE);
properties.add(USE_CERT_AUTHENTICATION);
properties.add(CLIENT_AUTH);
propertyDescriptors = Collections.unmodifiableList(properties);
propertyDescriptors = List.of(
BROKERS,
HOST, PORT,
V_HOST,
USER,
PASSWORD,
AMQP_VERSION,
SSL_CONTEXT_SERVICE,
USE_CERT_AUTHENTICATION,
CLIENT_AUTH);
}

protected static List<PropertyDescriptor> getCommonPropertyDescriptors() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
Expand All @@ -52,29 +53,31 @@
@CapabilityDescription("Consumes AMQP Messages from an AMQP Broker using the AMQP 0.9.1 protocol. Each message that is received from the AMQP Broker will be "
+ "emitted as its own FlowFile to the 'success' relationship.")
@WritesAttributes({
@WritesAttribute(attribute = "amqp$appId", description = "The App ID field from the AMQP Message"),
@WritesAttribute(attribute = "amqp$contentEncoding", description = "The Content Encoding reported by the AMQP Message"),
@WritesAttribute(attribute = "amqp$contentType", description = "The Content Type reported by the AMQP Message"),
@WritesAttribute(attribute = "amqp$headers", description = "The headers present on the AMQP Message. Added only if processor is configured to output this attribute."),
@WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_APPID_ATTRIBUTE, description = "The App ID field from the AMQP Message"),
@WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_CONTENT_ENCODING_ATTRIBUTE, description = "The Content Encoding reported by the AMQP Message"),
@WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_CONTENT_TYPE_ATTRIBUTE, description = "The Content Type reported by the AMQP Message"),
@WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE,
description = "The headers present on the AMQP Message. Added only if processor is configured to output this attribute."),
@WritesAttribute(attribute = "<Header Key Prefix>.<attribute>",
description = "Each message header will be inserted with this attribute name, if processor is configured to output headers as attribute"),
@WritesAttribute(attribute = "amqp$deliveryMode", description = "The numeric indicator for the Message's Delivery Mode"),
@WritesAttribute(attribute = "amqp$priority", description = "The Message priority"),
@WritesAttribute(attribute = "amqp$correlationId", description = "The Message's Correlation ID"),
@WritesAttribute(attribute = "amqp$replyTo", description = "The value of the Message's Reply-To field"),
@WritesAttribute(attribute = "amqp$expiration", description = "The Message Expiration"),
@WritesAttribute(attribute = "amqp$messageId", description = "The unique ID of the Message"),
@WritesAttribute(attribute = "amqp$timestamp", description = "The timestamp of the Message, as the number of milliseconds since epoch"),
@WritesAttribute(attribute = "amqp$type", description = "The type of message"),
@WritesAttribute(attribute = "amqp$userId", description = "The ID of the user"),
@WritesAttribute(attribute = "amqp$clusterId", description = "The ID of the AMQP Cluster"),
@WritesAttribute(attribute = "amqp$routingKey", description = "The routingKey of the AMQP Message"),
@WritesAttribute(attribute = "amqp$exchange", description = "The exchange from which AMQP Message was received")
@WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_DELIVERY_MODE_ATTRIBUTE, description = "The numeric indicator for the Message's Delivery Mode"),
@WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_PRIORITY_ATTRIBUTE, description = "The Message priority"),
@WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_CORRELATION_ID_ATTRIBUTE, description = "The Message's Correlation ID"),
@WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_REPLY_TO_ATTRIBUTE, description = "The value of the Message's Reply-To field"),
@WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_EXPIRATION_ATTRIBUTE, description = "The Message Expiration"),
@WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_MESSAGE_ID_ATTRIBUTE, description = "The unique ID of the Message"),
@WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_TIMESTAMP_ATTRIBUTE, description = "The timestamp of the Message, as the number of milliseconds since epoch"),
@WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_TYPE_ATTRIBUTE, description = "The type of message"),
@WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_USER_ID_ATTRIBUTE, description = "The ID of the user"),
@WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_CLUSTER_ID_ATTRIBUTE, description = "The ID of the AMQP Cluster"),
@WritesAttribute(attribute = ConsumeAMQP.AMQP_ROUTING_KEY_ATTRIBUTE, description = "The routingKey of the AMQP Message"),
@WritesAttribute(attribute = ConsumeAMQP.AMQP_EXCHANGE_ATTRIBUTE, description = "The exchange from which AMQP Message was received")
})
public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {

private static final String ATTRIBUTES_PREFIX = "amqp$";
public static final String DEFAULT_HEADERS_KEY_PREFIX = "consume.amqp";
public static final String AMQP_ROUTING_KEY_ATTRIBUTE = "amqp$routingKey";
public static final String AMQP_EXCHANGE_ATTRIBUTE = "amqp$exchange";

public static final AllowableValue HEADERS_FORMAT_COMMA_SEPARATED_STRING = new AllowableValue("Comma-Separated String", "Comma-Separated String",
"Put all headers as a string with the specified separator in the attribute 'amqp$headers'.");
Expand Down Expand Up @@ -162,6 +165,16 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {

private static final ObjectMapper objectMapper;

private Integer batchSize;
private String queueName;
private String headerFormat;
private String headerAttributePrefix;
private boolean removeCurlyBraces;
private String valueSeparatorForHeaders;

private boolean autoAcknowledge;


static {
List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(QUEUE);
Expand All @@ -179,6 +192,18 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
objectMapper = new ObjectMapper();
}

@OnScheduled
public void onScheduled(ProcessContext context) {
super.onScheduled(context);
batchSize = context.getProperty(BATCH_SIZE).asInteger();
queueName = context.getProperty(QUEUE).getValue();
headerFormat = context.getProperty(HEADER_FORMAT).getValue();
headerAttributePrefix = context.getProperty(HEADER_KEY_PREFIX).getValue();
removeCurlyBraces=context.getProperty(REMOVE_CURLY_BRACES).asBoolean();
valueSeparatorForHeaders = context.getProperty(HEADER_SEPARATOR).getValue();
autoAcknowledge = context.getProperty(AUTO_ACKNOWLEDGE).asBoolean();
}

/**
* Will construct a {@link FlowFile} containing the body of the consumed AMQP message (if {@link GetResponse} returned by {@link AMQPConsumer} is
* not null) and AMQP properties that came with message which are added to a {@link FlowFile} as attributes, transferring {@link FlowFile} to
Expand All @@ -192,7 +217,7 @@ protected void processResource(final Connection connection, final AMQPConsumer c
throw new AMQPException("AMQP client has lost connection.");
}

for (int i = 0; i < context.getProperty(BATCH_SIZE).asInteger(); i++) {
for (int i = 0; i < batchSize; i++) {
final GetResponse response = consumer.consume();
if (response == null) {
if (lastReceived == null) {
Expand All @@ -208,13 +233,10 @@ protected void processResource(final Connection connection, final AMQPConsumer c

final BasicProperties amqpProperties = response.getProps();
final Envelope envelope = response.getEnvelope();
final String headerFormat = context.getProperty(HEADER_FORMAT).getValue();
final String headerKeyPrefix = context.getProperty(HEADER_KEY_PREFIX).getValue();
final Map<String, String> attributes = buildAttributes(amqpProperties, envelope, headerFormat, headerKeyPrefix,
context.getProperty(REMOVE_CURLY_BRACES).asBoolean(), context.getProperty(HEADER_SEPARATOR).toString());
final Map<String, String> attributes = buildAttributes(amqpProperties, envelope);
flowFile = session.putAllAttributes(flowFile, attributes);

session.getProvenanceReporter().receive(flowFile, connection.toString() + "/" + context.getProperty(QUEUE).getValue());
session.getProvenanceReporter().receive(flowFile, connection + "/" + queueName);
session.transfer(flowFile, REL_SUCCESS);
lastReceived = response;
}
Expand All @@ -225,59 +247,60 @@ protected void processResource(final Connection connection, final AMQPConsumer c
}
}

private Map<String, String> buildAttributes(final BasicProperties properties, final Envelope envelope, String headersStringFormat, String headerAttributePrefix, boolean removeCurlyBraces,
String valueSeparatorForHeaders) {
AllowableValue headerFormat = new AllowableValue(headersStringFormat);
private Map<String, String> buildAttributes(final BasicProperties properties, final Envelope envelope) {
final Map<String, String> attributes = new HashMap<>();
addAttribute(attributes, ATTRIBUTES_PREFIX + "appId", properties.getAppId());
addAttribute(attributes, ATTRIBUTES_PREFIX + "contentEncoding", properties.getContentEncoding());
addAttribute(attributes, ATTRIBUTES_PREFIX + "contentType", properties.getContentType());
addAttribute(attributes, ATTRIBUTES_PREFIX + "deliveryMode", properties.getDeliveryMode());
addAttribute(attributes, ATTRIBUTES_PREFIX + "priority", properties.getPriority());
addAttribute(attributes, ATTRIBUTES_PREFIX + "correlationId", properties.getCorrelationId());
addAttribute(attributes, ATTRIBUTES_PREFIX + "replyTo", properties.getReplyTo());
addAttribute(attributes, ATTRIBUTES_PREFIX + "expiration", properties.getExpiration());
addAttribute(attributes, ATTRIBUTES_PREFIX + "messageId", properties.getMessageId());
addAttribute(attributes, ATTRIBUTES_PREFIX + "timestamp", properties.getTimestamp() == null ? null : properties.getTimestamp().getTime());
addAttribute(attributes, ATTRIBUTES_PREFIX + "type", properties.getType());
addAttribute(attributes, ATTRIBUTES_PREFIX + "userId", properties.getUserId());
addAttribute(attributes, ATTRIBUTES_PREFIX + "clusterId", properties.getClusterId());
addAttribute(attributes, ATTRIBUTES_PREFIX + "routingKey", envelope.getRoutingKey());
addAttribute(attributes, ATTRIBUTES_PREFIX + "exchange", envelope.getExchange());
addAttribute(attributes, AbstractAMQPProcessor.AMQP_APPID_ATTRIBUTE, properties.getAppId());
addAttribute(attributes, AbstractAMQPProcessor.AMQP_CONTENT_ENCODING_ATTRIBUTE, properties.getContentEncoding());
addAttribute(attributes, AbstractAMQPProcessor.AMQP_CONTENT_TYPE_ATTRIBUTE, properties.getContentType());
addAttribute(attributes, AbstractAMQPProcessor.AMQP_DELIVERY_MODE_ATTRIBUTE, properties.getDeliveryMode());
addAttribute(attributes, AbstractAMQPProcessor.AMQP_PRIORITY_ATTRIBUTE, properties.getPriority());
addAttribute(attributes, AbstractAMQPProcessor.AMQP_CORRELATION_ID_ATTRIBUTE, properties.getCorrelationId());
addAttribute(attributes, AbstractAMQPProcessor.AMQP_REPLY_TO_ATTRIBUTE, properties.getReplyTo());
addAttribute(attributes, AbstractAMQPProcessor.AMQP_EXPIRATION_ATTRIBUTE, properties.getExpiration());
addAttribute(attributes, AbstractAMQPProcessor.AMQP_MESSAGE_ID_ATTRIBUTE, properties.getMessageId());
addAttribute(attributes, AbstractAMQPProcessor.AMQP_TIMESTAMP_ATTRIBUTE, properties.getTimestamp() == null ? null : properties.getTimestamp().getTime());
addAttribute(attributes, AbstractAMQPProcessor.AMQP_CONTENT_TYPE_ATTRIBUTE, properties.getType());
addAttribute(attributes, AbstractAMQPProcessor.AMQP_USER_ID_ATTRIBUTE, properties.getUserId());
addAttribute(attributes, AbstractAMQPProcessor.AMQP_CLUSTER_ID_ATTRIBUTE, properties.getClusterId());
addAttribute(attributes, AMQP_ROUTING_KEY_ATTRIBUTE, envelope.getRoutingKey());
addAttribute(attributes, AMQP_EXCHANGE_ATTRIBUTE, envelope.getExchange());
Map<String, Object> headers = properties.getHeaders();
if (headers != null) {
if (HEADERS_FORMAT_ATTRIBUTES.equals(headerFormat)) {
headers.forEach((key, value) -> addAttribute(attributes,
String.format("%s.%s", headerAttributePrefix, key), value));
if (HEADERS_FORMAT_ATTRIBUTES.getValue().equals(headerFormat)) {
headers.forEach((key, value) -> addAttribute(attributes, String.format("%s.%s", headerAttributePrefix, key), value));
} else {
addAttribute(attributes, ATTRIBUTES_PREFIX + "headers",
buildHeaders(properties.getHeaders(), headerFormat, removeCurlyBraces,
valueSeparatorForHeaders));
addAttribute(attributes, AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE,
buildHeaders(properties.getHeaders()));
}
}
return attributes;
}

/**
* Adds the given attribute name and value in to the map of attributes
* @param attributes List of attributes to update
* @param attributeName Name of the attribute
* @param value Value of the attribute
*/
private void addAttribute(final Map<String, String> attributes, final String attributeName, final Object value) {
if (value == null) {
return;
}

attributes.put(attributeName, value.toString());
}

private String buildHeaders(Map<String, Object> headers, AllowableValue headerFormat, boolean removeCurlyBraces, String valueSeparatorForHeaders) {
private String buildHeaders(Map<String, Object> headers) {
if (headers == null) {
return null;
}
String headerString = null;
if (headerFormat.equals(HEADERS_FORMAT_COMMA_SEPARATED_STRING)) {
if ( HEADERS_FORMAT_COMMA_SEPARATED_STRING.getValue().equals(headerFormat)) {
headerString = convertMapToString(headers, valueSeparatorForHeaders);

if (!removeCurlyBraces) {
headerString = "{" + headerString + "}";
}
} else if (headerFormat.equals(HEADERS_FORMAT_JSON_STRING)) {
} else if (HEADERS_FORMAT_JSON_STRING.getValue().equals(headerFormat)) {
try {
headerString = convertMapToJSONString(headers);
} catch (JsonProcessingException e) {
Expand All @@ -288,7 +311,7 @@ private String buildHeaders(Map<String, Object> headers, AllowableValue headerFo
}

private static String convertMapToString(Map<String, Object> headers, String valueSeparatorForHeaders) {
return headers.entrySet().stream().map(e -> (e.getValue()!= null) ? e.getKey() + "=" + e.getValue(): e.getKey())
return headers.entrySet().stream().map(e -> (e.getValue() != null) ? e.getKey() + "=" + e.getValue() : e.getKey())
.collect(Collectors.joining(valueSeparatorForHeaders));
}

Expand All @@ -299,8 +322,6 @@ private static String convertMapToJSONString(Map<String, Object> headers) throws
@Override
protected synchronized AMQPConsumer createAMQPWorker(final ProcessContext context, final Connection connection) {
try {
final String queueName = context.getProperty(QUEUE).getValue();
final boolean autoAcknowledge = context.getProperty(AUTO_ACKNOWLEDGE).asBoolean();
final AMQPConsumer amqpConsumer = new AMQPConsumer(connection, queueName, autoAcknowledge, getLogger());

return amqpConsumer;
Expand Down
Loading

0 comments on commit a84db6e

Please sign in to comment.