Skip to content

Commit

Permalink
Merge pull request #128 from gocelery/custom-redis-conn
Browse files Browse the repository at this point in the history
support custom redis connection pool
  • Loading branch information
yoonsio committed Oct 7, 2019
2 parents db4fe93 + c19053a commit 454bc28
Show file tree
Hide file tree
Showing 11 changed files with 366 additions and 59 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -27,3 +27,4 @@ _testmain.go

coverage.out
vendor
.vscode/
16 changes: 14 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 29 additions & 3 deletions Gopkg.toml
@@ -1,12 +1,38 @@
# Gopkg.toml example
#
# Refer to https://golang.github.io/dep/docs/Gopkg.toml.html
# for detailed Gopkg.toml documentation.
#
# required = ["github.com/user/thing/cmd/thing"]
# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"]
#
# [[constraint]]
# name = "github.com/user/project"
# version = "1.0.0"
#
# [[constraint]]
# name = "github.com/user/project2"
# branch = "dev"
# source = "github.com/myfork/project2"
#
# [[override]]
# name = "github.com/x/y"
# version = "2.4.0"
#
# [prune]
# non-go = false
# go-tests = true
# unused-packages = true

[[constraint]]
branch = "master"
name = "github.com/satori/go.uuid"

[[constraint]]
name = "github.com/gomodule/redigo"
version = "2.0.0"

[[constraint]]
name = "github.com/satori/go.uuid"
branch = "master"

[[constraint]]
branch = "master"
name = "github.com/streadway/amqp"
Expand Down
78 changes: 49 additions & 29 deletions README.md
Expand Up @@ -56,10 +56,21 @@ Also take a look at `example` directory for sample python code.
Run Celery Worker implemented in Go

```go
// create redis connection pool
redisPool := &redis.Pool{
Dial: func() (redis.Conn, error) {
c, err := redis.DialURL("redis://")
if err != nil {
return nil, err
}
return c, err
},
}

// initialize celery client
cli, _ := NewCeleryClient(
NewRedisCeleryBroker("redis://"),
NewRedisCeleryBackend("redis://"),
cli, _ := gocelery.NewCeleryClient(
gocelery.NewRedisBroker(redisPool),
&gocelery.RedisCeleryBackend{Pool: redisPool},
5, // number of workers
)

Expand Down Expand Up @@ -128,33 +139,42 @@ celery -A worker worker --loglevel=debug --without-heartbeat --without-mingle
Submit Task from Go Client

```go
func main() {
// initialize celery client
cli, _ := NewCeleryClient(
NewRedisCeleryBroker("redis://"),
NewRedisCeleryBackend("redis://"),
1,
)

// prepare arguments
taskName := "worker.add"
argA := rand.Intn(10)
argB := rand.Intn(10)

// run task
asyncResult, err := cli.Delay(taskName, argA, argB)
if err != nil {
panic(err)
}

// get results from backend with timeout
res, err := asyncResult.Get(10 * time.Second)
if err != nil {
panic(err)
}

log.Printf("result: %+v of type %+v", res, reflect.TypeOf(res))
// create redis connection pool
redisPool := &redis.Pool{
Dial: func() (redis.Conn, error) {
c, err := redis.DialURL("redis://")
if err != nil {
return nil, err
}
return c, err
},
}

// initialize celery client
cli, _ := gocelery.NewCeleryClient(
gocelery.NewRedisBroker(redisPool),
&gocelery.RedisCeleryBackend{Pool: redisPool},
1,
)

// prepare arguments
taskName := "worker.add"
argA := rand.Intn(10)
argB := rand.Intn(10)

// run task
asyncResult, err := cli.Delay(taskName, argA, argB)
if err != nil {
panic(err)
}

// get results from backend with timeout
res, err := asyncResult.Get(10 * time.Second)
if err != nil {
panic(err)
}

log.Printf("result: %+v of type %+v", res, reflect.TypeOf(res))
```

## Sample Celery Task Message
Expand Down
12 changes: 12 additions & 0 deletions backend_test.go
Expand Up @@ -24,6 +24,10 @@ func TestBackendRedisGetResult(t *testing.T) {
name: "get result from redis backend",
backend: redisBackend,
},
{
name: "get result from redis backend with connection",
backend: redisBackendWithConn,
},
}
for _, tc := range testCases {
taskID := uuid.Must(uuid.NewV4()).String()
Expand Down Expand Up @@ -67,6 +71,10 @@ func TestBackendRedisSetResult(t *testing.T) {
name: "set result to redis backend",
backend: redisBackend,
},
{
name: "set result to redis backend with connection",
backend: redisBackendWithConn,
},
}
for _, tc := range testCases {
taskID := uuid.Must(uuid.NewV4()).String()
Expand Down Expand Up @@ -115,6 +123,10 @@ func TestBackendSetGetResult(t *testing.T) {
name: "set/get result to redis backend",
backend: redisBackend,
},
{
name: "set/get result to redis backend with connection",
backend: redisBackendWithConn,
},
{
name: "set/get result to amqp backend",
backend: amqpBackend,
Expand Down
12 changes: 12 additions & 0 deletions broker_test.go
Expand Up @@ -33,6 +33,10 @@ func TestBrokerRedisSend(t *testing.T) {
name: "send task to redis broker",
broker: redisBroker,
},
{
name: "send task to redis broker with connection",
broker: redisBrokerWithConn,
},
}
for _, tc := range testCases {
celeryMessage, err := makeCeleryMessage()
Expand Down Expand Up @@ -83,6 +87,10 @@ func TestBrokerRedisGet(t *testing.T) {
name: "get task from redis broker",
broker: redisBroker,
},
{
name: "get task from redis broker with connection",
broker: redisBrokerWithConn,
},
}
for _, tc := range testCases {
celeryMessage, err := makeCeleryMessage()
Expand Down Expand Up @@ -127,6 +135,10 @@ func TestBrokerSendGet(t *testing.T) {
name: "send/get task for redis broker",
broker: redisBroker,
},
{
name: "send/get task for redis broker with connection",
broker: redisBrokerWithConn,
},
{
name: "send/get task for amqp broker",
broker: amqpBroker,
Expand Down
24 changes: 21 additions & 3 deletions example/goclient/main.go
Expand Up @@ -11,17 +11,35 @@ import (
"time"

"github.com/gocelery/gocelery"
"github.com/gomodule/redigo/redis"
)

// Run Celery Worker First!
// celery -A worker worker --loglevel=debug --without-heartbeat --without-mingle

func main() {

// create redis connection pool
redisPool := &redis.Pool{
MaxIdle: 3, // maximum number of idle connections in the pool
MaxActive: 0, // maximum number of connections allocated by the pool at a given time
IdleTimeout: 240 * time.Second, // close connections after remaining idle for this duration
Dial: func() (redis.Conn, error) {
c, err := redis.DialURL("redis://")
if err != nil {
return nil, err
}
return c, err
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
}

// initialize celery client
cli, _ := gocelery.NewCeleryClient(
gocelery.NewRedisCeleryBroker("redis://"),
gocelery.NewRedisCeleryBackend("redis://"),
gocelery.NewRedisBroker(redisPool),
&gocelery.RedisCeleryBackend{Pool: redisPool},
1,
)

Expand Down
23 changes: 21 additions & 2 deletions example/goworker/main.go
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/gocelery/gocelery"
"github.com/gomodule/redigo/redis"
)

// exampleAddTask is integer addition task
Expand Down Expand Up @@ -47,10 +48,28 @@ func (a *exampleAddTask) RunTask() (interface{}, error) {

func main() {

// create redis connection pool
redisPool := &redis.Pool{
MaxIdle: 3, // maximum number of idle connections in the pool
MaxActive: 0, // maximum number of connections allocated by the pool at a given time
IdleTimeout: 240 * time.Second, // close connections after remaining idle for this duration
Dial: func() (redis.Conn, error) {
c, err := redis.DialURL("redis://")
if err != nil {
return nil, err
}
return c, err
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
}

// initialize celery client
cli, _ := gocelery.NewCeleryClient(
gocelery.NewRedisCeleryBroker("redis://"),
gocelery.NewRedisCeleryBackend("redis://"),
gocelery.NewRedisBroker(redisPool),
&gocelery.RedisCeleryBackend{Pool: redisPool},
5, // number of workers
)

Expand Down

0 comments on commit 454bc28

Please sign in to comment.