forked from pebbe/zmq4
-
Notifications
You must be signed in to change notification settings - Fork 0
/
clone.go
304 lines (258 loc) · 8.23 KB
/
clone.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
// Clone client API stack (multithreaded).
package clone
import (
zmq "github.com/pebbe/zmq4"
"github.com/pebbe/zmq4/examples/kvmsg"
"fmt"
"strconv"
"time"
)
// =====================================================================
// Synchronous part, works in our application thread
// ---------------------------------------------------------------------
// Structure of our class
var (
pipe_nmb uint64
)
type Clone struct {
pipe *zmq.Socket // Pipe through to clone agent
}
// This is the thread that handles our real clone class
// Constructor for the clone class. Note that we create
// the pipe that connects our frontend to the
// backend agent:
func New() (clone *Clone) {
clone = &Clone{}
clone.pipe, _ = zmq.NewSocket(zmq.PAIR)
pipename := fmt.Sprint("inproc://pipe", pipe_nmb)
pipe_nmb++
clone.pipe.Bind(pipename)
go clone_agent(pipename)
return
}
// Specify subtree for snapshot and updates, which we must do before
// connecting to a server since the subtree specification is sent as
// first command to the server. Sends a [SUBTREE][subtree] command to
// the agent:
func (clone *Clone) Subtree(subtree string) {
clone.pipe.SendMessage("SUBTREE", subtree)
}
// Connect to a new server endpoint. We can connect to at most two
// servers. Sends [CONNECT][endpoint][service] to the agent:
func (clone *Clone) Connect(address, service string) {
clone.pipe.SendMessage("CONNECT", address, service)
}
// Set a new value in the shared hashmap. Sends a [SET][key][value][ttl]
// command through to the agent which does the actual work:
func (clone *Clone) Set(key, value string, ttl int) {
clone.pipe.SendMessage("SET", key, value, ttl)
}
// Look-up value in distributed hash table. Sends [GET][key] to the agent and
// waits for a value response. If there is no value available, will eventually
// return error:
func (clone *Clone) Get(key string) (value string, err error) {
clone.pipe.SendMessage("GET", key)
reply, e := clone.pipe.RecvMessage(0)
if e != nil {
err = e
return
}
value = reply[0]
return
}
// The back-end agent manages a set of servers, which we implement using
// our simple class model:
type server_t struct {
address string // Server address
port int // Server port
snapshot *zmq.Socket // Snapshot socket
subscriber *zmq.Socket // Incoming updates
expiry time.Time // When server expires
requests int64 // How many snapshot requests made?
}
func server_new(address string, port int, subtree string) (server *server_t) {
server = &server_t{}
fmt.Printf("I: adding server %s:%d...\n", address, port)
server.address = address
server.port = port
server.snapshot, _ = zmq.NewSocket(zmq.DEALER)
server.snapshot.Connect(fmt.Sprintf("%s:%d", address, port))
server.subscriber, _ = zmq.NewSocket(zmq.SUB)
server.subscriber.Connect(fmt.Sprintf("%s:%d", address, port+1))
server.subscriber.SetSubscribe(subtree)
return
}
// Here is the implementation of the back-end agent itself:
const (
// Number of servers we will talk to
server_MAX = 2
// Server considered dead if silent for this long
server_TTL = 5000 * time.Millisecond
)
const (
// States we can be in
state_INITIAL = iota // Before asking server for state
state_SYNCING // Getting state from server
state_ACTIVE // Getting new updates from server
)
type agent_t struct {
pipe *zmq.Socket // Pipe back to application
kvmap map[string]*kvmsg.Kvmsg // Actual key/value table
subtree string // Subtree specification, if any
server [server_MAX]*server_t
nbr_servers int // 0 to SERVER_MAX
state int // Current state
cur_server int // If active, server 0 or 1
sequence int64 // Last kvmsg processed
publisher *zmq.Socket // Outgoing updates
}
func agent_new(pipe *zmq.Socket) (agent *agent_t) {
agent = &agent_t{}
agent.pipe = pipe
agent.kvmap = make(map[string]*kvmsg.Kvmsg)
agent.subtree = ""
agent.state = state_INITIAL
agent.publisher, _ = zmq.NewSocket(zmq.PUB)
return
}
// Here we handle the different control messages from the front-end;
// SUBTREE, CONNECT, SET, and GET:
func (agent *agent_t) control_message() (err error) {
msg, e := agent.pipe.RecvMessage(0)
if e != nil {
return e
}
command := msg[0]
msg = msg[1:]
switch command {
case "SUBTREE":
agent.subtree = msg[0]
case "CONNECT":
address := msg[0]
service := msg[1]
if agent.nbr_servers < server_MAX {
serv, _ := strconv.Atoi(service)
agent.server[agent.nbr_servers] = server_new(address, serv, agent.subtree)
agent.nbr_servers++
// We broadcast updates to all known servers
agent.publisher.Connect(fmt.Sprintf("%s:%d", address, serv+2))
} else {
fmt.Printf("E: too many servers (max. %d)\n", server_MAX)
}
case "SET":
// When we set a property, we push the new key-value pair onto
// all our connected servers:
key := msg[0]
value := msg[1]
ttl := msg[2]
// Send key-value pair on to server
kvmsg := kvmsg.NewKvmsg(0)
kvmsg.SetKey(key)
kvmsg.SetUuid()
kvmsg.SetBody(value)
kvmsg.SetProp("ttl", ttl)
kvmsg.Store(agent.kvmap)
kvmsg.Send(agent.publisher)
case "GET":
key := msg[0]
value := ""
if kvmsg, ok := agent.kvmap[key]; ok {
value, _ = kvmsg.GetBody()
}
agent.pipe.SendMessage(value)
}
return
}
// The asynchronous agent manages a server pool and handles the
// request/reply dialog when the application asks for it:
func clone_agent(pipename string) {
pipe, _ := zmq.NewSocket(zmq.PAIR)
pipe.Connect(pipename)
agent := agent_new(pipe)
LOOP:
for {
poller := zmq.NewPoller()
poller.Add(pipe, zmq.POLLIN)
server := agent.server[agent.cur_server]
switch agent.state {
case state_INITIAL:
// In this state we ask the server for a snapshot,
// if we have a server to talk to...
if agent.nbr_servers > 0 {
fmt.Printf("I: waiting for server at %s:%d...\n", server.address, server.port)
if server.requests < 2 {
server.snapshot.SendMessage("ICANHAZ?", agent.subtree)
server.requests++
}
server.expiry = time.Now().Add(server_TTL)
agent.state = state_SYNCING
poller.Add(server.snapshot, zmq.POLLIN)
}
case state_SYNCING:
// In this state we read from snapshot and we expect
// the server to respond, else we fail over.
poller.Add(server.snapshot, zmq.POLLIN)
case state_ACTIVE:
// In this state we read from subscriber and we expect
// the server to give hugz, else we fail over.
poller.Add(server.subscriber, zmq.POLLIN)
break
}
poll_timer := time.Duration(-1)
if server != nil {
poll_timer = server.expiry.Sub(time.Now())
if poll_timer < 0 {
poll_timer = 0
}
}
// We're ready to process incoming messages; if nothing at all
// comes from our server within the timeout, that means the
// server is dead:
polled, err := poller.Poll(poll_timer)
if err != nil {
break
}
if len(polled) > 0 {
for _, item := range polled {
switch socket := item.Socket; socket {
case pipe:
err = agent.control_message()
if err != nil {
break LOOP
}
default:
kvmsg, e := kvmsg.RecvKvmsg(socket)
if e != nil {
err = e
break LOOP
}
// Anything from server resets its expiry time
server.expiry = time.Now().Add(server_TTL)
if agent.state == state_SYNCING {
// Store in snapshot until we're finished
server.requests = 0
if key, _ := kvmsg.GetKey(); key == "KTHXBAI" {
agent.sequence, _ = kvmsg.GetSequence()
agent.state = state_ACTIVE
fmt.Printf("I: received from %s:%d snapshot=%d\n", server.address, server.port, agent.sequence)
} else {
kvmsg.Store(agent.kvmap)
}
} else if agent.state == state_ACTIVE {
// Discard out-of-sequence updates, incl. hugz
if seq, _ := kvmsg.GetSequence(); seq > agent.sequence {
agent.sequence = seq
kvmsg.Store(agent.kvmap)
fmt.Printf("I: received from %s:%d update=%d\n", server.address, server.port, agent.sequence)
}
}
}
}
} else {
// Server has died, failover to next
fmt.Printf("I: server at %s:%d didn't give hugz\n", server.address, server.port)
agent.cur_server = (agent.cur_server + 1) % agent.nbr_servers
agent.state = state_INITIAL
}
}
}