This repository has been archived by the owner on Mar 15, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 29
/
workers.go
123 lines (106 loc) · 2.81 KB
/
workers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package workers
import (
"context"
"errors"
"github.com/chapsuk/worker"
"github.com/go-redis/redis"
"github.com/im-kulikov/helium/module"
"github.com/spf13/viper"
"go.uber.org/dig"
"go.uber.org/zap"
)
// Module of workers
var Module = module.Module{
{Constructor: NewWorkersGroup},
{Constructor: NewWorkers},
}
type (
// Result returns wrapped workers group for di
Result struct {
dig.Out
Workers []*worker.Worker
}
// Params is dependencies for create workers slice
Params struct {
dig.In
Config *viper.Viper
Logger *zap.Logger
Redis *redis.Client `optional:"true"`
Jobs map[string]worker.Job
}
options struct {
Viper *viper.Viper
Redis *redis.Client
Logger *zap.SugaredLogger
CfgKey string
Job worker.Job
}
)
// NewWorkersGroup returns workers group with injected workers
func NewWorkersGroup(wrks []*worker.Worker) *worker.Group {
wg := worker.NewGroup()
wg.Add(wrks...)
return wg
}
// NewWorkers returns wrapped workers slice builded by config settings
func NewWorkers(p Params) (Result, error) {
res := Result{}
for name, job := range p.Jobs {
wrk, err := workerByConfig(options{
Viper: p.Config,
Redis: p.Redis,
Logger: p.Logger.Sugar(),
CfgKey: name,
Job: job,
})
if err != nil {
// all or nothing
return Result{}, err
}
res.Workers = append(res.Workers, wrk)
}
return res, nil
}
func workerByConfig(opts options) (*worker.Worker, error) {
key := "workers." + opts.CfgKey
if !opts.Viper.IsSet(key) {
return nil, errors.New("missing worker config key: " + key)
}
if opts.Viper.IsSet(key+".disabled") && opts.Viper.GetBool(key+".disabled") {
return worker.New(func(context.Context) {}), nil
}
w := worker.New(opts.Job)
if opts.Viper.IsSet(key + ".timer") {
w.ByTimer(opts.Viper.GetDuration(key + ".timer"))
}
if opts.Viper.IsSet(key + ".ticker") {
w.ByTicker(opts.Viper.GetDuration(key + ".ticker"))
}
if opts.Viper.IsSet(key + ".cron") {
w.ByCronSpec(opts.Viper.GetString(key + ".cron"))
}
if opts.Viper.IsSet(key + ".lock") {
if opts.Redis == nil {
return nil, errors.New("gotten nil redis client for exclusive worker: " + opts.CfgKey)
}
lockOptions := worker.RedisLockOptions{
RedisCLI: opts.Redis,
LockKey: opts.Viper.GetString(key + ".lock.key"),
LockTTL: opts.Viper.GetDuration(key + ".lock.ttl"),
Logger: opts.Logger.With("worker", opts.CfgKey),
}
if opts.Viper.IsSet(key + ".lock.retry.count") {
w.WithBsmRedisLock(worker.BsmRedisLockOptions{
RedisLockOptions: lockOptions,
RetryCount: opts.Viper.GetInt(key + ".lock.retry.count"),
RetryDelay: opts.Viper.GetDuration(key + ".lock.retry.timeout"),
})
} else {
w.WithRedisLock(lockOptions)
}
}
if opts.Viper.IsSet(key + ".immediately") {
w.SetImmediately(opts.Viper.GetBool(key + ".immediately"))
}
return w, nil
}