-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
queue.go
94 lines (79 loc) · 2.21 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package worker
import (
"runtime"
"time"
"github.com/hibiken/asynq"
)
type (
// QueueServer is a wrapper for asynq.Server.
QueueServer struct {
*asynq.Server
}
// QueueServerOption is a function that configures a QueueServer.
QueueServerOption func(*asynq.Config)
// taskHandler is an interface for task handlers.
taskHandler interface {
Register(*asynq.ServeMux)
}
)
// NewQueueServer creates a new queue client and returns the server.
func NewQueueServer(redisConnOpt asynq.RedisConnOpt, log asynq.Logger, opts ...QueueServerOption) *QueueServer {
// Get the number of available CPUs.
useProcs := runtime.GOMAXPROCS(0)
if useProcs == 0 {
useProcs = 1
} else if useProcs > 1 {
useProcs = useProcs / 2
}
// Default queue options
var (
workerConcurrency = useProcs // use half of the available CPUs
workerShutdownTimeout = time.Second * 10
workerLogLevel = "info"
queueName = "default"
)
cnf := asynq.Config{
Concurrency: workerConcurrency,
Logger: log,
LogLevel: getAsynqLogLevel(workerLogLevel),
ShutdownTimeout: workerShutdownTimeout,
Queues: map[string]int{
queueName: workerConcurrency,
},
}
// Apply options
for _, opt := range opts {
opt(&cnf)
}
return &QueueServer{Server: asynq.NewServer(redisConnOpt, cnf)}
}
// Run creates a new queue client, registers task handlers and runs the server.
// It returns a function that can be used to run server in a error group.
// E.g.:
//
// eg, ctx := errgroup.WithContext(context.Background())
// eg.Go(queueServer.Run(
// NewTaskHandler1(),
// NewTaskHandler2(),
// ))
func (srv *QueueServer) Run(handlers ...taskHandler) func() error {
return func() error {
// Run server
return srv.Server.Run(registerQueueHandlers(handlers...))
}
}
// registerQueueHandlers registers handlers for each task type.
func registerQueueHandlers(handlers ...taskHandler) *asynq.ServeMux {
mux := asynq.NewServeMux()
// Register handlers
for _, h := range handlers {
h.Register(mux)
}
return mux
}
// Shutdown gracefully shuts down the queue server by waiting for all
// in-flight tasks to finish processing before shutdown.
func (srv *QueueServer) Shutdown() {
srv.Server.Stop()
srv.Server.Shutdown()
}