-
Notifications
You must be signed in to change notification settings - Fork 1
/
clonesrv5.go
152 lines (127 loc) · 3.87 KB
/
clonesrv5.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
//
// Clone server Model Five
//
package main
import (
zmq "github.com/pebbe/zmq3"
"github.com/pebbe/zmq3/examples/kvmsg"
"errors"
"fmt"
"log"
"strconv"
"strings"
"time"
)
// Our server is defined by these properties
type clonesrv_t struct {
kvmap map[string]*kvmsg.Kvmsg // Key-value store
port int // Main port we're working on
sequence int64 // How many updates we're at
snapshot *zmq.Socket // Handle snapshot requests
publisher *zmq.Socket // Publish updates to clients
collector *zmq.Socket // Collect updates from clients
}
func main() {
srv := &clonesrv_t{
port: 5556,
kvmap: make(map[string]*kvmsg.Kvmsg),
}
// Set up our clone server sockets
srv.snapshot, _ = zmq.NewSocket(zmq.ROUTER)
srv.snapshot.Bind(fmt.Sprint("tcp://*:", srv.port))
srv.publisher, _ = zmq.NewSocket(zmq.PUB)
srv.publisher.Bind(fmt.Sprint("tcp://*:", srv.port+1))
srv.collector, _ = zmq.NewSocket(zmq.PULL)
srv.collector.Bind(fmt.Sprint("tcp://*:", srv.port+2))
// Register our handlers with reactor
reactor := zmq.NewReactor()
reactor.AddSocket(srv.snapshot, zmq.POLLIN,
func(e zmq.State) error { return snapshots(srv) })
reactor.AddSocket(srv.collector, zmq.POLLIN,
func(e zmq.State) error { return collector(srv) })
reactor.AddChannelTime(time.Tick(1000*time.Millisecond), 1,
func(v interface{}) error { return flush_ttl(srv) })
log.Println(reactor.Run(100 * time.Millisecond)) // precision: .1 seconds
}
// This is the reactor handler for the snapshot socket; it accepts
// just the ICANHAZ? request and replies with a state snapshot ending
// with a KTHXBAI message:
func snapshots(srv *clonesrv_t) (err error) {
msg, err := srv.snapshot.RecvMessage(0)
if err != nil {
return
}
identity := msg[0]
// Request is in second frame of message
request := msg[1]
if request != "ICANHAZ?" {
err = errors.New("E: bad request, aborting")
return
}
subtree := msg[2]
// Send state socket to client
for _, kvmsg := range srv.kvmap {
if key, _ := kvmsg.GetKey(); strings.HasPrefix(key, subtree) {
srv.snapshot.Send(identity, zmq.SNDMORE)
kvmsg.Send(srv.snapshot)
}
}
// Now send END message with sequence number
log.Println("I: sending shapshot =", srv.sequence)
srv.snapshot.Send(identity, zmq.SNDMORE)
kvmsg := kvmsg.NewKvmsg(srv.sequence)
kvmsg.SetKey("KTHXBAI")
kvmsg.SetBody(subtree)
kvmsg.Send(srv.snapshot)
return
}
// We store each update with a new sequence number, and if necessary, a
// time-to-live. We publish updates immediately on our publisher socket:
func collector(srv *clonesrv_t) (err error) {
kvmsg, err := kvmsg.RecvKvmsg(srv.collector)
if err != nil {
return
}
srv.sequence++
kvmsg.SetSequence(srv.sequence)
kvmsg.Send(srv.publisher)
if ttls, e := kvmsg.GetProp("ttl"); e == nil {
// change duration into specific time, using the same property: ugly!
ttl, e := strconv.ParseInt(ttls, 10, 64)
if e != nil {
err = e
return
}
kvmsg.SetProp("ttl", fmt.Sprint(time.Now().Add(time.Duration(ttl)*time.Second).Unix()))
}
kvmsg.Store(srv.kvmap)
log.Println("I: publishing update =", srv.sequence)
return
}
// At regular intervals we flush ephemeral values that have expired. This
// could be slow on very large data sets:
func flush_ttl(srv *clonesrv_t) (err error) {
for _, kvmsg := range srv.kvmap {
// If key-value pair has expired, delete it and publish the
// fact to listening clients.
if ttls, e := kvmsg.GetProp("ttl"); e == nil {
ttl, e := strconv.ParseInt(ttls, 10, 64)
if e != nil {
err = e
continue
}
if time.Now().After(time.Unix(ttl, 0)) {
srv.sequence++
kvmsg.SetSequence(srv.sequence)
kvmsg.SetBody("")
e = kvmsg.Send(srv.publisher)
if e != nil {
err = e
}
kvmsg.Store(srv.kvmap)
log.Println("I: publishing delete =", srv.sequence)
}
}
}
return
}