Skip to content

Commit

Permalink
fix: respect disabled config
Browse files Browse the repository at this point in the history
  • Loading branch information
aaxelb committed May 11, 2023
1 parent e6d8afa commit c65bf01
Showing 1 changed file with 10 additions and 16 deletions.
26 changes: 10 additions & 16 deletions share/tasks/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def _current_versions(self, job):
"""
raise NotImplementedError

def consume(self, job_id=None, exhaust=True, ignore_disabled=False, superfluous=False, force=False, **kwargs):
def consume(self, job_id=None, exhaust=True, superfluous=False, force=False, **kwargs):
"""Consume the given job, or consume an available job if no job is specified.
Parameters:
Expand All @@ -53,22 +53,19 @@ def consume(self, job_id=None, exhaust=True, ignore_disabled=False, superfluous=
exhaust (bool, optional): If True and there are queued jobs, start another task. Defaults to True.
Used to prevent a backlog. If we have a valid job, spin off another task to eat through
the rest of the queue.
ignore_disabled (bool, optional): Consume jobs from disabled source configs and/or deleted sources. Defaults to False.
superfluous (bool, optional): Consuming a job should be idempotent, and subsequent runs may
skip doing work that has already been done. If superfluous=True, however, will do all
work whether or not it's already been done. Default False.
force (bool, optional):
Additional keyword arguments passed to _consume_job, along with superfluous and force
"""
with self._locked_job(job_id, ignore_disabled) as job:
with self._locked_job(job_id) as job:
if job is None:
if job_id is None:
logger.info('No %ss are currently available', self.Job.__name__)
return
else:
# If an id was given to us, we should have gotten a job
job = self.Job.objects.get(id=job_id) # Force the failure
raise Exception('Failed to load {} but then found {!r}.'.format(job_id, job)) # Should never be reached
logger.error('Could not load/lock/consume %s(id=%s)', self.Job.__name__, job_id)
return

assert self.task or not exhaust, 'Cannot pass exhaust=True unless running in an async context'
if exhaust and job_id is None:
Expand Down Expand Up @@ -122,20 +119,17 @@ def _filter_ready(self, qs):
claimed=True
)

def _locked_job(self, job_id, ignore_disabled=False):
qs = self.Job.objects.all()
def _locked_job(self, job_id):
qs = (
self.Job.objects.all()
.exclude(source_config__disabled=True)
.exclude(source_config__source__is_deleted=True)
)
if job_id is not None:
logger.debug('Loading %s %d', self.Job.__name__, job_id)
qs = qs.filter(id=job_id)
else:
logger.debug('job_id was not specified, searching for an available job.')

if not ignore_disabled:
qs = qs.exclude(
source_config__disabled=True,
).exclude(
source_config__source__is_deleted=True
)
qs = self._filter_ready(qs).unlocked(self.lock_field)

return qs.lock_first(self.lock_field)
Expand Down

0 comments on commit c65bf01

Please sign in to comment.