Skip to content

Commit

Permalink
Merge ea0b583 into 9948f01
Browse files Browse the repository at this point in the history
  • Loading branch information
TDu committed Jun 28, 2018
2 parents 9948f01 + ea0b583 commit 2c87083
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 11 deletions.
119 changes: 110 additions & 9 deletions queue_job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import inspect
import functools
import hashlib
import logging
import uuid
import sys
Expand Down Expand Up @@ -52,13 +53,15 @@ class DelayableRecordset(object):
"""

def __init__(self, recordset, priority=None, eta=None,
max_retries=None, description=None, channel=None):
max_retries=None, description=None, channel=None,
identity_key=None):
self.recordset = recordset
self.priority = priority
self.eta = eta
self.max_retries = max_retries
self.description = description
self.channel = channel
self.identity_key = identity_key

def __getattr__(self, name):
if name in self.recordset:
Expand All @@ -82,7 +85,8 @@ def delay(*args, **kwargs):
max_retries=self.max_retries,
eta=self.eta,
description=self.description,
channel=self.channel)
channel=self.channel,
identity_key=self.identity_key)
return delay

def __str__(self):
Expand All @@ -97,6 +101,51 @@ def __unicode__(self):
__repr__ = __str__


def identity_exact(job_):
"""Identity function using the model, method and all arguments as key
When used, this identity key will have the effect that when a job should be
created and a pending job with the exact same recordset and arguments, the
second will not be created.
It should be used with the ``identity_key`` argument:
.. python::
from odoo.addons.queue_job.job import identity_exact
# [...]
delayable = self.with_delay(identity_key=identity_exact)
delayable.export_record(force=True)
Alternative identity keys can be built using the various fields of the job.
For example, you could compute a hash using only some arguments of
the job.
.. python::
def identity_example(job_):
hasher = hashlib.sha1()
hasher.update(job_.model_name)
hasher.update(job_.method_name)
hasher.update(str(sorted(job_.recordset.ids)))
hasher.update(unicode(job_.args[1]))
hasher.update(unicode(job_.kwargs.get('foo', '')))
return hasher.hexdigest()
Usually you will probably always want to include at least the name of the
model and method.
"""
hasher = hashlib.sha1()
hasher.update(job_.model_name)
hasher.update(job_.method_name)
hasher.update(str(sorted(job_.recordset.ids)))
hasher.update(unicode(job_.args))
hasher.update(unicode(sorted(job_.kwargs.items())))

return hasher.hexdigest()


class Job(object):
""" A Job is a task to execute.
Expand Down Expand Up @@ -185,21 +234,35 @@ class Job(object):
The complete name of the channel to use to process the job. If
provided it overrides the one defined on the job's function.
"""
.. attribute::identity_key
A key referencing the job, multiple job with the same key will not
be added to a channel if the existing job with the same key is not yet
started or executed. The identity key can be a string or a function
with one parameter - the job - that returns the hash. Read the
documentation of :func:`identity_exact`.
"""
@classmethod
def load(cls, env, job_uuid):
""" Read a job from the Database"""
stored = cls.db_record_from_uuid(env, job_uuid)
if not stored:
raise NoSuchJobError(
'Job %s does no longer exist in the storage.' % job_uuid)
return cls._load_from_db_record(stored)

@classmethod
def _load_from_db_record(cls, job_db_record):
stored = job_db_record
env = job_db_record.env

args = stored.args
kwargs = stored.kwargs
method_name = stored.method_name

model = env[stored.model_name]

recordset = model.browse(stored.record_ids)
method = getattr(recordset, method_name)

Expand All @@ -210,7 +273,8 @@ def load(cls, env, job_uuid):

job_ = cls(method, args=args, kwargs=kwargs,
priority=stored.priority, eta=eta, job_uuid=stored.uuid,
description=stored.name, channel=stored.channel)
description=stored.name, channel=stored.channel,
identity_key=stored.identity_key)

if stored.date_created:
job_.date_created = dt_from_string(stored.date_created)
Expand All @@ -235,20 +299,36 @@ def load(cls, env, job_uuid):
job_.company_id = stored.company_id.id
return job_

def job_record_with_same_identity_key(self):
"""Check if a job to be executed with the same key exists."""
existing = self.env['queue.job'].search(
[('identity_key', '=', self.identity_key),
('state', 'in', [PENDING, ENQUEUED])],
limit=1
)
return existing

@classmethod
def enqueue(cls, func, args=None, kwargs=None,
priority=None, eta=None, max_retries=None, description=None,
channel=None):
"""Create a Job and enqueue it in the queue. Return the job uuid.
channel=None, identity_key=None):
"""Create a Job and enqueue it in the queue. Return the job.
This expects the arguments specific to the job to be already extracted
from the ones to pass to the job function.
If the identity key is the same than the one in a pending job,
no job is created and the existing job is returned
"""
new_job = cls(func=func, args=args,
kwargs=kwargs, priority=priority, eta=eta,
max_retries=max_retries, description=description,
channel=channel)
channel=channel, identity_key=identity_key)
if new_job.identity_key:
existing = new_job.job_record_with_same_identity_key()
if existing:
return Job._load_from_db_record(existing)
new_job.store()
_logger.debug(
"enqueued %s:%s(*%r, **%r) with uuid: %s",
Expand All @@ -269,7 +349,7 @@ def db_record_from_uuid(env, job_uuid):
def __init__(self, func,
args=None, kwargs=None, priority=None,
eta=None, job_uuid=None, max_retries=None,
description=None, channel=None):
description=None, channel=None, identity_key=None):
""" Create a Job
:param func: function to execute
Expand All @@ -290,6 +370,9 @@ def __init__(self, func,
:param description: human description of the job. If None, description
is computed from the function doc or name
:param channel: The complete channel name to use to process the job.
:param identity_key: A hash to uniquely identify a job, or a function
that returns this hash (the function takes the job
as argument)
:param env: Odoo Environment
:type env: :class:`odoo.api.Environment`
"""
Expand Down Expand Up @@ -337,6 +420,15 @@ def __init__(self, func,
self.date_created = datetime.now()
self._description = description

if isinstance(identity_key, basestring):
self._identity_key = identity_key
self._identity_key_func = None
else:
# we'll compute the key on the fly when called
# from the function
self._identity_key = None
self._identity_key_func = identity_key

self.date_enqueued = None
self.date_started = None
self.date_done = None
Expand Down Expand Up @@ -399,8 +491,8 @@ def store(self):
'date_started': False,
'date_done': False,
'eta': False,
'identity_key': False,
}

dt_to_string = odoo.fields.Datetime.to_string
if self.date_enqueued:
vals['date_enqueued'] = dt_to_string(self.date_enqueued)
Expand All @@ -410,6 +502,8 @@ def store(self):
vals['date_done'] = dt_to_string(self.date_done)
if self.eta:
vals['eta'] = dt_to_string(self.eta)
if self.identity_key:
vals['identity_key'] = self.identity_key

db_record = self.db_record()
if db_record:
Expand Down Expand Up @@ -443,6 +537,13 @@ def func(self):
recordset = recordset.sudo(self.user_id)
return getattr(recordset, self.method_name)

@property
def identity_key(self):
if self._identity_key is None:
if self._identity_key_func:
self._identity_key = self._identity_key_func(self)
return self._identity_key

@property
def description(self):
if self._description:
Expand Down
8 changes: 6 additions & 2 deletions queue_job/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def _register_hook(self):
@api.multi
def with_delay(self, priority=None, eta=None,
max_retries=None, description=None,
channel=None):
channel=None, identity_key=None):
""" Return a ``DelayableRecordset``
The returned instance allow to enqueue any method of the recordset's
Expand All @@ -54,6 +54,9 @@ def with_delay(self, priority=None, eta=None,
:param channel: the complete name of the channel to use to process
the function. If specified it overrides the one
defined on the function
:param identity_key: key uniquely identifying the job, if specified
and a job with the same key has not yet been run,
the new job will not be added.
:return: instance of a DelayableRecordset
:rtype: :class:`odoo.addons.queue_job.job.DelayableRecordset`
Expand All @@ -62,4 +65,5 @@ def with_delay(self, priority=None, eta=None,
eta=eta,
max_retries=max_retries,
description=description,
channel=channel)
channel=channel,
identity_key=identity_key)
15 changes: 15 additions & 0 deletions queue_job/models/queue_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,21 @@ class QueueJob(models.Model):
store=True,
index=True)

identity_key = fields.Char()

@api.model_cr
def init(self):
self._cr.execute(
'SELECT indexname FROM pg_indexes WHERE indexname = %s ',
('queue_job_identity_key_state_partial_index',)
)
if not self._cr.fetchone():
self._cr.execute(
"CREATE INDEX queue_job_identity_key_state_partial_index "
"ON queue_job (identity_key) WHERE state in ('pending', "
"'enqueued') AND identity_key IS NOT NULL;"
)

@api.multi
def _inverse_channel(self):
self.filtered(lambda a: not a.channel)._compute_channel()
Expand Down
43 changes: 43 additions & 0 deletions test_queue_job/tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# Copyright 2016 Camptocamp SA
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html)

import hashlib

from datetime import datetime, timedelta
import mock

Expand All @@ -21,6 +23,7 @@
STARTED,
DONE,
FAILED,
identity_exact,
)


Expand Down Expand Up @@ -295,6 +298,36 @@ def test_job_delay_model_method(self):
(('a',), {'k': 1})
)

def test_job_identity_key_str(self):
id_key = 'e294e8444453b09d59bdb6efbfec1323'
test_job_1 = Job(self.method,
priority=15,
description="Test I am the first one",
identity_key=id_key)
test_job_1.user_id = 1
test_job_1.store()
job1 = Job.load(self.env, test_job_1.uuid)
self.assertEqual(job1.identity_key, id_key)

def test_job_identity_key_func_exact(self):
hasher = hashlib.sha1()
hasher.update('test.queue.job')
hasher.update('testing_method')
hasher.update(str(sorted([])))
hasher.update(unicode((1, 'foo')))
hasher.update(unicode(sorted({'bar': 'baz'}.items())))
expected_key = hasher.hexdigest()

test_job_1 = Job(self.method,
args=[1, 'foo'],
kwargs={'bar': 'baz'},
identity_key=identity_exact)
self.assertEqual(test_job_1.identity_key, expected_key)
test_job_1.store()

job1 = Job.load(self.env, test_job_1.uuid)
self.assertEqual(job1.identity_key, expected_key)


class TestJobs(common.TransactionCase):
""" Test jobs on other methods or with different job configuration """
Expand Down Expand Up @@ -380,6 +413,16 @@ def test_job_delay_model_method_multi(self):
self.assertEquals(job_instance.method_name, 'mapped')
self.assertEquals(['test1', 'test2'], job_instance.perform())

def test_job_identity_key_no_duplicate(self):
""" If a job with same identity key in queue do not add a new one """
id_key = 'e294e8444453b09d59bdb6efbfec1323'
rec1 = self.env['test.queue.job'].create({'name': 'test1'})
job_1 = rec1.with_delay(identity_key=id_key).mapped('name')

self.assertTrue(job_1)
job_2 = rec1.with_delay(identity_key=id_key).mapped('name')
self.assertEqual(job_2.uuid, job_1.uuid)

def test_job_with_mutable_arguments(self):
""" Job with mutable arguments do not mutate on perform() """
delayable = self.env['test.queue.job'].with_delay()
Expand Down

0 comments on commit 2c87083

Please sign in to comment.