Skip to content

Commit

Permalink
[#5137] [#5137] Fix Datapusher exceptions on 2.9
Browse files Browse the repository at this point in the history
Fixes #5137, #4751

This was originally reported in the issues above. On 2.9, enabling the
DataPusher causes an exception at the end of the dataset creation
process, when submitting the resources. The stack trace looks like this:

```
  File "/home/adria/dev/pyenvs/ckan/lib/python2.7/site-packages/flask/views.py", line 163, in dispatch_request
    return meth(*args, **kwargs)
  File "/home/adria/dev/pyenvs/ckan/src/ckan/ckan/views/resource.py", line 242, in post
    get_action(u'resource_create')(context, data)
  File "/home/adria/dev/pyenvs/ckan/src/ckan/ckan/logic/__init__.py", line 472, in wrapped
    result = _action(context, data_dict, **kw)
  File "/home/adria/dev/pyenvs/ckan/src/ckan/ckan/logic/action/create.py", line 330, in resource_create
    model.repo.commit()
  File "/home/adria/dev/pyenvs/ckan/lib/python2.7/site-packages/sqlalchemy/orm/scoping.py", line 162, in do
    return getattr(self.registry(), name)(*args, **kwargs)
  File "/home/adria/dev/pyenvs/ckan/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1027, in commit
    self.transaction.commit()
  File "/home/adria/dev/pyenvs/ckan/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 494, in commit
    self._prepare_impl()
  File "/home/adria/dev/pyenvs/ckan/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 464, in _prepare_impl
    stx = self.session.transaction
AttributeError: 'NoneType' object has no attribute 'transaction'
```

As with all SQLAlchemy session issues and DataPusher ones this one took
a while to track, but essentially it boiled down to what @davidread
explained [here](https://stackoverflow.com/a/56094135/235993)

The `datapusher` plugin used the `notify` hook to listen to resource
changes. This was fired from `resource_create` which had started a
transaction. The plugin then called `datapusher_submit` which in turns
calls `task_status_update`, which commits and clears the Session causing
troubles later on.

How this had been working up until 2.8 I don't know and I'm not keen on
finding out to be honest. This PR refactors the `datapusher` plugin to
use the `IResourceController.after_create` and
`IResourceUrlChange.notify` hooks, which are called after commit.

It adds two tests that failed before the patch as well.
  • Loading branch information
amercader committed Jan 24, 2020
1 parent 6257297 commit 2a340f8
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 54 deletions.
128 changes: 74 additions & 54 deletions ckanext/datapusher/plugin.py
Expand Up @@ -37,7 +37,7 @@ class DatapusherPlugin(p.SingletonPlugin):
p.implements(p.IActions)
p.implements(p.IAuthFunctions)
p.implements(p.IResourceUrlChange)
p.implements(p.IDomainObjectModification, inherit=True)
p.implements(p.IResourceController, inherit=True)
p.implements(p.ITemplateHelpers)
p.implements(p.IBlueprint)

Expand Down Expand Up @@ -65,60 +65,80 @@ def configure(self, config):
format(config_option)
)

def notify(self, entity, operation=None):
if isinstance(entity, model.Resource):
if (
operation == model.domain_object.DomainObjectOperation.new
or not operation
):
# if operation is None, resource URL has been changed, as
# the notify function in IResourceUrlChange only takes
# 1 parameter
context = {
u'model': model,
u'ignore_auth': True,
u'defer_commit': True
# IResourceUrlChange

def notify(self, resource):
context = {
u'model': model,
u'ignore_auth': True,
}
resource_dict = toolkit.get_action(u'resource_show')(
context, {
u'id': resource.id,
}
)
self._submit_to_datapusher(resource_dict)

# IResourceController

def after_create(self, context, resource_dict):

self._submit_to_datapusher(resource_dict)

def _submit_to_datapusher(self, resource_dict):

context = {
u'model': model,
u'ignore_auth': True,
u'defer_commit': True
}

resource_format = resource_dict.get('format')

submit = (
resource_format
and resource_format.lower() in self.datapusher_formats
and resource_dict.get('url_type') != u'datapusher'
)

if not submit:
return

try:
task = toolkit.get_action(u'task_status_show')(
context, {
u'entity_id': resource_dict['id'],
u'task_type': u'datapusher',
u'key': u'datapusher'
}
)

if task.get(u'state') in (u'pending', u'submitting'):
# There already is a pending DataPusher submission,
# skip this one ...
log.debug(
u'Skipping DataPusher submission for '
u'resource {0}'.format(resource_dict['id'])
)
return
except toolkit.ObjectNotFound:
pass

try:
log.debug(
u'Submitting resource {0}'.format(resource_dict['id']) +
u' to DataPusher'
)
toolkit.get_action(u'datapusher_submit')(
context, {
u'resource_id': resource_dict['id']
}
if (
entity.format
and entity.format.lower() in self.datapusher_formats
and entity.url_type != u'datapusher'
):

try:
task = toolkit.get_action(u'task_status_show')(
context, {
u'entity_id': entity.id,
u'task_type': u'datapusher',
u'key': u'datapusher'
}
)
if task.get(u'state') == u'pending':
# There already is a pending DataPusher submission,
# skip this one ...
log.debug(
u'Skipping DataPusher submission for '
u'resource {0}'.format(entity.id)
)
return
except toolkit.ObjectNotFound:
pass

try:
log.debug(
u'Submitting resource {0}'.format(entity.id) +
u' to DataPusher'
)
toolkit.get_action(u'datapusher_submit')(
context, {
u'resource_id': entity.id
}
)
except toolkit.ValidationError as e:
# If datapusher is offline want to catch error instead
# of raising otherwise resource save will fail with 500
log.critical(e)
pass
)
except toolkit.ValidationError as e:
# If datapusher is offline want to catch error instead
# of raising otherwise resource save will fail with 500
log.critical(e)
pass

def get_actions(self):
return {
Expand Down
68 changes: 68 additions & 0 deletions ckanext/datapusher/tests/test.py
Expand Up @@ -11,6 +11,7 @@
import ckan.model as model
import ckan.plugins as p
import ckan.tests.legacy as tests
from ckan.tests import factories
import ckanext.datastore.backend.postgres as db
from ckan.common import config
from ckanext.datastore.tests.helpers import set_url_type
Expand Down Expand Up @@ -286,3 +287,70 @@ def test_custom_callback_url_base(self, app):
data["result_url"]
== "https://ckan.example.com/api/3/action/datapusher_hook"
)

@responses.activate
@pytest.mark.ckan_config(
"ckan.datapusher.callback_url_base", "https://ckan.example.com"
)
@pytest.mark.ckan_config(
"ckan.datapusher.url", "http://datapusher.ckan.org"
)
@pytest.mark.ckan_config("ckan.plugins", "datastore datapusher")
@pytest.mark.usefixtures("with_plugins")
def test_create_resource_hooks(self, app):

responses.add(
responses.POST,
"http://datapusher.ckan.org/job",
content_type="application/json",
body=json.dumps({"job_id": "foo", "job_key": "barloco"}),
)
responses.add_passthru(config["solr_url"])

dataset = factories.Dataset()
resource = tests.call_action_api(
app,
"resource_create",
apikey=self.sysadmin_user.apikey,
package_id=dataset['id'],
format='CSV',
)

@responses.activate
@pytest.mark.ckan_config(
"ckan.datapusher.callback_url_base", "https://ckan.example.com"
)
@pytest.mark.ckan_config(
"ckan.datapusher.url", "http://datapusher.ckan.org"
)
@pytest.mark.ckan_config("ckan.plugins", "datastore datapusher")
@pytest.mark.usefixtures("with_plugins")
def test_update_resource_url_hooks(self, app):

responses.add(
responses.POST,
"http://datapusher.ckan.org/job",
content_type="application/json",
body=json.dumps({"job_id": "foo", "job_key": "barloco"}),
)
responses.add_passthru(config["solr_url"])

dataset = factories.Dataset()
resource = tests.call_action_api(
app,
"resource_create",
apikey=self.sysadmin_user.apikey,
package_id=dataset['id'],
url='http://example.com/old.csv',
format='CSV',
)

resource = tests.call_action_api(
app,
"resource_update",
apikey=self.sysadmin_user.apikey,
id=resource['id'],
url='http://example.com/new.csv',
format='CSV',
)
assert resource

0 comments on commit 2a340f8

Please sign in to comment.