Skip to content

Commit

Permalink
better start/stop process (and fix restart issues)
Browse files Browse the repository at this point in the history
  • Loading branch information
thefab committed Oct 2, 2013
1 parent b1087fc commit ebec3e5
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 67 deletions.
125 changes: 78 additions & 47 deletions circus/arbiter.py
Expand Up @@ -7,8 +7,8 @@
from time import sleep
import select
import socket
import functools
from tornado import gen
import time

import zmq
from zmq.eventloop import ioloop
Expand Down Expand Up @@ -108,13 +108,18 @@ def __init__(self, watchers, endpoint, pubsub_endpoint, check_delay=.5,
self.fqdn = fqdn

self.ctrl = self.loop = None
self._provided_loop = False
self.socket_event = False
if loop is not None:
self._provided_loop = True
self.loop = loop

# initialize zmq context
self._init_context(context)
self.pid = os.getpid()
self._watchers_names = {}
self._stopping = False
self._restarting = False
self.debug = debug
self._exclusive_running_command = None
if self.debug:
Expand Down Expand Up @@ -201,8 +206,9 @@ def __init__(self, watchers, endpoint, pubsub_endpoint, check_delay=.5,

def _init_context(self, context):
self.context = context or zmq.Context.instance()
ioloop.install()
self.loop = ioloop.IOLoop.instance()
if self.loop is None:
ioloop.install()
self.loop = ioloop.IOLoop.instance()
self.ctrl = Controller(self.endpoint, self.multicast_endpoint,
self.context, self.loop, self, self.check_delay)

Expand Down Expand Up @@ -374,7 +380,7 @@ def reload_from_config(self, config_file=None):
return False

@classmethod
def load_from_config(cls, config_file):
def load_from_config(cls, config_file, loop=None):
cfg = get_config(config_file)
watchers = []
for watcher in cfg.get('watchers', []):
Expand Down Expand Up @@ -404,6 +410,7 @@ def load_from_config(cls, config_file):
plugins=cfg.get('plugins'), sockets=sockets,
warmup_delay=cfg.get('warmup_delay', 0),
httpd=httpd,
loop=loop,
httpd_host=cfg.get('httpd_host', 'localhost'),
httpd_port=cfg.get('httpd_port', 8080),
debug=cfg.get('debug', False),
Expand Down Expand Up @@ -456,70 +463,90 @@ def start_watcher(self, watcher):

@gen.coroutine
@debuglog
def start(self, start_ioloop=True):
def start(self):
"""Starts all the watchers.
The start command is an infinite loop that waits
for any command from a client and that watches all the
processes and restarts them if needed.
If the ioloop has been provided during __init__() call,
starts all watchers as a standard coroutine
If the ioloop hasn't been provided during __init__() call (default),
starts all watchers and the eventloop (and blocks here). In this mode
the method MUST NOT yield anything because it's called as a standard
method.
"""
logger.info("Starting master on pid %s", self.pid)
self.initialize()

# start controller
self.ctrl.start()
self._restarting = False
try:
# initialize processes
logger.debug('Initializing watchers')
if start_ioloop:
for watcher in self.iter_watchers():
self.loop.add_future(self.start_watcher(watcher), lambda x: None) # FIXME : enchainement ?
if self._provided_loop:
yield self.start_watchers()
else:
for watcher in self.iter_watchers():
yield self.start_watcher(watcher)

# start_watchers will be called just after the start_io_loop()
self.loop.add_future(self.start_watchers(), lambda x: None)
logger.info('Arbiter now waiting for commands')
if start_ioloop:

while True:
try:
self.loop.start()
except zmq.ZMQError as e:
if e.errno == errno.EINTR:
continue
else:
raise
else:
break
if not self._provided_loop:
# If an event loop is not provided, block at this line
self.start_io_loop()
finally:
if start_ioloop:
self.ctrl.stop()
self.evpub_socket.close()
if len(self.sockets) > 0:
self.sockets.close_all()
if not self._provided_loop:
# If an event loop is not provided, do some cleaning
self.stop_controller_and_close_sockets()
raise gen.Return(self._restarting)

def stop_controller_and_close_sockets(self):
self.ctrl.stop()
self.evpub_socket.close()
if len(self.sockets) > 0:
self.sockets.close_all()

def start_io_loop(self):
"""Starts the ioloop and wait inside it
"""
while True:
try:
self.loop.start()
except zmq.ZMQError as e:
if e.errno == errno.EINTR:
continue
else:
raise
else:
break

@synchronized("arbiter_stop")
@gen.coroutine
def stop(self, stop_ioloop=True):
yield self._stop(stop_ioloop=stop_ioloop)
def stop(self):
yield self._stop()

@gen.coroutine
def _stop(self, stop_ioloop=True):
def _emergency_stop(self):
"""Emergency and fast stop, to use only in circusd
"""
for watcher in self.iter_watchers():
watcher.graceful_timeout = 0
yield self._stop_watchers()
self.stop_controller_and_close_sockets()

@gen.coroutine
def _stop(self):
logger.info('Arbiter exiting')
self._stopping = True
yield self._stop_watchers()
if stop_ioloop:
self.loop.add_callback(self._stop_cb)
if self._provided_loop:
cb = self.stop_controller_and_close_sockets
self.loop.add_callback(cb)
else:
self.ctrl.stop()
self.evpub_socket.close()
if len(self.sockets) > 0:
self.sockets.close_all()
self.loop.add_timeout(time.time() + 1, self._stop_cb)

def _stop_cb(self):
# this will stop the loop and the closing
# will finish in .start()
self.loop.stop()
# stop_controller_and_close_sockets will be
# called in the end of start() method

def reap_processes(self):
# map watcher to pids
Expand Down Expand Up @@ -677,14 +704,18 @@ def stop_watchers(self):
yield self._stop_watchers()

@gen.coroutine
def _restart(self, restart_ioloop=True):
yield self._stop(stop_ioloop=restart_ioloop)
yield self._start(start_ioloop=restart_ioloop)
def _restart(self, inside_circusd=False):
if inside_circusd:
self._restarting = True
yield self._stop()
else:
yield self._stop_watchers()
yield self._start_watchers()

@synchronized("arbiter_restart")
@gen.coroutine
def restart(self, restart_ioloop=True):
yield self._restart(restart_ioloop=restart_ioloop)
def restart(self, inside_circusd=False):
yield self._restart(inside_circusd=inside_circusd)


class ThreadedArbiter(Arbiter, Thread):
Expand Down
49 changes: 30 additions & 19 deletions circus/circusd.py
Expand Up @@ -5,10 +5,11 @@
import resource

from circus import logger
from circus.arbiter import Arbiter, ReloadArbiterException
from circus.arbiter import Arbiter
from circus.pidfile import Pidfile
from circus import __version__
from circus.util import MAXFD, REDIRECT_TO, configure_logger, LOG_LEVELS
from circus.util import check_future_exception_and_log


def get_maxfd():
Expand Down Expand Up @@ -117,25 +118,35 @@ def main():
logoutput = args.logoutput or arbiter.logoutput or '-'
configure_logger(logger, loglevel, logoutput)

try:
restart_after_stop = True
while restart_after_stop:
while True:
try:
arbiter = Arbiter.load_from_config(args.config)
arbiter.start()
restart_after_stop = False
except ReloadArbiterException:
restart_after_stop = True
else:
break
except KeyboardInterrupt:
pass
finally:
arbiter._stop()
if pidfile is not None:
pidfile.unlink()
# configure the main loop
#ioloop.install()
#loop = ioloop.IOLoop.instance()
#cb = functools.partial(manage_restart, loop, arbiter)
#periodic = tornado.ioloop.PeriodicCallback(cb, 1000, loop)
#periodic.start()

# schedule the arbiter start
#arbiter = Arbiter.load_from_config(args.config, loop=loop)
#loop.add_future(arbiter.start(), _arbiter_start_cb)

# Main loop
restart = True
while restart:
try:
arbiter = Arbiter.load_from_config(args.config)
future = arbiter.start()
restart = False
if check_future_exception_and_log(future) is None:
restart = arbiter._restarting
except Exception as e:
# emergency stop
arbiter.loop.run_sync(arbiter._emergency_stop)
raise(e)
except KeyboardInterrupt:
pass
finally:
if pidfile is not None:
pidfile.unlink()
sys.exit(0)


Expand Down
2 changes: 1 addition & 1 deletion circus/commands/restart.py
Expand Up @@ -67,4 +67,4 @@ def execute(self, arbiter, props):
watcher = self._get_watcher(arbiter, props['name'])
return watcher.restart()
else:
return arbiter.restart()
return arbiter.restart(inside_circusd=True)

0 comments on commit ebec3e5

Please sign in to comment.