forked from celery/celery
-
Notifications
You must be signed in to change notification settings - Fork 40
/
test_datastructures.py
140 lines (107 loc) · 3.63 KB
/
test_datastructures.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import sys
import unittest2 as unittest
from Queue import Queue
from celery.datastructures import PositionQueue, ExceptionInfo, LocalCache
from celery.datastructures import LimitedSet, SharedCounter, consume_queue
class TestPositionQueue(unittest.TestCase):
def test_position_queue_unfilled(self):
q = PositionQueue(length=10)
for position in q.data:
self.assertIsInstance(position, q.UnfilledPosition)
self.assertListEqual(q.filled, [])
self.assertEqual(len(q), 0)
self.assertFalse(q.full())
def test_position_queue_almost(self):
q = PositionQueue(length=10)
q[3] = 3
q[6] = 6
q[9] = 9
self.assertListEqual(q.filled, [3, 6, 9])
self.assertEqual(len(q), 3)
self.assertFalse(q.full())
def test_position_queue_full(self):
q = PositionQueue(length=10)
for i in xrange(10):
q[i] = i
self.assertListEqual(q.filled, list(xrange(10)))
self.assertEqual(len(q), 10)
self.assertTrue(q.full())
class TestExceptionInfo(unittest.TestCase):
def test_exception_info(self):
try:
raise LookupError("The quick brown fox jumps...")
except LookupError:
exc_info = sys.exc_info()
einfo = ExceptionInfo(exc_info)
self.assertEqual(str(einfo), einfo.traceback)
self.assertIsInstance(einfo.exception, LookupError)
self.assertTupleEqual(einfo.exception.args,
("The quick brown fox jumps...", ))
self.assertTrue(einfo.traceback)
r = repr(einfo)
self.assertTrue(r)
class TestUtilities(unittest.TestCase):
def test_consume_queue(self):
x = Queue()
it = consume_queue(x)
self.assertRaises(StopIteration, it.next)
x.put("foo")
it = consume_queue(x)
self.assertEqual(it.next(), "foo")
self.assertRaises(StopIteration, it.next)
class TestSharedCounter(unittest.TestCase):
def test_initial_value(self):
self.assertEqual(int(SharedCounter(10)), 10)
def test_increment(self):
c = SharedCounter(10)
c.increment()
self.assertEqual(int(c), 11)
c.increment(2)
self.assertEqual(int(c), 13)
def test_decrement(self):
c = SharedCounter(10)
c.decrement()
self.assertEqual(int(c), 9)
c.decrement(2)
self.assertEqual(int(c), 7)
def test_iadd(self):
c = SharedCounter(10)
c += 10
self.assertEqual(int(c), 20)
def test_isub(self):
c = SharedCounter(10)
c -= 20
self.assertEqual(int(c), -10)
def test_repr(self):
self.assertIn("<SharedCounter:", repr(SharedCounter(10)))
class TestLimitedSet(unittest.TestCase):
def test_add(self):
s = LimitedSet(maxlen=2)
s.add("foo")
s.add("bar")
for n in "foo", "bar":
self.assertIn(n, s)
s.add("baz")
for n in "bar", "baz":
self.assertIn(n, s)
self.assertNotIn("foo", s)
def test_iter(self):
s = LimitedSet(maxlen=2)
items = "foo", "bar"
map(s.add, items)
l = list(iter(items))
for item in items:
self.assertIn(item, l)
def test_repr(self):
s = LimitedSet(maxlen=2)
items = "foo", "bar"
map(s.add, items)
self.assertIn("LimitedSet(", repr(s))
class TestLocalCache(unittest.TestCase):
def test_expires(self):
limit = 100
x = LocalCache(limit=limit)
slots = list(range(limit * 2))
for i in slots:
x[i] = i
self.assertListEqual(x.keys(), slots[limit:])