Skip to content

Commit

Permalink
Merge 20452a3 into 66f9a32
Browse files Browse the repository at this point in the history
  • Loading branch information
guewen committed Dec 18, 2019
2 parents 66f9a32 + 20452a3 commit ea4b08d
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 40 deletions.
85 changes: 51 additions & 34 deletions queue_job/jobrunner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)

import logging
import os
from threading import Thread
import time

Expand Down Expand Up @@ -36,23 +35,7 @@ class QueueJobRunnerThread(Thread):
def __init__(self):
Thread.__init__(self)
self.daemon = True
scheme = (os.environ.get('ODOO_QUEUE_JOB_SCHEME') or
queue_job_config.get("scheme"))
host = (os.environ.get('ODOO_QUEUE_JOB_HOST') or
queue_job_config.get("host") or
config['http_interface'])
port = (os.environ.get('ODOO_QUEUE_JOB_PORT') or
queue_job_config.get("port") or
config['http_port'])
user = (os.environ.get('ODOO_QUEUE_JOB_HTTP_AUTH_USER') or
queue_job_config.get("http_auth_user"))
password = (os.environ.get('ODOO_QUEUE_JOB_HTTP_AUTH_PASSWORD') or
queue_job_config.get("http_auth_password"))
self.runner = QueueJobRunner(scheme or 'http',
host or 'localhost',
port or 8069,
user,
password)
self.runner = QueueJobRunner.from_environ_or_config()

def run(self):
# sleep a bit to let the workers start at ease
Expand All @@ -63,6 +46,28 @@ def stop(self):
self.runner.stop()


class WorkerJobRunner(server.Worker):
""" Jobrunner workers """

def __init__(self, multi):
super(WorkerJobRunner, self).__init__(multi)
self.watchdog_timeout = None
self.runner = QueueJobRunner.from_environ_or_config()

def sleep(self):
pass

def signal_handler(self, sig, frame):
_logger.debug("WorkerJobRunner (%s) received signal %s", self.pid, sig)
super(WorkerJobRunner, self).signal_handler(sig, frame)
self.runner.stop()

def process_work(self):
_logger.debug("WorkerJobRunner (%s) starting up", self.pid)
time.sleep(START_DELAY)
self.runner.run()


runner_thread = None


Expand All @@ -79,31 +84,42 @@ def _start_runner_thread(server_type):
runner_thread = QueueJobRunnerThread()
runner_thread.start()
else:
_logger.info("jobrunner thread (in %s) NOT started, " \
_logger.info("jobrunner thread (in %s) NOT started, "
"because the root channel's capacity is set to 0",
server_type)


orig_prefork_start = server.PreforkServer.start
orig_prefork_stop = server.PreforkServer.stop
orig_prefork__init__ = server.PreforkServer.__init__
orig_prefork_process_spawn = server.PreforkServer.process_spawn
orig_prefork_worker_pop = server.PreforkServer.worker_pop
orig_threaded_start = server.ThreadedServer.start
orig_threaded_stop = server.ThreadedServer.stop


def prefork_start(server, *args, **kwargs):
res = orig_prefork_start(server, *args, **kwargs)
_start_runner_thread("prefork server")
def prefork__init__(server, app):
res = orig_prefork__init__(server, app)
server.jobrunner = {}
return res


def prefork_stop(server, graceful=True):
global runner_thread
if runner_thread:
runner_thread.stop()
res = orig_prefork_stop(server, graceful)
if runner_thread:
runner_thread.join()
runner_thread = None
def prefork_process_spawn(server):
orig_prefork_process_spawn(server)
if not hasattr(server, 'jobrunner'):
# if 'queue_job' is not in server wide modules, PreforkServer is
# not initialized with a 'jobrunner' attribute, skip this
return
if not server.jobrunner and _is_runner_enabled():
server.worker_spawn(WorkerJobRunner, server.jobrunner)


def prefork_worker_pop(server, pid):
res = orig_prefork_worker_pop(server, pid)
if not hasattr(server, 'jobrunner'):
# if 'queue_job' is not in server wide modules, PreforkServer is
# not initialized with a 'jobrunner' attribute, skip this
return res
if pid in server.jobrunner:
server.jobrunner.pop(pid)
return res


Expand All @@ -124,7 +140,8 @@ def threaded_stop(server):
return res


server.PreforkServer.start = prefork_start
server.PreforkServer.stop = prefork_stop
server.PreforkServer.__init__ = prefork__init__
server.PreforkServer.process_spawn = prefork_process_spawn
server.PreforkServer.worker_pop = prefork_worker_pop
server.ThreadedServer.start = threaded_start
server.ThreadedServer.stop = threaded_stop
39 changes: 33 additions & 6 deletions queue_job/jobrunner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
How does it work?
-----------------
* It starts as a thread in the Odoo main process
* It starts as a thread in the Odoo main process or as a new worker
* It receives postgres NOTIFY messages each time jobs are
added or updated in the queue_job table.
* It maintains an in-memory priority queue of jobs that
Expand Down Expand Up @@ -366,6 +366,29 @@ def __init__(self,
self._stop = False
self._stop_pipe = os.pipe()

@classmethod
def from_environ_or_config(cls):
scheme = (os.environ.get('ODOO_QUEUE_JOB_SCHEME') or
queue_job_config.get("scheme"))
host = (os.environ.get('ODOO_QUEUE_JOB_HOST') or
queue_job_config.get("host") or
config['http_interface'])
port = (os.environ.get('ODOO_QUEUE_JOB_PORT') or
queue_job_config.get("port") or
config['http_port'])
user = (os.environ.get('ODOO_QUEUE_JOB_HTTP_AUTH_USER') or
queue_job_config.get("http_auth_user"))
password = (os.environ.get('ODOO_QUEUE_JOB_HTTP_AUTH_PASSWORD') or
queue_job_config.get("http_auth_password"))
runner = cls(
scheme=scheme or 'http',
host=host or 'localhost',
port=port or 8069,
user=user,
password=password
)
return runner

def get_db_names(self):
if config['db_name']:
db_names = config['db_name'].split(',')
Expand Down Expand Up @@ -480,10 +503,14 @@ def run(self):
self.wait_notification()
except KeyboardInterrupt:
self.stop()
except Exception:
_logger.exception("exception: sleeping %ds and retrying",
ERROR_RECOVERY_DELAY)
self.close_databases()
time.sleep(ERROR_RECOVERY_DELAY)
except Exception as e:
# Interrupted system call, i.e. KeyboardInterrupt during select
if isinstance(e, select.error) and e[0] == 4:
self.stop()
else:
_logger.exception("exception: sleeping %ds and retrying",
ERROR_RECOVERY_DELAY)
self.close_databases()
time.sleep(ERROR_RECOVERY_DELAY)
self.close_databases(remove_jobs=False)
_logger.info("stopped")
3 changes: 3 additions & 0 deletions queue_job/readme/HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
Next
~~~~

* [ADD] Run jobrunner as a worker process instead of a thread in the main
process (when running with --workers > 0)

12.0.1.1.0 (2019-11-01)
~~~~~~~~~~~~~~~~~~~~~~~

Expand Down

0 comments on commit ea4b08d

Please sign in to comment.