diff --git a/api_tests/preprints/views/test_preprint_contributors_list.py b/api_tests/preprints/views/test_preprint_contributors_list.py index dc2a5f20af37..00db7c091d1d 100644 --- a/api_tests/preprints/views/test_preprint_contributors_list.py +++ b/api_tests/preprints/views/test_preprint_contributors_list.py @@ -1511,7 +1511,7 @@ def test_add_unregistered_contributor_without_email_no_email( assert mock_mail.call_count == 0 @mock.patch('framework.auth.views.mails.send_mail') - @mock.patch('website.preprints.tasks.on_preprint_updated.s') + @mock.patch('osf.models.preprint.update_or_enqueue_on_preprint_updated') def test_publishing_preprint_sends_emails_to_contributors( self, mock_update, mock_mail, app, user, url_preprint_contribs, preprint_unpublished): url = '/{}preprints/{}/'.format(API_BASE, preprint_unpublished._id) diff --git a/api_tests/preprints/views/test_preprint_list.py b/api_tests/preprints/views/test_preprint_list.py index 3d9147e2bf6c..cd75cbe8377d 100644 --- a/api_tests/preprints/views/test_preprint_list.py +++ b/api_tests/preprints/views/test_preprint_list.py @@ -682,7 +682,7 @@ def test_create_preprint_adds_log_if_published(self): assert_equal(log.action, 'published') assert_equal(log.params.get('preprint'), preprint._id) - @mock.patch('website.preprints.tasks.on_preprint_updated.s') + @mock.patch('osf.models.preprint.update_or_enqueue_on_preprint_updated') def test_create_preprint_from_project_published_hits_update( self, mock_on_preprint_updated): private_project_payload = build_preprint_create_payload( @@ -699,7 +699,7 @@ def test_create_preprint_from_project_published_hits_update( assert_true(mock_on_preprint_updated.called) - @mock.patch('website.preprints.tasks.on_preprint_updated.s') + @mock.patch('osf.models.preprint.update_or_enqueue_on_preprint_updated') def test_create_preprint_from_project_unpublished_does_not_hit_update( self, mock_on_preprint_updated): private_project_payload = build_preprint_create_payload( @@ -711,7 +711,7 @@ def test_create_preprint_from_project_unpublished_does_not_hit_update( auth=self.user.auth) assert not mock_on_preprint_updated.called - @mock.patch('website.preprints.tasks.on_preprint_updated.s') + @mock.patch('osf.models.preprint.update_or_enqueue_on_preprint_updated') def test_setting_is_published_with_moderated_provider_fails( self, mock_on_preprint_updated): self.provider.reviews_workflow = 'pre-moderation' diff --git a/framework/postcommit_tasks/handlers.py b/framework/postcommit_tasks/handlers.py index b23f8a67574b..4ef52f640d83 100644 --- a/framework/postcommit_tasks/handlers.py +++ b/framework/postcommit_tasks/handlers.py @@ -11,6 +11,7 @@ from celery.canvas import Signature from celery.local import PromiseProxy from gevent.pool import Pool +from flask import _app_ctx_stack as context_stack from website import settings @@ -71,20 +72,24 @@ def enqueue_postcommit_task(fn, args, kwargs, celery=False, once_per_request=Tru ''' Any task queued with this function where celery=True will be run asynchronously. ''' - # make a hash of the pertinent data - raw = [fn.__name__, fn.__module__, args, kwargs] - m = hashlib.md5() - m.update('-'.join([x.__repr__() for x in raw])) - key = m.hexdigest() - - if not once_per_request: - # we want to run it once for every occurrence, add a random string - key = '{}:{}'.format(key, binascii.hexlify(os.urandom(8))) - - if celery and isinstance(fn, PromiseProxy): - postcommit_celery_queue().update({key: fn.si(*args, **kwargs)}) + if context_stack.top and context_stack.top.app.testing: + # For testing purposes only: run fn directly + fn(**kwargs) else: - postcommit_queue().update({key: functools.partial(fn, *args, **kwargs)}) + # make a hash of the pertinent data + raw = [fn.__name__, fn.__module__, args, kwargs] + m = hashlib.md5() + m.update('-'.join([x.__repr__() for x in raw])) + key = m.hexdigest() + + if not once_per_request: + # we want to run it once for every occurrence, add a random string + key = '{}:{}'.format(key, binascii.hexlify(os.urandom(8))) + + if celery and isinstance(fn, PromiseProxy): + postcommit_celery_queue().update({key: fn.si(*args, **kwargs)}) + else: + postcommit_queue().update({key: functools.partial(fn, *args, **kwargs)}) handlers = { 'before_request': postcommit_before_request, diff --git a/osf/models/mixins.py b/osf/models/mixins.py index 426fa2bb8a44..99eb508f2aba 100644 --- a/osf/models/mixins.py +++ b/osf/models/mixins.py @@ -915,13 +915,12 @@ def add_contributor(self, contributor, permissions=None, visible=True, if save: self.save() - self.update_search() if self._id and contrib_to_add: project_signals.contributor_added.send(self, contributor=contributor, auth=auth, email_template=send_email, permissions=permissions) - # enqueue on_node_updated to update DOI metadata when a contributor is added + # enqueue on_node_updated/on_preprint_updated to update DOI metadata when a contributor is added if self.get_identifier_value('doi'): request, user_id = get_request_and_user_id() self.update_or_enqueue_on_resource_updated(user_id, first_save=False, saved_fields={'contributors'}) @@ -1157,11 +1156,10 @@ def remove_contributor(self, contributor, auth, log=True): ) self.save() - self.update_search() # send signal to remove this user from project subscriptions project_signals.contributor_removed.send(self, user=contributor) - # enqueue on_node_updated to update DOI metadata when a contributor is removed + # enqueue on_node_updated/on_preprint_updated to update DOI metadata when a contributor is removed if self.get_identifier_value('doi'): request, user_id = get_request_and_user_id() self.update_or_enqueue_on_resource_updated(user_id, first_save=False, saved_fields={'contributors'}) @@ -1213,7 +1211,10 @@ def move_contributor(self, contributor, auth, index, save=False): ) if save: self.save() - self.update_search() + # enqueue on_node_updated/on_preprint_updated to update DOI metadata when a contributor is moved + if self.get_identifier_value('doi'): + request, user_id = get_request_and_user_id() + self.update_or_enqueue_on_resource_updated(user_id, first_save=False, saved_fields={'contributors'}) # TODO: Optimize me def manage_contributors(self, user_dicts, auth, save=False): @@ -1368,7 +1369,10 @@ def set_visible(self, user, visible, log=True, auth=None, save=False): ) if save: self.save() - self.update_search() + # enqueue on_node_updated/on_preprint_updated to update DOI metadata when a contributor is hidden/made visible + if self.get_identifier_value('doi'): + request, user_id = get_request_and_user_id() + self.update_or_enqueue_on_resource_updated(user_id, first_save=False, saved_fields={'contributors'}) def has_permission(self, user, permission, check_parent=True): """Check whether user has permission. diff --git a/osf/models/preprint.py b/osf/models/preprint.py index e474b43835b2..38c1867fe408 100644 --- a/osf/models/preprint.py +++ b/osf/models/preprint.py @@ -104,7 +104,9 @@ class Preprint(DirtyFieldsMixin, GuidMixin, IdentifierMixin, ReviewableMixin, Ba 'is_public', 'deleted', 'subjects', - '_contributors' + 'primary_file', + 'contributors', + 'tags', } # Setting for ContributorMixin @@ -483,7 +485,6 @@ def set_primary_file(self, preprint_file, auth, save=False): if save: self.save() - self.update_search() update_or_enqueue_on_preprint_updated(preprint_id=self._id, saved_fields={'primary_file'}) def set_published(self, published, auth, save=False): @@ -541,10 +542,9 @@ def set_preprint_license(self, license_detail, auth, save=False): auth=auth, save=False ) - update_or_enqueue_on_preprint_updated(preprint_id=self._id, saved_fields={'license'}) - if save: self.save() + update_or_enqueue_on_preprint_updated(preprint_id=self._id, saved_fields={'license'}) def set_identifier_values(self, doi, save=False): self.set_identifier_value('doi', doi) @@ -667,7 +667,7 @@ def add_tag_log(self, tag, auth): # Override Taggable def on_tag_added(self, tag): - self.update_search() + update_or_enqueue_on_preprint_updated(preprint_id=self._id, saved_fields={'tags'}) def remove_tag(self, tag, auth, save=True): if not tag: @@ -688,7 +688,7 @@ def remove_tag(self, tag, auth, save=True): ) if save: self.save() - self.update_search() + update_or_enqueue_on_preprint_updated(preprint_id=self._id, saved_fields={'tags'}) return True def set_supplemental_node(self, node, auth, save=False): diff --git a/osf_tests/test_elastic_search.py b/osf_tests/test_elastic_search.py index 70c6cce60c69..a570f13adb8a 100644 --- a/osf_tests/test_elastic_search.py +++ b/osf_tests/test_elastic_search.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- from __future__ import absolute_import, division, print_function, unicode_literals +import mock import time import unittest import logging @@ -25,12 +26,15 @@ CollectedGuidMetadata, ) from addons.osfstorage.models import OsfStorageFile +from framework.postcommit_tasks.handlers import postcommit_queue from scripts.populate_institutions import main as populate_institutions from osf_tests import factories from tests.base import OsfTestCase from tests.test_features import requires_search +from framework.postcommit_tasks.handlers import postcommit_after_request + from tests.utils import mock_archive, run_celery_tasks @@ -394,53 +398,62 @@ def setUp(self): def test_new_preprint_unsubmitted(self): # Verify that an unsubmitted preprint is not present in Elastic Search. - docs = query(self.preprint.title)['results'] + title = 'Apple' + self.preprint.title = title + self.preprint.save() + docs = query(title)['results'] assert_equal(len(docs), 0) def test_new_preprint_unpublished(self): # Verify that an unpublished preprint is not present in Elastic Search. - self.preprint = factories.PreprintFactory(creator=self.user, is_published=False) - docs = query(self.preprint.title)['results'] + title = 'Banana' + self.preprint = factories.PreprintFactory(creator=self.user, is_published=False, title=title) + assert self.preprint.title == title + docs = query(title)['results'] assert_equal(len(docs), 0) def test_unsubmitted_preprint_primary_file(self): # Unpublished preprint's primary_file not showing up in Elastic Search - with run_celery_tasks(): - self.preprint.set_primary_file(self.file, auth=Auth(self.user), save=True) - docs = query(self.preprint.title)['results'] + title = 'Cantaloupe' + self.preprint.title = title + self.preprint.set_primary_file(self.file, auth=Auth(self.user), save=True) + assert self.preprint.title == title + docs = query(title)['results'] assert_equal(len(docs), 0) - @unittest.skip("Elasticsearch latency seems to be causing theses tests to fail randomly.") def test_publish_preprint(self): - self.preprint = factories.PreprintFactory(creator=self.user, is_published=False) - with run_celery_tasks(): - self.preprint.set_published(True, auth=Auth(self.preprint.creator), save=True) - docs = query(self.preprint.title)['results'] + title = 'Date' + self.preprint = factories.PreprintFactory(creator=self.user, is_published=False, title=title) + self.preprint.set_published(True, auth=Auth(self.preprint.creator), save=True) + assert self.preprint.title == title + docs = query(title)['results'] # Both preprint and primary_file showing up in Elastic assert_equal(len(docs), 2) def test_preprint_title_change(self): + title_original = self.published_preprint.title new_title = 'My new preprint title' - with run_celery_tasks(): - self.published_preprint.set_title(new_title, auth=Auth(self.user), save=True) - docs = query(new_title)['results'] - # Both preprint and primary_file showing up in Elastic - assert_equal(len(docs), 2) + self.published_preprint.set_title(new_title, auth=Auth(self.user), save=True) + docs = query('category:preprint AND ' + title_original)['results'] + assert_equal(len(docs), 0) + + docs = query('category:preprint AND ' + new_title)['results'] + assert_equal(len(docs), 1) def test_preprint_description_change(self): - with run_celery_tasks(): - new_abstract = 'My preprint abstract' - self.published_preprint.set_description(new_abstract, auth=Auth(self.user), save=True) + description_original = self.published_preprint.description + new_abstract = 'My preprint abstract' + self.published_preprint.set_description(new_abstract, auth=Auth(self.user), save=True) docs = query(self.published_preprint.title)['results'] - # Both preprint and primary_file showing up in Elastic - assert_equal(len(docs), 2) - assert_equal(docs[0]['description'], new_abstract) + docs = query('category:preprint AND ' + description_original)['results'] + assert_equal(len(docs), 0) + + docs = query('category:preprint AND ' + new_abstract)['results'] + assert_equal(len(docs), 1) - @unittest.skip("Elasticsearch latency seems to be causing theses tests to fail randomly.") def test_set_preprint_private(self): # Not currently an option for users, but can be used for spam - with run_celery_tasks(): - self.published_preprint.set_privacy('private', auth=Auth(self.user), save=True) + self.published_preprint.set_privacy('private', auth=Auth(self.user), save=True) docs = query(self.published_preprint.title)['results'] # Both preprint and primary_file showing up in Elastic assert_equal(len(docs), 0) @@ -453,9 +466,7 @@ def test_set_primary_file(self): name='panda.txt', materialized_path='/panda.txt') self.file.save() - - with run_celery_tasks(): - self.published_preprint.set_primary_file(self.file, auth=Auth(self.user), save=True) + self.published_preprint.set_primary_file(self.file, auth=Auth(self.user), save=True) docs = query(self.published_preprint.title)['results'] assert_equal(len(docs), 2) assert_equal(docs[1]['name'], self.file.name) @@ -466,9 +477,11 @@ def test_set_license(self): 'year': '2015', 'copyrightHolders': ['Iron Man'] } - with run_celery_tasks(): - self.published_preprint.set_preprint_license(license_details, Auth(self.user), save=True) - docs = query(self.published_preprint.title)['results'] + title = 'Elderberry' + self.published_preprint.title = title + self.published_preprint.set_preprint_license(license_details, Auth(self.user), save=True) + assert self.published_preprint.title == title + docs = query(title)['results'] assert_equal(len(docs), 2) assert_equal(docs[0]['license']['copyright_holders'][0], 'Iron Man') assert_equal(docs[0]['license']['name'], 'No license') @@ -477,11 +490,10 @@ def test_add_tags(self): tags = ['stonecoldcrazy', 'just a poor boy', 'from-a-poor-family'] - with run_celery_tasks(): - for tag in tags: - docs = query('tags:"{}"'.format(tag))['results'] - assert_equal(len(docs), 0) - self.published_preprint.add_tag(tag, Auth(self.user), save=True) + for tag in tags: + docs = query('tags:"{}"'.format(tag))['results'] + assert_equal(len(docs), 0) + self.published_preprint.add_tag(tag, Auth(self.user), save=True) for tag in tags: docs = query('tags:"{}"'.format(tag))['results'] @@ -504,8 +516,8 @@ def test_add_contributor(self): docs = query('category:preprint AND "{}"'.format(user2.fullname))['results'] assert_equal(len(docs), 0) - with run_celery_tasks(): - self.published_preprint.add_contributor(user2, save=True) + # with run_celery_tasks(): + self.published_preprint.add_contributor(user2, save=True) docs = query('category:preprint AND "{}"'.format(user2.fullname))['results'] assert_equal(len(docs), 1) @@ -524,21 +536,31 @@ def test_remove_contributor(self): def test_hide_contributor(self): user2 = factories.UserFactory(fullname='Brian May') self.published_preprint.add_contributor(user2) - with run_celery_tasks(): - self.published_preprint.set_visible(user2, False, save=True) + self.published_preprint.set_visible(user2, False, save=True) docs = query('category:preprint AND "{}"'.format(user2.fullname))['results'] assert_equal(len(docs), 0) - with run_celery_tasks(): - self.published_preprint.set_visible(user2, True, save=True) + self.published_preprint.set_visible(user2, True, save=True) docs = query('category:preprint AND "{}"'.format(user2.fullname))['results'] assert_equal(len(docs), 1) + def test_move_contributor(self): + user2 = factories.UserFactory(fullname='Brian May') + self.published_preprint.add_contributor(user2, save=True) + docs = query('category:preprint AND "{}"'.format(user2.fullname))['results'] + assert_equal(len(docs), 1) + docs[0]['contributors'][0]['fullname'] == self.user.fullname + docs[0]['contributors'][1]['fullname'] == user2.fullname + self.published_preprint.move_contributor(user2, Auth(self.user), 0) + docs = query('category:preprint AND "{}"'.format(user2.fullname))['results'] + assert_equal(len(docs), 1) + docs[0]['contributors'][0]['fullname'] == user2.fullname + docs[0]['contributors'][1]['fullname'] == self.user.fullname + def test_tag_aggregation(self): tags = ['stonecoldcrazy', 'just a poor boy', 'from-a-poor-family'] - with run_celery_tasks(): - for tag in tags: - self.published_preprint.add_tag(tag, Auth(self.user), save=True) + for tag in tags: + self.published_preprint.add_tag(tag, Auth(self.user), save=True) docs = query(self.published_preprint.title)['tags'] assert len(docs) == 3 diff --git a/tests/test_preprints.py b/tests/test_preprints.py index d842a9fe6af6..6de4ee0346a3 100644 --- a/tests/test_preprints.py +++ b/tests/test_preprints.py @@ -19,7 +19,7 @@ from website.util.share import format_user from framework.auth import Auth, cas, signing from framework.celery_tasks import handlers -from framework.celery_tasks.handlers import enqueue_task, get_task_from_queue +from framework.postcommit_tasks.handlers import enqueue_postcommit_task, get_task_from_postcommit_queue from framework.exceptions import PermissionsError, HTTPError from framework.auth.core import Auth from addons.osfstorage.models import OsfStorageFile @@ -1853,7 +1853,7 @@ def test_update_or_enqueue_on_preprint_updated(self): old_subjects=second_subjects, saved_fields={'title': 'Hello'} ) - updated_task = get_task_from_queue( + updated_task = get_task_from_postcommit_queue( 'website.preprints.tasks.on_preprint_updated', predicate=lambda task: task.kwargs['preprint_id'] == self.preprint._id ) @@ -2137,7 +2137,7 @@ def setUp(self): self.file = api_test_utils.create_test_file(self.project, self.admin, 'second_place.pdf') self.preprint = PreprintFactory(creator=self.admin, filename='second_place.pdf', provider=self.provider, subjects=[[self.subject._id]], project=self.project, is_published=False) - @mock.patch('website.preprints.tasks.on_preprint_updated.si') + @mock.patch('osf.models.preprint.update_or_enqueue_on_preprint_updated') def test_save_unpublished_not_called(self, mock_on_preprint_updated): self.preprint.save() assert not mock_on_preprint_updated.called diff --git a/tests/utils.py b/tests/utils.py index 77c115088fbe..008ab71cf9e2 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -10,6 +10,7 @@ from framework.auth import Auth from framework.celery_tasks.handlers import celery_teardown_request +from framework.postcommit_tasks.handlers import postcommit_after_request from osf.models import Sanction from tests.base import get_default_metaschema from website.archiver import ARCHIVER_SUCCESS diff --git a/website/preprints/tasks.py b/website/preprints/tasks.py index c5d0fa5ff0f7..e6a8591110cf 100644 --- a/website/preprints/tasks.py +++ b/website/preprints/tasks.py @@ -7,7 +7,8 @@ from framework.exceptions import HTTPError from framework.celery_tasks import app as celery_app -from framework.celery_tasks.handlers import enqueue_task, get_task_from_queue +from framework.postcommit_tasks.handlers import enqueue_postcommit_task, get_task_from_postcommit_queue + from framework import sentry from website import settings, mails @@ -27,6 +28,7 @@ def on_preprint_updated(preprint_id, update_share=True, share_type=None, old_sub old_subjects = [] need_update = bool(preprint.SEARCH_UPDATE_FIELDS.intersection(saved_fields or {})) + if need_update: preprint.update_search() @@ -57,21 +59,23 @@ def update_or_create_preprint_identifiers(preprint): sentry.log_message(err.args[0]) def update_or_enqueue_on_preprint_updated(preprint_id, update_share=True, share_type=None, old_subjects=None, saved_fields=None): - task = get_task_from_queue( + task = get_task_from_postcommit_queue( 'website.preprints.tasks.on_preprint_updated', predicate=lambda task: task.kwargs['preprint_id'] == preprint_id ) if task: old_subjects = old_subjects or [] task_subjects = task.kwargs['old_subjects'] or [] - saved_fields = saved_fields or {} task.kwargs['update_share'] = update_share or task.kwargs['update_share'] task.kwargs['share_type'] = share_type or task.kwargs['share_type'] task.kwargs['old_subjects'] = old_subjects + task_subjects task.kwargs['saved_fields'] = list(set(task.kwargs['saved_fields']).union(saved_fields)) else: - enqueue_task( - on_preprint_updated.s(preprint_id=preprint_id, old_subjects=old_subjects, update_share=update_share, share_type=share_type, saved_fields=saved_fields) + enqueue_postcommit_task( + on_preprint_updated, + (), + {'preprint_id': preprint_id, 'old_subjects': old_subjects, 'update_share': update_share, 'share_type': share_type, 'saved_fields': saved_fields}, + celery=True ) def update_preprint_share(preprint, old_subjects=None, share_type=None):