/
supervisord.py
executable file
·386 lines (333 loc) · 14.4 KB
/
supervisord.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
#!/usr/bin/env python
"""supervisord -- run a set of applications as daemons.
Usage: %s [options]
Options:
-c/--configuration FILENAME -- configuration file
-n/--nodaemon -- run in the foreground (same as 'nodaemon true' in config file)
-h/--help -- print this usage message and exit
-v/--version -- print supervisord version number and exit
-u/--user USER -- run supervisord as this user (or numeric uid)
-m/--umask UMASK -- use this umask for daemon subprocess (default is 022)
-d/--directory DIRECTORY -- directory to chdir to when daemonized
-l/--logfile FILENAME -- use FILENAME as logfile path
-y/--logfile_maxbytes BYTES -- use BYTES to limit the max size of logfile
-z/--logfile_backups NUM -- number of backups to keep when max bytes reached
-e/--loglevel LEVEL -- use LEVEL as log level (debug,info,warn,error,critical)
-j/--pidfile FILENAME -- write a pid file for the daemon process to FILENAME
-i/--identifier STR -- identifier used for this instance of supervisord
-q/--childlogdir DIRECTORY -- the log directory for child process logs
-k/--nocleanup -- prevent the process from performing cleanup (removal of
old automatic child log files) at startup.
-a/--minfds NUM -- the minimum number of file descriptors for start success
-t/--strip_ansi -- strip ansi escape codes from process output
--minprocs NUM -- the minimum number of processes available for start success
--profile_options OPTIONS -- run supervisord under profiler and output
results based on OPTIONS, which is a comma-sep'd
list of 'cumulative', 'calls', and/or 'callers',
e.g. 'cumulative,callers')
"""
import os
import time
import errno
import select
import signal
from supervisor.medusa import asyncore_25 as asyncore
from supervisor.options import ServerOptions
from supervisor.options import signame
from supervisor import events
from supervisor.states import SupervisorStates
from supervisor.states import getProcessStateDescription
class Supervisor:
stopping = False # set after we detect that we are handling a stop request
lastshutdownreport = 0 # throttle for delayed process error reports at stop
process_groups = None # map of process group name to process group object
stop_groups = None # list used for priority ordered shutdown
def __init__(self, options):
self.options = options
self.process_groups = {}
self.ticks = {}
def main(self):
if not self.options.first:
# prevent crash on libdispatch-based systems, at least for the
# first request
self.options.cleanup_fds()
info_messages = []
critical_messages = []
warn_messages = []
setuid_msg = self.options.set_uid()
if setuid_msg:
critical_messages.append(setuid_msg)
if self.options.first:
rlimit_messages = self.options.set_rlimits()
info_messages.extend(rlimit_messages)
warn_messages.extend(self.options.parse_warnings)
# this sets the options.logger object
# delay logger instantiation until after setuid
self.options.make_logger(critical_messages, warn_messages,
info_messages)
if not self.options.nocleanup:
# clean up old automatic logs
self.options.clear_autochildlogdir()
self.run()
def run(self):
self.process_groups = {} # clear
self.stop_groups = None # clear
events.clear()
try:
for config in self.options.process_group_configs:
self.add_process_group(config)
self.options.process_environment()
self.options.openhttpservers(self)
self.options.setsignals()
if (not self.options.nodaemon) and self.options.first:
self.options.daemonize()
# writing pid file needs to come *after* daemonizing or pid
# will be wrong
self.options.write_pidfile()
self.options.load_subproc_pidfile(self.process_groups)
self.runforever()
finally:
self.options.cleanup()
def diff_to_active(self, new=None):
if not new:
new = self.options.process_group_configs
cur = [group.config for group in self.process_groups.values()]
curdict = dict(zip([cfg.name for cfg in cur], cur))
newdict = dict(zip([cfg.name for cfg in new], new))
added = [cand for cand in new if cand.name not in curdict]
removed = [cand for cand in cur if cand.name not in newdict]
changed = [cand for cand in new
if cand != curdict.get(cand.name, cand)]
return added, changed, removed
def add_process_group(self, config):
name = config.name
if name not in self.process_groups:
config.after_setuid()
self.process_groups[name] = config.make_group()
return True
return False
def remove_process_group(self, name):
if self.process_groups[name].get_unstopped_processes():
return False
del self.process_groups[name]
return True
def get_process_map(self):
process_map = {}
pgroups = self.process_groups.values()
for group in pgroups:
process_map.update(group.get_dispatchers())
return process_map
def shutdown_report(self):
unstopped = []
pgroups = self.process_groups.values()
for group in pgroups:
unstopped.extend(group.get_unstopped_processes())
if unstopped:
# throttle 'waiting for x to die' reports
now = time.time()
if now > (self.lastshutdownreport + 3): # every 3 secs
names = [ p.config.name for p in unstopped ]
namestr = ', '.join(names)
self.options.logger.info('waiting for %s to die' % namestr)
self.lastshutdownreport = now
for proc in unstopped:
state = getProcessStateDescription(proc.get_state())
self.options.logger.blather(
'%s state: %s' % (proc.config.name, state))
return unstopped
def ordered_stop_groups_phase_1(self):
if self.stop_groups:
# stop the last group (the one with the "highest" priority)
self.stop_groups[-1].stop_all()
def ordered_stop_groups_phase_2(self):
# after phase 1 we've transitioned and reaped, let's see if we
# can remove the group we stopped from the stop_groups queue.
if self.stop_groups:
# pop the last group (the one with the "highest" priority)
group = self.stop_groups.pop()
if group.get_unstopped_processes():
# if any processes in the group aren't yet in a
# stopped state, we're not yet done shutting this
# group down, so push it back on to the end of the
# stop group queue
self.stop_groups.append(group)
def runforever(self):
events.notify(events.SupervisorRunningEvent())
timeout = 1 # this cannot be fewer than the smallest TickEvent (5)
socket_map = self.options.get_socket_map()
while 1:
combined_map = {}
combined_map.update(socket_map)
combined_map.update(self.get_process_map())
pgroups = self.process_groups.values()
pgroups.sort()
if self.options.mood < SupervisorStates.RUNNING:
if self.options.subprocpidfile:
# 'subprocpidfile' option is set, which implies that all
# managed sub-processes should NOT be killed and would
# continue to run even supervisord exits. the supervisord
# would also continue to manage these sub-processes after
# it restarts.
self.options.logger.info('exiting without killing managed '
'sub-processes')
raise asyncore.ExitNow
if not self.stopping:
# first time, set the stopping flag, do a
# notification and set stop_groups
self.stopping = True
self.stop_groups = pgroups[:]
events.notify(events.SupervisorStoppingEvent())
self.ordered_stop_groups_phase_1()
if not self.shutdown_report():
# if there are no unstopped processes (we're done
# killing everything), it's OK to swtop or reload
raise asyncore.ExitNow
r, w, x = [], [], []
for fd, dispatcher in combined_map.items():
if dispatcher.readable():
r.append(fd)
if dispatcher.writable():
w.append(fd)
try:
r, w, x = self.options.select(r, w, x, timeout)
except select.error, err:
r = w = x = []
if err[0] == errno.EINTR:
self.options.logger.blather('EINTR encountered in select')
else:
raise
for fd in r:
if combined_map.has_key(fd):
try:
dispatcher = combined_map[fd]
self.options.logger.blather(
'read event caused by %(dispatcher)s',
dispatcher=dispatcher)
dispatcher.handle_read_event()
except asyncore.ExitNow:
raise
except:
combined_map[fd].handle_error()
for fd in w:
if combined_map.has_key(fd):
try:
dispatcher = combined_map[fd]
self.options.logger.blather(
'write event caused by %(dispatcher)s',
dispatcher=dispatcher)
dispatcher.handle_write_event()
except asyncore.ExitNow:
raise
except:
combined_map[fd].handle_error()
[ group.transition() for group in pgroups ]
self.reap()
self.handle_signal()
self.tick()
if self.options.mood < SupervisorStates.RUNNING:
self.ordered_stop_groups_phase_2()
if self.options.test:
break
def tick(self, now=None):
""" Send one or more 'tick' events when the timeslice related to
the period for the event type rolls over """
if now is None:
# now won't be None in unit tests
now = time.time()
for event in events.TICK_EVENTS:
period = event.period
last_tick = self.ticks.get(period)
if last_tick is None:
# we just started up
last_tick = self.ticks[period] = timeslice(period, now)
this_tick = timeslice(period, now)
if this_tick != last_tick:
self.ticks[period] = this_tick
events.notify(event(this_tick, self))
def reap(self, once=False):
pid, sts = self.options.waitpid()
if pid:
process = self.options.get_process(pid)
if process is None:
self.options.logger.critical('reaped unknown pid %s)' % pid)
else:
process.finish(pid, sts)
self.options.del_process(pid)
if not once:
self.reap() # keep reaping until no more kids to reap
def handle_signal(self):
sig = self.options.get_signal()
if sig:
if sig in (signal.SIGTERM, signal.SIGINT, signal.SIGQUIT):
self.options.logger.warn(
'received %s indicating exit request' % signame(sig))
self.options.mood = SupervisorStates.SHUTDOWN
elif sig == signal.SIGHUP:
self.options.logger.warn(
'received %s indicating restart request' % signame(sig))
self.options.mood = SupervisorStates.RESTARTING
elif sig == signal.SIGCHLD:
self.options.logger.debug(
'received %s indicating a child quit' % signame(sig))
elif sig == signal.SIGUSR2:
self.options.logger.info(
'received %s indicating log reopen request' % signame(sig))
self.options.reopenlogs()
for group in self.process_groups.values():
group.reopenlogs()
else:
self.options.logger.blather(
'received %s indicating nothing' % signame(sig))
def get_state(self):
return self.options.mood
def timeslice(period, when):
return int(when - (when % period))
# profile entry point
def profile(cmd, globals, locals, sort_order, callers):
try:
import cProfile as profile
except ImportError:
import profile # python < 2.5
import pstats
import tempfile
fd, fn = tempfile.mkstemp()
try:
profile.runctx(cmd, globals, locals, fn)
stats = pstats.Stats(fn)
stats.strip_dirs()
# calls,time,cumulative and cumulative,calls,time are useful
stats.sort_stats(*sort_order or ('cumulative', 'calls', 'time'))
if callers:
stats.print_callers(.3)
else:
stats.print_stats(.3)
finally:
os.remove(fn)
# Main program
def main(args=None, test=False):
assert os.name == "posix", "This code makes Unix-specific assumptions"
# if we hup, restart by making a new Supervisor()
first = True
while 1:
options = ServerOptions()
options.realize(args, doc=__doc__)
options.first = first
options.test = test
if options.profile_options:
sort_order, callers = options.profile_options
profile('go(options)', globals(), locals(), sort_order, callers)
else:
go(options)
if test or (options.mood < SupervisorStates.RESTARTING):
break
options.close_httpservers()
options.close_logger()
first = False
def go(options):
d = Supervisor(options)
try:
d.main()
except asyncore.ExitNow:
pass
if __name__ == "__main__":
main()