diff --git a/queue_job/__manifest__.py b/queue_job/__manifest__.py index f6cf427ba5..6ead595e93 100644 --- a/queue_job/__manifest__.py +++ b/queue_job/__manifest__.py @@ -15,6 +15,7 @@ "views/queue_job_views.xml", "views/queue_job_channel_views.xml", "views/queue_job_function_views.xml", + "wizards/queue_job_queue_jobs_pause_channel_views.xml", "wizards/queue_jobs_to_done_views.xml", "wizards/queue_jobs_to_cancelled_views.xml", "wizards/queue_requeue_job_views.xml", diff --git a/queue_job/data/queue_data.xml b/queue_job/data/queue_data.xml index 55bcb3f5fc..bd32db3a02 100644 --- a/queue_job/data/queue_data.xml +++ b/queue_job/data/queue_data.xml @@ -35,5 +35,9 @@ root + + pause + + diff --git a/queue_job/job.py b/queue_job/job.py index 920a8a0781..3d4689d291 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -37,6 +37,7 @@ DEFAULT_PRIORITY = 10 # used by the PriorityQueue to sort the jobs DEFAULT_MAX_RETRIES = 5 RETRY_INTERVAL = 10 * 60 # seconds +PAUSE_CHANNEL = "root.pause" _logger = logging.getLogger(__name__) @@ -624,6 +625,8 @@ def _store_values(self, create=False): vals["eta"] = self.eta if self.identity_key: vals["identity_key"] = self.identity_key + if self.channel: + vals["channel"] = self.channel dependencies = { "depends_on": [parent.uuid for parent in self.depends_on], @@ -840,6 +843,9 @@ def set_failed(self, **kw): if v is not None: setattr(self, k, v) + def change_job_channel(self, to_channel): + self.channel = to_channel + def __repr__(self): return "" % (self.uuid, self.priority) diff --git a/queue_job/jobrunner/channels.py b/queue_job/jobrunner/channels.py index c06f7b49d8..0d343f0caa 100644 --- a/queue_job/jobrunner/channels.py +++ b/queue_job/jobrunner/channels.py @@ -7,7 +7,16 @@ from weakref import WeakValueDictionary from ..exception import ChannelNotFound -from ..job import CANCELLED, DONE, ENQUEUED, FAILED, PENDING, STARTED, WAIT_DEPENDENCIES +from ..job import ( + CANCELLED, + DONE, + ENQUEUED, + FAILED, + PAUSE_CHANNEL, + PENDING, + STARTED, + WAIT_DEPENDENCIES, +) NOT_DONE = (WAIT_DEPENDENCIES, PENDING, ENQUEUED, STARTED, FAILED) @@ -451,7 +460,13 @@ def get_subchannel_by_name(self, subchannel_name): return self.children.get(subchannel_name) def __str__(self): - capacity = "∞" if self.capacity is None else str(self.capacity) + if not self.capacity: + if self.name == PAUSE_CHANNEL: + capacity = "0" + else: + capacity = "∞" + else: + capacity = str(self.capacity) return "%s(C:%s,Q:%d,R:%d,F:%d)" % ( self.fullname, capacity, @@ -517,7 +532,7 @@ def has_capacity(self): if self.sequential and self._failed: # a sequential queue blocks on failed jobs return False - if not self.capacity: + if not self.capacity and self.fullname != PAUSE_CHANNEL: # unlimited capacity return True return len(self._running) < self.capacity @@ -873,6 +888,10 @@ def parse_simple_config(cls, config_string): capacity = config_items[1] try: config["capacity"] = int(capacity) + if name == PAUSE_CHANNEL and config["capacity"] != 0: + raise Exception( + "Channel 'pause' must be capacity equal to zero" + ) except Exception as ex: raise ValueError( "Invalid channel config %s: " @@ -896,7 +915,10 @@ def parse_simple_config(cls, config_string): ) config[k] = v else: - config["capacity"] = 1 + if name == PAUSE_CHANNEL: + config["capacity"] = 0 + else: + config["capacity"] = 1 res.append(config) return res diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 8af7468b7c..f1b2eab7a6 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -18,6 +18,7 @@ CANCELLED, DONE, FAILED, + PAUSE_CHANNEL, PENDING, STARTED, STATES, @@ -506,3 +507,31 @@ def _test_job(self, failure_rate=0): _logger.info("Running test job.") if random.random() <= failure_rate: raise JobError("Job failed") + + def _change_job_pause_channel(self): + """Change the state of the `Job` object + Changing the channel of the Job will automatically change some fields + (date, result, ...). + """ + for record in self: + job_ = Job.load(record.env, record.uuid) + to_channel = "" + if record.channel == PAUSE_CHANNEL: + # Get original channel + to_channel = record.job_function_id.channel + record.channel = record.job_function_id.channel + else: + to_channel = PAUSE_CHANNEL + record.channel = to_channel + job_.change_job_channel(to_channel) + job_.store() + + def _validate_state_jobs(self): + if any(job.state in ("done", "started") for job in self): + raise exceptions.ValidationError( + _("Some selected jobs are in invalid states to pause.") + ) + + def set_channel_pause(self): + self._change_job_pause_channel() + return True diff --git a/queue_job/security/ir.model.access.csv b/queue_job/security/ir.model.access.csv index 634daf8ede..5a2dfe3b17 100644 --- a/queue_job/security/ir.model.access.csv +++ b/queue_job/security/ir.model.access.csv @@ -5,3 +5,4 @@ access_queue_job_channel_manager,queue job channel manager,queue_job.model_queue access_queue_requeue_job,queue requeue job manager,queue_job.model_queue_requeue_job,queue_job.group_queue_job_manager,1,1,1,1 access_queue_jobs_to_done,queue jobs to done manager,queue_job.model_queue_jobs_to_done,queue_job.group_queue_job_manager,1,1,1,1 access_queue_jobs_to_cancelled,queue jobs to cancelled manager,queue_job.model_queue_jobs_to_cancelled,queue_job.group_queue_job_manager,1,1,1,1 +access_queue_channel_pause,access_queue_channel_pause,model_queue_channel_pause,queue_job.group_queue_job_manager,1,1,1,1 diff --git a/queue_job/wizards/__init__.py b/queue_job/wizards/__init__.py index 06c0bd8572..ea8b0767bf 100644 --- a/queue_job/wizards/__init__.py +++ b/queue_job/wizards/__init__.py @@ -1,3 +1,4 @@ from . import queue_requeue_job from . import queue_jobs_to_done from . import queue_jobs_to_cancelled +from . import queue_jobs_pause_channel diff --git a/queue_job/wizards/queue_job_queue_jobs_pause_channel_views.xml b/queue_job/wizards/queue_job_queue_jobs_pause_channel_views.xml new file mode 100644 index 0000000000..ff180cbe82 --- /dev/null +++ b/queue_job/wizards/queue_job_queue_jobs_pause_channel_views.xml @@ -0,0 +1,34 @@ + + + + + Pause Jobs + queue.channel.pause + +
+ + + +
+
+
+
+
+ + + Pause/Resume Jobs + queue.channel.pause + form + + new + + + +
diff --git a/queue_job/wizards/queue_jobs_pause_channel.py b/queue_job/wizards/queue_jobs_pause_channel.py new file mode 100644 index 0000000000..2f82cd377b --- /dev/null +++ b/queue_job/wizards/queue_jobs_pause_channel.py @@ -0,0 +1,22 @@ +from odoo import fields, models + + +class QueueChannelPause(models.TransientModel): + _name = "queue.channel.pause" + _description = "Wizard to change jobs to channel paused" + + job_ids = fields.Many2many( + comodel_name="queue.job", string="Jobs", default=lambda r: r._default_job_ids() + ) + + def _default_job_ids(self): + res = False + context = self.env.context + if context.get("active_model") == "queue.job" and context.get("active_ids"): + res = context["active_ids"] + return res + + def set_channel_paused(self): + self.job_ids._validate_state_jobs() + self.job_ids.set_channel_pause() + return {"type": "ir.actions.act_window_close"}