Skip to content

Commit

Permalink
Merge pull request #131 from acsone/12.0-under-pressure-sbi
Browse files Browse the repository at this point in the history
[12.0][FIX] under pressure
  • Loading branch information
guewen committed Mar 28, 2019
2 parents 962e738 + d3e6e92 commit 490f246
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 41 deletions.
56 changes: 20 additions & 36 deletions queue_job/controllers/main.py
Expand Up @@ -13,11 +13,7 @@
from odoo.service.model import PG_CONCURRENCY_ERRORS_TO_RETRY

from ..job import Job, ENQUEUED
from ..exception import (NoSuchJobError,
NotReadableJobError,
RetryableJobError,
FailedJobError,
NothingToDoJob)
from ..exception import (RetryableJobError, FailedJobError, NothingToDoJob)

_logger = logging.getLogger(__name__)

Expand All @@ -26,42 +22,17 @@

class RunJobController(http.Controller):

def _load_job(self, env, job_uuid):
"""Reload a job from the backend"""
try:
job = Job.load(env, job_uuid)
except NoSuchJobError:
# just skip it
job = None
except NotReadableJobError:
_logger.exception('Could not read job: %s', job_uuid)
raise
return job

def _try_perform_job(self, env, job):
"""Try to perform the job."""

# if the job has been manually set to DONE or PENDING,
# or if something tries to run a job that is not enqueued
# before its execution, stop
if job.state != ENQUEUED:
_logger.warning('job %s is in state %s '
'instead of enqueued in /runjob',
job.uuid, job.state)
return

# TODO: set_started should be done atomically with
# update queue_job set=state=started
# where state=enqueid and id=
job.set_started()
job.store()
http.request.env.cr.commit()

env.cr.commit()
_logger.debug('%s started', job)

job.perform()
job.set_done()
job.store()
http.request.env.cr.commit()
env.cr.commit()
_logger.debug('%s done', job)

@http.route('/queue_job/session', type='http', auth="none")
Expand All @@ -87,10 +58,23 @@ def retry_postpone(job, message, seconds=None):
job.store()
env.cr.commit()

job = self._load_job(env, job_uuid)
if job is None:
# ensure the job to run is in the correct state and lock the record
env.cr.execute(
"SELECT state FROM queue_job "
"WHERE uuid=%s AND state=%s "
"FOR UPDATE",
(job_uuid, ENQUEUED)
)
if not env.cr.fetchone():
_logger.warn(
"was requested to run job %s, but it does not exist, "
"or is not in state %s",
job_uuid, ENQUEUED
)
return ""
env.cr.commit()

job = Job.load(env, job_uuid)
assert job and job.state == ENQUEUED

try:
try:
Expand Down
4 changes: 0 additions & 4 deletions queue_job/exception.py
Expand Up @@ -14,10 +14,6 @@ class NoSuchJobError(JobError):
"""The job does not exist."""


class NotReadableJobError(JobError):
"""The job cannot be read from the storage."""


class FailedJobError(JobError):
"""A job had an error having to be resolved."""

Expand Down
8 changes: 7 additions & 1 deletion queue_job/jobrunner/runner.py
Expand Up @@ -224,8 +224,14 @@ def set_job_pending():
cr.execute(
"UPDATE queue_job SET state=%s, "
"date_enqueued=NULL, date_started=NULL "
"WHERE uuid=%s and state=%s", (PENDING, job_uuid, ENQUEUED)
"WHERE uuid=%s and state=%s "
"RETURNING uuid", (PENDING, job_uuid, ENQUEUED)
)
if cr.fetchone():
_logger.warning(
"state of job %s was reset from %s to %s",
job_uuid, ENQUEUED, PENDING,
)

# TODO: better way to HTTP GET asynchronously (grequest, ...)?
# if this was python3 I would be doing this with
Expand Down

0 comments on commit 490f246

Please sign in to comment.