forked from vmihailenco/taskq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
options.go
126 lines (102 loc) · 2.79 KB
/
options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package msgqueue
import (
"runtime"
"time"
"github.com/go-redis/rate"
"github.com/go-redis/redis"
timerate "golang.org/x/time/rate"
)
type Redis interface {
Del(keys ...string) *redis.IntCmd
SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd
SAdd(key string, members ...interface{}) *redis.IntCmd
SMembers(key string) *redis.StringSliceCmd
Pipelined(func(pipe *redis.Pipeline) error) ([]redis.Cmder, error)
Eval(script string, keys []string, args ...interface{}) *redis.Cmd
Publish(channel, message string) *redis.IntCmd
}
type Storage interface {
Exists(key string) bool
}
type redisStorage struct {
Redis
}
var _ Storage = (*redisStorage)(nil)
func (s redisStorage) Exists(key string) bool {
return !s.SetNX(key, "", 24*time.Hour).Val()
}
type RateLimiter interface {
AllowRate(name string, limit timerate.Limit) (delay time.Duration, allow bool)
}
type Options struct {
// Queue name.
Name string
// Queue group name.
GroupName string
// Function called to process a message.
Handler interface{}
// Function called to process failed message.
FallbackHandler interface{}
// Number of goroutines processing messages.
WorkerNumber int
// Global max number of workers which overrides WorkerNumber.
MaxWorkers int
// Size of the buffer where reserved messages are stored.
BufferSize int
// Time after which the reserved message is returned to the queue.
ReservationTimeout time.Duration
// Number of tries/releases after which the message fails permanently
// and is deleted.
RetryLimit int
// Minimum time between retries.
MinBackoff time.Duration
// Processing rate limit.
RateLimit timerate.Limit
// Redis client that is used for storing metadata.
Redis Redis
// Optional storage interface. The default is to use Redis.
Storage Storage
// Optional rate limiter interface. The default is to use Redis.
RateLimiter RateLimiter
inited bool
}
func (opt *Options) Init() {
if opt.inited {
return
}
opt.inited = true
if opt.GroupName == "" {
opt.GroupName = opt.Name
}
if opt.MaxWorkers > 0 {
opt.WorkerNumber = opt.MaxWorkers
}
if opt.WorkerNumber == 0 {
opt.WorkerNumber = 10 * runtime.NumCPU()
}
if opt.BufferSize == 0 {
opt.BufferSize = opt.WorkerNumber
if opt.BufferSize > 10 {
opt.BufferSize = 10
}
}
if opt.RateLimit == 0 {
opt.RateLimit = timerate.Inf
}
if opt.ReservationTimeout == 0 {
opt.ReservationTimeout = 300 * time.Second
}
if opt.RetryLimit == 0 {
opt.RetryLimit = 10
}
if opt.MinBackoff == 0 {
opt.MinBackoff = 3 * time.Second
}
if opt.Storage == nil {
opt.Storage = redisStorage{opt.Redis}
}
if opt.RateLimit != timerate.Inf && opt.RateLimiter == nil && opt.Redis != nil {
fallbackLimiter := timerate.NewLimiter(opt.RateLimit, 1)
opt.RateLimiter = rate.NewLimiter(opt.Redis, fallbackLimiter)
}
}