Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-12411: Update PublishAMQP to read AMQP headers value from FlowFile attributes and amq$headers string #8105

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

/**
* Generic publisher of messages to AMQP-based messaging system. It is based on
* RabbitMQ client API (https://www.rabbitmq.com/api-guide.html)
* RabbitMQ client API (<a href="https://www.rabbitmq.com/api-guide.html">Java Client API Guide</a>)
*/
final class AMQPPublisher extends AMQPWorker {

Expand Down Expand Up @@ -63,7 +63,7 @@ void publish(byte[] bytes, BasicProperties properties, String routingKey, String
exchange = exchange == null ? "" : exchange.trim();

if (processorLog.isDebugEnabled()) {
if (exchange.length() == 0) {
if (exchange.isEmpty()) {
processorLog.debug("The 'exchangeName' is not specified. Messages will be sent to default exchange");
}
processorLog.debug("Successfully connected AMQPPublisher to {} and '{}' exchange with '{}' as a routing key.", this.connectionString, exchange, routingKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
Expand All @@ -49,14 +48,28 @@

/**
* Base processor that uses RabbitMQ client API
* (https://www.rabbitmq.com/api-guide.html) to rendezvous with AMQP-based
* (<a href="https://www.rabbitmq.com/api-guide.html">Java Client API Guide</a>) to rendezvous with AMQP-based
* messaging systems version 0.9.1
*
* @param <T> the type of {@link AMQPWorker}. Please see {@link AMQPPublisher}
* and {@link AMQPConsumer}
*/
abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProcessor {

public static final String AMQP_APPID_ATTRIBUTE = "amqp$appId";
public static final String AMQP_CONTENT_ENCODING_ATTRIBUTE = "amqp$contentEncoding";
public static final String AMQP_CONTENT_TYPE_ATTRIBUTE = "amqp$contentType";
public static final String AMQP_HEADERS_ATTRIBUTE = "amqp$headers";
public static final String AMQP_DELIVERY_MODE_ATTRIBUTE = "amqp$deliveryMode";
public static final String AMQP_PRIORITY_ATTRIBUTE = "amqp$priority";
public static final String AMQP_CORRELATION_ID_ATTRIBUTE = "amqp$correlationId";
public static final String AMQP_REPLY_TO_ATTRIBUTE = "amqp$replyTo";
public static final String AMQP_EXPIRATION_ATTRIBUTE = "amqp$expiration";
public static final String AMQP_MESSAGE_ID_ATTRIBUTE = "amqp$messageId";
public static final String AMQP_TIMESTAMP_ATTRIBUTE = "amqp$timestamp";
public static final String AMQP_TYPE_ATTRIBUTE = "amqp$type";
public static final String AMQP_USER_ID_ATTRIBUTE = "amqp$userId";
public static final String AMQP_CLUSTER_ID_ATTRIBUTE = "amqp$clusterId";
public static final PropertyDescriptor BROKERS = new PropertyDescriptor.Builder()
.name("Brokers")
.description("A comma-separated list of known AMQP Brokers in the format <host>:<port> (e.g., localhost:5672). If this is " +
Expand Down Expand Up @@ -129,17 +142,15 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
private static final List<PropertyDescriptor> propertyDescriptors;

static {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(BROKERS);
properties.add(HOST);
properties.add(PORT);
properties.add(V_HOST);
properties.add(USER);
properties.add(PASSWORD);
properties.add(AMQP_VERSION);
properties.add(SSL_CONTEXT_SERVICE);
properties.add(USE_CERT_AUTHENTICATION);
propertyDescriptors = Collections.unmodifiableList(properties);
propertyDescriptors = List.of(
BROKERS,
HOST, PORT,
V_HOST,
USER,
PASSWORD,
AMQP_VERSION,
SSL_CONTEXT_SERVICE,
USE_CERT_AUTHENTICATION);
}

protected static List<PropertyDescriptor> getCommonPropertyDescriptors() {
Expand Down
Loading