Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

HH-34878 on start -- wait for all workers to start serving, on stop -…

…- tune timeout for sigkill
  • Loading branch information...
commit 20dd58c37cc415528ae456e2b27fe885c57d78bf 1 parent c2d2135
@alekseyrybalkin alekseyrybalkin authored
Showing with 41 additions and 14 deletions.
  1. +41 −14 tornado_util/supervisor.py
View
55 tornado_util/supervisor.py
@@ -1,7 +1,7 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
-'''
+"""
This is module for creating of init.d scripts for tornado-based
services
@@ -23,12 +23,11 @@
script='/usr/bin/frontik_srv.py',
config='/etc/frontik/frontik.cfg'
)
-'''
+"""
import signal
import sys
import urllib2
-import httplib
import logging
import subprocess
import time
@@ -38,7 +37,6 @@
from functools import partial
import tornado.options
-import tornado_util.server
from tornado.options import options
tornado.options.define('port', 8000, int)
@@ -46,13 +44,14 @@
tornado.options.define('logfile_template', None, str)
tornado.options.define('pidfile_template', None, str)
-tornado.options.define('start_check_timeout', 3, int)
tornado.options.define('supervisor_sigterm_timeout', 4, int)
import os.path
import os
+starter_scripts = {}
+
def is_alive(port):
try:
@@ -61,19 +60,21 @@ def is_alive(port):
if os.path.exists("/proc/{0}".format(pid)):
return True
return False
- except Exception:
+ except IOError:
return False
def is_running(port):
try:
- urllib2.urlopen('http://localhost:%s/status/' % (port,))
- return True
- except urllib2.URLError:
+ response = urllib2.urlopen('http://localhost:%s/status/' % (port,))
+ for (header, value) in response.info().items():
+ if header == 'server' and value.startswith('TornadoServer'):
+ return True
return False
- except urllib2.HTTPError:
+ except urllib2.URLError:
return False
+
def start_worker(script, config, port):
if is_alive(port):
logging.warn("another process already started on %s", port)
@@ -88,7 +89,9 @@ def start_worker(script, config, port):
if options.logfile_template:
args.append('--logfile=%s' % (options.logfile_template % dict(port=port),))
- return subprocess.Popen(args)
+ starter_scripts[port] = subprocess.Popen(args)
+ return starter_scripts[port]
+
def stop_worker(port, signal_to_send=signal.SIGTERM):
logging.debug('stop worker %s', port)
@@ -105,17 +108,20 @@ def stop_worker(port, signal_to_send=signal.SIGTERM):
except ValueError:
pass
+
def rm_pidfile(port):
pid_path = options.pidfile_template % dict(port=port)
if os.path.exists(pid_path):
try:
os.remove(pid_path)
- except :
+ except:
logging.warning('failed to rm %s', pid_path)
+
def map_workers(f):
return map(f, [options.port + p for p in range(options.workers_count)])
+
def map_stale_workers(f):
ports = [str(options.port + p) for p in range(options.workers_count)]
stale_ports = []
@@ -130,9 +136,11 @@ def map_stale_workers(f):
stale_ports.append(port_match.group(1))
return map(f, stale_ports)
+
def map_all_workers(f):
return map_workers(f) + map_stale_workers(f)
+
def stop():
if any(map_all_workers(is_running)):
logging.warning('some of the workers are running; trying to kill')
@@ -140,7 +148,7 @@ def stop():
map_all_workers(lambda port: stop_worker(port, signal.SIGTERM))
time.sleep(int(options.supervisor_sigterm_timeout))
map_all_workers(lambda port: stop_worker(port, signal.SIGKILL) if is_alive(port) else rm_pidfile(port))
- time.sleep(1)
+ time.sleep(0.1 * options.workers_count)
map_all_workers(lambda port:
rm_pidfile(port) if not is_alive(port)
else logging.warning("failed to stop worker on port %d" % port))
@@ -148,9 +156,27 @@ def stop():
logging.warning('failed to stop workers')
sys.exit(1)
+
+def check_start_status(port):
+ alive = is_alive(port)
+ running = is_running(port)
+ shell_script_exited = starter_scripts.get(port, None) is None or starter_scripts[port].poll() is not None
+ if alive and running and shell_script_exited:
+ return True
+ if not alive and not running and shell_script_exited:
+ logging.error("worker on port %s failed to start" % port)
+ return True
+ logging.info('waiting for worker on port {0} to start'.format(port))
+ return False
+
+
def start(script, config):
map_workers(partial(start_worker, script, config))
- time.sleep(options.start_check_timeout)
+ time.sleep(1)
+ while not all(map_workers(check_start_status)):
+ time.sleep(1)
+ map_workers(lambda port: rm_pidfile(port) if not is_alive(port) else 0)
+
def status(expect=None):
res = map_stale_workers(is_running)
@@ -177,6 +203,7 @@ def status(expect=None):
logging.info('all workers are stopped')
return 0
+
def supervisor(script, config):
tornado.options.parse_config_file(config)
Please sign in to comment.
Something went wrong with that request. Please try again.