forked from meebo/pistil
-
Notifications
You must be signed in to change notification settings - Fork 2
/
worker.py
132 lines (99 loc) · 3.36 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# -*- coding: utf-8 -
#
# This file is part of pistil released under the MIT license.
# See the NOTICE for more information.
from __future__ import with_statement
import logging
import os
import signal
import sys
import time
import traceback
from pistil import util
from pistil.workertmp import WorkerTmp
log = logging.getLogger(__name__)
class Worker(object):
_SIGNALS = map(
lambda x: getattr(signal, "SIG%s" % x),
"HUP QUIT INT TERM USR1 USR2 WINCH CHLD".split()
)
_PIPE = []
def __init__(self, conf, name=None, child_type="worker",
age=0, ppid=0, timeout=30):
if name is None:
name = self.__class__.__name__
self.name = name
self.child_type = child_type
self.age = age
self.ppid = ppid
self.timeout = timeout
self.conf = conf
# initialize
self.booted = False
self.alive = True
self.debug =self.conf.get("debug", False)
self.tmp = WorkerTmp(self.conf)
self.on_init(self.conf)
def on_init(self, conf):
pass
def pid(self):
return os.getpid()
pid = util.cached_property(pid)
def notify(self):
"""\
Your worker subclass must arrange to have this method called
once every ``self.timeout`` seconds. If you fail in accomplishing
this task, the master process will murder your workers.
"""
self.tmp.notify()
def handle(self):
raise NotImplementedError
def run(self):
"""\
This is the mainloop of a worker process. You should override
this method in a subclass to provide the intended behaviour
for your particular evil schemes.
"""
while True:
self.notify()
self.handle()
time.sleep(0.1)
def on_init_process(self):
""" method executed when we init a process """
pass
def init_process(self):
"""\
If you override this method in a subclass, the last statement
in the function should be to call this method with
super(MyWorkerClass, self).init_process() so that the ``run()``
loop is initiated.
"""
util.set_owner_process(self.conf.get("uid", os.geteuid()),
self.conf.get("gid", os.getegid()))
# Reseed the random number generator
util.seed()
# For waking ourselves up
self._PIPE = os.pipe()
map(util.set_non_blocking, self._PIPE)
map(util.close_on_exec, self._PIPE)
# Prevent fd inherientence
util.close_on_exec(self.tmp.fileno())
self.init_signals()
self.on_init_process()
# Enter main run loop
self.booted = True
self.run()
def init_signals(self):
map(lambda s: signal.signal(s, signal.SIG_DFL), self._SIGNALS)
signal.signal(signal.SIGQUIT, self.handle_quit)
signal.signal(signal.SIGTERM, self.handle_exit)
signal.signal(signal.SIGINT, self.handle_exit)
signal.signal(signal.SIGWINCH, self.handle_winch)
def handle_quit(self, sig, frame):
self.alive = False
def handle_exit(self, sig, frame):
self.alive = False
sys.exit(0)
def handle_winch(self, sig, fname):
# Ignore SIGWINCH in worker. Fixes a crash on OpenBSD.
return