Skip to content

Commit

Permalink
handle missing jobs in format_metadata_records
Browse files Browse the repository at this point in the history
add `-j` flag to `python manage.py format_metadata_records` that will
create an ingest job for each relevant suid that needs one -- the query
for scheduling ingest assumes every suid already has an ingest job,
which is true for new data but might not be true for data on old/messy
environments like staging
  • Loading branch information
aaxelb committed Feb 16, 2021
1 parent 1927578 commit d8351f6
Showing 1 changed file with 20 additions and 1 deletion.
21 changes: 20 additions & 1 deletion share/management/commands/format_metadata_records.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from django.db import connection
from django.db.models import Exists, OuterRef

from share.ingest.scheduler import IngestScheduler
from share.management.commands import BaseShareCommand
from share.models.ingest import SourceConfig
from share.models.ingest import SourceConfig, SourceUniqueIdentifier
from share.models.jobs import IngestJob
from share.tasks import ingest
from share.util.extensions import Extensions
Expand Down Expand Up @@ -33,13 +35,15 @@ class Command(BaseShareCommand):
def add_arguments(self, parser):
parser.add_argument('metadata_formats', nargs='+', help='metadata format name (see entry points in setup.py)')
parser.add_argument('--suid-start-id', '-s', type=int, default=0, help='resume based on the previous run\'s last successful suid')
parser.add_argument('--ensure-ingest-jobs', '-j', action='store_true', help='before starting, ensure that all relevant suids have ingest jobs')
source_config_group = parser.add_mutually_exclusive_group(required=True)
source_config_group.add_argument('--source-config', '-c', action='append', help='format data from these source configs')
source_config_group.add_argument('--all-source-configs', '-a', action='store_true', help='format data from *all* source configs')

def handle(self, *args, **options):
metadata_formats = options['metadata_formats']
suid_start_id = options['suid_start_id']
ensure_ingest_jobs = options['ensure_ingest_jobs']

valid_formats = Extensions.get_names('share.metadata_formats')
if any(mf not in valid_formats for mf in metadata_formats):
Expand All @@ -51,6 +55,9 @@ def handle(self, *args, **options):
if not source_config_ids:
return

if ensure_ingest_jobs:
self.ensure_ingest_jobs_exist(source_config_ids)

with connection.cursor() as cursor:
while True:
last_successful_suid = self.enqueue_job_chunk(cursor, suid_start_id, metadata_formats, source_config_ids)
Expand Down Expand Up @@ -80,6 +87,18 @@ def get_source_config_ids(self, options):
return None
return tuple(il['id'] for il in ids_and_labels)

def ensure_ingest_jobs_exist(self, source_config_ids):
self.stdout.write(f'creating ingest jobs as needed for source configs {source_config_ids}...')
unjobbed_suids = (
SourceUniqueIdentifier.objects
.filter(source_config__in=source_config_ids)
.annotate(
has_ingest_job=Exists(IngestJob.objects.filter(suid_id=OuterRef('id')))
)
.filter(has_ingest_job=False)
)
IngestScheduler().bulk_schedule(unjobbed_suids)

def enqueue_job_chunk(self, cursor, suid_start_id, metadata_formats, source_config_ids):
cursor.execute(update_ingest_job_sql, {
'suid_start_id': suid_start_id or 0,
Expand Down

0 comments on commit d8351f6

Please sign in to comment.