-
Notifications
You must be signed in to change notification settings - Fork 46
/
globals.go
85 lines (73 loc) · 2.54 KB
/
globals.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package taskqueueworker
import (
"context"
"reflect"
"time"
"github.com/golangid/candi/candiutils"
"github.com/golangid/candi/codebase/factory"
"github.com/golangid/candi/config/env"
)
var (
// externalWorkerHost setting worker host for add job, if not empty default using http request when add job
externalWorkerHost string
// core engine
engine *taskQueueWorker
)
func initEngine(service factory.ServiceFactory, opts ...OptionFunc) *taskQueueWorker {
var opt option
// set default value
opt.maxClientSubscriber = 5
opt.autoRemoveClientInterval = 30 * time.Minute
opt.dashboardPort = 8080
opt.debugMode = true
if redisPool := service.GetDependency().GetRedisPool(); redisPool != nil {
opt.locker = candiutils.NewRedisLocker(redisPool.WritePool())
} else {
opt.locker = &candiutils.NoopLocker{}
}
opt.secondaryPersistent = &noopPersistent{}
opt.dashboardBanner = ` _________ _ ______ ____
/ ____/ | / | / / __ \/ _/
/ / / /| | / |/ / / / // /
/ /___/ ___ |/ /| / /_/ // /
\____/_/ |_/_/ |_/_____/___/ `
// override option value
for _, optFunc := range opts {
optFunc(&opt)
}
// set default persistent & queue if not defined
if opt.persistent == nil {
if mongoDB := service.GetDependency().GetMongoDatabase(); mongoDB != nil {
opt.persistent = NewMongoPersistent(mongoDB.WriteDB())
} else if sqlDB := service.GetDependency().GetSQLDatabase(); sqlDB != nil {
opt.persistent = NewSQLPersistent(sqlDB.WriteDB())
} else {
opt.persistent = NewNoopPersistent()
}
}
if opt.queue == nil {
if redisPool := service.GetDependency().GetRedisPool(); redisPool != nil {
opt.queue = NewRedisQueue(redisPool.WritePool())
} else {
opt.queue = NewInMemQueue()
}
}
engine = &taskQueueWorker{
service: service,
ready: make(chan struct{}),
shutdown: make(chan struct{}, 1),
refreshWorkerNotif: make(chan struct{}),
opt: &opt,
configuration: initConfiguration(&opt),
registeredTaskWorkerIndex: make(map[string]int),
runningWorkerIndexTask: make(map[int]*Task),
globalSemaphore: make(chan struct{}, env.BaseEnv().MaxGoroutines),
}
engine.subscriber = initSubscriber(engine.configuration, &opt)
engine.ctx, engine.ctxCancelFunc = context.WithCancel(context.Background())
// add refresh worker channel to first index
engine.workerChannels = append(engine.workerChannels, reflect.SelectCase{
Dir: reflect.SelectRecv, Chan: reflect.ValueOf(engine.refreshWorkerNotif),
})
return engine
}