diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java index 39f483b9eba3f..4f6a07fe78999 100644 --- a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java +++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java @@ -76,7 +76,7 @@ public class RMQSource extends MultipleIdsMessageAcknowledgingSourceBase schema; @@ -177,6 +177,15 @@ protected ConnectionFactory setupConnectionFactory() { return new ConnectionFactory(); } + /** + * Sets up the queue. The default implementation just declares the queue. The user may override + * this method to have a custom setup for the queue (i.e. binding the queue to an exchange or + * defining custom queue parameters) + */ + protected void setupQueue() throws IOException { + channel.queueDeclare(queueName, true, false, false, null); + } + /** * Initializes the connection to RMQ. */ @@ -195,7 +204,7 @@ private void initializeConnection() { try { connection = factory.newConnection(); channel = connection.createChannel(); - channel.queueDeclare(queueName, true, false, false, null); + setupQueue(); consumer = new QueueingConsumer(channel); RuntimeContext runtimeContext = getRuntimeContext();