forked from marselester/gopher-celery
/
main.go
91 lines (83 loc) · 2.44 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
// Program redis shows how to pass a Redis connection pool to the broker.
package main
import (
"context"
"fmt"
"os"
"os/signal"
"time"
celery "github.com/dryarullin/gopher-celery"
celeryredis "github.com/dryarullin/gopher-celery/redis"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gomodule/redigo/redis"
)
func main() {
logger := log.NewJSONLogger(log.NewSyncWriter(os.Stderr))
pool := redis.Pool{
Dial: func() (redis.Conn, error) {
c, err := redis.DialURL(
"redis://localhost",
redis.DialConnectTimeout(5*time.Second),
// The Conn.Do method sets a write deadline,
// writes the command arguments to the network connection,
// sets the read deadline and reads the response from the network connection
// https://github.com/gomodule/redigo/issues/320.
//
// Note, the read timeout should be big enough for BRPOP to finish
// or else the broker returns i/o timeout error.
redis.DialWriteTimeout(5*time.Second),
redis.DialReadTimeout(10*time.Second),
)
if err != nil {
level.Error(logger).Log("msg", "Redis dial failed", "err", err)
}
return c, err
},
// Check the health of an idle connection before using it.
// It PINGs connections that have been idle more than a minute.
TestOnBorrow: func(c redis.Conn, t time.Time) error {
if time.Since(t) < time.Minute {
return nil
}
_, err := c.Do("PING")
return err
},
// Maximum number of idle connections in the pool.
MaxIdle: 3,
// Close connections after remaining idle for given duration.
IdleTimeout: 5 * time.Minute,
}
defer func() {
if err := pool.Close(); err != nil {
level.Error(logger).Log("msg", "failed to close Redis connection pool", "err", err)
}
}()
c := pool.Get()
if _, err := c.Do("PING"); err != nil {
level.Error(logger).Log("msg", "Redis connection failed", "err", err)
return
}
c.Close()
broker := celeryredis.NewBroker(
celeryredis.WithPool(&pool),
)
app := celery.NewApp(
celery.WithBroker(broker),
celery.WithLogger(logger),
)
app.Register(
"myproject.mytask",
"important",
func(ctx context.Context, p *celery.TaskParam) error {
p.NameArgs("a", "b")
fmt.Printf("received a=%s b=%s\n", p.MustString("a"), p.MustString("b"))
return nil
},
)
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
defer stop()
logger.Log("msg", "waiting for tasks...")
err := app.Run(ctx)
logger.Log("msg", "program stopped", "err", err)
}