Skip to content

Commit

Permalink
Merge pull request #6 from acsone/10.0-seq-channels-sbi
Browse files Browse the repository at this point in the history
10.0 seq channels sbi
  • Loading branch information
guewen committed Jan 6, 2017
2 parents e771761 + 1a60294 commit b47c9a6
Showing 1 changed file with 220 additions and 29 deletions.
249 changes: 220 additions & 29 deletions queue_job/jobrunner/channels.py
Expand Up @@ -127,7 +127,7 @@ class ChannelJob(object):
Channel jobs are comparable according to the following rules:
* jobs with an eta come before all other jobs
* then jobs with a smaller eta come first
* then jobs with smaller priority come first
* then jobs with a smaller priority come first
* then jobs with a smaller creation time come first
* then jobs with a smaller sequence come first
Expand Down Expand Up @@ -182,6 +182,11 @@ class ChannelJob(object):
False
>>> j0 == j0
True
Comparison excluding eta:
>>> j1.cmp_no_eta(j2)
-1
"""

def __init__(self, db_name, channel, uuid,
Expand All @@ -203,21 +208,26 @@ def __eq__(self, other):
def __hash__(self):
return id(self)

def cmp_no_eta(self, other):
return (cmp(self.priority, other.priority) or
cmp(self.date_created, other.date_created) or
cmp(self.seq, other.seq))

def __cmp__(self, other):
if self.eta and not other.eta:
return -1
elif not self.eta and other.eta:
return 1
else:
return (cmp(self.eta, other.eta) or
cmp(self.priority, other.priority) or
cmp(self.date_created, other.date_created) or
cmp(self.seq, other.seq))
self.cmp_no_eta(other))


class ChannelQueue(object):
"""A channel queue is a priority queue for jobs that returns
jobs with a past ETA first.
"""A channel queue is a priority queue for jobs.
Jobs with an eta are set aside until their eta is past due, at
which point they start competing normally with other jobs.
>>> q = ChannelQueue()
>>> j1 = ChannelJob(None, None, 1,
Expand Down Expand Up @@ -257,11 +267,49 @@ class ChannelQueue(object):
>>> q.get_wakeup_time()
0
>>> q.pop(now=13)
Observe that job with past eta still run after jobs with higher priority.
>>> j4 = ChannelJob(None, None, 4,
... seq=0, date_created=4, priority=10, eta=20)
>>> j5 = ChannelJob(None, None, 5,
... seq=0, date_created=5, priority=1, eta=None)
>>> q.add(j4)
>>> q.add(j5)
>>> q.get_wakeup_time()
20
>>> q.pop(21)
<ChannelJob 5>
>>> q.get_wakeup_time()
0
>>> q.pop(22)
<ChannelJob 4>
Test a sequential queue.
>>> sq = ChannelQueue(sequential=True)
>>> j6 = ChannelJob(None, None, 6,
... seq=0, date_created=6, priority=1, eta=None)
>>> j7 = ChannelJob(None, None, 7,
... seq=0, date_created=7, priority=1, eta=20)
>>> j8 = ChannelJob(None, None, 8,
... seq=0, date_created=8, priority=1, eta=None)
>>> sq.add(j6)
>>> sq.add(j7)
>>> sq.add(j8)
>>> sq.pop(10)
<ChannelJob 6>
>>> sq.pop(15)
>>> sq.pop(20)
<ChannelJob 7>
>>> sq.pop(30)
<ChannelJob 8>
"""

def __init__(self):
def __init__(self, sequential=False):
self._queue = PriorityQueue()
self._eta_queue = PriorityQueue()
self.sequential = sequential

def __len__(self):
return len(self._eta_queue) + len(self._queue)
Expand All @@ -280,10 +328,19 @@ def remove(self, job):
self._queue.remove(job)

def pop(self, now):
if len(self._eta_queue) and self._eta_queue[0].eta <= now:
return self._eta_queue.pop()
else:
return self._queue.pop()
while len(self._eta_queue) and self._eta_queue[0].eta <= now:
eta_job = self._eta_queue.pop()
eta_job.eta = None
self._queue.add(eta_job)
if self.sequential and len(self._eta_queue) and len(self._queue):
eta_job = self._eta_queue[0]
job = self._queue[0]
if eta_job.cmp_no_eta(job) < 0:
# eta ignored, the job with eta has higher priority
# than the job without eta; since it's a sequential
# queue we wait until eta
return
return self._queue.pop()

def get_wakeup_time(self, wakeup_time=0):
if len(self._eta_queue):
Expand Down Expand Up @@ -348,13 +405,21 @@ def __init__(self, name, parent, capacity=None, sequential=False,
if self.parent:
self.parent.children[name] = self
self.children = {}
self.capacity = capacity
self.sequential = sequential
self.throttle = throttle # seconds
self._queue = ChannelQueue()
self._running = SafeSet()
self._failed = SafeSet()
self._pause_until = 0 # utc seconds since the epoch
self.capacity = capacity
self.throttle = throttle # seconds
self.sequential = sequential

@property
def sequential(self):
return self._queue.sequential

@sequential.setter
def sequential(self, val):
self._queue.sequential = val

def configure(self, config):
""" Configure a channel from a dictionary.
Expand All @@ -363,6 +428,7 @@ def configure(self, config):
* capacity
* sequential
* throttle
"""
assert self.fullname.endswith(config['name'])
self.capacity = config.get('capacity', None)
Expand Down Expand Up @@ -447,6 +513,15 @@ def set_failed(self, job):
_logger.debug("job %s marked failed in channel %s",
job.uuid, self)

def has_capacity(self):
if self.sequential and self._failed:
# a sequential queue blocks on failed jobs
return False
if not self.capacity:
# unlimited capacity
return True
return len(self._running) < self.capacity

def get_jobs_to_run(self, now):
""" Get jobs that are ready to run in channel.
Expand All @@ -468,25 +543,20 @@ def get_jobs_to_run(self, now):
for job in child.get_jobs_to_run(now):
self._queue.add(job)
# is this channel paused?
if self.throttle:
if self.throttle and self._pause_until:
if now < self._pause_until:
if not self.capacity or len(self._running) < self.capacity:
_logger.debug("channel %s paused because of throttle "
"delay between jobs", self)
if self.has_capacity():
_logger.debug("channel %s paused until %s because "
"of throttle delay between jobs",
self, self._pause_until)
return
else:
# unpause, this is important to avoid perpetual wakeup
# while the channel is at full capacity
self._pause_until = 0
# sequential channels block when there are failed jobs
# TODO: this is probably not sufficient to ensure
# sequentiality because of the behaviour in presence
# of jobs with eta; plus: check if there are no
# race conditions.
if self.sequential and len(self._failed):
return
_logger.debug("channel %s unpaused at %s", self, now)
# yield jobs that are ready to run, while we have capacity
while not self.capacity or len(self._running) < self.capacity:
while self.has_capacity():
job = self._queue.pop(now)
if not job:
return
Expand All @@ -496,10 +566,12 @@ def get_jobs_to_run(self, now):
yield job
if self.throttle:
self._pause_until = now + self.throttle
_logger.debug("pausing channel %s until %s",
self, self._pause_until)
return

def get_wakeup_time(self, wakeup_time=0):
if self.capacity and len(self._running) >= self.capacity:
if not self.has_capacity():
# this channel is full, do not request timed wakeup, as
# a notification will wakeup the runner when a job finishes
return wakeup_time
Expand All @@ -518,14 +590,16 @@ def get_wakeup_time(self, wakeup_time=0):
wakeup_time = child.get_wakeup_time(wakeup_time)
return wakeup_time


def split_strip(s, sep, maxsplit=-1):
"""Split string and strip each component.
>>> ChannelManager.split_strip("foo: bar baz\\n: fred:", ":")
>>> split_strip("foo: bar baz\\n: fred:", ":")
['foo', 'bar baz', 'fred', '']
"""
return [x.strip() for x in s.split(sep, maxsplit)]


class ChannelManager(object):
""" High level interface for channels
Expand Down Expand Up @@ -613,6 +687,115 @@ class ChannelManager(object):
[<ChannelJob A2>]
>>> cm.get_wakeup_time()
104
Let's test throttling in combination with a queue reaching full capacity.
>>> cm = ChannelManager()
>>> cm.simple_configure('root:4,T:2:throttle=2')
>>> cm.notify(db, 'T', 'T1', 1, 0, 10, None, 'pending')
>>> cm.notify(db, 'T', 'T2', 2, 0, 10, None, 'pending')
>>> cm.notify(db, 'T', 'T3', 3, 0, 10, None, 'pending')
>>> pp(list(cm.get_jobs_to_run(now=100)))
[<ChannelJob T1>]
>>> pp(list(cm.get_jobs_to_run(now=102)))
[<ChannelJob T2>]
Channel is now full, so no job to run even though throttling
delay is over.
>>> pp(list(cm.get_jobs_to_run(now=103)))
[]
>>> cm.get_wakeup_time() # no wakeup time, since queue is full
0
>>> pp(list(cm.get_jobs_to_run(now=104)))
[]
>>> cm.get_wakeup_time() # queue is still full
0
>>> cm.notify(db, 'T', 'T1', 1, 0, 10, None, 'done')
>>> pp(list(cm.get_jobs_to_run(now=105)))
[<ChannelJob T3>]
>>> cm.get_wakeup_time() # queue is full
0
>>> cm.notify(db, 'T', 'T2', 1, 0, 10, None, 'done')
>>> cm.get_wakeup_time()
107
Test wakeup time behaviour in presence of eta.
>>> cm = ChannelManager()
>>> cm.simple_configure('root:4,E:1')
>>> cm.notify(db, 'E', 'E1', 1, 0, 10, None, 'pending')
>>> cm.notify(db, 'E', 'E2', 2, 0, 10, None, 'pending')
>>> cm.notify(db, 'E', 'E3', 3, 0, 10, None, 'pending')
>>> pp(list(cm.get_jobs_to_run(now=100)))
[<ChannelJob E1>]
>>> pp(list(cm.get_jobs_to_run(now=101)))
[]
>>> cm.notify(db, 'E', 'E1', 1, 0, 10, 105, 'pending')
>>> cm.get_wakeup_time() # wakeup at eta
105
>>> pp(list(cm.get_jobs_to_run(now=102))) # but there is capacity
[<ChannelJob E2>]
>>> pp(list(cm.get_jobs_to_run(now=106))) # no capacity anymore
[]
>>> cm.get_wakeup_time() # no timed wakeup because no capacity
0
>>> cm.notify(db, 'E', 'E2', 1, 0, 10, None, 'done')
>>> cm.get_wakeup_time()
105
>>> pp(list(cm.get_jobs_to_run(now=107))) # no capacity anymore
[<ChannelJob E1>]
>>> cm.get_wakeup_time()
0
Test wakeup time behaviour in a sequential queue.
>>> cm = ChannelManager()
>>> cm.simple_configure('root:4,S:1:sequential')
>>> cm.notify(db, 'S', 'S1', 1, 0, 10, None, 'pending')
>>> cm.notify(db, 'S', 'S2', 2, 0, 10, None, 'pending')
>>> cm.notify(db, 'S', 'S3', 3, 0, 10, None, 'pending')
>>> pp(list(cm.get_jobs_to_run(now=100)))
[<ChannelJob S1>]
>>> cm.notify(db, 'S', 'S1', 1, 0, 10, None, 'failed')
>>> pp(list(cm.get_jobs_to_run(now=101)))
[]
>>> cm.notify(db, 'S', 'S2', 2, 0, 10, 105, 'pending')
>>> pp(list(cm.get_jobs_to_run(now=102)))
[]
No wakeup time because due to eta, because the sequential queue
is waiting for a failed job.
>>> cm.get_wakeup_time()
0
>>> cm.notify(db, 'S', 'S1', 1, 0, 10, None, 'pending')
>>> cm.get_wakeup_time()
105
>>> pp(list(cm.get_jobs_to_run(now=102)))
[<ChannelJob S1>]
>>> pp(list(cm.get_jobs_to_run(now=103)))
[]
>>> cm.notify(db, 'S', 'S1', 1, 0, 10, None, 'done')
At this stage, we have S2 with an eta of 105 and since the
queue is sequential, we wait for it.
>>> pp(list(cm.get_jobs_to_run(now=103)))
[]
>>> pp(list(cm.get_jobs_to_run(now=105)))
[<ChannelJob S2>]
>>> cm.notify(db, 'S', 'S2', 2, 0, 10, 105, 'done')
>>> pp(list(cm.get_jobs_to_run(now=105)))
[<ChannelJob S3>]
>>> cm.notify(db, 'S', 'S3', 3, 0, 10, None, 'done')
>>> pp(list(cm.get_jobs_to_run(now=105)))
[]
"""

def __init__(self):
Expand Down Expand Up @@ -720,14 +903,22 @@ def simple_configure(self, config_string):
>>> c = cm.get_channel_by_name('root')
>>> c.capacity
1
>>> cm.simple_configure('root:4,autosub.sub:2')
>>> cm.simple_configure('root:4,autosub.sub:2,seq:1:sequential')
>>> cm.get_channel_by_name('root').capacity
4
>>> cm.get_channel_by_name('root').sequential
False
>>> cm.get_channel_by_name('root.autosub').capacity
>>> cm.get_channel_by_name('root.autosub.sub').capacity
2
>>> cm.get_channel_by_name('root.autosub.sub').sequential
False
>>> cm.get_channel_by_name('autosub.sub').capacity
2
>>> cm.get_channel_by_name('seq').capacity
1
>>> cm.get_channel_by_name('seq').sequential
True
"""
for config in ChannelManager.parse_simple_config(config_string):
self.get_channel_from_config(config)
Expand Down

0 comments on commit b47c9a6

Please sign in to comment.