Skip to content

Commit

Permalink
add --source-config param to format_metadata_records
Browse files Browse the repository at this point in the history
  • Loading branch information
aaxelb committed Jan 13, 2021
1 parent 1cdb16b commit 5dde563
Showing 1 changed file with 38 additions and 9 deletions.
47 changes: 38 additions & 9 deletions share/management/commands/format_metadata_records.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
from django.db import connection

from share.management.commands import BaseShareCommand
from share.models.ingest import SourceConfig
from share.models.jobs import IngestJob
from share.tasks import ingest
from share.util.extensions import Extensions


# find most recent job for each suid, set its status back to `created`,
# and return the job and suid ids
# TODO clean up significantly once ingest jobs and suids are one-to-one
# TODO rewrite with django orm once ingest jobs and suids are one-to-one
update_ingest_job_sql = '''
WITH latest_job_per_suid AS (
SELECT DISTINCT ON (latest_job.suid_id) latest_job.id
FROM share_ingestjob AS latest_job
WHERE latest_job.suid_id >= %(suid_start_id)s
JOIN share_sourceuniqueidentifier suid ON latest_job.suid_id = suid.id
WHERE latest_job.suid_id >= %(suid_start_id)s AND suid.source_config_id IN %(source_config_ids)s
ORDER BY latest_job.suid_id ASC, latest_job.date_started DESC NULLS LAST, latest_job.date_created DESC
LIMIT %(chunk_size)s
)
Expand All @@ -29,32 +31,59 @@

class Command(BaseShareCommand):
def add_arguments(self, parser):
parser.add_argument('metadata_formats', nargs='*', help='metadata format name (see entry points in setup.py)')
parser.add_argument('--suid-start-id', '-s', type=int, default=0, help='metadata format name (see entry points in setup.py)')
parser.add_argument('metadata_formats', nargs='+', help='metadata format name (see entry points in setup.py)')
parser.add_argument('--suid-start-id', '-s', type=int, default=0, help='resume based on the previous run\'s last successful suid')
source_config_group = parser.add_mutually_exclusive_group(required=True)
source_config_group.add_argument('--source-config', '-c', action='append', help='format data from these source configs')
source_config_group.add_argument('--all-source-configs', '-a', action='store_true', help='format data from *all* source configs')

def handle(self, *args, **options):
metadata_formats = options['metadata_formats']
suid_start_id = options['suid_start_id']

valid_formats = Extensions.get_names('share.metadata_formats')
if not metadata_formats:
self.stdout.write(f'Valid metadata formats: {metadata_formats}')
return
if any(mf not in valid_formats for mf in metadata_formats):
invalid_formats = set(metadata_formats).difference(valid_formats)
self.stderr.write(f'Invalid metadata format(s): {invalid_formats}. Valid formats: {valid_formats}')
return

source_config_ids = self.get_source_config_ids(options)
if not source_config_ids:
return

with connection.cursor() as cursor:
while True:
last_successful_suid = self.enqueue_job_chunk(cursor, suid_start_id, metadata_formats)
last_successful_suid = self.enqueue_job_chunk(cursor, suid_start_id, metadata_formats, source_config_ids)
if last_successful_suid is None:
break
suid_start_id = int(last_successful_suid) + 1

def enqueue_job_chunk(self, cursor, suid_start_id, metadata_formats):
def get_source_config_ids(self, options):
source_config_labels = options['source_config']
all_source_configs = options['all_source_configs']

if all_source_configs:
return tuple(SourceConfig.objects.filter(
disabled=False,
source__is_deleted=False,
).values_list('id', flat=True))

ids_and_labels = SourceConfig.objects.filter(
label__in=source_config_labels,
source__is_deleted=False,
).values('id', 'label')

given_labels = set(source_config_labels)
valid_labels = set(il['label'] for il in ids_and_labels)
if valid_labels != set(source_config_labels):
self.stderr.write(f'Invalid source configs: {given_labels - valid_labels}')
return None
return tuple(il['id'] for il in ids_and_labels)

def enqueue_job_chunk(self, cursor, suid_start_id, metadata_formats, source_config_ids):
cursor.execute(update_ingest_job_sql, {
'suid_start_id': suid_start_id or 0,
'source_config_ids': source_config_ids,
'chunk_size': CHUNK_SIZE,
'set_job_status': IngestJob.STATUS.created,
'in_progress_status': IngestJob.STATUS.started,
Expand Down

0 comments on commit 5dde563

Please sign in to comment.