Skip to content

Commit

Permalink
Merge pull request #637 from mmirate/patch-1
Browse files Browse the repository at this point in the history
Python 3 compatibility for Clone pattern models 6 and 5
  • Loading branch information
hintjens committed Apr 27, 2016
2 parents d6f5558 + 1620bf4 commit 94527c1
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 68 deletions.
28 changes: 14 additions & 14 deletions examples/Python/clone.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,27 +52,27 @@ def subtree(self):
def subtree(self, subtree):
"""Sends [SUBTREE][subtree] to the agent"""
self._subtree = subtree
self.pipe.send_multipart(["SUBTREE", subtree])
self.pipe.send_multipart([b"SUBTREE", subtree])

def connect(self, address, port):
"""Connect to new server endpoint
Sends [CONNECT][address][port] to the agent
"""
self.pipe.send_multipart(["CONNECT", address, str(port)])
self.pipe.send_multipart([b"CONNECT", (address.encode() if isinstance(address, str) else address), b'%d' % port])

def set(self, key, value, ttl=0):
"""Set new value in distributed hash table
Sends [SET][key][value][ttl] to the agent
"""
self.pipe.send_multipart(["SET", key, value, str(ttl)])
self.pipe.send_multipart([b"SET", key, value, b'%i' % ttl])

def get(self, key):
"""Lookup value in distributed hash table
Sends [GET][key] to the agent and waits for a value response
If there is no clone available, will eventually return None.
"""

self.pipe.send_multipart(["GET", key])
self.pipe.send_multipart([b"GET", key])
try:
reply = self.pipe.recv_multipart()
except KeyboardInterrupt:
Expand Down Expand Up @@ -100,11 +100,11 @@ def __init__(self, ctx, address, port, subtree):
self.port = port
self.snapshot = ctx.socket(zmq.DEALER)
self.snapshot.linger = 0
self.snapshot.connect("%s:%i" % (address,port))
self.snapshot.connect("%s:%i" % (address.decode(),port))
self.subscriber = ctx.socket(zmq.SUB)
self.subscriber.setsockopt(zmq.SUBSCRIBE, subtree)
self.subscriber.setsockopt(zmq.SUBSCRIBE, b'HUGZ')
self.subscriber.connect("%s:%i" % (address,port+1))
self.subscriber.connect("%s:%i" % (address.decode(),port+1))
self.subscriber.linger = 0


Expand Down Expand Up @@ -141,29 +141,29 @@ def control_message (self):
msg = self.pipe.recv_multipart()
command = msg.pop(0)

if command == "CONNECT":
if command == b"CONNECT":
address = msg.pop(0)
port = int(msg.pop(0))
if len(self.servers) < SERVER_MAX:
self.servers.append(CloneServer(self.ctx, address, port, self.subtree))
self.publisher.connect("%s:%i" % (address,port+2))
self.publisher.connect("%s:%i" % (address.decode(),port+2))
else:
logging.error("E: too many servers (max. %i)", SERVER_MAX)
elif command == "SET":
elif command == b"SET":
key,value,sttl = msg
ttl = int(sttl)

# Send key-value pair on to server
kvmsg = KVMsg(0, key=key, body=value)
kvmsg.store(self.kvmap)
if ttl:
kvmsg["ttl"] = ttl
kvmsg[b"ttl"] = sttl
kvmsg.send(self.publisher)
elif command == "GET":
elif command == b"GET":
key = msg[0]
value = self.kvmap.get(key)
self.pipe.send(value.body if value else '')
elif command == "SUBTREE":
elif command == b"SUBTREE":
self.subtree = msg[0]


Expand All @@ -189,7 +189,7 @@ def clone_agent(ctx, pipe):
logging.info ("I: waiting for server at %s:%d...",
server.address, server.port)
if (server.requests < 2):
server.snapshot.send_multipart(["ICANHAZ?", agent.subtree])
server.snapshot.send_multipart([b"ICANHAZ?", agent.subtree])
server.requests += 1
server.expiry = time.time() + SERVER_TTL
agent.state = STATE_SYNCING
Expand Down Expand Up @@ -228,7 +228,7 @@ def clone_agent(ctx, pipe):
if (agent.state == STATE_SYNCING):
# Store in snapshot until we're finished
server.requests = 0
if kvmsg.key == "KTHXBAI":
if kvmsg.key == b"KTHXBAI":
agent.sequence = kvmsg.sequence
agent.state = STATE_ACTIVE
logging.info ("I: received from %s:%d snapshot=%d",
Expand Down
18 changes: 9 additions & 9 deletions examples/Python/clonecli5.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def main():
snapshot.connect("tcp://localhost:5556")
subscriber = ctx.socket(zmq.SUB)
subscriber.linger = 0
subscriber.setsockopt(zmq.SUBSCRIBE, SUBTREE)
subscriber.setsockopt(zmq.SUBSCRIBE, SUBTREE.encode())
subscriber.connect("tcp://localhost:5557")
publisher = ctx.socket(zmq.PUSH)
publisher.linger = 0
Expand All @@ -33,17 +33,17 @@ def main():

# Get state snapshot
sequence = 0
snapshot.send_multipart(["ICANHAZ?", SUBTREE])
snapshot.send_multipart(["ICANHAZ?", SUBTREE.encode()])
while True:
try:
kvmsg = KVMsg.recv(snapshot)
except:
raise
return # Interrupted

if kvmsg.key == "KTHXBAI":
if kvmsg.key == b"KTHXBAI":
sequence = kvmsg.sequence
print "I: Received snapshot=%d" % sequence
print("I: Received snapshot=%d" % sequence)
break # Done
kvmsg.store(kvmap)

Expand All @@ -66,19 +66,19 @@ def main():
sequence = kvmsg.sequence
kvmsg.store(kvmap)
action = "update" if kvmsg.body else "delete"
print "I: received %s=%d" % (action, sequence)
print("I: received %s=%d" % (action, sequence))

# If we timed-out, generate a random kvmsg
if time.time() >= alarm:
kvmsg = KVMsg(0)
kvmsg.key = SUBTREE + "%d" % random.randint(1,10000)
kvmsg.body = "%d" % random.randint(1,1000000)
kvmsg['ttl'] = random.randint(0,30)
kvmsg.key = SUBTREE + b"%d" % random.randint(1,10000)
kvmsg.body = b"%d" % random.randint(1,1000000)
kvmsg[b'ttl'] = random.randint(0,30)
kvmsg.send(publisher)
kvmsg.store(kvmap)
alarm = time.time() + 1.

print " Interrupted\n%d messages in" % sequence
print(" Interrupted\n%d messages in" % sequence)

if __name__ == '__main__':
main()
6 changes: 3 additions & 3 deletions examples/Python/clonecli6.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
def main():
# Create and connect clone
clone = Clone()
clone.subtree = SUBTREE
clone.subtree = SUBTREE.encode()
clone.connect("tcp://localhost", 5556)
clone.connect("tcp://localhost", 5566)

try:
while True:
# Distribute as key-value message
key = "%d" % random.randint(1,10000)
value = "%d" % random.randint(1,1000000)
key = b"%d" % random.randint(1,10000)
value = b"%d" % random.randint(1,1000000)
clone.set(key, value, random.randint(0,30))
time.sleep(1)
except KeyboardInterrupt:
Expand Down
17 changes: 9 additions & 8 deletions examples/Python/clonesrv5.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ def start(self):

def handle_snapshot(self, msg):
"""snapshot requests"""
if len(msg) != 3 or msg[1] != "ICANHAZ?":
print "E: bad request, aborting"
if len(msg) != 3 or msg[1] != b"ICANHAZ?":
print("E: bad request, aborting")
dump(msg)
self.loop.stop()
return
Expand All @@ -99,7 +99,7 @@ def handle_snapshot(self, msg):
logging.info("I: Sending state shapshot=%d" % self.sequence)
self.snapshot.send(identity, zmq.SNDMORE)
kvmsg = KVMsg(self.sequence)
kvmsg.key = "KTHXBAI"
kvmsg.key = b"KTHXBAI"
kvmsg.body = subtree
kvmsg.send(self.snapshot)

Expand All @@ -109,23 +109,24 @@ def handle_collect(self, msg):
self.sequence += 1
kvmsg.sequence = self.sequence
kvmsg.send(self.publisher)
ttl = float(kvmsg.get('ttl', 0))
ttl = float(kvmsg.get(b'ttl', 0))
if ttl:
kvmsg['ttl'] = time.time() + ttl
kvmsg[b'ttl'] = b'%f' % (time.time() + ttl)
kvmsg.store(self.kvmap)
logging.info("I: publishing update=%d", self.sequence)

def flush_ttl(self):
"""Purge ephemeral values that have expired"""
for key,kvmsg in self.kvmap.items():
for key,kvmsg in list(self.kvmap.items()):
# used list() to exhaust the iterator before deleting from the dict
self.flush_single(kvmsg)

def flush_single(self, kvmsg):
"""If key-value pair has expired, delete it and publish the fact
to listening clients."""
ttl = float(kvmsg.get('ttl', 0))
ttl = float(kvmsg.get(b'ttl', 0))
if ttl and ttl <= time.time():
kvmsg.body = ""
kvmsg.body = b""
self.sequence += 1
kvmsg.sequence = self.sequence
kvmsg.send(self.publisher)
Expand Down
31 changes: 16 additions & 15 deletions examples/Python/clonesrv6.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def start(self):

def handle_snapshot(self, socket, msg):
"""snapshot requests"""
if msg[1] != "ICANHAZ?" or len(msg) != 3:
if msg[1] != b"ICANHAZ?" or len(msg) != 3:
logging.error("E: bad request, aborting")
dump(msg)
self.bstar.loop.stop()
Expand All @@ -127,7 +127,7 @@ def handle_snapshot(self, socket, msg):
logging.info("I: Sending state shapshot=%d" % self.sequence)
socket.send(identity, zmq.SNDMORE)
kvmsg = KVMsg(self.sequence)
kvmsg.key = "KTHXBAI"
kvmsg.key = b"KTHXBAI"
kvmsg.body = subtree
kvmsg.send(socket)

Expand All @@ -142,9 +142,9 @@ def handle_collect(self, msg):
self.sequence += 1
kvmsg.sequence = self.sequence
kvmsg.send(self.publisher)
ttl = float(kvmsg.get('ttl', 0))
ttl = float(kvmsg.get(b'ttl', 0))
if ttl:
kvmsg['ttl'] = time.time() + ttl
kvmsg[b'ttl'] = b'%f' % (time.time() + ttl)
kvmsg.store(self.kvmap)
logging.info("I: publishing update=%d", self.sequence)
else:
Expand All @@ -169,26 +169,27 @@ def was_pending(self, kvmsg):
def flush_ttl(self):
"""Purge ephemeral values that have expired"""
if self.kvmap:
for key,kvmsg in self.kvmap.items():
for key,kvmsg in list(self.kvmap.items()):
self.flush_single(kvmsg)

def flush_single(self, kvmsg):
"""If key-value pair has expired, delete it and publish the fact
to listening clients."""
ttl = float(kvmsg.get('ttl', 0))
ttl = float(kvmsg.get(b'ttl', 0))
if ttl and ttl <= time.time():
kvmsg.body = ""
kvmsg.body = b""
self.sequence += 1
kvmsg.sequence = self.sequence
logging.info("I: preparing to publish delete=%s", kvmsg.properties)
kvmsg.send(self.publisher)
del self.kvmap[kvmsg.key]
logging.info("I: publishing delete=%d", self.sequence)

def send_hugz(self):
"""Send hugz to anyone listening on the publisher socket"""
kvmsg = KVMsg(self.sequence)
kvmsg.key = "HUGZ"
kvmsg.body = ""
kvmsg.key = b"HUGZ"
kvmsg.body = b""
kvmsg.send(self.publisher)

# ---------------------------------------------------------------------
Expand Down Expand Up @@ -239,15 +240,15 @@ def handle_subscriber(self, msg):

logging.info ("I: asking for snapshot from: tcp://localhost:%d",
self.peer)
snapshot.send_multipart(["ICANHAZ?", ''])
snapshot.send_multipart([b"ICANHAZ?", b''])
while True:
try:
kvmsg = KVMsg.recv(snapshot)
except KeyboardInterrupt:
# Interrupted
self.bstar.loop.stop()
return
if kvmsg.key == "KTHXBAI":
if kvmsg.key == b"KTHXBAI":
self.sequence = kvmsg.sequence
break # Done
kvmsg.store(self.kvmap)
Expand All @@ -257,11 +258,11 @@ def handle_subscriber(self, msg):
# Find and remove update off pending list
kvmsg = KVMsg.from_msg(msg)
# update float ttl -> timestamp
ttl = float(kvmsg.get('ttl', 0))
ttl = float(kvmsg.get(b'ttl', 0))
if ttl:
kvmsg['ttl'] = time.time() + ttl
kvmsg[b'ttl'] = b'%f' % (time.time() + ttl)

if kvmsg.key != "HUGZ":
if kvmsg.key != b"HUGZ":
if not self.was_pending(kvmsg):
# If master update came before client update, flip it
# around, store master update (with sequence) on pending
Expand All @@ -282,7 +283,7 @@ def main():
elif '-b' in sys.argv:
primary = False
else:
print "Usage: clonesrv6.py { -p | -b }"
print("Usage: clonesrv6.py { -p | -b }")
sys.exit(1)
clone = CloneServer(primary)
clone.start()
Expand Down
Loading

0 comments on commit 94527c1

Please sign in to comment.