Skip to content

Commit

Permalink
format_metadata_records command improvements
Browse files Browse the repository at this point in the history
- replace raw SQL with queryset
- new default behavior: skip scheduling ingest tasks for suids that
  already have a FormattedMetadataRecord of each requested format
- add `--pls-reformat`/`-r` flag to explictly restore the old behavior
  (reformat everything, even when unnecessary or redundant)
  • Loading branch information
aaxelb committed Feb 16, 2021
1 parent d8351f6 commit 5dd2cc8
Showing 1 changed file with 72 additions and 53 deletions.
125 changes: 72 additions & 53 deletions share/management/commands/format_metadata_records.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,35 @@
from django.db import connection
from django.db.models import Exists, OuterRef
from django.db.models import Exists, OuterRef, Subquery
from django.db.models.functions import Coalesce

from share.ingest.scheduler import IngestScheduler
from share.management.commands import BaseShareCommand
from share.models.core import FormattedMetadataRecord
from share.models.ingest import SourceConfig, SourceUniqueIdentifier
from share.models.jobs import IngestJob
from share.tasks import ingest
from share.util.extensions import Extensions


# find most recent job for each suid, set its status back to `created`,
# and return the job and suid ids
# TODO rewrite with django orm once ingest jobs and suids are one-to-one
update_ingest_job_sql = '''
WITH latest_job_per_suid AS (
SELECT DISTINCT ON (latest_job.suid_id) latest_job.id
FROM share_ingestjob AS latest_job
JOIN share_sourceuniqueidentifier suid ON latest_job.suid_id = suid.id
WHERE latest_job.suid_id >= %(suid_start_id)s AND suid.source_config_id IN %(source_config_ids)s
ORDER BY latest_job.suid_id ASC, latest_job.date_started DESC NULLS LAST, latest_job.date_created DESC
LIMIT %(chunk_size)s
)
UPDATE share_ingestjob AS job
SET status=%(set_job_status)s
FROM latest_job_per_suid
WHERE job.id = latest_job_per_suid.id AND job.status != %(in_progress_status)s
RETURNING job.id, job.suid_id
'''

CHUNK_SIZE = 2000


class Command(BaseShareCommand):
def add_arguments(self, parser):
parser.add_argument('metadata_formats', nargs='+', help='metadata format name (see entry points in setup.py)')
parser.add_argument('--suid-start-id', '-s', type=int, default=0, help='resume based on the previous run\'s last successful suid')
parser.add_argument('--ensure-ingest-jobs', '-j', action='store_true', help='before starting, ensure that all relevant suids have ingest jobs')

source_config_group = parser.add_mutually_exclusive_group(required=True)
source_config_group.add_argument('--source-config', '-c', action='append', help='format data from these source configs')
source_config_group.add_argument('--all-source-configs', '-a', action='store_true', help='format data from *all* source configs')

parser.add_argument('--suid-start-id', '-s', type=int, default=0, help='resume based on the previous run\'s last successful suid')
parser.add_argument('--pls-ensure-ingest-jobs', '-j', action='store_true', help='before starting, ensure that all relevant suids have ingest jobs')
parser.add_argument('--pls-reformat', '-r', action='store_true', help='re-ingest records that are already in these formats')

def handle(self, *args, **options):
metadata_formats = options['metadata_formats']
suid_start_id = options['suid_start_id']
ensure_ingest_jobs = options['ensure_ingest_jobs']
pls_ensure_ingest_jobs = options['pls_ensure_ingest_jobs']
pls_reformat = options['pls_reformat']

valid_formats = Extensions.get_names('share.metadata_formats')
if any(mf not in valid_formats for mf in metadata_formats):
Expand All @@ -55,15 +41,16 @@ def handle(self, *args, **options):
if not source_config_ids:
return

if ensure_ingest_jobs:
self.ensure_ingest_jobs_exist(source_config_ids)
base_suid_qs = self._base_suid_qs(metadata_formats, source_config_ids, pls_reformat)

if pls_ensure_ingest_jobs:
self.ensure_ingest_jobs_exist(base_suid_qs)

with connection.cursor() as cursor:
while True:
last_successful_suid = self.enqueue_job_chunk(cursor, suid_start_id, metadata_formats, source_config_ids)
if last_successful_suid is None:
break
suid_start_id = int(last_successful_suid) + 1
while True:
last_successful_suid = self.enqueue_job_chunk(base_suid_qs, suid_start_id, metadata_formats)
if last_successful_suid is None:
break
suid_start_id = int(last_successful_suid) + 1

def get_source_config_ids(self, options):
source_config_labels = options['source_config']
Expand All @@ -87,39 +74,71 @@ def get_source_config_ids(self, options):
return None
return tuple(il['id'] for il in ids_and_labels)

def ensure_ingest_jobs_exist(self, source_config_ids):
self.stdout.write(f'creating ingest jobs as needed for source configs {source_config_ids}...')
unjobbed_suids = (
def _base_suid_qs(self, metadata_formats, source_config_ids, pls_reformat):
suid_qs = (
SourceUniqueIdentifier.objects
.filter(source_config__in=source_config_ids)
.annotate(
has_ingest_job=Exists(IngestJob.objects.filter(suid_id=OuterRef('id')))
)
.filter(has_ingest_job=False)
.annotate(latest_ingest_job_id=Subquery(
IngestJob.objects
.filter(suid_id=OuterRef('id'))
.order_by(Coalesce('date_started', 'date_created').desc(nulls_last=True))
.values('id')
[:1]
))
)
if not pls_reformat:
fmr_exists_annotations = {
f'fmr_exists_{metadata_format}': Exists(
FormattedMetadataRecord.objects.filter(
record_format=metadata_format,
suid_id=OuterRef('id'),
)
)
for metadata_format in metadata_formats
}
fmr_exists_for_all_formats = {
fmr_exists: True
for fmr_exists in fmr_exists_annotations.keys()
}
suid_qs = (
suid_qs
.annotate(**fmr_exists_annotations)
.exclude(**fmr_exists_for_all_formats)
)
return suid_qs.order_by('id')

def ensure_ingest_jobs_exist(self, base_suid_qs):
unjobbed_suids = base_suid_qs.filter(latest_ingest_job_id=None)
IngestScheduler().bulk_schedule(unjobbed_suids)

def enqueue_job_chunk(self, cursor, suid_start_id, metadata_formats, source_config_ids):
cursor.execute(update_ingest_job_sql, {
'suid_start_id': suid_start_id or 0,
'source_config_ids': source_config_ids,
'chunk_size': CHUNK_SIZE,
'set_job_status': IngestJob.STATUS.created,
'in_progress_status': IngestJob.STATUS.started,
})
result_rows = cursor.fetchall()
if not result_rows:
def enqueue_job_chunk(self, base_suid_qs, suid_start_id, metadata_formats):
result_chunk = tuple(
base_suid_qs
.filter(id__gte=suid_start_id)
.values('id', 'latest_ingest_job_id')
[:CHUNK_SIZE]
)

if not result_chunk:
self.stdout.write('all done!')
return None

last_suid_id = result_rows[-1][1]
for result_row in result_rows:
job_id = result_row[0]
last_suid_id = result_chunk[-1]['id']
job_ids = tuple(result['latest_ingest_job_id'] for result in result_chunk)

jobs_to_update = (
IngestJob.objects
.filter(id__in=job_ids)
.exclude(status=IngestJob.STATUS.started)
)
jobs_to_update.update(status=IngestJob.STATUS.created)

for job_id in job_ids:
ingest.delay(
job_id=job_id,
apply_changes=False, # skip the whole ShareObject mess
pls_format_metadata=True, # definitely don't skip this command's namesake
metadata_formats=metadata_formats,
)
self.stdout.write(f'queued tasks for {len(result_rows)} IngestJobs (last suid: {last_suid_id})...')
self.stdout.write(f'queued tasks for {len(result_chunk)} IngestJobs (last suid: {last_suid_id})...')
return last_suid_id

0 comments on commit 5dd2cc8

Please sign in to comment.