From 2a340f85a21e7e8c563deedba675f967040dfbb3 Mon Sep 17 00:00:00 2001 From: amercader Date: Fri, 24 Jan 2020 14:32:41 +0100 Subject: [PATCH] [#5137] [#5137] Fix Datapusher exceptions on 2.9 Fixes #5137, #4751 This was originally reported in the issues above. On 2.9, enabling the DataPusher causes an exception at the end of the dataset creation process, when submitting the resources. The stack trace looks like this: ``` File "/home/adria/dev/pyenvs/ckan/lib/python2.7/site-packages/flask/views.py", line 163, in dispatch_request return meth(*args, **kwargs) File "/home/adria/dev/pyenvs/ckan/src/ckan/ckan/views/resource.py", line 242, in post get_action(u'resource_create')(context, data) File "/home/adria/dev/pyenvs/ckan/src/ckan/ckan/logic/__init__.py", line 472, in wrapped result = _action(context, data_dict, **kw) File "/home/adria/dev/pyenvs/ckan/src/ckan/ckan/logic/action/create.py", line 330, in resource_create model.repo.commit() File "/home/adria/dev/pyenvs/ckan/lib/python2.7/site-packages/sqlalchemy/orm/scoping.py", line 162, in do return getattr(self.registry(), name)(*args, **kwargs) File "/home/adria/dev/pyenvs/ckan/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1027, in commit self.transaction.commit() File "/home/adria/dev/pyenvs/ckan/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 494, in commit self._prepare_impl() File "/home/adria/dev/pyenvs/ckan/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 464, in _prepare_impl stx = self.session.transaction AttributeError: 'NoneType' object has no attribute 'transaction' ``` As with all SQLAlchemy session issues and DataPusher ones this one took a while to track, but essentially it boiled down to what @davidread explained [here](https://stackoverflow.com/a/56094135/235993) The `datapusher` plugin used the `notify` hook to listen to resource changes. This was fired from `resource_create` which had started a transaction. The plugin then called `datapusher_submit` which in turns calls `task_status_update`, which commits and clears the Session causing troubles later on. How this had been working up until 2.8 I don't know and I'm not keen on finding out to be honest. This PR refactors the `datapusher` plugin to use the `IResourceController.after_create` and `IResourceUrlChange.notify` hooks, which are called after commit. It adds two tests that failed before the patch as well. --- ckanext/datapusher/plugin.py | 128 ++++++++++++++++++------------- ckanext/datapusher/tests/test.py | 68 ++++++++++++++++ 2 files changed, 142 insertions(+), 54 deletions(-) diff --git a/ckanext/datapusher/plugin.py b/ckanext/datapusher/plugin.py index a4c55524a08..a1f6c3ef421 100644 --- a/ckanext/datapusher/plugin.py +++ b/ckanext/datapusher/plugin.py @@ -37,7 +37,7 @@ class DatapusherPlugin(p.SingletonPlugin): p.implements(p.IActions) p.implements(p.IAuthFunctions) p.implements(p.IResourceUrlChange) - p.implements(p.IDomainObjectModification, inherit=True) + p.implements(p.IResourceController, inherit=True) p.implements(p.ITemplateHelpers) p.implements(p.IBlueprint) @@ -65,60 +65,80 @@ def configure(self, config): format(config_option) ) - def notify(self, entity, operation=None): - if isinstance(entity, model.Resource): - if ( - operation == model.domain_object.DomainObjectOperation.new - or not operation - ): - # if operation is None, resource URL has been changed, as - # the notify function in IResourceUrlChange only takes - # 1 parameter - context = { - u'model': model, - u'ignore_auth': True, - u'defer_commit': True + # IResourceUrlChange + + def notify(self, resource): + context = { + u'model': model, + u'ignore_auth': True, + } + resource_dict = toolkit.get_action(u'resource_show')( + context, { + u'id': resource.id, + } + ) + self._submit_to_datapusher(resource_dict) + + # IResourceController + + def after_create(self, context, resource_dict): + + self._submit_to_datapusher(resource_dict) + + def _submit_to_datapusher(self, resource_dict): + + context = { + u'model': model, + u'ignore_auth': True, + u'defer_commit': True + } + + resource_format = resource_dict.get('format') + + submit = ( + resource_format + and resource_format.lower() in self.datapusher_formats + and resource_dict.get('url_type') != u'datapusher' + ) + + if not submit: + return + + try: + task = toolkit.get_action(u'task_status_show')( + context, { + u'entity_id': resource_dict['id'], + u'task_type': u'datapusher', + u'key': u'datapusher' + } + ) + + if task.get(u'state') in (u'pending', u'submitting'): + # There already is a pending DataPusher submission, + # skip this one ... + log.debug( + u'Skipping DataPusher submission for ' + u'resource {0}'.format(resource_dict['id']) + ) + return + except toolkit.ObjectNotFound: + pass + + try: + log.debug( + u'Submitting resource {0}'.format(resource_dict['id']) + + u' to DataPusher' + ) + toolkit.get_action(u'datapusher_submit')( + context, { + u'resource_id': resource_dict['id'] } - if ( - entity.format - and entity.format.lower() in self.datapusher_formats - and entity.url_type != u'datapusher' - ): - - try: - task = toolkit.get_action(u'task_status_show')( - context, { - u'entity_id': entity.id, - u'task_type': u'datapusher', - u'key': u'datapusher' - } - ) - if task.get(u'state') == u'pending': - # There already is a pending DataPusher submission, - # skip this one ... - log.debug( - u'Skipping DataPusher submission for ' - u'resource {0}'.format(entity.id) - ) - return - except toolkit.ObjectNotFound: - pass - - try: - log.debug( - u'Submitting resource {0}'.format(entity.id) + - u' to DataPusher' - ) - toolkit.get_action(u'datapusher_submit')( - context, { - u'resource_id': entity.id - } - ) - except toolkit.ValidationError as e: - # If datapusher is offline want to catch error instead - # of raising otherwise resource save will fail with 500 - log.critical(e) - pass + ) + except toolkit.ValidationError as e: + # If datapusher is offline want to catch error instead + # of raising otherwise resource save will fail with 500 + log.critical(e) + pass def get_actions(self): return { diff --git a/ckanext/datapusher/tests/test.py b/ckanext/datapusher/tests/test.py index a3b993704e4..47b527e48dc 100644 --- a/ckanext/datapusher/tests/test.py +++ b/ckanext/datapusher/tests/test.py @@ -11,6 +11,7 @@ import ckan.model as model import ckan.plugins as p import ckan.tests.legacy as tests +from ckan.tests import factories import ckanext.datastore.backend.postgres as db from ckan.common import config from ckanext.datastore.tests.helpers import set_url_type @@ -286,3 +287,70 @@ def test_custom_callback_url_base(self, app): data["result_url"] == "https://ckan.example.com/api/3/action/datapusher_hook" ) + + @responses.activate + @pytest.mark.ckan_config( + "ckan.datapusher.callback_url_base", "https://ckan.example.com" + ) + @pytest.mark.ckan_config( + "ckan.datapusher.url", "http://datapusher.ckan.org" + ) + @pytest.mark.ckan_config("ckan.plugins", "datastore datapusher") + @pytest.mark.usefixtures("with_plugins") + def test_create_resource_hooks(self, app): + + responses.add( + responses.POST, + "http://datapusher.ckan.org/job", + content_type="application/json", + body=json.dumps({"job_id": "foo", "job_key": "barloco"}), + ) + responses.add_passthru(config["solr_url"]) + + dataset = factories.Dataset() + resource = tests.call_action_api( + app, + "resource_create", + apikey=self.sysadmin_user.apikey, + package_id=dataset['id'], + format='CSV', + ) + + @responses.activate + @pytest.mark.ckan_config( + "ckan.datapusher.callback_url_base", "https://ckan.example.com" + ) + @pytest.mark.ckan_config( + "ckan.datapusher.url", "http://datapusher.ckan.org" + ) + @pytest.mark.ckan_config("ckan.plugins", "datastore datapusher") + @pytest.mark.usefixtures("with_plugins") + def test_update_resource_url_hooks(self, app): + + responses.add( + responses.POST, + "http://datapusher.ckan.org/job", + content_type="application/json", + body=json.dumps({"job_id": "foo", "job_key": "barloco"}), + ) + responses.add_passthru(config["solr_url"]) + + dataset = factories.Dataset() + resource = tests.call_action_api( + app, + "resource_create", + apikey=self.sysadmin_user.apikey, + package_id=dataset['id'], + url='http://example.com/old.csv', + format='CSV', + ) + + resource = tests.call_action_api( + app, + "resource_update", + apikey=self.sysadmin_user.apikey, + id=resource['id'], + url='http://example.com/new.csv', + format='CSV', + ) + assert resource