From 233ea996e4be9eae90ca927fc6f0e7d0a53e4305 Mon Sep 17 00:00:00 2001 From: Surendra Tiwari Date: Wed, 10 Jan 2024 20:43:17 +0530 Subject: [PATCH] AMQP: added custom delayed queue config (#796) * AMQP: added custom delayed queue config * added documentation for delayed queue --- README.md | 1 + v1/brokers/amqp/amqp.go | 54 ++++++++++++++++++++++++++-------------- v1/config/config.go | 1 + v2/brokers/amqp/amqp.go | 55 +++++++++++++++++++++++++++-------------- v2/config/config.go | 1 + 5 files changed, 76 insertions(+), 36 deletions(-) diff --git a/README.md b/README.md index e430ce025..ddb6fdf7d 100644 --- a/README.md +++ b/README.md @@ -343,6 +343,7 @@ RabbitMQ related configuration. Not necessary if you are using other broker/back * `QueueBindingArguments`: an optional map of additional arguments used when binding to an AMQP queue * `BindingKey`: The queue is bind to the exchange with this key, e.g. `machinery_task` * `PrefetchCount`: How many tasks to prefetch (set to `1` if you have long running tasks) +* `DelayedQueue`: delayed queue name to be used for task retry or delayed task (if empty it will follow auto create and delate delayed queues) #### DynamoDB diff --git a/v1/brokers/amqp/amqp.go b/v1/brokers/amqp/amqp.go index 9f435aff8..953fd84e2 100644 --- a/v1/brokers/amqp/amqp.go +++ b/v1/brokers/amqp/amqp.go @@ -354,24 +354,47 @@ func (b *Broker) delay(signature *tasks.Signature, delayMs int64) error { return fmt.Errorf("JSON marshal error: %s", err) } - // It's necessary to redeclare the queue each time (to zero its TTL timer). - queueName := fmt.Sprintf( - "delay.%d.%s.%s", - delayMs, // delay duration in mileseconds - b.GetConfig().AMQP.Exchange, - signature.RoutingKey, // routing key - ) + queueName := b.GetConfig().AMQP.DelayedQueue declareQueueArgs := amqp.Table{ // Exchange where to send messages after TTL expiration. "x-dead-letter-exchange": b.GetConfig().AMQP.Exchange, // Routing key which use when resending expired messages. "x-dead-letter-routing-key": signature.RoutingKey, - // Time in milliseconds - // after that message will expire and be sent to destination. - "x-message-ttl": delayMs, - // Time after that the queue will be deleted. - "x-expires": delayMs * 2, } + messageProperties := amqp.Publishing{ + Headers: amqp.Table(signature.Headers), + ContentType: "application/json", + Body: message, + DeliveryMode: amqp.Persistent, + Expiration: fmt.Sprint(delayMs), + } + if queueName == "" { + // It's necessary to redeclare the queue each time (to zero its TTL timer). + queueName = fmt.Sprintf( + "delay.%d.%s.%s", + delayMs, // delay duration in mileseconds + b.GetConfig().AMQP.Exchange, + signature.RoutingKey, // routing key + ) + declareQueueArgs = amqp.Table{ + // Exchange where to send messages after TTL expiration. + "x-dead-letter-exchange": b.GetConfig().AMQP.Exchange, + // Routing key which use when resending expired messages. + "x-dead-letter-routing-key": signature.RoutingKey, + // Time in milliseconds + // after that message will expire and be sent to destination. + "x-message-ttl": delayMs, + // Time after that the queue will be deleted. + "x-expires": delayMs * 2, + } + messageProperties = amqp.Publishing{ + Headers: amqp.Table(signature.Headers), + ContentType: "application/json", + Body: message, + DeliveryMode: amqp.Persistent, + } + } + conn, channel, _, _, _, err := b.Connect( b.GetConfig().Broker, b.GetConfig().MultipleBrokerSeparator, @@ -397,12 +420,7 @@ func (b *Broker) delay(signature *tasks.Signature, delayMs int64) error { queueName, // routing key false, // mandatory false, // immediate - amqp.Publishing{ - Headers: amqp.Table(signature.Headers), - ContentType: "application/json", - Body: message, - DeliveryMode: amqp.Persistent, - }, + messageProperties, ); err != nil { return err } diff --git a/v1/config/config.go b/v1/config/config.go index 40d11fc06..1cce0b623 100644 --- a/v1/config/config.go +++ b/v1/config/config.go @@ -85,6 +85,7 @@ type AMQPConfig struct { BindingKey string `yaml:"binding_key" envconfig:"AMQP_BINDING_KEY"` PrefetchCount int `yaml:"prefetch_count" envconfig:"AMQP_PREFETCH_COUNT"` AutoDelete bool `yaml:"auto_delete" envconfig:"AMQP_AUTO_DELETE"` + DelayedQueue string `yaml:"delayed_queue" envconfig:"AMQP_DELAYED_QUEUE"` } // DynamoDBConfig wraps DynamoDB related configuration diff --git a/v2/brokers/amqp/amqp.go b/v2/brokers/amqp/amqp.go index 428915894..c253da13e 100644 --- a/v2/brokers/amqp/amqp.go +++ b/v2/brokers/amqp/amqp.go @@ -354,24 +354,48 @@ func (b *Broker) delay(signature *tasks.Signature, delayMs int64) error { return fmt.Errorf("JSON marshal error: %s", err) } - // It's necessary to redeclare the queue each time (to zero its TTL timer). - queueName := fmt.Sprintf( - "delay.%d.%s.%s", - delayMs, // delay duration in mileseconds - b.GetConfig().AMQP.Exchange, - signature.RoutingKey, // routing key - ) + queueName := b.GetConfig().AMQP.DelayedQueue declareQueueArgs := amqp.Table{ // Exchange where to send messages after TTL expiration. "x-dead-letter-exchange": b.GetConfig().AMQP.Exchange, // Routing key which use when resending expired messages. "x-dead-letter-routing-key": signature.RoutingKey, - // Time in milliseconds - // after that message will expire and be sent to destination. - "x-message-ttl": delayMs, - // Time after that the queue will be deleted. - "x-expires": delayMs * 2, } + messageProperties := amqp.Publishing{ + Headers: amqp.Table(signature.Headers), + ContentType: "application/json", + Body: message, + DeliveryMode: amqp.Persistent, + Expiration: fmt.Sprint(delayMs), + } + + if queueName == "" { + // It's necessary to redeclare the queue each time (to zero its TTL timer). + queueName = fmt.Sprintf( + "delay.%d.%s.%s", + delayMs, // delay duration in mileseconds + b.GetConfig().AMQP.Exchange, + signature.RoutingKey, // routing key + ) + declareQueueArgs = amqp.Table{ + // Exchange where to send messages after TTL expiration. + "x-dead-letter-exchange": b.GetConfig().AMQP.Exchange, + // Routing key which use when resending expired messages. + "x-dead-letter-routing-key": signature.RoutingKey, + // Time in milliseconds + // after that message will expire and be sent to destination. + "x-message-ttl": delayMs, + // Time after that the queue will be deleted. + "x-expires": delayMs * 2, + } + messageProperties = amqp.Publishing{ + Headers: amqp.Table(signature.Headers), + ContentType: "application/json", + Body: message, + DeliveryMode: amqp.Persistent, + } + } + conn, channel, _, _, _, err := b.Connect( b.GetConfig().Broker, b.GetConfig().MultipleBrokerSeparator, @@ -397,12 +421,7 @@ func (b *Broker) delay(signature *tasks.Signature, delayMs int64) error { queueName, // routing key false, // mandatory false, // immediate - amqp.Publishing{ - Headers: amqp.Table(signature.Headers), - ContentType: "application/json", - Body: message, - DeliveryMode: amqp.Persistent, - }, + messageProperties, ); err != nil { return err } diff --git a/v2/config/config.go b/v2/config/config.go index f2a326cb1..0ae628322 100644 --- a/v2/config/config.go +++ b/v2/config/config.go @@ -85,6 +85,7 @@ type AMQPConfig struct { BindingKey string `yaml:"binding_key" envconfig:"AMQP_BINDING_KEY"` PrefetchCount int `yaml:"prefetch_count" envconfig:"AMQP_PREFETCH_COUNT"` AutoDelete bool `yaml:"auto_delete" envconfig:"AMQP_AUTO_DELETE"` + DelayedQueue string `yaml:"delayed_queue" envconfig:"AMQP_DELAYED_QUEUE"` } // DynamoDBConfig wraps DynamoDB related configuration