From 62161272243485b315630f250863713321945e73 Mon Sep 17 00:00:00 2001 From: yaoyangyu Date: Sun, 18 Aug 2019 19:05:18 +0800 Subject: [PATCH 1/2] Modify API NewRedisCeleryBroker, add parameters queueName. Because i want to set queuename, call multiple tasks but they have same task-name --- redis_broker.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/redis_broker.go b/redis_broker.go index 4e4b983..9d5d0df 100644 --- a/redis_broker.go +++ b/redis_broker.go @@ -38,10 +38,14 @@ func NewRedisPool(uri string) *redis.Pool { } // NewRedisCeleryBroker creates new RedisCeleryBroker based on given uri -func NewRedisCeleryBroker(uri string) *RedisCeleryBroker { +func NewRedisCeleryBroker(uri string, queuename string) *RedisCeleryBroker { + qName := "celery" + if queuename != "" { + qName = queuename + } return &RedisCeleryBroker{ Pool: NewRedisPool(uri), - queueName: "celery", + queueName: qName, } } @@ -72,7 +76,7 @@ func (cb *RedisCeleryBroker) GetCeleryMessage() (*CeleryMessage, error) { return nil, fmt.Errorf("null message received from redis") } messageList := messageJSON.([]interface{}) - if string(messageList[0].([]byte)) != "celery" { + if string(messageList[0].([]byte)) != cd.queueName { return nil, fmt.Errorf("not a celery message: %v", messageList[0]) } var message CeleryMessage From 3eb7ab0a33a990a189dbfc5a2a1074af4e45ae06 Mon Sep 17 00:00:00 2001 From: yaoyangyu Date: Sun, 18 Aug 2019 20:30:27 +0800 Subject: [PATCH 2/2] Modify API NewRedisCeleryBroker, fix parameters error calling NewRedisCeleryBroker. --- README.md | 4 ++-- example/goclient/main.go | 2 +- example/goworker/main.go | 2 +- example_client_named_arg_test.go | 2 +- example_client_test.go | 2 +- example_worker_named_arg_test.go | 2 +- example_worker_test.go | 2 +- example_worker_with_context_test.go | 2 +- gocelery_test.go | 2 +- redis_broker.go | 2 +- 10 files changed, 11 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 99a1c17..4ac3528 100644 --- a/README.md +++ b/README.md @@ -58,7 +58,7 @@ Run Celery Worker implemented in Go ```go // initialize celery client cli, _ := NewCeleryClient( - NewRedisCeleryBroker("redis://"), + NewRedisCeleryBroker("redis://", ""), NewRedisCeleryBackend("redis://"), 5, // number of workers ) @@ -131,7 +131,7 @@ Submit Task from Go Client func main() { // initialize celery client cli, _ := NewCeleryClient( - NewRedisCeleryBroker("redis://"), + NewRedisCeleryBroker("redis://", ""), NewRedisCeleryBackend("redis://"), 1, ) diff --git a/example/goclient/main.go b/example/goclient/main.go index 5ff990a..a711135 100644 --- a/example/goclient/main.go +++ b/example/goclient/main.go @@ -20,7 +20,7 @@ func main() { // initialize celery client cli, _ := gocelery.NewCeleryClient( - gocelery.NewRedisCeleryBroker("redis://"), + gocelery.NewRedisCeleryBroker("redis://", ""), gocelery.NewRedisCeleryBackend("redis://"), 1, ) diff --git a/example/goworker/main.go b/example/goworker/main.go index c948ed9..cdf4c97 100644 --- a/example/goworker/main.go +++ b/example/goworker/main.go @@ -49,7 +49,7 @@ func main() { // initialize celery client cli, _ := gocelery.NewCeleryClient( - gocelery.NewRedisCeleryBroker("redis://"), + gocelery.NewRedisCeleryBroker("redis://", ""), gocelery.NewRedisCeleryBackend("redis://"), 5, // number of workers ) diff --git a/example_client_named_arg_test.go b/example_client_named_arg_test.go index 1b345d6..b1c07b9 100644 --- a/example_client_named_arg_test.go +++ b/example_client_named_arg_test.go @@ -15,7 +15,7 @@ func Example_clientWithNamedArguments() { // initialize celery client cli, _ := NewCeleryClient( - NewRedisCeleryBroker("redis://"), + NewRedisCeleryBroker("redis://", ""), NewRedisCeleryBackend("redis://"), 1, ) diff --git a/example_client_test.go b/example_client_test.go index 993dcd5..906afc5 100644 --- a/example_client_test.go +++ b/example_client_test.go @@ -15,7 +15,7 @@ func Example_client() { // initialize celery client cli, _ := NewCeleryClient( - NewRedisCeleryBroker("redis://"), + NewRedisCeleryBroker("redis://", ""), NewRedisCeleryBackend("redis://"), 1, ) diff --git a/example_worker_named_arg_test.go b/example_worker_named_arg_test.go index 00d0129..2a4bfbd 100644 --- a/example_worker_named_arg_test.go +++ b/example_worker_named_arg_test.go @@ -47,7 +47,7 @@ func Example_workerWithNamedArguments() { // initialize celery client cli, _ := NewCeleryClient( - NewRedisCeleryBroker("redis://"), + NewRedisCeleryBroker("redis://", ""), NewRedisCeleryBackend("redis://"), 5, // number of workers ) diff --git a/example_worker_test.go b/example_worker_test.go index 1389686..d31e49a 100644 --- a/example_worker_test.go +++ b/example_worker_test.go @@ -10,7 +10,7 @@ func Example_worker() { // initialize celery client cli, _ := NewCeleryClient( - NewRedisCeleryBroker("redis://"), + NewRedisCeleryBroker("redis://", ""), NewRedisCeleryBackend("redis://"), 5, // number of workers ) diff --git a/example_worker_with_context_test.go b/example_worker_with_context_test.go index 909b6ee..e991d0b 100644 --- a/example_worker_with_context_test.go +++ b/example_worker_with_context_test.go @@ -13,7 +13,7 @@ func Example_workerWithContext() { // initialize celery client cli, _ := NewCeleryClient( - NewRedisCeleryBroker("redis://"), + NewRedisCeleryBroker("redis://", ""), NewRedisCeleryBackend("redis://"), 1, ) diff --git a/gocelery_test.go b/gocelery_test.go index de2d4e9..98a7af0 100644 --- a/gocelery_test.go +++ b/gocelery_test.go @@ -17,7 +17,7 @@ import ( const TIMEOUT = 2 * time.Second var ( - redisBroker = NewRedisCeleryBroker("redis://") + redisBroker = NewRedisCeleryBroker("redis://", "") redisBackend = NewRedisCeleryBackend("redis://") amqpBroker = NewAMQPCeleryBroker("amqp://") amqpBackend = NewAMQPCeleryBackend("amqp://") diff --git a/redis_broker.go b/redis_broker.go index 9d5d0df..f83acd8 100644 --- a/redis_broker.go +++ b/redis_broker.go @@ -76,7 +76,7 @@ func (cb *RedisCeleryBroker) GetCeleryMessage() (*CeleryMessage, error) { return nil, fmt.Errorf("null message received from redis") } messageList := messageJSON.([]interface{}) - if string(messageList[0].([]byte)) != cd.queueName { + if string(messageList[0].([]byte)) != cb.queueName { return nil, fmt.Errorf("not a celery message: %v", messageList[0]) } var message CeleryMessage