henryr / pyzk-recipes

Collection of recipes demonstrating how to use ZooKeeper and Python

This URL has Read+Write access

pyzk-recipes / queue.py
100644 86 lines (76 sloc) 3.052 kb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import zookeeper, threading, sys
ZOO_OPEN_ACL_UNSAFE = {"perms":0x1f, "scheme":"world", "id" :"anyone"};
 
class ZooKeeperQueue(object):
    def __init__(self,queuename):
        self.connected = False
        self.queuename = "/" + queuename
        self.cv = threading.Condition()
        zookeeper.set_log_stream(open("/dev/null"))
        def watcher(handle,type,state,path):
            print "Connected"
            self.cv.acquire()
            self.connected = True
            self.cv.notify()
            self.cv.release()
            
        self.cv.acquire()
        self.handle = zookeeper.init("localhost:2181", watcher, 10000, 0)
        self.cv.wait(10.0)
        if not self.connected:
            print "Connection to ZooKeeper cluster timed out - is a server running on localhost:2181?"
            sys.exit()
        self.cv.release()
        try:
            zookeeper.create(self.handle,self.queuename,"queue top level", [ZOO_OPEN_ACL_UNSAFE],0)
        except IOError, e:
            if e.message == zookeeper.zerror(zookeeper.NODEEXISTS):
                print "Queue already exists"
            else:
                raise e
    
    def enqueue(self,val):
        zookeeper.create(self.handle, self.queuename+"/item", val, [ZOO_OPEN_ACL_UNSAFE],zookeeper.SEQUENCE)
    
    def dequeue(self):
        while True:
            children = sorted(zookeeper.get_children(self.handle, self.queuename,None))
            if len(children) == 0:
                return None
            for child in children:
                data = self.get_and_delete(self.queuename + "/" + children[0])
                if data:
                    return data
    
    def get_and_delete(self,node):
        try:
            (data,stat) = zookeeper.get(self.handle, node, None)
            zookeeper.delete(self.handle, node, stat["version"])
            return data
        except IOError, e:
            if e.message == zookeeper.zerror(zookeeper.NONODE):
                return None
            raise e
               
    def block_dequeue(self):
        def queue_watcher(handle,event,state,path):
            self.cv.acquire()
            self.cv.notify()
            self.cv.release()
        while True:
            self.cv.acquire()
            children = sorted(zookeeper.get_children(self.handle, self.queuename, queue_watcher))
            for child in children:
                data = self.get_and_delete(self.queuename+"/"+children[0])
                if data != None:
                    self.cv.release()
                    return data
            self.cv.wait()
            self.cv.release()
              
if __name__ == '__main__':
    zk = ZooKeeperQueue("myfirstqueue")
    print "Enqueuing three items"
    zk.enqueue("queue item 1")
    zk.enqueue("queue item 2")
    zk.enqueue("queue item 3")
    print "Done"
 
    print "Consuming all items in queue"
    v = zk.dequeue()
    while v != None:
     print v
     v = zk.dequeue()
    print "Done"