From dc667ff30acb7f8c3c8131e9099626d86ab034e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul=20=28ACSONE=29?= Date: Mon, 2 Jan 2017 21:59:18 +0100 Subject: [PATCH 1/2] [IMP] sequential channels from 9.0 --- queue_job/jobrunner/channels.py | 245 ++++++++++++++++++++++++++++---- 1 file changed, 217 insertions(+), 28 deletions(-) diff --git a/queue_job/jobrunner/channels.py b/queue_job/jobrunner/channels.py index 590dd4da5..1e4d25a84 100644 --- a/queue_job/jobrunner/channels.py +++ b/queue_job/jobrunner/channels.py @@ -127,7 +127,7 @@ class ChannelJob(object): Channel jobs are comparable according to the following rules: * jobs with an eta come before all other jobs * then jobs with a smaller eta come first - * then jobs with smaller priority come first + * then jobs with a smaller priority come first * then jobs with a smaller creation time come first * then jobs with a smaller sequence come first @@ -182,6 +182,11 @@ class ChannelJob(object): False >>> j0 == j0 True + + Comparison excluding eta: + + >>> j1.cmp_no_eta(j2) + -1 """ def __init__(self, db_name, channel, uuid, @@ -203,6 +208,11 @@ def __eq__(self, other): def __hash__(self): return id(self) + def cmp_no_eta(self, other): + return (cmp(self.priority, other.priority) or + cmp(self.date_created, other.date_created) or + cmp(self.seq, other.seq)) + def __cmp__(self, other): if self.eta and not other.eta: return -1 @@ -210,14 +220,14 @@ def __cmp__(self, other): return 1 else: return (cmp(self.eta, other.eta) or - cmp(self.priority, other.priority) or - cmp(self.date_created, other.date_created) or - cmp(self.seq, other.seq)) + self.cmp_no_eta(other)) class ChannelQueue(object): - """A channel queue is a priority queue for jobs that returns - jobs with a past ETA first. + """A channel queue is a priority queue for jobs. + + Jobs with an eta are set aside until their eta is past due, at + which point they start competing normally with other jobs. >>> q = ChannelQueue() >>> j1 = ChannelJob(None, None, 1, @@ -257,11 +267,49 @@ class ChannelQueue(object): >>> q.get_wakeup_time() 0 >>> q.pop(now=13) + + Observe that job with past eta still run after jobs with higher priority. + + >>> j4 = ChannelJob(None, None, 4, + ... seq=0, date_created=4, priority=10, eta=20) + >>> j5 = ChannelJob(None, None, 5, + ... seq=0, date_created=5, priority=1, eta=None) + >>> q.add(j4) + >>> q.add(j5) + >>> q.get_wakeup_time() + 20 + >>> q.pop(21) + + >>> q.get_wakeup_time() + 0 + >>> q.pop(22) + + + Test a sequential queue. + + >>> sq = ChannelQueue(sequential=True) + >>> j6 = ChannelJob(None, None, 6, + ... seq=0, date_created=6, priority=1, eta=None) + >>> j7 = ChannelJob(None, None, 7, + ... seq=0, date_created=7, priority=1, eta=20) + >>> j8 = ChannelJob(None, None, 8, + ... seq=0, date_created=8, priority=1, eta=None) + >>> sq.add(j6) + >>> sq.add(j7) + >>> sq.add(j8) + >>> sq.pop(10) + + >>> sq.pop(15) + >>> sq.pop(20) + + >>> sq.pop(30) + """ - def __init__(self): + def __init__(self, sequential=False): self._queue = PriorityQueue() self._eta_queue = PriorityQueue() + self.sequential = sequential def __len__(self): return len(self._eta_queue) + len(self._queue) @@ -280,10 +328,19 @@ def remove(self, job): self._queue.remove(job) def pop(self, now): - if len(self._eta_queue) and self._eta_queue[0].eta <= now: - return self._eta_queue.pop() - else: - return self._queue.pop() + while len(self._eta_queue) and self._eta_queue[0].eta <= now: + eta_job = self._eta_queue.pop() + eta_job.eta = None + self._queue.add(eta_job) + if self.sequential and len(self._eta_queue) and len(self._queue): + eta_job = self._eta_queue[0] + job = self._queue[0] + if eta_job.cmp_no_eta(job) < 0: + # eta ignored, the job with eta has higher priority + # than the job without eta; since it's a sequential + # queue we wait until eta + return + return self._queue.pop() def get_wakeup_time(self, wakeup_time=0): if len(self._eta_queue): @@ -348,13 +405,21 @@ def __init__(self, name, parent, capacity=None, sequential=False, if self.parent: self.parent.children[name] = self self.children = {} - self.capacity = capacity - self.sequential = sequential - self.throttle = throttle # seconds self._queue = ChannelQueue() self._running = SafeSet() self._failed = SafeSet() self._pause_until = 0 # utc seconds since the epoch + self.capacity = capacity + self.throttle = throttle # seconds + self.sequential = sequential + + @property + def sequential(self): + return self._queue.sequential + + @sequential.setter + def sequential(self, val): + self._queue.sequential = val def configure(self, config): """ Configure a channel from a dictionary. @@ -363,6 +428,7 @@ def configure(self, config): * capacity * sequential + * throttle """ assert self.fullname.endswith(config['name']) self.capacity = config.get('capacity', None) @@ -447,6 +513,15 @@ def set_failed(self, job): _logger.debug("job %s marked failed in channel %s", job.uuid, self) + def has_capacity(self): + if self.sequential and self._failed: + # a sequential queue blocks on failed jobs + return False + if not self.capacity: + # unlimited capacity + return True + return len(self._running) < self.capacity + def get_jobs_to_run(self, now): """ Get jobs that are ready to run in channel. @@ -468,25 +543,20 @@ def get_jobs_to_run(self, now): for job in child.get_jobs_to_run(now): self._queue.add(job) # is this channel paused? - if self.throttle: + if self.throttle and self._pause_until: if now < self._pause_until: - if not self.capacity or len(self._running) < self.capacity: - _logger.debug("channel %s paused because of throttle " - "delay between jobs", self) + if self.has_capacity(): + _logger.debug("channel %s paused until %s because " + "of throttle delay between jobs", + self, self._pause_until) return else: # unpause, this is important to avoid perpetual wakeup # while the channel is at full capacity self._pause_until = 0 - # sequential channels block when there are failed jobs - # TODO: this is probably not sufficient to ensure - # sequentiality because of the behaviour in presence - # of jobs with eta; plus: check if there are no - # race conditions. - if self.sequential and len(self._failed): - return + _logger.debug("channel %s unpaused at %s", self, now) # yield jobs that are ready to run, while we have capacity - while not self.capacity or len(self._running) < self.capacity: + while self.has_capacity(): job = self._queue.pop(now) if not job: return @@ -496,10 +566,12 @@ def get_jobs_to_run(self, now): yield job if self.throttle: self._pause_until = now + self.throttle + _logger.debug("pausing channel %s until %s", + self, self._pause_until) return def get_wakeup_time(self, wakeup_time=0): - if self.capacity and len(self._running) >= self.capacity: + if not self.has_capacity(): # this channel is full, do not request timed wakeup, as # a notification will wakeup the runner when a job finishes return wakeup_time @@ -613,6 +685,115 @@ class ChannelManager(object): [] >>> cm.get_wakeup_time() 104 + + Let's test throttling in combination with a queue reaching full capacity. + + >>> cm = ChannelManager() + >>> cm.simple_configure('root:4,T:2:throttle=2') + >>> cm.notify(db, 'T', 'T1', 1, 0, 10, None, 'pending') + >>> cm.notify(db, 'T', 'T2', 2, 0, 10, None, 'pending') + >>> cm.notify(db, 'T', 'T3', 3, 0, 10, None, 'pending') + + >>> pp(list(cm.get_jobs_to_run(now=100))) + [] + >>> pp(list(cm.get_jobs_to_run(now=102))) + [] + + Channel is now full, so no job to run even though throttling + delay is over. + + >>> pp(list(cm.get_jobs_to_run(now=103))) + [] + >>> cm.get_wakeup_time() # no wakeup time, since queue is full + 0 + >>> pp(list(cm.get_jobs_to_run(now=104))) + [] + >>> cm.get_wakeup_time() # queue is still full + 0 + + >>> cm.notify(db, 'T', 'T1', 1, 0, 10, None, 'done') + >>> pp(list(cm.get_jobs_to_run(now=105))) + [] + >>> cm.get_wakeup_time() # queue is full + 0 + >>> cm.notify(db, 'T', 'T2', 1, 0, 10, None, 'done') + >>> cm.get_wakeup_time() + 107 + + Test wakeup time behaviour in presence of eta. + + >>> cm = ChannelManager() + >>> cm.simple_configure('root:4,E:1') + >>> cm.notify(db, 'E', 'E1', 1, 0, 10, None, 'pending') + >>> cm.notify(db, 'E', 'E2', 2, 0, 10, None, 'pending') + >>> cm.notify(db, 'E', 'E3', 3, 0, 10, None, 'pending') + + >>> pp(list(cm.get_jobs_to_run(now=100))) + [] + >>> pp(list(cm.get_jobs_to_run(now=101))) + [] + >>> cm.notify(db, 'E', 'E1', 1, 0, 10, 105, 'pending') + >>> cm.get_wakeup_time() # wakeup at eta + 105 + >>> pp(list(cm.get_jobs_to_run(now=102))) # but there is capacity + [] + >>> pp(list(cm.get_jobs_to_run(now=106))) # no capacity anymore + [] + >>> cm.get_wakeup_time() # no timed wakeup because no capacity + 0 + >>> cm.notify(db, 'E', 'E2', 1, 0, 10, None, 'done') + >>> cm.get_wakeup_time() + 105 + >>> pp(list(cm.get_jobs_to_run(now=107))) # no capacity anymore + [] + >>> cm.get_wakeup_time() + 0 + + Test wakeup time behaviour in a sequential queue. + + >>> cm = ChannelManager() + >>> cm.simple_configure('root:4,S:1:sequential') + >>> cm.notify(db, 'S', 'S1', 1, 0, 10, None, 'pending') + >>> cm.notify(db, 'S', 'S2', 2, 0, 10, None, 'pending') + >>> cm.notify(db, 'S', 'S3', 3, 0, 10, None, 'pending') + + >>> pp(list(cm.get_jobs_to_run(now=100))) + [] + >>> cm.notify(db, 'S', 'S1', 1, 0, 10, None, 'failed') + >>> pp(list(cm.get_jobs_to_run(now=101))) + [] + >>> cm.notify(db, 'S', 'S2', 2, 0, 10, 105, 'pending') + >>> pp(list(cm.get_jobs_to_run(now=102))) + [] + + No wakeup time because due to eta, because the sequential queue + is waiting for a failed job. + + >>> cm.get_wakeup_time() + 0 + >>> cm.notify(db, 'S', 'S1', 1, 0, 10, None, 'pending') + >>> cm.get_wakeup_time() + 105 + >>> pp(list(cm.get_jobs_to_run(now=102))) + [] + >>> pp(list(cm.get_jobs_to_run(now=103))) + [] + >>> cm.notify(db, 'S', 'S1', 1, 0, 10, None, 'done') + + At this stage, we have S2 with an eta of 105 and since the + queue is sequential, we wait for it. + + >>> pp(list(cm.get_jobs_to_run(now=103))) + [] + >>> pp(list(cm.get_jobs_to_run(now=105))) + [] + >>> cm.notify(db, 'S', 'S2', 2, 0, 10, 105, 'done') + >>> pp(list(cm.get_jobs_to_run(now=105))) + [] + >>> cm.notify(db, 'S', 'S3', 3, 0, 10, None, 'done') + >>> pp(list(cm.get_jobs_to_run(now=105))) + [] + """ def __init__(self): @@ -720,14 +901,22 @@ def simple_configure(self, config_string): >>> c = cm.get_channel_by_name('root') >>> c.capacity 1 - >>> cm.simple_configure('root:4,autosub.sub:2') + >>> cm.simple_configure('root:4,autosub.sub:2,seq:1:sequential') >>> cm.get_channel_by_name('root').capacity 4 + >>> cm.get_channel_by_name('root').sequential + False >>> cm.get_channel_by_name('root.autosub').capacity >>> cm.get_channel_by_name('root.autosub.sub').capacity 2 + >>> cm.get_channel_by_name('root.autosub.sub').sequential + False >>> cm.get_channel_by_name('autosub.sub').capacity 2 + >>> cm.get_channel_by_name('seq').capacity + 1 + >>> cm.get_channel_by_name('seq').sequential + True """ for config in ChannelManager.parse_simple_config(config_string): self.get_channel_from_config(config) From 1a602945cd2245cd02e2ada3d76865cb9a443d1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul=20=28ACSONE=29?= Date: Fri, 6 Jan 2017 19:36:27 +0100 Subject: [PATCH 2/2] [FIX] pep8 plus trivial doctest fix --- queue_job/jobrunner/channels.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/queue_job/jobrunner/channels.py b/queue_job/jobrunner/channels.py index 1e4d25a84..85c73a5f3 100644 --- a/queue_job/jobrunner/channels.py +++ b/queue_job/jobrunner/channels.py @@ -590,14 +590,16 @@ def get_wakeup_time(self, wakeup_time=0): wakeup_time = child.get_wakeup_time(wakeup_time) return wakeup_time + def split_strip(s, sep, maxsplit=-1): """Split string and strip each component. - >>> ChannelManager.split_strip("foo: bar baz\\n: fred:", ":") + >>> split_strip("foo: bar baz\\n: fred:", ":") ['foo', 'bar baz', 'fred', ''] """ return [x.strip() for x in s.split(sep, maxsplit)] + class ChannelManager(object): """ High level interface for channels