Skip to content

Commit

Permalink
NIFI-12411: Update PublishAMQP to read AMQP headers value from FlowFi…
Browse files Browse the repository at this point in the history
…le attributes and `amq$headers` string

Processor can now also create message headers using attributes matching the given Regex property. The name of attribute is used as header key.
In addition to that it can read values from two sources i.e. attributes matching Regex and the headers present in `amq$headers` string. In case
of key duplication in both sources the precedence property decide which value will be used.

Signed-off-by: Umar Hussain <umarhussain.work@gmail.com>
  • Loading branch information
umarhussain15 committed May 17, 2024
1 parent 60112f2 commit b8e6ab6
Show file tree
Hide file tree
Showing 6 changed files with 497 additions and 169 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

/**
* Generic publisher of messages to AMQP-based messaging system. It is based on
* RabbitMQ client API (https://www.rabbitmq.com/api-guide.html)
* RabbitMQ client API (<a href="https://www.rabbitmq.com/api-guide.html">Java Client API Guide</a>)
*/
final class AMQPPublisher extends AMQPWorker {

Expand Down Expand Up @@ -63,7 +63,7 @@ void publish(byte[] bytes, BasicProperties properties, String routingKey, String
exchange = exchange == null ? "" : exchange.trim();

if (processorLog.isDebugEnabled()) {
if (exchange.length() == 0) {
if (exchange.isEmpty()) {
processorLog.debug("The 'exchangeName' is not specified. Messages will be sent to default exchange");
}
processorLog.debug("Successfully connected AMQPPublisher to " + this.connectionString + " and '" + exchange
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
Expand All @@ -49,14 +48,28 @@

/**
* Base processor that uses RabbitMQ client API
* (https://www.rabbitmq.com/api-guide.html) to rendezvous with AMQP-based
* (<a href="https://www.rabbitmq.com/api-guide.html">Java Client API Guide</a>) to rendezvous with AMQP-based
* messaging systems version 0.9.1
*
* @param <T> the type of {@link AMQPWorker}. Please see {@link AMQPPublisher}
* and {@link AMQPConsumer}
*/
abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProcessor {

public static final String AMQP_APPID_ATTRIBUTE = "amqp$appId";
public static final String AMQP_CONTENT_ENCODING_ATTRIBUTE = "amqp$contentEncoding";
public static final String AMQP_CONTENT_TYPE_ATTRIBUTE = "amqp$contentType";
public static final String AMQP_HEADERS_ATTRIBUTE = "amqp$headers";
public static final String AMQP_DELIVERY_MODE_ATTRIBUTE = "amqp$deliveryMode";
public static final String AMQP_PRIORITY_ATTRIBUTE = "amqp$priority";
public static final String AMQP_CORRELATION_ID_ATTRIBUTE = "amqp$correlationId";
public static final String AMQP_REPLY_TO_ATTRIBUTE = "amqp$replyTo";
public static final String AMQP_EXPIRATION_ATTRIBUTE = "amqp$expiration";
public static final String AMQP_MESSAGE_ID_ATTRIBUTE = "amqp$messageId";
public static final String AMQP_TIMESTAMP_ATTRIBUTE = "amqp$timestamp";
public static final String AMQP_TYPE_ATTRIBUTE = "amqp$type";
public static final String AMQP_USER_ID_ATTRIBUTE = "amqp$userId";
public static final String AMQP_CLUSTER_ID_ATTRIBUTE = "amqp$clusterId";
public static final PropertyDescriptor BROKERS = new PropertyDescriptor.Builder()
.name("Brokers")
.description("A comma-separated list of known AMQP Brokers in the format <host>:<port> (e.g., localhost:5672). If this is " +
Expand Down Expand Up @@ -129,17 +142,15 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
private static final List<PropertyDescriptor> propertyDescriptors;

static {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(BROKERS);
properties.add(HOST);
properties.add(PORT);
properties.add(V_HOST);
properties.add(USER);
properties.add(PASSWORD);
properties.add(AMQP_VERSION);
properties.add(SSL_CONTEXT_SERVICE);
properties.add(USE_CERT_AUTHENTICATION);
propertyDescriptors = Collections.unmodifiableList(properties);
propertyDescriptors = List.of(
BROKERS,
HOST, PORT,
V_HOST,
USER,
PASSWORD,
AMQP_VERSION,
SSL_CONTEXT_SERVICE,
USE_CERT_AUTHENTICATION);
}

protected static List<PropertyDescriptor> getCommonPropertyDescriptors() {
Expand Down
Loading

0 comments on commit b8e6ab6

Please sign in to comment.