From 10b28db24718c4ffb260b84760d0ccaba8c2d3ab Mon Sep 17 00:00:00 2001 From: Thierry Ducrest Date: Mon, 19 Mar 2018 17:58:16 +0100 Subject: [PATCH 1/2] [10.0] Add identity key on job to allow limiting redundant execution --- queue_job/job.py | 41 +++++++++++++++++++++++++++----- queue_job/models/base.py | 5 ++-- queue_job/models/queue_job.py | 2 ++ test_queue_job/tests/test_job.py | 20 ++++++++++++++++ 4 files changed, 60 insertions(+), 8 deletions(-) diff --git a/queue_job/job.py b/queue_job/job.py index b34a6f6b4..ad90bd69f 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -52,13 +52,15 @@ class DelayableRecordset(object): """ def __init__(self, recordset, priority=None, eta=None, - max_retries=None, description=None, channel=None): + max_retries=None, description=None, channel=None, + identity_key=None): self.recordset = recordset self.priority = priority self.eta = eta self.max_retries = max_retries self.description = description self.channel = channel + self.identity_key = identity_key def __getattr__(self, name): if name in self.recordset: @@ -82,7 +84,8 @@ def delay(*args, **kwargs): max_retries=self.max_retries, eta=self.eta, description=self.description, - channel=self.channel) + channel=self.channel, + identity_key=self.identity_key) return delay def __str__(self): @@ -185,6 +188,11 @@ class Job(object): The complete name of the channel to use to process the job. If provided it overrides the one defined on the job's function. + .. attribute::identity_key + + A key referencing the job, multiple job with the same key will not + be added to a channel. + """ @classmethod @@ -233,22 +241,38 @@ def load(cls, env, job_uuid): job_.max_retries = stored.max_retries if stored.company_id: job_.company_id = stored.company_id.id + job_.identity_key = stored.identity_key return job_ + @classmethod + def exist_duplicate_job(cls, env, identity_key): + """Check if a job to be executed with the same key exists.""" + jobs = env['queue.job'].search( + [('identity_key', '=', identity_key), + ('state', 'in', [PENDING, ENQUEUED, STARTED])], + limit=1 + ) + return bool(len(jobs)) + @classmethod def enqueue(cls, func, args=None, kwargs=None, priority=None, eta=None, max_retries=None, description=None, - channel=None): + channel=None, identity_key=None): """Create a Job and enqueue it in the queue. Return the job uuid. This expects the arguments specific to the job to be already extracted from the ones to pass to the job function. """ + + recordset = func.im_self + env = recordset.env + if identity_key and cls.exist_duplicate_job(env, identity_key): + return new_job = cls(func=func, args=args, kwargs=kwargs, priority=priority, eta=eta, max_retries=max_retries, description=description, - channel=channel) + channel=channel, identity_key=identity_key) new_job.store() _logger.debug( "enqueued %s:%s(*%r, **%r) with uuid: %s", @@ -269,7 +293,7 @@ def db_record_from_uuid(env, job_uuid): def __init__(self, func, args=None, kwargs=None, priority=None, eta=None, job_uuid=None, max_retries=None, - description=None, channel=None): + description=None, channel=None, identity_key=None): """ Create a Job :param func: function to execute @@ -290,6 +314,7 @@ def __init__(self, func, :param description: human description of the job. If None, description is computed from the function doc or name :param channel: The complete channel name to use to process the job. + :param identity_key: A hash to uniquely identify a job. :param env: Odoo Environment :type env: :class:`odoo.api.Environment` """ @@ -337,6 +362,8 @@ def __init__(self, func, self.date_created = datetime.now() self._description = description + self.identity_key = identity_key + self.date_enqueued = None self.date_started = None self.date_done = None @@ -399,8 +426,8 @@ def store(self): 'date_started': False, 'date_done': False, 'eta': False, + 'identity_key': False, } - dt_to_string = odoo.fields.Datetime.to_string if self.date_enqueued: vals['date_enqueued'] = dt_to_string(self.date_enqueued) @@ -410,6 +437,8 @@ def store(self): vals['date_done'] = dt_to_string(self.date_done) if self.eta: vals['eta'] = dt_to_string(self.eta) + if self.identity_key: + vals['identity_key'] = self.identity_key db_record = self.db_record() if db_record: diff --git a/queue_job/models/base.py b/queue_job/models/base.py index 9bb5c0d44..fa8a4ad37 100644 --- a/queue_job/models/base.py +++ b/queue_job/models/base.py @@ -26,7 +26,7 @@ def _register_hook(self): @api.multi def with_delay(self, priority=None, eta=None, max_retries=None, description=None, - channel=None): + channel=None, identity_key=None): """ Return a ``DelayableRecordset`` The returned instance allow to enqueue any method of the recordset's @@ -60,4 +60,5 @@ def with_delay(self, priority=None, eta=None, eta=eta, max_retries=max_retries, description=description, - channel=channel) + channel=channel, + identity_key=identity_key) diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 961dd7dff..e07aeca98 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -83,6 +83,8 @@ class QueueJob(models.Model): store=True, index=True) + identity_key = fields.Char() + @api.multi def _inverse_channel(self): self.filtered(lambda a: not a.channel)._compute_channel() diff --git a/test_queue_job/tests/test_job.py b/test_queue_job/tests/test_job.py index 2d19d7916..3422800d2 100644 --- a/test_queue_job/tests/test_job.py +++ b/test_queue_job/tests/test_job.py @@ -295,6 +295,17 @@ def test_job_delay_model_method(self): (('a',), {'k': 1}) ) + def test_job_identity_key(self): + id_key = 'e294e8444453b09d59bdb6efbfec1323' + test_job_1 = Job(self.method, + priority=15, + description="Test I am the first one", + identity_key=id_key) + test_job_1.user_id = 1 + test_job_1.store() + job1 = Job.load(self.env, test_job_1.uuid) + self.assertEqual(job1.identity_key, id_key) + class TestJobs(common.TransactionCase): """ Test jobs on other methods or with different job configuration """ @@ -380,6 +391,15 @@ def test_job_delay_model_method_multi(self): self.assertEquals(job_instance.method_name, 'mapped') self.assertEquals(['test1', 'test2'], job_instance.perform()) + def test_job_no_duplicate(self): + """ If a job with same identity key in queue do not add a new one """ + id_key = 'e294e8444453b09d59bdb6efbfec1323' + rec1 = self.env['test.queue.job'].create({'name': 'test1'}) + job_1 = rec1.with_delay(identity_key=id_key).mapped('name') + self.assertTrue(job_1) + job_2 = rec1.with_delay(identity_key=id_key).mapped('name') + self.assertFalse(job_2) + def test_job_with_mutable_arguments(self): """ Job with mutable arguments do not mutate on perform() """ delayable = self.env['test.queue.job'].with_delay() From 5d902f91b5d8c7eaa01823cf5ea42a9922caf14d Mon Sep 17 00:00:00 2001 From: Thierry Ducrest Date: Thu, 24 May 2018 15:33:50 +0200 Subject: [PATCH 2/2] fixup! [10.0] Add identity key on job to allow limiting redundant execution --- queue_job/job.py | 5 +++-- queue_job/models/base.py | 3 +++ queue_job/models/queue_job.py | 13 +++++++++++++ 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/queue_job/job.py b/queue_job/job.py index ad90bd69f..e045312d0 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -191,7 +191,8 @@ class Job(object): .. attribute::identity_key A key referencing the job, multiple job with the same key will not - be added to a channel. + be added to a channel if the existing job with the same key is not yet + started or executed. """ @@ -249,7 +250,7 @@ def exist_duplicate_job(cls, env, identity_key): """Check if a job to be executed with the same key exists.""" jobs = env['queue.job'].search( [('identity_key', '=', identity_key), - ('state', 'in', [PENDING, ENQUEUED, STARTED])], + ('state', 'in', [PENDING, ENQUEUED])], limit=1 ) return bool(len(jobs)) diff --git a/queue_job/models/base.py b/queue_job/models/base.py index fa8a4ad37..f7cf49dee 100644 --- a/queue_job/models/base.py +++ b/queue_job/models/base.py @@ -52,6 +52,9 @@ def with_delay(self, priority=None, eta=None, :param channel: the complete name of the channel to use to process the function. If specified it overrides the one defined on the function + :param identity_key: key uniquely identifying the job, if specified + and a job with the same key has not yet been run, + the new job will not be added. :return: instance of a DelayableRecordset :rtype: :class:`odoo.addons.queue_job.job.DelayableRecordset` diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index e07aeca98..10dcd565c 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -85,6 +85,19 @@ class QueueJob(models.Model): identity_key = fields.Char() + @api.model_cr + def init(self): + self._cr.execute( + 'SELECT indexname FROM pg_indexes WHERE indexname = %s ', + ('queue_job_identity_key_state_partial_index',) + ) + if not self._cr.fetchone(): + self._cr.execute( + "CREATE INDEX queue_job_identity_key_state_partial_index " + "ON queue_job (identity_key) WHERE state in ('pending', " + "'enqueued');" + ) + @api.multi def _inverse_channel(self): self.filtered(lambda a: not a.channel)._compute_channel()