Skip to content
Permalink
Browse files

ENH: lib/pipeline: amqp: not declare queue if exchange set

Do not declare the queue if the exchange is set explicitly
  • Loading branch information...
wagner-certat committed Oct 8, 2019
1 parent 37ec9f7 commit 5aef10e9bd0badc779a39c40ef4bf92ffece7966
Showing with 9 additions and 3 deletions.
  1. +3 −1 CHANGELOG.md
  2. +2 −2 docs/User-Guide.md
  3. +4 −0 intelmq/lib/pipeline.py
@@ -20,7 +20,9 @@ CHANGELOG
- New internal variable `_has_message` to keep the state of the pipeline.
- Split receive and acknowledge into public-facing and private methods.
- Add `reject_message` method to the Pipeline class for explicit requeue of messages.
- AMQP: Make exchange configurable
- AMQP:
- Make exchange configurable.
- If exchange is set, the queues are not declared, the queue name is for routing used by exchanges.
- `intelmq.lib.bot`:
- Log message after successful bot initialization, no log message anymore for ready pipeline.
- Use existing current message if receive is called and the current message still exists.
@@ -268,14 +268,14 @@ You need to set the parameter `source_pipeline_broker`/`destination_pipeline_bro
* `destination_pipeline_username`
* `destination_pipeline_password`
* `destination_pipeline_socket_timeout` (default: no timeout)
* `destination_pipeline_amqp_exchange`: Only change/set this if you know what you do (default: `''`).
* `destination_pipeline_amqp_exchange`: Only change/set this if you know what you do. If set, the destination queues are not declared as queues, but used as routing key. (default: `''`).
* `destination_pipeline_amqp_virtual_host` (default: `'/'`)
* `source_pipeline_host` (default: `'127.0.0.1'`)
* `source_pipeline_port` (default: 5672)
* `source_pipeline_username`
* `source_pipeline_password`
* `source_pipeline_socket_timeout` (default: no timeout)
* `source_pipeline_amqp_exchange`: Only change/set this if you know what you do (default: `''`).
* `source_pipeline_amqp_exchange`: Only change/set this if you know what you do. If set, the destination queues are not declared as queues, but used as routing key. (default: `''`).
* `source_pipeline_amqp_virtual_host` (default: `'/'`)

For getting the queue sizes, `intelmqctl` needs to connect to the monitoring interface of RabbitMQ. If the monitoring interface is not available under "http://{host}:15671" you can manually set using the parameter `intelmqctl_rabbitmq_monitoring_url`.
@@ -456,6 +456,10 @@ def connect(self):
def setup_channel(self):
self.channel = self.connection.channel()
self.channel.confirm_delivery()

if self.exchange:
# Do not declare and use queues if an exchange is given
return
if self.source_queue:
self.channel.queue_declare(queue=self.source_queue, durable=True,
arguments=self.queue_args)

0 comments on commit 5aef10e

Please sign in to comment.
You can’t perform that action at this time.