forked from celery/celery
/
test_worker_controllers.py
71 lines (51 loc) · 1.66 KB
/
test_worker_controllers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import unittest2 as unittest
from Queue import Queue
from celery.utils import gen_unique_id
from celery.worker.controllers import Mediator
from celery.worker.state import revoked as revoked_tasks
class MockTask(object):
task_id = 1234
task_name = "mocktask"
acked = False
def __init__(self, value, **kwargs):
self.value = value
def on_ack(self):
self.acked = True
def revoked(self):
if self.task_id in revoked_tasks:
self.on_ack()
return True
return False
class test_Mediator(unittest.TestCase):
def test_mediator_start__stop(self):
ready_queue = Queue()
m = Mediator(ready_queue, lambda t: t)
m.start()
self.assertFalse(m._shutdown.isSet())
self.assertFalse(m._stopped.isSet())
m.stop()
m.join()
self.assertTrue(m._shutdown.isSet())
self.assertTrue(m._stopped.isSet())
def test_mediator_move(self):
ready_queue = Queue()
got = {}
def mycallback(value):
got["value"] = value.value
m = Mediator(ready_queue, mycallback)
ready_queue.put(MockTask("George Costanza"))
m.move()
self.assertEqual(got["value"], "George Costanza")
def test_mediator_move_revoked(self):
ready_queue = Queue()
got = {}
def mycallback(value):
got["value"] = value.value
m = Mediator(ready_queue, mycallback)
t = MockTask("Jerry Seinfeld")
t.task_id = gen_unique_id()
revoked_tasks.add(t.task_id)
ready_queue.put(t)
m.move()
self.assertNotIn("value", got)
self.assertTrue(t.acked)