/
test_zmq.py
114 lines (82 loc) · 2.26 KB
/
test_zmq.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# -*- coding: utf-8 -
#
# This file is part of uzmq. See the NOTICE for more information.
import time
import pyuv
import zmq
from zmq.tests import BaseZMQTestCase
from uzmq import ZMQ
def wait():
time.sleep(.25)
class TestZMQStream(BaseZMQTestCase):
def test_simple(self):
req, rep = self.create_bound_pair(zmq.REQ, zmq.REP)
wait()
loop = pyuv.Loop.default_loop()
s = ZMQ(loop, rep)
r = []
def cb(stream, msg, err):
r.append(msg[0])
s.start_read(cb)
req.send(b'test')
wait()
def stop(handle):
s.stop()
t = pyuv.Timer(loop)
t.start(stop, 0.8, 0.0)
loop.run()
assert r == [b'test']
def test_echo(self):
req, rep = self.create_bound_pair(zmq.REQ, zmq.REP)
wait()
loop = pyuv.Loop.default_loop()
s = ZMQ(loop, rep)
s1 = ZMQ(loop, req)
r = []
def cb(stream, msg, err):
r.append(msg[0])
stream.write(msg[0])
r1 = []
def cb1(stream, msg, err):
r1.append(msg[0])
s.stop()
s1.stop()
s.start_read(cb)
s1.start_read(cb1)
s1.write(b'echo')
loop.run()
assert r == [b'echo']
assert r1 == [b'echo']
def test_pubsub(self):
pub, sub = self.create_bound_pair(zmq.PUB, zmq.SUB)
sub.setsockopt(zmq.SUBSCRIBE,b'')
wait()
loop = pyuv.Loop.default_loop()
s = ZMQ(loop, sub)
s1 = ZMQ(loop, pub)
r = []
def cb(stream, msg, err):
r.append(msg[0])
s.stop()
s1.stop()
s.start_read(cb)
s1.write(b"message")
loop.run()
assert r == [b'message']
def test_pubsub_topic(self):
pub, sub = self.create_bound_pair(zmq.PUB, zmq.SUB)
sub.setsockopt(zmq.SUBSCRIBE,b'x')
wait()
loop = pyuv.Loop.default_loop()
s = ZMQ(loop, sub)
s1 = ZMQ(loop, pub)
r = []
def cb(stream, msg, err):
r.append(msg[0])
s.stop()
s1.stop()
s.start_read(cb)
s1.write(b"message")
s1.write(b"xmessage")
loop.run()
assert r == [b'xmessage']