Skip to content

Commit

Permalink
Merge 10b28db into 1e77b25
Browse files Browse the repository at this point in the history
  • Loading branch information
TDu committed Apr 26, 2018
2 parents 1e77b25 + 10b28db commit cf45cda
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 8 deletions.
41 changes: 35 additions & 6 deletions queue_job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,15 @@ class DelayableRecordset(object):
"""

def __init__(self, recordset, priority=None, eta=None,
max_retries=None, description=None, channel=None):
max_retries=None, description=None, channel=None,
identity_key=None):
self.recordset = recordset
self.priority = priority
self.eta = eta
self.max_retries = max_retries
self.description = description
self.channel = channel
self.identity_key = identity_key

def __getattr__(self, name):
if name in self.recordset:
Expand All @@ -82,7 +84,8 @@ def delay(*args, **kwargs):
max_retries=self.max_retries,
eta=self.eta,
description=self.description,
channel=self.channel)
channel=self.channel,
identity_key=self.identity_key)
return delay

def __str__(self):
Expand Down Expand Up @@ -185,6 +188,11 @@ class Job(object):
The complete name of the channel to use to process the job. If
provided it overrides the one defined on the job's function.
.. attribute::identity_key
A key referencing the job, multiple job with the same key will not
be added to a channel.
"""

@classmethod
Expand Down Expand Up @@ -233,22 +241,38 @@ def load(cls, env, job_uuid):
job_.max_retries = stored.max_retries
if stored.company_id:
job_.company_id = stored.company_id.id
job_.identity_key = stored.identity_key
return job_

@classmethod
def exist_duplicate_job(cls, env, identity_key):
"""Check if a job to be executed with the same key exists."""
jobs = env['queue.job'].search(
[('identity_key', '=', identity_key),
('state', 'in', [PENDING, ENQUEUED, STARTED])],
limit=1
)
return bool(len(jobs))

@classmethod
def enqueue(cls, func, args=None, kwargs=None,
priority=None, eta=None, max_retries=None, description=None,
channel=None):
channel=None, identity_key=None):
"""Create a Job and enqueue it in the queue. Return the job uuid.
This expects the arguments specific to the job to be already extracted
from the ones to pass to the job function.
"""

recordset = func.im_self
env = recordset.env
if identity_key and cls.exist_duplicate_job(env, identity_key):
return
new_job = cls(func=func, args=args,
kwargs=kwargs, priority=priority, eta=eta,
max_retries=max_retries, description=description,
channel=channel)
channel=channel, identity_key=identity_key)
new_job.store()
_logger.debug(
"enqueued %s:%s(*%r, **%r) with uuid: %s",
Expand All @@ -269,7 +293,7 @@ def db_record_from_uuid(env, job_uuid):
def __init__(self, func,
args=None, kwargs=None, priority=None,
eta=None, job_uuid=None, max_retries=None,
description=None, channel=None):
description=None, channel=None, identity_key=None):
""" Create a Job
:param func: function to execute
Expand All @@ -290,6 +314,7 @@ def __init__(self, func,
:param description: human description of the job. If None, description
is computed from the function doc or name
:param channel: The complete channel name to use to process the job.
:param identity_key: A hash to uniquely identify a job.
:param env: Odoo Environment
:type env: :class:`odoo.api.Environment`
"""
Expand Down Expand Up @@ -337,6 +362,8 @@ def __init__(self, func,
self.date_created = datetime.now()
self._description = description

self.identity_key = identity_key

self.date_enqueued = None
self.date_started = None
self.date_done = None
Expand Down Expand Up @@ -399,8 +426,8 @@ def store(self):
'date_started': False,
'date_done': False,
'eta': False,
'identity_key': False,
}

dt_to_string = odoo.fields.Datetime.to_string
if self.date_enqueued:
vals['date_enqueued'] = dt_to_string(self.date_enqueued)
Expand All @@ -410,6 +437,8 @@ def store(self):
vals['date_done'] = dt_to_string(self.date_done)
if self.eta:
vals['eta'] = dt_to_string(self.eta)
if self.identity_key:
vals['identity_key'] = self.identity_key

db_record = self.db_record()
if db_record:
Expand Down
5 changes: 3 additions & 2 deletions queue_job/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def _register_hook(self):
@api.multi
def with_delay(self, priority=None, eta=None,
max_retries=None, description=None,
channel=None):
channel=None, identity_key=None):
""" Return a ``DelayableRecordset``
The returned instance allow to enqueue any method of the recordset's
Expand Down Expand Up @@ -62,4 +62,5 @@ def with_delay(self, priority=None, eta=None,
eta=eta,
max_retries=max_retries,
description=description,
channel=channel)
channel=channel,
identity_key=identity_key)
2 changes: 2 additions & 0 deletions queue_job/models/queue_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ class QueueJob(models.Model):
store=True,
index=True)

identity_key = fields.Char()

@api.multi
def _inverse_channel(self):
self.filtered(lambda a: not a.channel)._compute_channel()
Expand Down
20 changes: 20 additions & 0 deletions test_queue_job/tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,17 @@ def test_job_delay_model_method(self):
(('a',), {'k': 1})
)

def test_job_identity_key(self):
id_key = 'e294e8444453b09d59bdb6efbfec1323'
test_job_1 = Job(self.method,
priority=15,
description="Test I am the first one",
identity_key=id_key)
test_job_1.user_id = 1
test_job_1.store()
job1 = Job.load(self.env, test_job_1.uuid)
self.assertEqual(job1.identity_key, id_key)


class TestJobs(common.TransactionCase):
""" Test jobs on other methods or with different job configuration """
Expand Down Expand Up @@ -380,6 +391,15 @@ def test_job_delay_model_method_multi(self):
self.assertEquals(job_instance.method_name, 'mapped')
self.assertEquals(['test1', 'test2'], job_instance.perform())

def test_job_no_duplicate(self):
""" If a job with same identity key in queue do not add a new one """
id_key = 'e294e8444453b09d59bdb6efbfec1323'
rec1 = self.env['test.queue.job'].create({'name': 'test1'})
job_1 = rec1.with_delay(identity_key=id_key).mapped('name')
self.assertTrue(job_1)
job_2 = rec1.with_delay(identity_key=id_key).mapped('name')
self.assertFalse(job_2)

def test_job_with_mutable_arguments(self):
""" Job with mutable arguments do not mutate on perform() """
delayable = self.env['test.queue.job'].with_delay()
Expand Down

0 comments on commit cf45cda

Please sign in to comment.