-
Notifications
You must be signed in to change notification settings - Fork 1
/
serve.go
178 lines (154 loc) · 4.3 KB
/
serve.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
package workers
import (
"fmt"
"time"
"github.com/Odinman/omq/utils"
zmq "github.com/pebbe/zmq4"
)
type Node struct {
identity string // Identity of worker
id_string string // Printable identity
expire time.Time // Expires at this time
}
/* {{{ func newNode(identity string) node
* Construct new worker
*/
func newNode(identity string) Node {
return Node{
identity: identity,
id_string: fmt.Sprintf("%q", identity),
expire: time.Now().Add(HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS),
}
}
/* }}} */
/* {{{ func nodeReady(self Node, nodes []Node) []Node
* 把一个ready的节点放到队列最后
*/
func nodeReady(self Node, nodes []Node) []Node {
for i, worker := range nodes {
if self.identity == worker.identity {
if i == 0 { //第一个
nodes = nodes[1:]
} else if i == len(nodes)-1 { //最后一个
nodes = nodes[:i]
} else { //中间
nodes = append(nodes[:i], nodes[i+1:]...)
}
break
}
}
return append(nodes, self)
}
/* }}} */
/* {{{ func purgeNodes(nodes []Node) []Node
* The purge method looks for and kills expired nodes. We hold nodes
* from oldest to most recent, so we stop at the first alive worker:
*/
func purgeNodes(nodes []Node) []Node {
now := time.Now()
for i, worker := range nodes {
if now.Before(worker.expire) {
return nodes[i:] // Worker is alive, we're done here
}
}
return nodes[0:0]
}
/* }}} */
/* {{{ func (w *OmqWorker) serve() {
*
*/
func (w *OmqWorker) serve() {
// 接受请求的socket
frontend, _ := zmq.NewSocket(zmq.ROUTER)
defer frontend.Close()
frontend.Bind(fmt.Sprint("tcp://*:", basePort))
w.Debug("frontend bind port: %v", basePort)
// backend
backend, _ := zmq.NewSocket(zmq.ROUTER)
defer backend.Close()
backend.Bind("inproc://backend")
// 可用节点列表,LRU算法,最空的节点保持在队列最前
nodes := make([]Node, 0)
// 心跳
heartbeat_at := time.Tick(HEARTBEAT_INTERVAL)
poller1 := zmq.NewPoller()
poller1.Add(backend, zmq.POLLIN)
poller2 := zmq.NewPoller()
poller2.Add(backend, zmq.POLLIN)
poller2.Add(frontend, zmq.POLLIN)
// spawn responser
w.Info("create %d responsers", responseNodes)
for i := 1; i <= responseNodes; i++ {
go w.newResponser(i)
}
// loop
for {
// Poll frontend only if we have available nodes
var sockets []zmq.Polled
var err error
if nl := len(nodes); nl > 0 {
//w.Info("nodes len: %d", nl)
sockets, err = poller2.Poll(HEARTBEAT_INTERVAL)
} else {
w.Info("nodes empty")
sockets, err = poller1.Poll(HEARTBEAT_INTERVAL)
}
if err != nil {
w.Critical("big wrong: %s", err)
break // Interrupted
}
for _, socket := range sockets {
switch socket.Socket {
case backend:
// Handle worker activity on backend
// Use worker identity for load-balancing
msg, err := backend.RecvMessage(0)
if err != nil {
w.Error("backend wrong: %s", err)
break // Interrupted
}
// Any sign of life from worker means it's ready
identity, msg := utils.Unwrap(msg)
nodes = nodeReady(newNode(identity), nodes)
// Validate control message, or return reply to client
if len(msg) == 1 {
// 控制信息, ready or heartbeat
if msg[0] != PPP_READY && msg[0] != PPP_HEARTBEAT {
w.Info("Error: invalid message from worker, %s", msg)
}
} else {
// 任务处理完毕的回复(带信封), 直接返回前台
w.Trace("backend recv: %q", msg)
frontend.SendMessage(msg)
}
case frontend:
// Now get next client request, route to next worker
msg, err := frontend.RecvMessage(0)
if err != nil {
w.Error("frontend wrong: %s", err)
break // Interrupted
}
w.Trace("frontend recv: %q", msg)
//定向发送到后台(带信封), 将来可以有多组后台, 分别处理不同的任务
backend.SendMessage(nodes[0].identity, msg)
w.Trace("send to backend: %q", nodes[0].identity)
nodes = nodes[1:]
}
}
// We handle heartbeating after any socket activity. First we send
// heartbeats to any idle nodes if it's time. Then we purge any
// dead nodes:
select {
case <-heartbeat_at:
//向节点发送心跳
for _, worker := range nodes {
backend.SendMessage(worker.identity, PPP_HEARTBEAT)
}
//向订阅者发送心跳
publisher.SendMessage(PPP_HEARTBEAT)
default:
}
nodes = purgeNodes(nodes)
}
}
/* }}} */