From 90be3b6162afeaf50a24424f3deb34e48a81a3dd Mon Sep 17 00:00:00 2001 From: Sick Yoon Date: Sun, 6 Oct 2019 17:06:54 -0400 Subject: [PATCH 1/2] support custom redis connection pool --- .gitignore | 1 + Gopkg.lock | 16 +++++++-- Gopkg.toml | 32 +++++++++++++++-- README.md | 78 +++++++++++++++++++++++++--------------- example/goclient/main.go | 24 +++++++++++-- example/goworker/main.go | 23 ++++++++++-- redis_backend.go | 11 ++++++ redis_broker.go | 46 +++++++++++++++--------- 8 files changed, 176 insertions(+), 55 deletions(-) diff --git a/.gitignore b/.gitignore index 9b00b60..cdc6d4f 100644 --- a/.gitignore +++ b/.gitignore @@ -27,3 +27,4 @@ _testmain.go coverage.out vendor +.vscode/ \ No newline at end of file diff --git a/Gopkg.lock b/Gopkg.lock index c16e262..d4d075c 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1,6 +1,17 @@ # This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'. +[[projects]] + digest = "1:0594af97b2f4cec6554086eeace6597e20a4b69466eb4ada25adf9f4300dddd2" + name = "github.com/garyburd/redigo" + packages = [ + "internal", + "redis", + ] + pruneopts = "UT" + revision = "a69d19351219b6dd56f274f96d85a7014a2ec34e" + version = "v1.6.0" + [[projects]] digest = "1:38ec74012390146c45af1f92d46e5382b50531247929ff3a685d2b2be65155ac" name = "github.com/gomodule/redigo" @@ -22,16 +33,17 @@ [[projects]] branch = "master" - digest = "1:baa0e36ba787113501e02a40cd45e3d17b216285bbaa3dabbc1b26e626ab4a39" + digest = "1:3df46e572883257c46c470dc1796f9bc609d0d0d7339dd10358030649b9beb93" name = "github.com/streadway/amqp" packages = ["."] pruneopts = "UT" - revision = "30f8ed68076eedf14990dafc696f469f58fc1342" + revision = "edfb9018d2714e4ec54dbaba37dbfef2bdadf0e4" [solve-meta] analyzer-name = "dep" analyzer-version = 1 input-imports = [ + "github.com/garyburd/redigo/redis", "github.com/gomodule/redigo/redis", "github.com/satori/go.uuid", "github.com/streadway/amqp", diff --git a/Gopkg.toml b/Gopkg.toml index 265a781..20b7071 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -1,12 +1,38 @@ +# Gopkg.toml example +# +# Refer to https://golang.github.io/dep/docs/Gopkg.toml.html +# for detailed Gopkg.toml documentation. +# +# required = ["github.com/user/thing/cmd/thing"] +# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"] +# +# [[constraint]] +# name = "github.com/user/project" +# version = "1.0.0" +# +# [[constraint]] +# name = "github.com/user/project2" +# branch = "dev" +# source = "github.com/myfork/project2" +# +# [[override]] +# name = "github.com/x/y" +# version = "2.4.0" +# +# [prune] +# non-go = false +# go-tests = true +# unused-packages = true -[[constraint]] - branch = "master" - name = "github.com/satori/go.uuid" [[constraint]] name = "github.com/gomodule/redigo" version = "2.0.0" +[[constraint]] + name = "github.com/satori/go.uuid" + branch = "master" + [[constraint]] branch = "master" name = "github.com/streadway/amqp" diff --git a/README.md b/README.md index 99a1c17..4a68315 100644 --- a/README.md +++ b/README.md @@ -56,10 +56,21 @@ Also take a look at `example` directory for sample python code. Run Celery Worker implemented in Go ```go +// create redis connection pool +redisPool := &redis.Pool{ + Dial: func() (redis.Conn, error) { + c, err := redis.DialURL("redis://") + if err != nil { + return nil, err + } + return c, err + }, +} + // initialize celery client -cli, _ := NewCeleryClient( - NewRedisCeleryBroker("redis://"), - NewRedisCeleryBackend("redis://"), +cli, _ := gocelery.NewCeleryClient( + gocelery.NewRedisBroker(redisPool), + &gocelery.RedisCeleryBackend{Pool: redisPool}, 5, // number of workers ) @@ -128,33 +139,42 @@ celery -A worker worker --loglevel=debug --without-heartbeat --without-mingle Submit Task from Go Client ```go -func main() { - // initialize celery client - cli, _ := NewCeleryClient( - NewRedisCeleryBroker("redis://"), - NewRedisCeleryBackend("redis://"), - 1, - ) - - // prepare arguments - taskName := "worker.add" - argA := rand.Intn(10) - argB := rand.Intn(10) - - // run task - asyncResult, err := cli.Delay(taskName, argA, argB) - if err != nil { - panic(err) - } - - // get results from backend with timeout - res, err := asyncResult.Get(10 * time.Second) - if err != nil { - panic(err) - } - - log.Printf("result: %+v of type %+v", res, reflect.TypeOf(res)) +// create redis connection pool +redisPool := &redis.Pool{ + Dial: func() (redis.Conn, error) { + c, err := redis.DialURL("redis://") + if err != nil { + return nil, err + } + return c, err + }, } + +// initialize celery client +cli, _ := gocelery.NewCeleryClient( + gocelery.NewRedisBroker(redisPool), + &gocelery.RedisCeleryBackend{Pool: redisPool}, + 1, +) + +// prepare arguments +taskName := "worker.add" +argA := rand.Intn(10) +argB := rand.Intn(10) + +// run task +asyncResult, err := cli.Delay(taskName, argA, argB) +if err != nil { + panic(err) +} + +// get results from backend with timeout +res, err := asyncResult.Get(10 * time.Second) +if err != nil { + panic(err) +} + +log.Printf("result: %+v of type %+v", res, reflect.TypeOf(res)) ``` ## Sample Celery Task Message diff --git a/example/goclient/main.go b/example/goclient/main.go index 5ff990a..7f07a64 100644 --- a/example/goclient/main.go +++ b/example/goclient/main.go @@ -11,17 +11,35 @@ import ( "time" "github.com/gocelery/gocelery" + "github.com/gomodule/redigo/redis" ) // Run Celery Worker First! // celery -A worker worker --loglevel=debug --without-heartbeat --without-mingle - func main() { + // create redis connection pool + redisPool := &redis.Pool{ + MaxIdle: 3, // maximum number of idle connections in the pool + MaxActive: 0, // maximum number of connections allocated by the pool at a given time + IdleTimeout: 240 * time.Second, // close connections after remaining idle for this duration + Dial: func() (redis.Conn, error) { + c, err := redis.DialURL("redis://") + if err != nil { + return nil, err + } + return c, err + }, + TestOnBorrow: func(c redis.Conn, t time.Time) error { + _, err := c.Do("PING") + return err + }, + } + // initialize celery client cli, _ := gocelery.NewCeleryClient( - gocelery.NewRedisCeleryBroker("redis://"), - gocelery.NewRedisCeleryBackend("redis://"), + gocelery.NewRedisBroker(redisPool), + &gocelery.RedisCeleryBackend{Pool: redisPool}, 1, ) diff --git a/example/goworker/main.go b/example/goworker/main.go index c948ed9..3c20f85 100644 --- a/example/goworker/main.go +++ b/example/goworker/main.go @@ -9,6 +9,7 @@ import ( "time" "github.com/gocelery/gocelery" + "github.com/gomodule/redigo/redis" ) // exampleAddTask is integer addition task @@ -47,10 +48,28 @@ func (a *exampleAddTask) RunTask() (interface{}, error) { func main() { + // create redis connection pool + redisPool := &redis.Pool{ + MaxIdle: 3, // maximum number of idle connections in the pool + MaxActive: 0, // maximum number of connections allocated by the pool at a given time + IdleTimeout: 240 * time.Second, // close connections after remaining idle for this duration + Dial: func() (redis.Conn, error) { + c, err := redis.DialURL("redis://") + if err != nil { + return nil, err + } + return c, err + }, + TestOnBorrow: func(c redis.Conn, t time.Time) error { + _, err := c.Do("PING") + return err + }, + } + // initialize celery client cli, _ := gocelery.NewCeleryClient( - gocelery.NewRedisCeleryBroker("redis://"), - gocelery.NewRedisCeleryBackend("redis://"), + gocelery.NewRedisBroker(redisPool), + &gocelery.RedisCeleryBackend{Pool: redisPool}, 5, // number of workers ) diff --git a/redis_backend.go b/redis_backend.go index f4b7d6b..d0f5a62 100644 --- a/redis_backend.go +++ b/redis_backend.go @@ -16,7 +16,18 @@ type RedisCeleryBackend struct { *redis.Pool } +// NewRedisBackend creates new RedisCeleryBackend with given redis pool. +// RedisCeleryBackend can be initialized manually as well. +func NewRedisBackend(conn *redis.Pool) *RedisCeleryBackend { + return &RedisCeleryBackend{ + Pool: conn, + } +} + // NewRedisCeleryBackend creates new RedisCeleryBackend +// +// Deprecated: NewRedisCeleryBackend exists for historical compatibility +// and should not be used. Pool should be initialized outside of gocelery package. func NewRedisCeleryBackend(uri string) *RedisCeleryBackend { return &RedisCeleryBackend{ Pool: NewRedisPool(uri), diff --git a/redis_broker.go b/redis_broker.go index d94afd6..05060b4 100644 --- a/redis_broker.go +++ b/redis_broker.go @@ -18,26 +18,18 @@ type RedisCeleryBroker struct { queueName string } -// NewRedisPool creates pool of redis connections from given connection string -func NewRedisPool(uri string) *redis.Pool { - return &redis.Pool{ - MaxIdle: 3, - IdleTimeout: 240 * time.Second, - Dial: func() (redis.Conn, error) { - c, err := redis.DialURL(uri) - if err != nil { - return nil, err - } - return c, err - }, - TestOnBorrow: func(c redis.Conn, t time.Time) error { - _, err := c.Do("PING") - return err - }, +// NewRedisBroker creates new RedisCeleryBroker with given redis connection pool +func NewRedisBroker(conn *redis.Pool) *RedisCeleryBroker { + return &RedisCeleryBroker{ + Pool: conn, + queueName: "celery", } } // NewRedisCeleryBroker creates new RedisCeleryBroker based on given uri +// +// Deprecated: NewRedisCeleryBroker exists for historical compatibility +// and should not be used. Use NewRedisBroker instead to create new RedisCeleryBroker. func NewRedisCeleryBroker(uri string) *RedisCeleryBroker { return &RedisCeleryBroker{ Pool: NewRedisPool(uri), @@ -90,3 +82,25 @@ func (cb *RedisCeleryBroker) GetTaskMessage() (*TaskMessage, error) { } return celeryMessage.GetTaskMessage(), nil } + +// NewRedisPool creates pool of redis connections from given connection string +// +// Deprecated: newRedisPool exists for historical compatibility +// and should not be used. Pool should be initialized outside of gocelery package. +func NewRedisPool(uri string) *redis.Pool { + return &redis.Pool{ + MaxIdle: 3, + IdleTimeout: 240 * time.Second, + Dial: func() (redis.Conn, error) { + c, err := redis.DialURL(uri) + if err != nil { + return nil, err + } + return c, err + }, + TestOnBorrow: func(c redis.Conn, t time.Time) error { + _, err := c.Do("PING") + return err + }, + } +} From c19053a0f0c95ae1dad15197bf10b8c257ac7d48 Mon Sep 17 00:00:00 2001 From: Sick Yoon Date: Sun, 6 Oct 2019 21:55:09 -0400 Subject: [PATCH 2/2] additional tests to cover broker/backend creation method that takes redis connection --- backend_test.go | 12 ++++ broker_test.go | 12 ++++ gocelery_test.go | 170 +++++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 190 insertions(+), 4 deletions(-) diff --git a/backend_test.go b/backend_test.go index 5a477f0..90b5a61 100644 --- a/backend_test.go +++ b/backend_test.go @@ -24,6 +24,10 @@ func TestBackendRedisGetResult(t *testing.T) { name: "get result from redis backend", backend: redisBackend, }, + { + name: "get result from redis backend with connection", + backend: redisBackendWithConn, + }, } for _, tc := range testCases { taskID := uuid.Must(uuid.NewV4()).String() @@ -67,6 +71,10 @@ func TestBackendRedisSetResult(t *testing.T) { name: "set result to redis backend", backend: redisBackend, }, + { + name: "set result to redis backend with connection", + backend: redisBackendWithConn, + }, } for _, tc := range testCases { taskID := uuid.Must(uuid.NewV4()).String() @@ -115,6 +123,10 @@ func TestBackendSetGetResult(t *testing.T) { name: "set/get result to redis backend", backend: redisBackend, }, + { + name: "set/get result to redis backend with connection", + backend: redisBackendWithConn, + }, { name: "set/get result to amqp backend", backend: amqpBackend, diff --git a/broker_test.go b/broker_test.go index 49133ed..3790a97 100644 --- a/broker_test.go +++ b/broker_test.go @@ -33,6 +33,10 @@ func TestBrokerRedisSend(t *testing.T) { name: "send task to redis broker", broker: redisBroker, }, + { + name: "send task to redis broker with connection", + broker: redisBrokerWithConn, + }, } for _, tc := range testCases { celeryMessage, err := makeCeleryMessage() @@ -83,6 +87,10 @@ func TestBrokerRedisGet(t *testing.T) { name: "get task from redis broker", broker: redisBroker, }, + { + name: "get task from redis broker with connection", + broker: redisBrokerWithConn, + }, } for _, tc := range testCases { celeryMessage, err := makeCeleryMessage() @@ -127,6 +135,10 @@ func TestBrokerSendGet(t *testing.T) { name: "send/get task for redis broker", broker: redisBroker, }, + { + name: "send/get task for redis broker with connection", + broker: redisBrokerWithConn, + }, { name: "send/get task for amqp broker", broker: amqpBroker, diff --git a/gocelery_test.go b/gocelery_test.go index de2d4e9..be1ca8c 100644 --- a/gocelery_test.go +++ b/gocelery_test.go @@ -11,16 +11,28 @@ import ( "testing" "time" + "github.com/gomodule/redigo/redis" uuid "github.com/satori/go.uuid" ) const TIMEOUT = 2 * time.Second var ( - redisBroker = NewRedisCeleryBroker("redis://") - redisBackend = NewRedisCeleryBackend("redis://") - amqpBroker = NewAMQPCeleryBroker("amqp://") - amqpBackend = NewAMQPCeleryBackend("amqp://") + redisPool = &redis.Pool{ + Dial: func() (redis.Conn, error) { + c, err := redis.DialURL("redis://") + if err != nil { + return nil, err + } + return c, err + }, + } + redisBroker = NewRedisCeleryBroker("redis://") + redisBrokerWithConn = NewRedisBroker(redisPool) + redisBackend = NewRedisCeleryBackend("redis://") + redisBackendWithConn = NewRedisBackend(redisPool) + amqpBroker = NewAMQPCeleryBroker("amqp://") + amqpBackend = NewAMQPCeleryBackend("amqp://") ) // TestInteger tests successful function execution @@ -46,6 +58,16 @@ func TestInteger(t *testing.T) { inB: 6468, expected: 8953, }, + { + name: "integer addition with redis broker/backend with connection", + broker: redisBrokerWithConn, + backend: redisBackendWithConn, + taskName: uuid.Must(uuid.NewV4()).String(), + taskFunc: addInt, + inA: 2485, + inB: 6468, + expected: 8953, + }, { name: "integer addition with amqp broker/backend", broker: amqpBroker, @@ -104,6 +126,16 @@ func TestIntegerNamedArguments(t *testing.T) { inB: 6468, expected: 8953, }, + { + name: "integer addition (named arguments) with redis broker/backend with connection", + broker: redisBrokerWithConn, + backend: redisBackendWithConn, + taskName: uuid.Must(uuid.NewV4()).String(), + taskFunc: &addIntTask{}, + inA: 2485, + inB: 6468, + expected: 8953, + }, { name: "integer addition (named arguments) with amqp broker/backend", broker: amqpBroker, @@ -168,6 +200,16 @@ func TestString(t *testing.T) { inB: "world", expected: "helloworld", }, + { + name: "string addition with redis broker/backend with connection", + broker: redisBrokerWithConn, + backend: redisBackendWithConn, + taskName: uuid.Must(uuid.NewV4()).String(), + taskFunc: addStr, + inA: "hello", + inB: "world", + expected: "helloworld", + }, { name: "string addition with amqp broker/backend", broker: amqpBroker, @@ -225,6 +267,16 @@ func TestStringNamedArguments(t *testing.T) { inB: "world", expected: "helloworld", }, + { + name: "string addition (named arguments) with redis broker/backend with connection", + broker: redisBrokerWithConn, + backend: redisBackendWithConn, + taskName: uuid.Must(uuid.NewV4()).String(), + taskFunc: &addStrTask{}, + inA: "hello", + inB: "world", + expected: "helloworld", + }, { name: "string addition (named arguments) with amqp broker/backend", broker: amqpBroker, @@ -288,6 +340,16 @@ func TestStringInteger(t *testing.T) { inB: 5, expected: "hello5", }, + { + name: "integer and string concatenation with redis broker/backend with connection", + broker: redisBrokerWithConn, + backend: redisBackendWithConn, + taskName: uuid.Must(uuid.NewV4()).String(), + taskFunc: addStrInt, + inA: "hello", + inB: 5, + expected: "hello5", + }, { name: "integer and string concatenation with amqp broker/backend", broker: amqpBroker, @@ -345,6 +407,16 @@ func TestStringIntegerNamedArguments(t *testing.T) { inB: 5, expected: "hello5", }, + { + name: "integer and string concatenation (named arguments) with redis broker/backend with connection", + broker: redisBrokerWithConn, + backend: redisBackendWithConn, + taskName: uuid.Must(uuid.NewV4()).String(), + taskFunc: &addStrIntTask{}, + inA: "hello", + inB: 5, + expected: "hello5", + }, { name: "integer and string concatenation (named arguments) with amqp broker/backend", broker: amqpBroker, @@ -408,6 +480,16 @@ func TestFloat(t *testing.T) { inB: 5.3688, expected: 8.8268, }, + { + name: "float addition with redis broker/backend with connection", + broker: redisBrokerWithConn, + backend: redisBackendWithConn, + taskName: uuid.Must(uuid.NewV4()).String(), + taskFunc: addFloat, + inA: 3.4580, + inB: 5.3688, + expected: 8.8268, + }, { name: "float addition with amqp broker/backend", broker: amqpBroker, @@ -465,6 +547,16 @@ func TestFloatNamedArguments(t *testing.T) { inB: 5.3688, expected: 8.8268, }, + { + name: "float addition with redis broker/backend with connection", + broker: redisBrokerWithConn, + backend: redisBackendWithConn, + taskName: uuid.Must(uuid.NewV4()).String(), + taskFunc: &addFloatTask{}, + inA: 3.4580, + inB: 5.3688, + expected: 8.8268, + }, { name: "float addition with amqp broker/backend", broker: amqpBroker, @@ -531,6 +623,16 @@ func TestFloat32(t *testing.T) { inB: 5.3688, expected: float32(8.8268), }, + { + name: "float32 addition with redis broker/backend with connection", + broker: redisBrokerWithConn, + backend: redisBackendWithConn, + taskName: uuid.Must(uuid.NewV4()).String(), + taskFunc: addFloat32, + inA: 3.4580, + inB: 5.3688, + expected: float32(8.8268), + }, { name: "float32 addition with amqp broker/backend", broker: amqpBroker, @@ -591,6 +693,16 @@ func TestFloat32NamedArguments(t *testing.T) { inB: 5.3688, expected: float32(8.8268), }, + { + name: "float32 addition with redis broker/backend with connection", + broker: redisBrokerWithConn, + backend: redisBackendWithConn, + taskName: uuid.Must(uuid.NewV4()).String(), + taskFunc: &addFloat32Task{}, + inA: 3.4580, + inB: 5.3688, + expected: float32(8.8268), + }, { name: "float32 addition with amqp broker/backend", broker: amqpBroker, @@ -654,6 +766,16 @@ func TestBool(t *testing.T) { inB: false, expected: false, }, + { + name: "boolean and operation with redis broker/backend with connection", + broker: redisBrokerWithConn, + backend: redisBackendWithConn, + taskName: uuid.Must(uuid.NewV4()).String(), + taskFunc: andBool, + inA: true, + inB: false, + expected: false, + }, { name: "boolean and operation with amqp broker/backend", broker: amqpBroker, @@ -711,6 +833,16 @@ func TestBoolNamedArguments(t *testing.T) { inB: false, expected: false, }, + { + name: "boolean and operation (named arguments) with redis broker/backend with connection", + broker: redisBrokerWithConn, + backend: redisBackendWithConn, + taskName: uuid.Must(uuid.NewV4()).String(), + taskFunc: &andBoolTask{}, + inA: true, + inB: false, + expected: false, + }, { name: "boolean and operation (named arguments) with amqp broker/backend", broker: amqpBroker, @@ -775,6 +907,16 @@ func TestArrayIntNamedArguments(t *testing.T) { inB: []string{"e", "f", "g", "h"}, expected: 4, }, + { + name: "maximum array length (named arguments) with redis broker/backend with connection", + broker: redisBrokerWithConn, + backend: redisBackendWithConn, + taskName: uuid.Must(uuid.NewV4()).String(), + taskFunc: &maxArrLenTask{}, + inA: []string{"a", "b", "c", "d"}, + inB: []string{"e", "f", "g", "h"}, + expected: 4, + }, { name: "maximum array length (named arguments) with amqp broker/backend", broker: amqpBroker, @@ -839,6 +981,16 @@ func TestArray(t *testing.T) { inB: []string{"e", "f", "g", "h"}, expected: []string{"a", "b", "c", "d", "e", "f", "g", "h"}, }, + { + name: "array addition with redis broker/backend with connection", + broker: redisBrokerWithConn, + backend: redisBackendWithConn, + taskName: uuid.Must(uuid.NewV4()).String(), + taskFunc: addArr, + inA: []string{"a", "b", "c", "d"}, + inB: []string{"e", "f", "g", "h"}, + expected: []string{"a", "b", "c", "d", "e", "f", "g", "h"}, + }, { name: "array addition with amqp broker/backend", broker: amqpBroker, @@ -897,6 +1049,16 @@ func TestMap(t *testing.T) { inB: map[string]string{"b": "b"}, expected: map[string]string{"a": "a", "b": "b"}, }, + { + name: "integer addition with redis broker/backend with connection", + broker: redisBrokerWithConn, + backend: redisBackendWithConn, + taskName: uuid.Must(uuid.NewV4()).String(), + taskFunc: addMap, + inA: map[string]string{"a": "a"}, + inB: map[string]string{"b": "b"}, + expected: map[string]string{"a": "a", "b": "b"}, + }, { name: "integer addition with amqp broker/backend", broker: amqpBroker,