Skip to content

Commit

Permalink
allow reingesting an entire source in the admin
Browse files Browse the repository at this point in the history
  • Loading branch information
aaxelb committed May 4, 2023
1 parent 8b53f2f commit fc272e3
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 16 deletions.
35 changes: 28 additions & 7 deletions share/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,29 +149,34 @@ def get_urls(self):
r'^(?P<config_id>.+)/harvest/$',
self.admin_site.admin_view(self.harvest),
name='source-config-harvest'
),
url(
r'^(?P<config_id>.+)/ingest/$',
self.admin_site.admin_view(self.start_ingest),
name='source-config-ingest'
)
] + super().get_urls()

def source_config_actions(self, obj):
if obj.harvester_id is None:
return ''
return format_html(
'<a class="button" href="{}">Harvest</a>',
reverse('admin:source-config-harvest', args=[obj.pk]),
' '.join((
('<a class="button" href="{harvest_href}">Harvest</a>' if obj.harvester_id else ''),
('<a class="button" href="{ingest_href}">Ingest</a>' if not obj.disabled else ''),
)),
harvest_href=reverse('admin:source-config-harvest', args=[obj.pk]),
ingest_href=reverse('admin:source-config-ingest', args=[obj.pk]),
)
source_config_actions.short_description = 'Actions'

def harvest(self, request, config_id):
config = self.get_object(request, config_id)
if config.harvester_id is None:
raise ValueError('You need a harvester to harvest.')

if request.method == 'POST':
form = HarvestForm(request.POST)
if form.is_valid():
for job in HarvestScheduler(config, claim_jobs=True).range(form.cleaned_data['start'], form.cleaned_data['end']):
tasks.harvest.apply_async((), {'job_id': job.id, 'superfluous': form.cleaned_data['superfluous']})

self.message_user(request, 'Started harvesting {}!'.format(config.label))
url = reverse('admin:share_harvestjob_changelist', current_app=self.admin_site.name)
return HttpResponseRedirect(url)
Expand All @@ -181,14 +186,30 @@ def harvest(self, request, config_id):
if field in request.GET:
initial[field] = request.GET[field]
form = HarvestForm(initial=initial)

context = self.admin_site.each_context(request)
context['opts'] = self.model._meta
context['form'] = form
context['source_config'] = config
context['title'] = 'Harvest {}'.format(config.label)
return TemplateResponse(request, 'admin/harvest.html', context)

def start_ingest(self, request, config_id):
config = self.get_object(request, config_id)
if request.method == 'POST':
tasks.schedule_reingest.apply_async((config.pk,), {
'pls_renormalize': request.POST.get('pls_renormalize', False),
'pls_reformat': request.POST.get('pls_reformat', False),
})
url = reverse(
'admin:share_sourceconfig_changelist',
current_app=self.admin_site.name,
)
return HttpResponseRedirect(url)
else:
context = self.admin_site.each_context(request)
context['source_config'] = config
return TemplateResponse(request, 'admin/start-ingest.html', context)


@linked_fk('user')
class SourceAdmin(admin.ModelAdmin):
Expand Down
21 changes: 12 additions & 9 deletions share/management/commands/format_metadata_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

class Command(BaseShareCommand):
def add_arguments(self, parser):
parser.add_argument('metadata_formats', nargs='+', help='metadata format name (see entry points in setup.py)')
parser.add_argument('metadata_formats', nargs='*', help='metadata format name (see entry points in setup.py)')

source_config_group = parser.add_mutually_exclusive_group(required=True)
source_config_group.add_argument('--source-config', '-c', action='append', help='format data from these source configs')
Expand All @@ -33,13 +33,16 @@ def handle(self, *args, **options):
suid_start_id = options['suid_start_id']
pls_ensure_ingest_jobs = options['pls_ensure_ingest_jobs']
pls_reformat = options['pls_reformat']
pls_reingest = options['pls_reingest']
pls_renormalize = options['pls_renormalize']

valid_formats = Extensions.get_names('share.metadata_formats')
if any(mf not in valid_formats for mf in metadata_formats):
invalid_formats = set(metadata_formats).difference(valid_formats)
self.stderr.write(f'Invalid metadata format(s): {invalid_formats}. Valid formats: {valid_formats}')
return
if metadata_formats:
if any(mf not in valid_formats for mf in metadata_formats):
invalid_formats = set(metadata_formats).difference(valid_formats)
self.stderr.write(f'Invalid metadata format(s): {invalid_formats}. Valid formats: {valid_formats}')
return
else:
metadata_formats = list(valid_formats) # all

source_config_ids = self.get_source_config_ids(options)
if not source_config_ids:
Expand All @@ -51,7 +54,7 @@ def handle(self, *args, **options):
self.ensure_ingest_jobs_exist(base_suid_qs)

while True:
last_successful_suid = self.enqueue_job_chunk(base_suid_qs, suid_start_id, metadata_formats, pls_reingest)
last_successful_suid = self.enqueue_job_chunk(base_suid_qs, suid_start_id, metadata_formats, pls_renormalize)
if last_successful_suid is None:
break
suid_start_id = int(last_successful_suid) + 1
Expand Down Expand Up @@ -123,7 +126,7 @@ def ensure_ingest_jobs_exist(self, base_suid_qs):
unjobbed_suids = base_suid_qs.filter(latest_ingest_job_id=None)
IngestScheduler().bulk_schedule(unjobbed_suids)

def enqueue_job_chunk(self, base_suid_qs, suid_start_id, metadata_formats, pls_reingest):
def enqueue_job_chunk(self, base_suid_qs, suid_start_id, metadata_formats, pls_renormalize):
result_chunk = tuple(
base_suid_qs
.filter(id__gte=suid_start_id)
Expand All @@ -148,7 +151,7 @@ def enqueue_job_chunk(self, base_suid_qs, suid_start_id, metadata_formats, pls_r
for job_id in job_ids:
ingest.delay(
job_id=job_id,
superfluous=pls_reingest, # whether to start from RawDatum or NormalizedDatum
superfluous=pls_renormalize, # whether to start from RawDatum or NormalizedDatum
metadata_formats=metadata_formats,
)
self.stdout.write(f'queued tasks for {len(result_chunk)} IngestJobs (last suid: {last_suid_id})...')
Expand Down
16 changes: 16 additions & 0 deletions share/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,22 @@ def ingest(self, only_canonical=None, **kwargs):
IngestJobConsumer(task=self, only_canonical=only_canonical).consume(**kwargs)


@celery.shared_task()
def schedule_reingest(source_config_pk, pls_renormalize=False, pls_reformat=False):
source_config = db.SourceConfig.objects.get(pk=source_config_pk)
assert not source_config.disabled
assert not source_config.source.is_deleted
# TODO: something nice like IndexBackfill, instead of this
from django.core import management
management.call_command(
'format_metadata_records',
source_config=[source_config.label],
pls_ensure_ingest_jobs=True,
pls_renormalize=pls_renormalize,
pls_reformat=pls_reformat,
)


@celery.shared_task(bind=True)
def schedule_index_backfill(self, index_backfill_pk):
index_backfill = db.IndexBackfill.objects.get(pk=index_backfill_pk)
Expand Down
34 changes: 34 additions & 0 deletions templates/admin/start-ingest.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{% extends "admin/base_site.html" %}
{% load i18n %}

{% block extrastyle %}
<style>
section, table {
padding-left: 2em;
}
form {
display: flex;
flex-direction: column;
}
</style>
{% endblock %}

{% block content %}
<h1>{% trans "schedule ingest" %}</h1>
<section>
<h2>for source config "{{ source_config.label }}"</h2>
<form method="post">
{% csrf_token %}
<input type="hidden" name="source_config_label" value={{source_config.label}} />
<label>
<input type="checkbox" name="pls_reformat" />
{% trans "pls reformat (even if already formatted)" %}
</label>
<label>
<input type="checkbox" name="pls_renormalize" />
{% trans "pls renormalize (even if already normalized)" %}
</label>
<input type="submit" value="{% trans "yum yum" %}" />
</form>
</section>
{% endblock %}
26 changes: 26 additions & 0 deletions tests/share/tasks/test_schedule_reingest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from unittest import mock

import pytest

from share.tasks import schedule_reingest


@pytest.mark.django_db
def test_schedule_reingest(source_config):
for yes_renormalize in (True, False):
for yes_reformat in (True, False):
with mock.patch('django.core.management.call_command') as mock_call_command:
schedule_reingest.apply(
(source_config.pk,),
{
'pls_renormalize': yes_renormalize,
'pls_reformat': yes_reformat,
},
)
mock_call_command.assert_called_once_with(
'format_metadata_records',
source_config=[source_config.label],
pls_ensure_ingest_jobs=True,
pls_renormalize=yes_renormalize,
pls_reformat=yes_reformat,
)

0 comments on commit fc272e3

Please sign in to comment.