forked from celery/celery
/
test_supervisor.py
66 lines (51 loc) · 1.84 KB
/
test_supervisor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import unittest
from celery.supervisor import OFASupervisor
from celery.supervisor import TimeoutError, MaxRestartsExceededError
def target_one(x, y, z):
return x * y * z
class MockProcess(object):
_started = False
_stopped = False
_terminated = False
_joined = False
alive = True
timeout_on_is_alive = False
def __init__(self, target, args, kwargs):
self.target = target
self.args = args
self.kwargs = kwargs
def start(self):
self._stopped = False
self._started = True
def stop(self):
self._stopped = True
self._started = False
def terminate(self):
self._terminated = False
def is_alive(self):
if self._started and self.alive:
if self.timeout_on_is_alive:
raise TimeoutError("Supervised: timed out.")
return True
return False
def join(self, timeout=None):
self._joined = True
class TestOFASupervisor(unittest.TestCase):
def test_init(self):
s = OFASupervisor(target=target_one, args=[2, 4, 8], kwargs={})
s.Process = MockProcess
def test_start(self):
MockProcess.alive = False
s = OFASupervisor(target=target_one, args=[2, 4, 8], kwargs={},
max_restart_freq=0, max_restart_freq_time=0)
s.Process = MockProcess
self.assertRaises(MaxRestartsExceededError, s.start)
MockProcess.alive = True
def test_start_is_alive_timeout(self):
MockProcess.alive = True
MockProcess.timeout_on_is_alive = True
s = OFASupervisor(target=target_one, args=[2, 4, 8], kwargs={},
max_restart_freq=0, max_restart_freq_time=0)
s.Process = MockProcess
self.assertRaises(MaxRestartsExceededError, s.start)
MockProcess.timeout_on_is_alive = False