diff --git a/api/share/utils.py b/api/share/utils.py index 583b148cb9e..b8e919f6f20 100644 --- a/api/share/utils.py +++ b/api/share/utils.py @@ -4,8 +4,10 @@ """ from http import HTTPStatus import logging +from rdflib import Graph from django.apps import apps +from django.db.models import Q from celery.utils.time import get_exponential_backoff_interval import requests @@ -14,6 +16,7 @@ from framework.encryption import ensure_bytes from framework.sentry import log_exception from osf.external.gravy_valet.exceptions import GVException +from osf.metadata.rdfutils import OSF from osf.metadata.osf_gathering import ( OsfmapPartition, pls_get_magic_metadata_basket, @@ -64,6 +67,86 @@ def _enqueue_update_share(osfresource): enqueue_task(task__update_share.s(_osfguid_value)) +def retry_shtrove_request(self_celery_task, _response): + try: + _response.raise_for_status() + except Exception as e: + log_exception(e) + if _response.status_code == HTTPStatus.TOO_MANY_REQUESTS: + retry_after = _response.headers.get('Retry-After') + try: + countdown = int(retry_after) + except (TypeError, ValueError): + retries = getattr(self_celery_task.request, 'retries', 0) + countdown = get_exponential_backoff_interval( + factor=4, + retries=retries, + maximum=2 * 60, + full_jitter=True, + ) + raise self_celery_task.retry(exc=e, countdown=countdown) + + raise self_celery_task.retry(exc=e) + + +def cedar_record_to_turtle(referent, cedar_record): + graph = Graph() + iri = referent.get_semantic_iri() + full_metadata = { + '@id': iri, + OSF.hasCedarRecord: cedar_record.metadata, + } + graph.parse(data=full_metadata, format='json-ld') + + return graph.serialize(format='turtle') + + +@celery_app.task(bind=True) +def share_update_cedar_metadata_record(self, referent_id, cedar_record_pk): + from osf.models import Guid, CedarMetadataRecord + + guid = Guid.load(referent_id) + referent = guid.referent + cedar_record = CedarMetadataRecord.objects.filter(pk=cedar_record_pk).first() + if not cedar_record: + return + + serialized_data = cedar_record_to_turtle(referent, cedar_record) + response = requests.post( + shtrove_ingest_url(), + params={ + 'focus_iri': referent.get_semantic_iri(), + 'record_identifier': _shtrove_cedar_record_identifier(cedar_record._id, cedar_record.template.cedar_id), + 'is_supplementary': True, + }, + headers={ + 'Content-Type': 'text/turtle; charset=utf-8', + **_shtrove_auth_headers(referent), + }, + data=ensure_bytes(serialized_data), + ) + retry_shtrove_request(self, response) + + +@celery_app.task(bind=True) +def share_delete_cedar_metadata_record( + self, + cedar_referent___id, + cedar_record___id, + cedar_template_cedar_id, +): + from osf.models import Guid + referent = Guid.load(cedar_referent___id).referent + response = requests.delete( + shtrove_ingest_url(), + params={ + 'record_identifier': _shtrove_cedar_record_identifier(cedar_record___id, cedar_template_cedar_id), + }, + headers=_shtrove_auth_headers(referent), + ) + retry_shtrove_request(self, response) + + @celery_app.task( bind=True, acks_late=True, @@ -94,36 +177,35 @@ def task__update_share(self, guid: str, is_backfill=False, osfmap_partition_name log_exception(e) raise self.retry(exc=e) - try: - _response.raise_for_status() - except Exception as e: - log_exception(e) - if _response.status_code == HTTPStatus.TOO_MANY_REQUESTS: - retry_after = _response.headers.get('Retry-After') - try: - countdown = int(retry_after) - except (TypeError, ValueError): - retries = getattr(self.request, 'retries', 0) - countdown = get_exponential_backoff_interval( - factor=4, - retries=retries, - maximum=2 * 60, - full_jitter=True, - ) - raise self.retry(exc=e, countdown=countdown) - - if HTTPStatus(_response.status_code).is_server_error: - raise self.retry(exc=e) - else: # success response - if not _is_deletion: - # enqueue followup task for supplementary metadata - _next_partition = _next_osfmap_partition(_osfmap_partition) - if _next_partition is not None: - task__update_share.delay( - guid, - is_backfill=is_backfill, - osfmap_partition_name=_next_partition.name, - ) + retry_shtrove_request(self, _response) + # success response + if _is_deletion: + return + + # enqueue followup task for supplementary metadata + _next_partition = _next_osfmap_partition(_osfmap_partition) + if _next_partition is not None: + task__update_share.delay( + guid, + is_backfill=is_backfill, + osfmap_partition_name=_next_partition.name, + ) + for cedar_record in _osfid_instance.cedar_metadata_records.filter( + is_published=True, + template__should_index_for_search=True, + ): + enqueue_task(share_update_cedar_metadata_record.s(_osfid_instance._id, cedar_record.pk)) + + for cedar_record in _osfid_instance.cedar_metadata_records.filter( + Q(is_published=False) | Q(template__should_index_for_search=False), + ): + enqueue_task( + share_delete_cedar_metadata_record.s( + cedar_record.guid._id, + cedar_record._id, + cedar_record.template.cedar_id, + ), + ) def pls_send_trove_record(osf_item, *, is_backfill: bool, osfmap_partition: OsfmapPartition): @@ -179,6 +261,10 @@ def _shtrove_record_identifier(osf_item, osfmap_partition: OsfmapPartition): ) +def _shtrove_cedar_record_identifier(cedar_record___id, template_cedar_id) -> str: + return f'{cedar_record___id}/CedarMetadataRecord:{template_cedar_id}' + + def _shtrove_auth_headers(osf_item): _nonfile_item = ( osf_item.target diff --git a/api_tests/share/test_share_preprint.py b/api_tests/share/test_share_preprint.py index 118abf3105b..95993042532 100644 --- a/api_tests/share/test_share_preprint.py +++ b/api_tests/share/test_share_preprint.py @@ -134,12 +134,20 @@ def test_call_async_update_on_500_failure(self, mock_share_responses, preprint, with expect_preprint_ingest_request(mock_share_responses, preprint, count=5): preprint.update_search() - def test_no_call_async_update_on_400_failure(self, mock_share_responses, preprint, auth): + @mock.patch('api.share.utils.task__update_share.delay') + def test_no_call_async_update_on_400_failure(self, share_delay, mock_share_responses, preprint, auth): with capture_notifications(): mock_share_responses.replace(responses.POST, shtrove_ingest_url(), status=400) preprint.set_published(True, auth=auth, save=True) with expect_preprint_ingest_request(mock_share_responses, preprint, count=1, error_response=True): - preprint.update_search() + try: + preprint.update_search() + except Exception as err: + share_delay.assert_not_called() + assert str(err).startswith("Retry in 180s: HTTPError('400 Client Error:") + assert len(mock_share_responses.calls) == 1 + else: + pytest.fail('Expected Retry(HTTPError) to be raised') def test_delete_from_share(self, mock_share_responses): preprint = PreprintFactory() diff --git a/osf/models/provider.py b/osf/models/provider.py index 8984abf9f55..dd32a88ff59 100644 --- a/osf/models/provider.py +++ b/osf/models/provider.py @@ -167,23 +167,6 @@ def update_or_create_from_json(cls, provider_data, user): related_name='required_by_providers', ) - def validate_required_metadata(self, obj): - """ - Raises ValidationError if obj does not have a CedarMetadataRecord for - this provider's required_metadata_template. - Does nothing when required_metadata_template is not set. - """ - if not self.required_metadata_template_id: - return - guid = obj.guids.first() - if guid is None or not guid.cedar_metadata_records.filter( - template_id=self.required_metadata_template_id - ).exists(): - raise ValidationError( - f'Submitted object must have a CEDAR metadata record for template ' - f'"{self.required_metadata_template.schema_name}" to be submitted to this collection.' - ) - def __repr__(self): return ('(name={self.name!r}, default_license={self.default_license!r}, ' 'allow_submissions={self.allow_submissions!r}) with id {self.id!r}').format(self=self) diff --git a/osf_tests/test_collection_submission.py b/osf_tests/test_collection_submission.py index 2b661f00873..54d282b7690 100644 --- a/osf_tests/test_collection_submission.py +++ b/osf_tests/test_collection_submission.py @@ -1,4 +1,9 @@ +from contextlib import suppress +from unittest import mock import pytest +from requests import Response +from requests.exceptions import HTTPError + from osf_tests.factories import ( AuthUserFactory, @@ -8,11 +13,13 @@ from osf_tests.factories import NodeFactory, CollectionFactory, CollectionProviderFactory -from osf.models import CollectionSubmission, NotificationTypeEnum +from osf.models import CollectionSubmission, NotificationTypeEnum, CedarMetadataRecord, CedarMetadataTemplate from osf.utils.workflows import CollectionSubmissionStates from framework.exceptions import PermissionsError from api_tests.utils import UserRoles +from api.share.utils import cedar_record_to_turtle, _shtrove_cedar_record_identifier from django.utils import timezone +from website import settings from tests.utils import capture_notifications @@ -147,6 +154,38 @@ def configure_test_auth(node, user_role, provider=None): return user +@pytest.fixture() +def unmoderated_collection_submission_public(node, unmoderated_collection): + unmoderated_collection.is_public = True + unmoderated_collection.save() + collection_submission = CollectionSubmission( + guid=node.guids.first(), + collection=unmoderated_collection, + creator=node.creator, + ) + with capture_notifications(): + collection_submission.save() + assert not collection_submission.is_moderated + return collection_submission + + +@pytest.fixture() +def cedar_template_json(): + return {'t_key_1': 't_value_1', 't_key_2': 't_value_2', 't_key_3': 't_value_3'} + + +@pytest.fixture() +def cedar_template(cedar_template_json): + return CedarMetadataTemplate.objects.create( + schema_name='cedar_test_schema_name', + cedar_id='cedar_test_id', + template_version=1, + template=cedar_template_json, + active=True, + should_index_for_search=True + ) + + @pytest.mark.django_db class TestModeratedCollectionSubmission: @@ -574,3 +613,388 @@ def test_cancel_succeeds(self, node, hybrid_moderated_collection_submission): with capture_notifications(): hybrid_moderated_collection_submission.cancel(user=user, comment='Test Comment') assert hybrid_moderated_collection_submission.state == CollectionSubmissionStates.IN_PROGRESS + + +@pytest.mark.django_db +@pytest.mark.enable_enqueue_task +@mock.patch.object(settings, 'SHARE_ENABLED', True) +@mock.patch.object(settings, 'USE_CELERY', False) # run tasks synchronously +class TestCollectionSubmissionWithCedarRecord: + + @mock.patch('api.share.utils.pls_send_trove_record') + @mock.patch('api.share.utils.share_update_cedar_metadata_record') + @mock.patch('api.share.utils.share_delete_cedar_metadata_record') + def test_unindexable_template_and_unpublished_record_calls_records_deletion( + self, + mock_delete, + mock_create, + mock_pls, + unmoderated_collection_submission_public, + cedar_template, + cedar_template_json + ): + cedar_template.should_index_for_search = False + cedar_template.save() + record = CedarMetadataRecord.objects.create( + guid=unmoderated_collection_submission_public.guid, + template=cedar_template, + metadata=cedar_template_json, + is_published=False, + ) + obj = mock.Mock() + obj.status_code = 200 + mock_pls.return_value = obj + unmoderated_collection_submission_public.save() + + assert not mock_create.s.called + assert mock_delete.s.called + mock_delete.s.assert_called_with( + record.guid._id, + record._id, + record.template.cedar_id + ) + + @mock.patch('api.share.utils.pls_send_trove_record') + @mock.patch('api.share.utils.share_update_cedar_metadata_record') + @mock.patch('api.share.utils.share_delete_cedar_metadata_record') + def test_indexable_template_and_unpublished_record_calls_records_deletion( + self, + mock_delete, + mock_create, + mock_pls, + unmoderated_collection_submission_public, + cedar_template, + cedar_template_json + ): + cedar_template.should_index_for_search = True + cedar_template.save() + record = CedarMetadataRecord.objects.create( + guid=unmoderated_collection_submission_public.guid, + template=cedar_template, + metadata=cedar_template_json, + is_published=False, + ) + obj = mock.Mock() + obj.status_code = 200 + mock_pls.return_value = obj + unmoderated_collection_submission_public.save() + + assert not mock_create.s.called + assert mock_delete.s.called + mock_delete.s.assert_called_with( + record.guid._id, + record._id, + record.template.cedar_id + ) + + @mock.patch('api.share.utils.pls_send_trove_record') + @mock.patch('api.share.utils.share_update_cedar_metadata_record') + @mock.patch('api.share.utils.share_delete_cedar_metadata_record') + def test_unindexable_template_and_published_record_calls_records_deletion( + self, + mock_delete, + mock_create, + mock_pls, + unmoderated_collection_submission_public, + cedar_template, + cedar_template_json + ): + cedar_template.should_index_for_search = False + cedar_template.save() + record = CedarMetadataRecord.objects.create( + guid=unmoderated_collection_submission_public.guid, + template=cedar_template, + metadata=cedar_template_json, + is_published=True, + ) + obj = mock.Mock() + obj.status_code = 200 + mock_pls.return_value = obj + unmoderated_collection_submission_public.save() + + assert not mock_create.s.called + assert mock_delete.s.called + mock_delete.s.assert_called_with( + record.guid._id, + record._id, + record.template.cedar_id + ) + + @mock.patch('api.share.utils.pls_send_trove_record') + @mock.patch('api.share.utils.share_update_cedar_metadata_record') + @mock.patch('api.share.utils.share_delete_cedar_metadata_record') + def test_indexable_template_and_published_record_call_shtrove( + self, + mock_delete, + mock_create, + mock_pls, + unmoderated_collection_submission_public, + cedar_template, + cedar_template_json + ): + cedar_template.should_index_for_search = True + cedar_template.save() + record = CedarMetadataRecord.objects.create( + guid=unmoderated_collection_submission_public.guid, + template=cedar_template, + metadata=cedar_template_json, + is_published=True, + ) + obj = mock.Mock() + obj.status_code = 200 + mock_pls.return_value = obj + unmoderated_collection_submission_public.save() + + assert mock_create.s.called + mock_create.s.assert_called_with(unmoderated_collection_submission_public.guid._id, record.pk) + assert not mock_delete.s.called + + def test_share_update_cedar_metadata_record(self, unmoderated_collection_submission_public, cedar_template): + metadata = { + '@context': { + 'pav': 'http://purl.org/pav/', + 'url': 'http://schema.org/url', + 'xsd': 'http://www.w3.org/2001/XMLSchema#', + 'name': 'http://schema.org/name', + 'oslc': 'http://open-services.net/ns/core#', + 'rdfs': 'http://www.w3.org/2000/01/rdf-schema#', + 'skos': 'http://www.w3.org/2004/02/skos/core#', + 'author': 'http://schema.org/author', + 'funder': 'https://schema.metadatacenter.org/properties/c35f0660-2072-46a3-8e0d-532e40d94919', + 'schema': 'http://schema.org/', + 'license': 'http://schema.org/license', + 'citation': 'http://schema.org/citation', + 'keywords': 'http://schema.org/keywords', + 'identifier': 'http://schema.org/identifier', + 'rdfs:label': { + '@type': 'xsd:string' + }, + 'description': 'http://schema.org/description', + 'schema:name': { + '@type': 'xsd:string' + }, + 'pav:createdBy': { + '@type': '@id' + }, + 'pav:createdOn': { + '@type': 'xsd:dateTime' + }, + 'skos:notation': { + '@type': 'xsd:string' + }, + 'oslc:modifiedBy': { + '@type': '@id' + }, + 'pav:derivedFrom': { + '@type': '@id' + }, + 'schema:isBasedOn': { + '@type': '@id' + }, + 'variableMeasured': 'http://schema.org/variableMeasured', + 'pav:lastUpdatedOn': { + '@type': 'xsd:dateTime' + }, + 'schema:description': { + '@type': 'xsd:string' + }, + 'About this template': 'https://repo.metadatacenter.org/template-fields/bc66544c-e100-439e-9e80-9b35537368e5' + }, + 'name': { + '@value': 'name' + }, + 'description': { + '@value': 'description' + }, + 'variableMeasured': [ + { + '@value': 'variable' + } + ], + 'author': [ + { + '@value': None + } + ], + 'citation': { + '@value': None + }, + 'license': { + '@value': None + }, + 'funder': [ + { + '@value': '1111111' + } + ], + 'url': {}, + 'keywords': { + '@value': None + }, + 'identifier': {} + } + with mock.patch('api.share.utils.pls_send_trove_record'): + record = CedarMetadataRecord.objects.create( + guid=unmoderated_collection_submission_public.guid, + template=cedar_template, + metadata=metadata, + is_published=True, + ) + + result = cedar_record_to_turtle(record.guid.referent, record) + vocab_url = '' + schema_url = '' + schema_metadata_url = '' + urls_to_find = { + vocab_url: None, + schema_url: None, + schema_metadata_url: None + } + for url in urls_to_find.keys(): + urls_to_find[url] = result[result.index(url) - 3] + + # fetch urls from result and assign ns prefixes based on order of appearance in result to make test resilient to changes in order of namespace declaration in turtle output + ns1 = list(filter(lambda url: urls_to_find[url] == '1', urls_to_find.keys()))[0] + ns2 = list(filter(lambda url: urls_to_find[url] == '2', urls_to_find.keys()))[0] + ns3 = list(filter(lambda url: urls_to_find[url] == '3', urls_to_find.keys()))[0] + vocab_n = urls_to_find[vocab_url] + schema_n = urls_to_find[schema_url] + schema_metadata_n = urls_to_find[schema_metadata_url] + # compose expected result dynamically based on ordering of prefixes + # however ns attributes are strictly attached to specific prefix + expected = ( + f'@prefix ns1: {ns1} .\n' + f'@prefix ns2: {ns2} .\n' + f'@prefix ns3: {ns3} .\n\n' + f' ns{vocab_n}:hasCedarRecord [ ns{schema_n}:description "description" ;\n' + f' ns{schema_n}:identifier [ ] ;\n' + f' ns{schema_n}:name "name" ;\n' + f' ns{schema_n}:url [ ] ;\n' + f' ns{schema_n}:variableMeasured "variable" ;\n' + f' ns{schema_metadata_n}:c35f0660-2072-46a3-8e0d-532e40d94919 "1111111" ] .\n\n' + ) + + assert result == expected + + @mock.patch('api.share.utils.pls_send_trove_record') + @mock.patch('api.share.utils.share_delete_cedar_metadata_record') + def test_cedar_record_identifier_on_create(self, mock_delete, mock_pls, unmoderated_collection_submission_public): + template = CedarMetadataTemplate.objects.create(schema_name='http://google.com', cedar_id='http26', template_version=1) + template.should_index_for_search = True + template.save() + with mock.patch('api.share.utils.share_update_cedar_metadata_record'): + to_create_record = CedarMetadataRecord.objects.create( + guid=unmoderated_collection_submission_public.guid, + template=template, + metadata=template.template, + is_published=True, + ) + + with mock.patch('api.share.utils.requests.post'): + with mock.patch('api.share.utils._shtrove_cedar_record_identifier') as mock_identifier: + unmoderated_collection_submission_public.save() + mock_identifier.assert_called_with( + to_create_record._id, + to_create_record.template.cedar_id + ) + assert ( + _shtrove_cedar_record_identifier(to_create_record._id, to_create_record.template.cedar_id) == + f'{to_create_record._id}/CedarMetadataRecord:http26' + ) + + @mock.patch('api.share.utils.pls_send_trove_record') + @mock.patch('api.share.utils.share_update_cedar_metadata_record') + def test_cedar_record_identifier_on_delete(self, mock_update, mock_pls, unmoderated_collection_submission_public): + template = CedarMetadataTemplate.objects.create(schema_name='http://google.com', cedar_id='http25', template_version=1) + with mock.patch('api.share.utils.share_delete_cedar_metadata_record'): + to_delete_record = CedarMetadataRecord.objects.create( + guid=unmoderated_collection_submission_public.guid, + template=template, + metadata=template.template, + is_published=False, + ) + + with mock.patch('api.share.utils.requests.delete'): + with mock.patch('api.share.utils._shtrove_cedar_record_identifier') as mock_identifier: + unmoderated_collection_submission_public.save() + mock_identifier.assert_called_with(to_delete_record._id, to_delete_record.template.cedar_id) + assert ( + _shtrove_cedar_record_identifier(to_delete_record._id, to_delete_record.template.cedar_id) == + f'{to_delete_record._id}/CedarMetadataRecord:http25' + ) + + @mock.patch('api.share.utils.share_update_cedar_metadata_record') + @mock.patch('api.share.utils.share_delete_cedar_metadata_record') + def test_cedar_record_create_retry( + self, + mock_delete, + mock_create, + unmoderated_collection_submission_public, + cedar_template, + cedar_template_json + ): + cedar_template.should_index_for_search = True + cedar_template.save() + with mock.patch('api.share.utils.pls_send_trove_record') as mock_pls: + mock_pls.return_value = Response() + mock_pls.return_value.status_code = 400 + with suppress(Exception): + CedarMetadataRecord.objects.create( + guid=unmoderated_collection_submission_public.guid, + template=cedar_template, + metadata=cedar_template_json, + is_published=True, + ) + + def mock_raise_for_status(*args, **kwargs): + raise HTTPError('Retry error') + + mock_pls.return_value = Response() + mock_pls.return_value.status_code = 400 + mock_pls.return_value.raise_for_status = mock_raise_for_status + try: + unmoderated_collection_submission_public.save() + except Exception as err: + assert str(err) == "Retry in 180s: HTTPError('Retry error')" + assert not mock_create.s.called + assert not mock_delete.s.called + else: + pytest.fail('Expected Retry(HTTPError) to be raised') + + @mock.patch('api.share.utils.share_update_cedar_metadata_record') + @mock.patch('api.share.utils.share_delete_cedar_metadata_record') + def test_cedar_record_delete_retry( + self, + mock_delete, + mock_create, + unmoderated_collection_submission_public, + cedar_template, + cedar_template_json + ): + cedar_template.should_index_for_search = False + cedar_template.save() + with mock.patch('api.share.utils.pls_send_trove_record') as mock_pls: + mock_pls.return_value = Response() + mock_pls.return_value.status_code = 400 + with suppress(Exception): + CedarMetadataRecord.objects.create( + guid=unmoderated_collection_submission_public.guid, + template=cedar_template, + metadata=cedar_template_json, + is_published=True, + ) + + def mock_raise_for_status(*args, **kwargs): + raise HTTPError('Retry error') + + mock_pls.return_value = Response() + mock_pls.return_value.status_code = 400 + mock_pls.return_value.raise_for_status = mock_raise_for_status + try: + unmoderated_collection_submission_public.save() + except Exception as err: + assert str(err) == "Retry in 180s: HTTPError('Retry error')" + assert not mock_create.s.called + assert not mock_delete.s.called + else: + pytest.fail('Expected Retry(HTTPError) to be raised')