Skip to content

Commit

Permalink
Merge pull request #69 from acsone/8.0-runnerstop-sbi
Browse files Browse the repository at this point in the history
[IMP] connector-runner: graceful stop mechanism
  • Loading branch information
guewen committed Jun 9, 2015
2 parents 918d8b6 + ce882d0 commit cbf270f
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 31 deletions.
68 changes: 47 additions & 21 deletions connector/jobrunner/__init__.py
Expand Up @@ -49,49 +49,75 @@
enable = os.environ.get('ODOO_CONNECTOR_CHANNELS')


def run():
# sleep a bit to let the workers start at ease
time.sleep(START_DELAY)
port = os.environ.get('ODOO_CONNECTOR_PORT') or config['xmlrpc_port']
channels = os.environ.get('ODOO_CONNECTOR_CHANNELS')
runner = ConnectorRunner(port or 8069, channels or 'root:1')
runner.run_forever()
class ConnectorRunnerThread(Thread):

def __init__(self):
Thread.__init__(self)
self.daemon = True
port = os.environ.get('ODOO_CONNECTOR_PORT') or config['xmlrpc_port']
channels = os.environ.get('ODOO_CONNECTOR_CHANNELS')
self.runner = ConnectorRunner(port or 8069, channels or 'root:1')

def run(self):
# sleep a bit to let the workers start at ease
time.sleep(START_DELAY)
self.runner.run()

def stop(self):
self.runner.stop()


runner_thread = None

orig_prefork_start = server.PreforkServer.start
orig_prefork_stop = server.PreforkServer.stop
orig_threaded_start = server.ThreadedServer.start
orig_gevent_start = server.GeventServer.start
orig_threaded_stop = server.ThreadedServer.stop


def prefork_start(server, *args, **kwargs):
global runner_thread
res = orig_prefork_start(server, *args, **kwargs)
if enable and not config['stop_after_init']:
_logger.info("starting jobrunner thread (in prefork server)")
thread = Thread(target=run)
thread.daemon = True
thread.start()
runner_thread = ConnectorRunnerThread()
runner_thread.start()
return res


def prefork_stop(server, graceful=True):
global runner_thread
if runner_thread:
runner_thread.stop()
res = orig_prefork_stop(server, graceful)
if runner_thread:
runner_thread.join()
runner_thread = None
return res


def threaded_start(server, *args, **kwargs):
global runner_thread
res = orig_threaded_start(server, *args, **kwargs)
if enable and not config['stop_after_init']:
_logger.info("starting jobrunner thread (in threaded server)")
thread = Thread(target=run)
thread.daemon = True
thread.start()
runner_thread = ConnectorRunnerThread()
runner_thread.start()
return res


def gevent_start(server, *args, **kwargs):
res = orig_gevent_start(server, *args, **kwargs)
if enable and not config['stop_after_init']:
_logger.info("starting jobrunner thread (in gevent server)")
# TODO: gevent spawn?
raise RuntimeError("not implemented")
def threaded_stop(server):
global runner_thread
if runner_thread:
runner_thread.stop()
res = orig_threaded_stop(server)
if runner_thread:
runner_thread.join()
runner_thread = None
return res


server.PreforkServer.start = prefork_start
server.PreforkServer.stop = prefork_stop
server.ThreadedServer.start = threaded_start
server.GeventServer.start = gevent_start
server.ThreadedServer.stop = threaded_stop
37 changes: 27 additions & 10 deletions connector/jobrunner/runner.py
Expand Up @@ -79,6 +79,7 @@

from contextlib import closing
import logging
import os
import re
import select
import threading
Expand Down Expand Up @@ -209,6 +210,8 @@ def __init__(self, port=8069, channel_config_string='root:1'):
self.channel_manager = ChannelManager()
self.channel_manager.simple_configure(channel_config_string)
self.db_by_name = {}
self._stop = False
self._stop_pipe = os.pipe()

def get_db_names(self):
if openerp.tools.config['db_name']:
Expand All @@ -220,15 +223,18 @@ def get_db_names(self):
db_names = [d for d in db_names if re.match(dbfilter, d)]
return db_names

def initialize_databases(self):
def close_databases(self, remove_jobs=True):
for db_name, db in self.db_by_name.items():
try:
self.channel_manager.remove_db(db_name)
if remove_jobs:
self.channel_manager.remove_db(db_name)
db.close()
except:
_logger.warning('error closing database %s',
db_name, exc_info=True)
self.db_by_name = {}

def initialize_databases(self):
for db_name in self.get_db_names():
db = Database(db_name)
if not db.has_connector:
Expand All @@ -242,6 +248,8 @@ def initialize_databases(self):
def run_jobs(self):
now = openerp.fields.Datetime.now()
for job in self.channel_manager.get_jobs_to_run(now):
if self._stop:
break
_logger.info("asking Odoo to run job %s on db %s",
job.uuid, job.db_name)
self.db_by_name[job.db_name].set_job_enqueued(job.uuid)
Expand All @@ -252,6 +260,8 @@ def run_jobs(self):
def process_notifications(self):
for db in self.db_by_name.values():
while db.conn.notifies:
if self._stop:
break
notification = db.conn.notifies.pop()
uuid = notification.payload
job_datas = db.select_jobs('uuid = %s', (uuid,))
Expand All @@ -266,16 +276,21 @@ def wait_notification(self):
return
# wait for something to happen in the queue_job tables
conns = [db.conn for db in self.db_by_name.values()]
conns.append(self._stop_pipe[0])
conns, _, _ = select.select(conns, [], [], SELECT_TIMEOUT)
if conns:
if conns and not self._stop:
for conn in conns:
conn.poll()
else:
_logger.debug("select timeout")

def run_forever(self):
def stop(self):
_logger.info("graceful stop requested")
self._stop = True
# wakeup the select() in wait_notification
os.write(self._stop_pipe[1], '.')

def run(self):
_logger.info("starting")
while True:
while not self._stop:
# outer loop does exception recovery
try:
_logger.info("initializing database connections")
Expand All @@ -284,14 +299,16 @@ def run_forever(self):
self.initialize_databases()
_logger.info("database connections ready")
# inner loop does the normal processing
while True:
while not self._stop:
self.process_notifications()
self.run_jobs()
self.wait_notification()
except KeyboardInterrupt:
_logger.info("stopping")
break
self.stop()
except:
_logger.exception("exception: sleeping %ds and retrying",
ERROR_RECOVERY_DELAY)
self.close_databases()
time.sleep(ERROR_RECOVERY_DELAY)
self.close_databases(remove_jobs=False)
_logger.info("stopped")

0 comments on commit cbf270f

Please sign in to comment.