-
Notifications
You must be signed in to change notification settings - Fork 1
/
clonesrv2.go
119 lines (105 loc) · 3.01 KB
/
clonesrv2.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
//
// Clone server Model Two
//
// In the original C example, the client misses updates between snapshot
// and further updates. Sometimes, it even misses the END message of
// the snapshot, so it waits for it forever.
// This Go implementation has some modifications to improve this, but it
// is still not fully reliable.
package main
import (
zmq "github.com/pebbe/zmq3"
"github.com/pebbe/zmq3/examples/kvsimple"
"fmt"
"math/rand"
"time"
)
func main() {
// Prepare our context and sockets
publisher, _ := zmq.NewSocket(zmq.PUB)
publisher.Bind("tcp://*:5557")
sequence := int64(0)
rand.Seed(time.Now().UnixNano())
// Start state manager and wait for synchronization signal
updates, _ := zmq.NewSocket(zmq.PAIR)
updates.Bind("inproc://pipe")
go state_manager()
updates.RecvMessage(0) // "READY"
for {
// Distribute as key-value message
sequence++
kvmsg := kvsimple.NewKvmsg(sequence)
kvmsg.SetKey(fmt.Sprint(rand.Intn(10000)))
kvmsg.SetBody(fmt.Sprint(rand.Intn(1000000)))
if kvmsg.Send(publisher) != nil {
break
}
if kvmsg.Send(updates) != nil {
break
}
}
fmt.Printf("Interrupted\n%d messages out\n", sequence)
}
// The state manager task maintains the state and handles requests from
// clients for snapshots:
func state_manager() {
kvmap := make(map[string]*kvsimple.Kvmsg)
pipe, _ := zmq.NewSocket(zmq.PAIR)
pipe.Connect("inproc://pipe")
pipe.SendMessage("READY")
snapshot, _ := zmq.NewSocket(zmq.ROUTER)
snapshot.Bind("tcp://*:5556")
poller := zmq.NewPoller()
poller.Add(pipe, zmq.POLLIN)
poller.Add(snapshot, zmq.POLLIN)
sequence := int64(0) // Current snapshot version number
LOOP:
for {
polled, err := poller.Poll(-1)
if err != nil {
break // Context has been shut down
}
for _, item := range polled {
switch socket := item.Socket; socket {
case pipe:
// Apply state update from main thread
kvmsg, err := kvsimple.RecvKvmsg(pipe)
if err != nil {
break LOOP // Interrupted
}
sequence, _ = kvmsg.GetSequence()
kvmsg.Store(kvmap)
case snapshot:
// Execute state snapshot request
msg, err := snapshot.RecvMessage(0)
if err != nil {
break LOOP // Interrupted
}
identity := msg[0]
// Request is in second frame of message
request := msg[1]
if request != "ICANHAZ?" {
fmt.Println("E: bad request, aborting")
break LOOP
}
// Send state snapshot to client
// For each entry in kvmap, send kvmsg to client
for _, kvmsg := range kvmap {
snapshot.Send(identity, zmq.SNDMORE)
kvmsg.Send(snapshot)
}
// Give client some time to deal with it.
// This reduces the risk that the client won't see
// the END message, but it doesn't eliminate the risk.
time.Sleep(100 * time.Millisecond)
// Now send END message with sequence number
fmt.Printf("Sending state shapshot=%d\n", sequence)
snapshot.Send(identity, zmq.SNDMORE)
kvmsg := kvsimple.NewKvmsg(sequence)
kvmsg.SetKey("KTHXBAI")
kvmsg.SetBody("")
kvmsg.Send(snapshot)
}
}
}
}