From 811e88f17f8469f7d01a69158ab7c062325eeb34 Mon Sep 17 00:00:00 2001 From: Dominik Bruhn Date: Sun, 5 Jun 2016 23:18:56 +0200 Subject: [PATCH] RMQ Streaming: Possibility to customize queue This patch adds the possibilty for th user of the RabbitMQ Streaming Connector to customize the queue which is used. There are use-cases in which you want to set custom parameters for the queue (i.e. TTL of the messages if Flink reboots) or the possibility to bind the queue to an exchange afterwards. The commit doesn't change the actual behaviour but makes it possible for users to override the newly create `setupQueue` method and cutomize their implementation. This was not possible before. --- .../streaming/connectors/rabbitmq/RMQSource.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java index 39f483b9eba3f..4f6a07fe78999 100644 --- a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java +++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java @@ -76,7 +76,7 @@ public class RMQSource extends MultipleIdsMessageAcknowledgingSourceBase schema; @@ -177,6 +177,15 @@ protected ConnectionFactory setupConnectionFactory() { return new ConnectionFactory(); } + /** + * Sets up the queue. The default implementation just declares the queue. The user may override + * this method to have a custom setup for the queue (i.e. binding the queue to an exchange or + * defining custom queue parameters) + */ + protected void setupQueue() throws IOException { + channel.queueDeclare(queueName, true, false, false, null); + } + /** * Initializes the connection to RMQ. */ @@ -195,7 +204,7 @@ private void initializeConnection() { try { connection = factory.newConnection(); channel = connection.createChannel(); - channel.queueDeclare(queueName, true, false, false, null); + setupQueue(); consumer = new QueueingConsumer(channel); RuntimeContext runtimeContext = getRuntimeContext();