Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
added TotoService for creating background services that interact with…
… toto workers and events
- Loading branch information
Showing
1 changed file
with
123 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
import os | ||
import tornado | ||
from tornado.options import define, options | ||
import logging | ||
from multiprocessing import Process, cpu_count | ||
|
||
define("daemon", metavar='start|stop|restart', help="Start, stop or restart this script as a daemon process. Use this setting in conf files, the shorter start, stop, restart aliases as command line arguments. Requires the multiprocessing module.") | ||
define("processes", default=-1, help="The number of daemon processes to run") | ||
define("pidfile", default="toto.daemon.pid", help="The path to the pidfile for daemon processes will be named <path>.<num>.pid (toto.daemon.pid -> toto.daemon.0.pid)") | ||
define("remote_event_receivers", type=str, help="A comma separated list of remote event address that this event manager should connect to. e.g.: 'tcp://192.168.1.2:8889'", multiple=True) | ||
define("event_init_module", default=None, type=str, help="If defined, this module's 'invoke' function will be called with the EventManager instance after the main event handler is registered (e.g.: myevents.setup)") | ||
define("start", default=False, help="Alias for daemon=start for command line usage - overrides daemon setting.") | ||
define("stop", default=False, help="Alias for daemon=start for command line usage - overrides daemon setting.") | ||
define("restart", default=False, help="Alias for daemon=start for command line usage - overrides daemon setting.") | ||
define("nodaemon", default=False, help="Alias for daemon='' for command line usage - overrides daemon setting.") | ||
define("debug", default=False, help="Set this to true to prevent Toto from nicely formatting generic errors. With debug=True, errors will print to the command line") | ||
define("event_port", default=8999, help="The address to listen to event connections on - due to message queuing, servers use the next higher port as well") | ||
define("worker_address", default='', help="This is the address that toto.workerconnection.invoke(method, params) will send tasks too (As specified in the worker conf file)") | ||
|
||
#convert p to the absolute path, insert ".i" before the last "." or at the end of the path | ||
def pid_path_with_id(p, i): | ||
(d, f) = os.path.split(os.path.abspath(p)) | ||
components = f.rsplit('.', 1) | ||
f = '%s.%s' % (components[0], i) | ||
if len(components) > 1: | ||
f += "." + components[1] | ||
return os.path.join(d, f) | ||
|
||
class TotoService(): | ||
|
||
def __load_options(self, conf_file=None, **kwargs): | ||
for k in kwargs: | ||
options[k].set(kwargs[k]) | ||
if conf_file: | ||
tornado.options.parse_config_file(conf_file) | ||
tornado.options.parse_command_line() | ||
if options.start: | ||
options['daemon'].set('start') | ||
elif options.stop: | ||
options['daemon'].set('stop') | ||
elif options.restart: | ||
options['daemon'].set('restart') | ||
elif options.nodaemon: | ||
options['daemon'].set('') | ||
|
||
def __init__(self, conf_file=None, **kwargs): | ||
if options.log_file_prefix: | ||
root_logger = logging.getLogger() | ||
for handler in [h for h in root_logger.handlers]: | ||
root_logger.removeHandler(handler) | ||
self.__load_options(conf_file, **kwargs) | ||
|
||
def __run_service(self): | ||
|
||
def start_server_process(pidfile): | ||
self.main_loop() | ||
os.remove(pidfile) | ||
count = options.processes if options.processes >= 0 else cpu_count() | ||
processes = [] | ||
pidfiles = options.daemon and [pid_path_with_id(options.pidfile, i) for i in xrange(1, count + 1)] or [] | ||
for i in xrange(count): | ||
proc = Process(target=start_server_process, args=(pidfiles and pidfiles[i],)) | ||
proc.daemon = True | ||
processes.append(proc) | ||
proc.start() | ||
else: | ||
print "Starting %s %s process%s." % (count, self.__class__.__name__, count > 1 and 'es' or '') | ||
if options.daemon: | ||
i = 1 | ||
for proc in processes: | ||
with open(pidfiles[i - 1], 'w') as f: | ||
f.write(str(proc.pid)) | ||
i += 1 | ||
for proc in processes: | ||
proc.join() | ||
|
||
def run(self): | ||
if options.daemon: | ||
import multiprocessing | ||
import signal, re | ||
|
||
pattern = pid_path_with_id(options.pidfile, r'\d+').replace('.', r'\.') | ||
piddir = os.path.dirname(pattern) | ||
existing_pidfiles = [pidfile for pidfile in (os.path.join(piddir, fn) for fn in os.listdir(os.path.dirname(pattern))) if re.match(pattern, pidfile)] | ||
|
||
if options.daemon == 'stop' or options.daemon == 'restart': | ||
for pidfile in existing_pidfiles: | ||
with open(pidfile, 'r') as f: | ||
pid = int(f.read()) | ||
try: | ||
os.kill(pid, signal.SIGTERM) | ||
except OSError as e: | ||
if e.errno != 3: | ||
raise | ||
print "Stopped %s %s" % (self.__class__.__name__, pid) | ||
os.remove(pidfile) | ||
|
||
if options.daemon == 'start' or options.daemon == 'restart': | ||
if existing_pidfiles: | ||
print "Not starting %s, pidfile%s exist%s at %s" % (self.__class__.__name__, len(existing_pidfiles) > 1 and 's' or '', len(existing_pidfiles) == 1 and 's' or '', ', '.join(existing_pidfiles)) | ||
return | ||
pidfile = pid_path_with_id(options.pidfile, 0) | ||
#fork and only continue on child process | ||
if not os.fork(): | ||
#detach from controlling terminal | ||
os.setsid() | ||
#fork again and write pid to pidfile from parent, run server on child | ||
pid = os.fork() | ||
if pid: | ||
with open(pidfile, 'w') as f: | ||
f.write(str(pid)) | ||
else: | ||
self.__run_service() | ||
|
||
if options.daemon not in ('start', 'stop', 'restart'): | ||
print "Invalid daemon option: " + options.daemon | ||
|
||
else: | ||
self.__run_service() | ||
|
||
def main_loop(self): | ||
print 'Subclass TotoService and override main_loop with your desired functionality/' | ||
pass |