-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
clonesrv2.py
110 lines (89 loc) · 3.05 KB
/
clonesrv2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
"""
Clone server Model Two
Author: Min RK <benjaminrk@gmail.com>
"""
import random
import threading
import time
import zmq
from kvsimple import KVMsg
from zhelpers import zpipe
def main():
# Prepare our context and publisher socket
ctx = zmq.Context()
publisher = ctx.socket(zmq.PUB)
publisher.bind("tcp://*:5557")
updates, peer = zpipe(ctx)
manager_thread = threading.Thread(target=state_manager, args=(ctx,peer))
manager_thread.daemon=True
manager_thread.start()
sequence = 0
random.seed(time.time())
try:
while True:
# Distribute as key-value message
sequence += 1
kvmsg = KVMsg(sequence)
kvmsg.key = "%d" % random.randint(1,10000)
kvmsg.body = "%d" % random.randint(1,1000000)
kvmsg.send(publisher)
kvmsg.send(updates)
except KeyboardInterrupt:
print " Interrupted\n%d messages out" % sequence
# simple struct for routing information for a key-value snapshot
class Route:
def __init__(self, socket, identity):
self.socket = socket # ROUTER socket to send to
self.identity = identity # Identity of peer who requested state
def send_single(key, kvmsg, route):
"""Send one state snapshot key-value pair to a socket
Hash item data is our kvmsg object, ready to send
"""
# Send identity of recipient first
route.socket.send(route.identity, zmq.SNDMORE)
kvmsg.send(route.socket)
def state_manager(ctx, pipe):
"""This thread maintains the state and handles requests from clients for snapshots.
"""
kvmap = {}
pipe.send("READY")
snapshot = ctx.socket(zmq.ROUTER)
snapshot.bind("tcp://*:5556")
poller = zmq.Poller()
poller.register(pipe, zmq.POLLIN)
poller.register(snapshot, zmq.POLLIN)
sequence = 0 # Current snapshot version number
while True:
try:
items = dict(poller.poll())
except (zmq.ZMQError, KeyboardInterrupt):
break # interrupt/context shutdown
# Apply state update from main thread
if pipe in items:
kvmsg = KVMsg.recv(pipe)
sequence = kvmsg.sequence
kvmsg.store(kvmap)
# Execute state snapshot request
if snapshot in items:
msg = snapshot.recv_multipart()
identity = msg[0]
request = msg[1]
if request == "ICANHAZ?":
pass
else:
print "E: bad request, aborting\n",
break
# Send state snapshot to client
route = Route(snapshot, identity)
# For each entry in kvmap, send kvmsg to client
for k,v in kvmap.items():
send_single(k,v,route)
# Now send END message with sequence number
print "Sending state shapshot=%d\n" % sequence,
snapshot.send(identity, zmq.SNDMORE)
kvmsg = KVMsg(sequence)
kvmsg.key = "KTHXBAI"
kvmsg.body = ""
kvmsg.send(snapshot)
if __name__ == '__main__':
main()