Skip to content

Commit

Permalink
[SHARE-993][Feature] Capture ingest errors (#732)
Browse files Browse the repository at this point in the history
* Use custom exceptions for transform errors

* Allow running sharectl transform on RawDatums

* Update exception expections.

* Move disambiguate task into IngestJobConsumer

* More friendly display of things

* Some cleanup

* Fix up push endpoints

* Consolidate migrations

* Add Ingester util

* Update management commands

* Allow claiming jobs when explicitly starting tasks

* Update migration

* Plac8 flake8

* Better migration

* Fix tests

* Responding to review

* Fix more tests
  • Loading branch information
aaxelb committed Jan 30, 2018
1 parent b56e15f commit 6b6355f
Show file tree
Hide file tree
Showing 69 changed files with 1,071 additions and 710 deletions.
30 changes: 30 additions & 0 deletions api/ingestjobs/serializers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from rest_framework_json_api import serializers

from share import models

from api.base import ShareSerializer
from api.fields import ShareIdentityField


class IngestJobSerializer(ShareSerializer):
# link to self
url = ShareIdentityField(view_name='api:ingestjob-detail')

status = serializers.SerializerMethodField()

class Meta:
model = models.IngestJob
fields = (
'status',
'message',
'completions',
'date_started',
'date_created',
'date_modified',
'raw',
'source_config',
'url'
)

def get_status(self, job):
return job.STATUS[job.status]
7 changes: 7 additions & 0 deletions api/ingestjobs/urls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from rest_framework.routers import SimpleRouter
from api.ingestjobs import views


router = SimpleRouter()
router.register(r'ingestjobs', views.IngestJobViewSet, base_name='ingestjob')
urlpatterns = router.urls
17 changes: 17 additions & 0 deletions api/ingestjobs/views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from rest_framework import viewsets

from share.models.jobs import IngestJob

from api.base.views import ShareViewSet
from api.pagination import CursorPagination
from api.ingestjobs.serializers import IngestJobSerializer


class IngestJobViewSet(ShareViewSet, viewsets.ReadOnlyModelViewSet):
ordering = ('-id', )

serializer_class = IngestJobSerializer
pagination_class = CursorPagination

def get_queryset(self):
return IngestJob.objects.all()
30 changes: 23 additions & 7 deletions api/normalizeddata/views.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
from django.db import transaction

from rest_framework import status
from rest_framework import generics
from rest_framework.response import Response

from share import models
from share.tasks import disambiguate
from share.tasks import ingest
from share.util import IDObfuscator
from share.util.ingester import Ingester

from api.base.views import ShareViewSet
from api.normalizeddata.serializers import BasicNormalizedDataSerializer
from api.normalizeddata.serializers import FullNormalizedDataSerializer
from api.pagination import CursorPagination
from api.permissions import ReadOnlyOrTokenHasScopeOrIsAuthenticated
from api.util import absolute_reverse


class NormalizedDataViewSet(ShareViewSet, generics.ListCreateAPIView, generics.RetrieveAPIView):
Expand Down Expand Up @@ -59,13 +63,25 @@ def get_queryset(self):

def create(self, request, *args, **kwargs):
serializer = self.get_serializer_class()(data=request.data, context={'request': request})
if serializer.is_valid(raise_exception=True):
nm_instance = serializer.save()
# TODO create an IngestJob, respond with a link to a job detail endpoint (SHARE-1003)
async_result = disambiguate.delay(nm_instance.id)
# TODO Fix Me
serializer.is_valid(raise_exception=True)
with transaction.atomic():
# Hack for back-compat: Ingest halfway synchronously, then apply changes asynchronously
ingester = Ingester(serializer.validated_data['data']).as_user(request.user).ingest(apply_changes=False)

nm_instance = models.NormalizedData.objects.filter(
raw=ingester.raw,
ingest_job=ingester.job
).order_by('-created_at').first()

ingester.job.reschedule(claim=True)
async_result = ingest.delay(job_id=ingester.job.id, exhaust=False)

# TODO Use an actual serializer
return Response({
'id': IDObfuscator.encode(nm_instance),
'type': 'NormalizedData',
'attributes': {'task': async_result.id}
'attributes': {
'task': async_result.id,
'ingest_job': absolute_reverse('api:ingestjob-detail', args=[IDObfuscator.encode(ingester.job)]),
}
}, status=status.HTTP_202_ACCEPTED)
1 change: 1 addition & 0 deletions api/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
urlpatterns = [
url('^$', RootView.as_view()),
url('^', include('api.banners.urls')),
url('^', include('api.ingestjobs.urls')),
url('^', include('api.normalizeddata.urls')),
url('^', include('api.rawdata.urls')),
url('^', include('api.shareobjects.urls')),
Expand Down
45 changes: 8 additions & 37 deletions api/views/workflow.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
import jsonschema

from django.db import transaction
from django.utils import timezone

from rest_framework import views, status
from rest_framework.exceptions import ParseError
from rest_framework.parsers import JSONParser
from rest_framework.renderers import JSONRenderer
from rest_framework.response import Response

from share import models
from share.harvest.base import FetchResult
from share.harvest.serialization import DictSerializer
from share.tasks import disambiguate
from share.util import IDObfuscator
from share.util.ingester import Ingester

from api import v1_schemas
from api.authentication import APIV1TokenBackPortAuthentication
from api.permissions import ReadOnlyOrTokenHasScopeOrIsAuthenticated
from api.normalizeddata.serializers import BasicNormalizedDataSerializer
from api.util import absolute_reverse


__all__ = ('V1DataView', )
Expand Down Expand Up @@ -113,36 +111,9 @@ def post(self, request, *args, **kwargs):
except KeyError:
return Response({'errors': 'Canonical URI not found in uris.', 'data': prelim_data}, status=status.HTTP_400_BAD_REQUEST)

config = self._get_source_config(request.user)
raw = models.RawDatum.objects.store_data(config, FetchResult(doc_id, DictSerializer(pretty=False).serialize(prelim_data), timezone.now()))
ingester = Ingester(prelim_data, doc_id).as_user(request.user, 'v1_push').ingest_async()

# TODO create an IngestJob, respond with a link to a job detail endpoint
transformed_graph = config.get_transformer().transform(raw.datum)
data = {'data': {'@graph': transformed_graph.to_jsonld(in_edges=False)}}
serializer = BasicNormalizedDataSerializer(data=data, context={'request': request})

if serializer.is_valid():
nm_instance = serializer.save()
async_result = disambiguate.delay(nm_instance.id)
return Response({'task_id': async_result.id}, status=status.HTTP_202_ACCEPTED)
return Response({'errors': serializer.errors, 'data': prelim_data}, status=status.HTTP_400_BAD_REQUEST)

def _get_source_config(self, user):
config_label = '{}.v1_push'.format(user.username)
try:
return models.SourceConfig.objects.get(label=config_label)
except models.SourceConfig.DoesNotExist:
source, _ = models.Source.objects.get_or_create(
user=user,
defaults={
'name': user.username,
'long_title': user.username,
}
)
config = models.SourceConfig(
label=config_label,
source=source,
transformer=models.Transformer.objects.get(key='v1_push'),
)
config.save()
return config
return Response({
'task_id': ingester.async_task.id,
'ingest_job': absolute_reverse('api:ingestjob-detail', args=[IDObfuscator.encode(ingester.job)]),
}, status=status.HTTP_202_ACCEPTED)
5 changes: 2 additions & 3 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
-r requirements.txt

behave==1.2.5
coveralls==1.2.0
factory-boy==2.8.1
fake-factory==0.7.2
httpretty==0.8.14
ipdb
ipython
pytest-benchmark==3.0.0
pytest==3.0.6
httpretty==0.8.14
pytest-cov==2.5.1
coveralls==1.2.0
pytest==3.0.6

# pulling from github because master has django_assert_num_queries context mgr
git+https://github.com/pytest-dev/pytest-django.git@master
7 changes: 6 additions & 1 deletion project/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def split(string, delim):
'corsheaders',
'revproxy',
'graphene_django',
'prettyjson',

'allauth',
'allauth.account',
Expand Down Expand Up @@ -394,6 +395,11 @@ def split(string, delim):
'task': 'bots.elasticsearch.tasks.elasticsearch_janitor',
'schedule': crontab(hour=23, minute=30),
},
# Executes daily at 10:30 P.M
'IngestJob Janitor': {
'task': 'share.janitor.tasks.ingestjob_janitor',
'schedule': crontab(hour=22, minute=30),
},
}

if not DEBUG:
Expand Down Expand Up @@ -434,7 +440,6 @@ def split(string, delim):
'bots.elasticsearch.*': {'priority': 50, 'queue': 'elasticsearch'},
'share.tasks.harvest': {'priority': 0, 'queue': 'harvest'},
'share.tasks.ingest': {'priority': 20, 'queue': 'ingest'},
'share.tasks.disambiguate': {'priority': 35, 'queue': 'disambiguate'},
}

CELERY_TASK_QUEUES = {q['queue']: {} for q in CELERY_TASK_ROUTES.values()}
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ django-filter==1.0.2 # BSD
django-include==0.2.1 # MIT
django-model-utils==2.6.1 # BSD
django-oauth-toolkit==0.12.0 # BSD
django-prettyjson==0.3.0 # BSD 3 Clause
django-revproxy==0.9.13 # MPL 2.0
django-typed-models==0.6.0 # BSD 3 Clause
django==1.11.4 # BSD 3 Clause
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
'org.socarxiv = share.transformers.org_socarxiv:SocarxivTransformer',
'org.swbiodiversity = share.transformers.org_swbiodiversity:SWTransformer',
'v1_push = share.transformers.v1_push:V1Transformer',
'v2_push = share.transformers.v2_push:V2PushTransformer',
],
'share.harvesters': [
'ca.lwbin = share.harvesters.ca_lwbin:LWBINHarvester',
Expand Down
14 changes: 13 additions & 1 deletion share/admin/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from prettyjson import PrettyJSONWidget

from django import forms
from django.conf.urls import url
from django.contrib import admin
Expand All @@ -23,6 +25,7 @@
from share.models.change import ChangeSet
from share.models.core import NormalizedData, ShareUser
from share.models.creative import AbstractCreativeWork
from share.models.fields import DateTimeAwareJSONField
from share.models.ingest import RawDatum, Source, SourceConfig, Harvester, Transformer
from share.models.jobs import HarvestJob
from share.models.jobs import IngestJob
Expand All @@ -40,6 +43,15 @@ class NormalizedDataAdmin(admin.ModelAdmin):
date_hierarchy = 'created_at'
list_filter = ['source', ]
raw_id_fields = ('raw', 'tasks',)
formfield_overrides = {
DateTimeAwareJSONField: {
'widget': PrettyJSONWidget(attrs={
'initial': 'parsed',
'cols': 120,
'rows': 20
})
}
}


class ChangeSetSubmittedByFilter(SimpleListFilter):
Expand Down Expand Up @@ -161,7 +173,7 @@ def harvest(self, request, config_id):
if request.method == 'POST':
form = HarvestForm(request.POST)
if form.is_valid():
for job in HarvestScheduler(config).range(form.cleaned_data['start'], form.cleaned_data['end']):
for job in HarvestScheduler(config, claim_jobs=True).range(form.cleaned_data['start'], form.cleaned_data['end']):
tasks.harvest.apply_async((), {'job_id': job.id, 'superfluous': form.cleaned_data['superfluous']})

self.message_user(request, 'Started harvesting {}!'.format(config.label))
Expand Down
22 changes: 20 additions & 2 deletions share/admin/jobs.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from furl import furl
from prettyjson import PrettyJSONWidget

from django.contrib import admin
from django.urls import reverse
from django.utils.html import format_html

from share.admin.util import FuzzyPaginator
from share.models.fields import DateTimeAwareJSONField
from share.models.ingest import SourceConfig
from share.models.jobs import AbstractBaseJob

Expand Down Expand Up @@ -45,7 +47,7 @@ class BaseJobAdmin(admin.ModelAdmin):
list_filter = ('status', SourceConfigFilter, )
list_select_related = ('source_config', )
actions = ('restart_tasks', )
readonly_fields = ('task_id', 'context', 'completions', 'date_started', 'source_config_version', )
readonly_fields = ('task_id', 'error_type', 'error_message', 'error_context', 'completions', 'date_started', 'source_config_version', )
show_full_result_count = False
paginator = FuzzyPaginator

Expand Down Expand Up @@ -86,7 +88,23 @@ def harvest_job_actions(self, obj):
class IngestJobAdmin(BaseJobAdmin):
list_display = ('id', 'source_config_', 'suid_', 'status_', 'date_started', 'share_version', )
list_select_related = BaseJobAdmin.list_select_related + ('suid',)
readonly_fields = BaseJobAdmin.readonly_fields + ('suid', 'transformed_data', 'regulated_data', 'raw', 'transformer_version', 'regulator_version', )
readonly_fields = BaseJobAdmin.readonly_fields + ('suid', 'raw', 'transformer_version', 'regulator_version', )
fake_readonly_fields = ('transformed_data', 'regulated_data')
formfield_overrides = {
DateTimeAwareJSONField: {
'widget': PrettyJSONWidget(attrs={
'initial': 'parsed',
'cols': 120,
'rows': 20
})
}
}

def get_form(self, *args, **kwargs):
form = super().get_form(*args, **kwargs)
for field_name in self.fake_readonly_fields:
form.base_fields[field_name].disabled = True
return form

def suid_(self, obj):
return obj.suid.identifier
7 changes: 6 additions & 1 deletion share/bin/harvest.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,11 @@ def schedule(args, argv):
'ingest': not args.get('--no-ingest'),
}.items() if v is not None}

claim_jobs = args['--run'] or args['--tasks']

jobs = []
for config in configs:
scheduler = HarvestScheduler(config)
scheduler = HarvestScheduler(config, claim_jobs=claim_jobs)

if not (args['<date>'] or args['--start'] or args['--end']):
jobs.append(scheduler.today())
Expand All @@ -143,6 +145,9 @@ def schedule(args, argv):
else:
jobs.extend(scheduler.range(pendulum.parse(args['--start']), pendulum.parse(args['--end'])))

if not claim_jobs:
return

for job in jobs:
if args['--run']:
tasks.harvest.apply((), {'job_id': job.id, **kwargs}, retry=False, throw=True)
Expand Down

0 comments on commit 6b6355f

Please sign in to comment.