-
Notifications
You must be signed in to change notification settings - Fork 13
/
dispatcher-service.go
124 lines (110 loc) · 2.86 KB
/
dispatcher-service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package process_master
import (
"context"
"encoding/json"
"github.com/eolinker/eosc"
"github.com/eolinker/eosc/log"
"github.com/eolinker/eosc/process-master/extender"
"sync"
"github.com/eolinker/eosc/common/dispatcher"
"github.com/eolinker/eosc/service"
)
type DispatcherServer struct {
service.UnimplementedMasterDispatcherServer
datacenter dispatcher.IDispatchCenter
ctxManager *CtxManager
currentStatus bool
}
func (d *DispatcherServer) Update(es []*extender.Status, success bool) {
if !success && d.currentStatus {
d.currentStatus = success
d.ctxManager.Stop("")
}
}
func NewDispatcherServer() *DispatcherServer {
return &DispatcherServer{datacenter: dispatcher.NewDataDispatchCenter(), ctxManager: NewCtxManager()}
}
type CtxWidthCancel struct {
ctx context.Context
cancelFunc context.CancelFunc
}
type CtxManager struct {
root context.Context
rootCancelFunc context.CancelFunc
lock sync.Mutex
cancelHandlers map[string]*CtxWidthCancel
}
func NewCtxManager() *CtxManager {
ctx, cancel := context.WithCancel(context.Background())
return &CtxManager{root: ctx, rootCancelFunc: cancel, lock: sync.Mutex{}, cancelHandlers: make(map[string]*CtxWidthCancel)}
}
func (c *CtxManager) Close() error {
c.lock.Lock()
c.cancelHandlers = make(map[string]*CtxWidthCancel)
cancel := c.rootCancelFunc
c.root, c.rootCancelFunc = context.WithCancel(context.Background())
c.lock.Unlock()
cancel()
return nil
}
func (c *CtxManager) Get(name string) context.Context {
c.lock.Lock()
cl, has := c.cancelHandlers[name]
if !has {
ctx, cancelFunc := context.WithCancel(c.root)
cl = &CtxWidthCancel{
ctx: ctx,
cancelFunc: cancelFunc,
}
c.cancelHandlers[name] = cl
}
c.lock.Unlock()
return cl.ctx
}
func (c *CtxManager) Stop(namespace string) {
c.lock.Lock()
cl, has := c.cancelHandlers[namespace]
if has {
delete(c.cancelHandlers, namespace)
cl.cancelFunc()
}
c.lock.Unlock()
}
func (d *DispatcherServer) Listen(request *service.EmptyRequest, server service.MasterDispatcher_ListenServer) error {
ctx := d.ctxManager.Get("")
log.Debug("worker listen start")
listener := d.datacenter.Listener()
defer listener.Leave()
for {
select {
case e, ok := <-listener.Event():
if !ok {
return nil
}
event := &service.Event{
Namespace: e.Namespace(),
Command: e.Event(),
Key: e.Key(),
Data: e.Data(),
}
if e.Event() == eosc.EventReset || e.Event() == eosc.EventInit {
event.Data, _ = json.Marshal(e.All())
} else {
event.Data = e.Data()
}
err := server.Send(event)
log.Debug("send listen to worker:", event.String())
if err != nil {
return err
}
case <-ctx.Done():
return nil
}
}
}
func (d *DispatcherServer) Dispatch(event dispatcher.IEvent) {
d.datacenter.Send(event)
}
func (d *DispatcherServer) Close() error {
return d.ctxManager.Close()
}