forked from celery/celery
-
Notifications
You must be signed in to change notification settings - Fork 40
/
test_concurrency_base.py
63 lines (46 loc) · 1.78 KB
/
test_concurrency_base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import os
from itertools import count
from celery.concurrency.base import apply_target, BasePool
from celery.tests.utils import unittest
class test_BasePool(unittest.TestCase):
def test_apply_target(self):
scratch = {}
counter = count(0).next
def gen_callback(name, retval=None):
def callback(*args):
scratch[name] = (counter(), args)
return retval
return callback
apply_target(gen_callback("target", 42),
args=(8, 16),
callback=gen_callback("callback"),
accept_callback=gen_callback("accept_callback"))
self.assertDictContainsSubset({
"target": (1, (8, 16)),
"callback": (2, (42, ))}, scratch)
pa1 = scratch["accept_callback"]
self.assertEqual(0, pa1[0])
self.assertEqual(pa1[1][0], os.getpid())
self.assertTrue(pa1[1][1])
# No accept callback
scratch.clear()
apply_target(gen_callback("target", 42),
args=(8, 16),
callback=gen_callback("callback"),
accept_callback=None)
self.assertDictEqual(scratch,
{"target": (3, (8, 16)),
"callback": (4, (42, ))})
def test_interface_on_start(self):
BasePool(10).on_start()
def test_interface_on_stop(self):
BasePool(10).on_stop()
def test_interface_on_apply(self):
BasePool(10).on_apply()
def test_interface_info(self):
self.assertDictEqual(BasePool(10).info, {})
def test_active(self):
p = BasePool(10)
self.assertFalse(p.active)
p._state = p.RUN
self.assertTrue(p.active)