forked from celery/celery
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_basic.py
64 lines (53 loc) · 2.22 KB
/
test_basic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import operator
import os
import sys
# funtest config
sys.path.insert(0, os.getcwd())
sys.path.insert(0, os.path.join(os.getcwd(), os.pardir))
import suite # noqa
from celery.five import range
from celery.tests.case import unittest
from celery.tests.functional import tasks
from celery.tests.functional.case import WorkerCase
class test_basic(WorkerCase):
def test_started(self):
self.assertWorkerAlive()
def test_roundtrip_simple_task(self):
publisher = tasks.add.get_publisher()
results = [(tasks.add.apply_async(i, publisher=publisher), i)
for i in zip(range(100), range(100))]
for result, i in results:
self.assertEqual(result.get(timeout=10), operator.add(*i))
def test_dump_active(self, sleep=1):
r1 = tasks.sleeptask.delay(sleep)
tasks.sleeptask.delay(sleep)
self.ensure_accepted(r1.id)
active = self.inspect().active(safe=True)
self.assertTrue(active)
active = active[self.worker.hostname]
self.assertEqual(len(active), 2)
self.assertEqual(active[0]['name'], tasks.sleeptask.name)
self.assertEqual(active[0]['args'], [sleep])
def test_dump_reserved(self, sleep=1):
r1 = tasks.sleeptask.delay(sleep)
tasks.sleeptask.delay(sleep)
tasks.sleeptask.delay(sleep)
tasks.sleeptask.delay(sleep)
self.ensure_accepted(r1.id)
reserved = self.inspect().reserved(safe=True)
self.assertTrue(reserved)
reserved = reserved[self.worker.hostname]
self.assertEqual(reserved[0]['name'], tasks.sleeptask.name)
self.assertEqual(reserved[0]['args'], [sleep])
def test_dump_schedule(self, countdown=1):
r1 = tasks.add.apply_async((2, 2), countdown=countdown)
tasks.add.apply_async((2, 2), countdown=countdown)
self.ensure_scheduled(r1.id, interval=0.1)
schedule = self.inspect().scheduled(safe=True)
self.assertTrue(schedule)
schedule = schedule[self.worker.hostname]
self.assertTrue(len(schedule), 2)
self.assertEqual(schedule[0]['request']['name'], tasks.add.name)
self.assertEqual(schedule[0]['request']['args'], [2, 2])
if __name__ == '__main__':
unittest.main()