diff --git a/ckan/config/deployment.ini_tmpl b/ckan/config/deployment.ini_tmpl
index 2f91158ea49..67d369af28d 100644
--- a/ckan/config/deployment.ini_tmpl
+++ b/ckan/config/deployment.ini_tmpl
@@ -79,6 +79,7 @@ ckan.site_id = default
## Plugins Settings
# Note: Add ``datastore`` to enable the CKAN DataStore
+# Add ``datapusher`` to enable DataPusher
# Add ``pdf_preview`` to enable the resource preview for PDFs
# Add ``resource_proxy`` to enable resorce proxying and get around the
# same origin policy
@@ -146,8 +147,8 @@ ckan.feeds.author_link =
# Make sure you have set up the DataStore
-datapusher.formats = csv
-datapusher.url = http://datapusher.ckan.org/
+ckan.datapusher.formats = csv
+ckan.datapusher.url = http://datapusher.ckan.org/
## Activity Streams Settings
diff --git a/ckan/logic/action/get.py b/ckan/logic/action/get.py
index 37a5ca297c0..70326e75317 100644
--- a/ckan/logic/action/get.py
+++ b/ckan/logic/action/get.py
@@ -1868,11 +1868,11 @@ def task_status_show(context, data_dict):
context['task_status'] = task_status
+ _check_access('task_status_show', context, data_dict)
+
if task_status is None:
raise NotFound
- _check_access('task_status_show', context, data_dict)
-
task_status_dict = model_dictize.task_status_dictize(task_status, context)
return task_status_dict
diff --git a/ckan/logic/schema.py b/ckan/logic/schema.py
index 41e3c724656..84b7c5e6df7 100644
--- a/ckan/logic/schema.py
+++ b/ckan/logic/schema.py
@@ -85,6 +85,7 @@ def default_resource_schema():
'cache_last_updated': [ignore_missing, isodate],
'webstore_last_updated': [ignore_missing, isodate],
'tracking_summary': [ignore_missing],
+ 'datastore_active': [ignore],
'__extras': [ignore_missing, extras_unicode_convert, keep_extras],
}
diff --git a/ckan/model/meta.py b/ckan/model/meta.py
index 795094965c5..2b6cb4581ba 100644
--- a/ckan/model/meta.py
+++ b/ckan/model/meta.py
@@ -162,5 +162,5 @@ def engine_is_sqlite(sa_engine=None):
def engine_is_pg(sa_engine=None):
# Returns true iff the engine is connected to a postgresql database.
# According to http://docs.sqlalchemy.org/en/latest/core/engines.html#postgresql
- # all Postgres driver names start with `postgresql`
- return (sa_engine or engine).url.drivername.startswith('postgresql')
+ # all Postgres driver names start with `postgres`
+ return (sa_engine or engine).url.drivername.startswith('postgres')
diff --git a/ckan/plugins/core.py b/ckan/plugins/core.py
index 8f3752117ea..f39c8062917 100644
--- a/ckan/plugins/core.py
+++ b/ckan/plugins/core.py
@@ -18,7 +18,7 @@
'PluginNotFoundException', 'Plugin', 'SingletonPlugin',
'load', 'load_all', 'unload', 'unload_all',
'get_plugin', 'plugins_update',
- 'use_plugin',
+ 'use_plugin', 'plugin_loaded',
]
log = logging.getLogger(__name__)
@@ -210,6 +210,15 @@ def unload(*plugins):
plugins_update()
+def plugin_loaded(name):
+ '''
+ See if a particular plugin is loaded.
+ '''
+ if name in _PLUGINS:
+ return True
+ return False
+
+
def find_system_plugins():
'''
Return all plugins in the ckan.system_plugins entry point group.
diff --git a/ckan/plugins/interfaces.py b/ckan/plugins/interfaces.py
index 302ed2ee09b..14d64b01146 100644
--- a/ckan/plugins/interfaces.py
+++ b/ckan/plugins/interfaces.py
@@ -443,7 +443,6 @@ def before_view(self, pkg_dict):
class IResourceController(Interface):
"""
Hook into the resource controller.
- (see IGroupController)
"""
def before_show(self, resource_dict):
diff --git a/ckan/public/base/javascript/main.js b/ckan/public/base/javascript/main.js
index e307fe8403c..42c76dfd15f 100644
--- a/ckan/public/base/javascript/main.js
+++ b/ckan/public/base/javascript/main.js
@@ -36,6 +36,7 @@ this.ckan = this.ckan || {};
ckan.i18n.load(data);
ckan.module.initialize();
});
+ jQuery('[data-target="popover"]').popover();
};
/* Returns a full url for the current site with the provided path appended.
diff --git a/ckan/public/base/less/activity.less b/ckan/public/base/less/activity.less
index dc75ede5e37..6071fabc5f0 100644
--- a/ckan/public/base/less/activity.less
+++ b/ckan/public/base/less/activity.less
@@ -56,6 +56,9 @@
.border-radius(100px);
.box-shadow(0 1px 2px rgba(0, 0, 0, 0.2));
}
+ &.no-avatar p {
+ margin-left: 40px;
+ }
}
.load-less {
margin-bottom: 15px;
@@ -76,11 +79,26 @@
float: right;
text-decoration: none;
}
+ .popover-content {
+ font-size: @baseFontSize;
+ line-height: @baseLineHeight;
+ color: @layoutTextColor;
+ word-break: break-all;
+ dl {
+ margin: 0;
+ dd {
+ margin-left: 0;
+ margin-bottom: 10px;
+ }
+ }
+ }
}
// colors
.activity .item {
& .icon { background-color: @activityColorBlank; } // Non defined
+ &.failure .icon { background-color: @activityColorDelete; }
+ &.success .icon { background-color: @activityColorNew; }
&.added-tag .icon { background-color: spin(@activityColorNew, 60); }
&.changed-group .icon { background-color: @activityColorModify; }
&.changed-package .icon { background-color: spin(@activityColorModify, 20); }
diff --git a/ckan/public/base/less/ckan.less b/ckan/public/base/less/ckan.less
index f7c108f8734..f842bd6dbae 100644
--- a/ckan/public/base/less/ckan.less
+++ b/ckan/public/base/less/ckan.less
@@ -20,6 +20,7 @@
@import "activity.less";
@import "dropdown.less";
@import "dashboard.less";
+@import "datapusher.less";
body {
// Using the masthead/footer gradient prevents the color from changing
diff --git a/ckan/public/base/less/datapusher.less b/ckan/public/base/less/datapusher.less
new file mode 100644
index 00000000000..f147c0c3f6c
--- /dev/null
+++ b/ckan/public/base/less/datapusher.less
@@ -0,0 +1,18 @@
+.datapusher-status-link:hover {
+ text-decoration: none;
+}
+
+.datapusher-status {
+ &.status-unknown {
+ color: #bbb;
+ }
+ &.status-pending {
+ color: #FFCC00;
+ }
+ &.status-error {
+ color: red;
+ }
+ &.status-complete {
+ color: #009900;
+ }
+}
\ No newline at end of file
diff --git a/ckan/templates/package/base_form_page.html b/ckan/templates/package/base_form_page.html
index a738a8716c8..b4d7dc440c0 100644
--- a/ckan/templates/package/base_form_page.html
+++ b/ckan/templates/package/base_form_page.html
@@ -2,8 +2,11 @@
{% block primary_content %}
+ {% block page_header %}{% endblock %}
- {% block form %}{{ c.form | safe }}{% endblock %}
+ {% block primary_content_inner %}
+ {% block form %}{{ c.form | safe }}{% endblock %}
+ {% endblock %}
{% endblock %}
diff --git a/ckan/templates/package/edit.html b/ckan/templates/package/edit.html
index 565623f7ebd..c3a7a5074e5 100644
--- a/ckan/templates/package/edit.html
+++ b/ckan/templates/package/edit.html
@@ -1,7 +1,5 @@
{% extends 'package/edit_base.html' %}
-{% block subtitle %}{{ _('Edit') }} - {{ h.dataset_display_name(pkg) }}{% endblock %}
-
{% block primary_content_inner %}
- {% block form %}{{ c.form | safe }}{% endblock %}
+ {% block form %}{{ c.form | safe }}{% endblock %}
{% endblock %}
diff --git a/ckan/templates/package/edit_base.html b/ckan/templates/package/edit_base.html
index e8b785712b3..10c1aa977ee 100644
--- a/ckan/templates/package/edit_base.html
+++ b/ckan/templates/package/edit_base.html
@@ -1,6 +1,7 @@
{% extends 'package/base.html' %}
{% set pkg = c.pkg_dict %}
+{% set pkg_dict = c.pkg_dict %}
{% block breadcrumb_content_selected %}{% endblock %}
diff --git a/ckan/templates/package/new_resource.html b/ckan/templates/package/new_resource.html
index 49d02387768..edd34bb5213 100644
--- a/ckan/templates/package/new_resource.html
+++ b/ckan/templates/package/new_resource.html
@@ -13,12 +13,7 @@
{% block form %}{% snippet 'package/snippets/resource_form.html', data=data, errors=errors, error_summary=error_summary, include_metadata=false, pkg_name=pkg_name, stage=stage, allow_upload=g.ofs_impl and logged_in %}{% endblock %}
{% block secondary_content %}
-
- {{ _('What\'s a resource?') }}
-
-
{{ _('A resource can be any file or link to a file containing useful data.') }}
-
-
+ {% snippet 'package/snippets/resource_help.html' %}
{% endblock %}
{% block scripts %}
diff --git a/ckan/templates/package/new_resource_not_draft.html b/ckan/templates/package/new_resource_not_draft.html
index a24f37ce459..7e1638c51e7 100644
--- a/ckan/templates/package/new_resource_not_draft.html
+++ b/ckan/templates/package/new_resource_not_draft.html
@@ -3,6 +3,18 @@
{% block subtitle %}{{ _('Add resource') }} - {{ h.dataset_display_name(pkg) }}{% endblock %}
{% block form_title %}{{ _('Add resource') }}{% endblock %}
+{% block breadcrumb_content %}
+
{{ _('Add New Resource') }}
+{% endblock %}
+
{% block form %}
{% snippet 'package/snippets/resource_form.html', data=data, errors=errors, error_summary=error_summary, include_metadata=false, pkg_name=pkg_name, stage=stage, allow_upload=g.ofs_impl and logged_in %}
{% endblock %}
+
+{% block content_primary_nav %}
+ {{ _('New resource') }}
+{% endblock %}
+
+{% block secondary_content %}
+ {% snippet 'package/snippets/resource_help.html' %}
+{% endblock %}
diff --git a/ckan/templates/package/resource_data.html b/ckan/templates/package/resource_data.html
new file mode 100644
index 00000000000..05bd66fdfee
--- /dev/null
+++ b/ckan/templates/package/resource_data.html
@@ -0,0 +1,68 @@
+{% extends "package/resource_edit_base.html" %}
+
+{% block subtitle %}{{ h.dataset_display_name(pkg) }} - {{ h.resource_display_name(res) }}{% endblock %}
+
+{% block primary_content_inner %}
+
+ {% set action = h.url_for(controller='ckanext.datapusher.plugin:ResourceDataController', action='resource_data', id=pkg.name, resource_id=res.id) %}
+ {% set show_table = true %}
+
+
+
+ {% if status.error and status.error.message %}
+ {% set show_table = false %}
+
+ {{ _('Upload error:') }} {{ status.error.message }}
+
+ {% elif status.task_info and status.task_info.error %}
+ {% set show_table = false %}
+
+ {{ _('Error:') }} {{ status.task_info.error }}
+
+ {% endif %}
+
+
+
+
+
+
+
+ {{ _('Status') }} |
+ {{ status.status.capitalize() if status.status else _('Not Uploaded Yet') }} |
+
+
+ {{ _('Last updated') }} |
+ {{ h.time_ago_from_timestamp(status.last_updated) if status.status else _('Never') }} |
+
+
+
+ {% if status.status and status.task_info and show_table %}
+ {{ _('Upload Log') }}
+
+ {% endif %}
+
+{% endblock %}
diff --git a/ckan/templates/package/resource_edit.html b/ckan/templates/package/resource_edit.html
index 3d761185230..5e335d9114b 100644
--- a/ckan/templates/package/resource_edit.html
+++ b/ckan/templates/package/resource_edit.html
@@ -1,9 +1,7 @@
{% extends "package/resource_edit_base.html" %}
-{% set res = c.resource %}
-
{% block subtitle %}{{ _('Edit') }} - {{ h.resource_display_name(res) }} - {{ h.dataset_display_name(pkg) }}{% endblock %}
{% block form %}
- {{ h.snippet('package/snippets/resource_edit_form.html', data=data, errors=errors, error_summary=error_summary, pkg_name=pkg.name, form_action=c.form_action, allow_upload=g.ofs_impl and logged_in) }}
+ {% snippet 'package/snippets/resource_edit_form.html', data=data, errors=errors, error_summary=error_summary, pkg_name=pkg.name, form_action=c.form_action, allow_upload=g.ofs_impl and logged_in %}
{% endblock %}
diff --git a/ckan/templates/package/resource_edit_base.html b/ckan/templates/package/resource_edit_base.html
index e6c98ad7ca3..0682cbbf014 100644
--- a/ckan/templates/package/resource_edit_base.html
+++ b/ckan/templates/package/resource_edit_base.html
@@ -1,26 +1,28 @@
{% extends "package/base.html" %}
{% set logged_in = true if c.userobj else false %}
+{% set res = c.resource %}
{% block breadcrumb_content_selected %}{% endblock %}
{% block breadcrumb_content %}
{{ super() }}
{% link_for h.resource_display_name(res)|truncate(30), controller='package', action='resource_read', id=pkg.name, resource_id=res.id %}
- Edit
+ {{ _('Edit') }}
{% endblock %}
{% block content_action %}
{% link_for _('All resources'), controller='package', action='resources', id=pkg.name, class_='btn', icon='arrow-left' %}
- {% link_for _('View resource'), controller='package', action='resource_read', id=pkg.name, resource_id=res.id, class_='btn', icon='eye-open' %}
+ {% if res %}
+ {% link_for _('View resource'), controller='package', action='resource_read', id=pkg.name, resource_id=res.id, class_='btn', icon='eye-open' %}
+ {% endif %}
{% endblock %}
{% block content_primary_nav %}
{{ h.build_nav_icon('resource_edit', _('Edit resource'), id=pkg.name, resource_id=res.id) }}
-{% endblock %}
-
-{% block secondary_content %}
- {% snippet 'package/snippets/resource_info.html', res=res %}
+ {% if 'datapusher' in g.plugins %}
+ {{ h.build_nav_icon('resource_data', _('Resource Data'), id=pkg.name, resource_id=res.id) }}
+ {% endif %}
{% endblock %}
{% block primary_content_inner %}
@@ -28,6 +30,10 @@ {% block form_title %}{{ _('Edit resource') }}{% endblo
{% block form %}{% endblock %}
{% endblock %}
+{% block secondary_content %}
+ {% snippet 'package/snippets/resource_info.html', res=res %}
+{% endblock %}
+
{% block scripts %}
{{ super() }}
{% resource 'vendor/fileupload' %}
diff --git a/ckan/templates/package/snippets/info.html b/ckan/templates/package/snippets/info.html
index b12ce3f6998..39ad092ff94 100644
--- a/ckan/templates/package/snippets/info.html
+++ b/ckan/templates/package/snippets/info.html
@@ -8,19 +8,23 @@
{% snippet "package/snippets/info.html", pkg=pkg %}
#}
-
-
-
{{ pkg.title or pkg.name }}
-
-
- - {{ _('Followers') }}
- - {{ h.SI_number_span(h.get_action('dataset_follower_count', {'id': pkg.id})) }}
-
-
- {% if not hide_follow_button %}
-
+
+
+{% endif %}
diff --git a/ckan/templates/package/snippets/resource_help.html b/ckan/templates/package/snippets/resource_help.html
new file mode 100644
index 00000000000..d1724956909
--- /dev/null
+++ b/ckan/templates/package/snippets/resource_help.html
@@ -0,0 +1,6 @@
+
+ {{ _('What\'s a resource?') }}
+
+
{{ _('A resource can be any file or link to a file containing useful data.') }}
+
+
diff --git a/ckan/templates/snippets/datapusher_status.html b/ckan/templates/snippets/datapusher_status.html
new file mode 100644
index 00000000000..3aa8908aef8
--- /dev/null
+++ b/ckan/templates/snippets/datapusher_status.html
@@ -0,0 +1,14 @@
+{# Datapusher status indicator
+
+resource: the resource
+
+#}
+{% if resource.datastore_active %}
+ {% set job = h.datapusher_status(resource.id) %}
+ {% set title = _('Datapusher status: {status}.').format(status=job.status) %}
+ {% if job.status == 'unknown' %}
+
+ {% else %}
+
+ {% endif %}
+{% endif %}
diff --git a/ckanext/datapusher/__init__.py b/ckanext/datapusher/__init__.py
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/ckanext/datapusher/helpers.py b/ckanext/datapusher/helpers.py
new file mode 100644
index 00000000000..d5291425b28
--- /dev/null
+++ b/ckanext/datapusher/helpers.py
@@ -0,0 +1,11 @@
+import ckan.plugins.toolkit as toolkit
+
+
+def datapusher_status(resource_id):
+ try:
+ return toolkit.get_action('datapusher_status')(
+ {}, {'resource_id': resource_id})
+ except toolkit.ObjectNotFound:
+ return {
+ 'status': 'unknown'
+ }
diff --git a/ckanext/datapusher/logic/__init__.py b/ckanext/datapusher/logic/__init__.py
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/ckanext/datapusher/logic/action.py b/ckanext/datapusher/logic/action.py
new file mode 100644
index 00000000000..1872a1ef265
--- /dev/null
+++ b/ckanext/datapusher/logic/action.py
@@ -0,0 +1,209 @@
+import logging
+import json
+import urlparse
+import datetime
+
+import pylons
+import requests
+
+import ckan.lib.navl.dictization_functions
+import ckan.logic as logic
+import ckan.plugins as p
+import ckanext.datapusher.logic.schema as dpschema
+
+log = logging.getLogger(__name__)
+_get_or_bust = logic.get_or_bust
+_validate = ckan.lib.navl.dictization_functions.validate
+
+
+def datapusher_submit(context, data_dict):
+ ''' Submit a job to the datapusher. The datapusher is a service that
+ imports tabular data into the datastore.
+
+ :param resource_id: The resource id of the resource that the data
+ should be imported in. The resource's URL will be used to get the data.
+ :type resource_id: string
+ :param set_url_type: If set to True, the ``url_type`` of the resource will
+ be set to ``datastore`` and the resource URL will automatically point
+ to the :ref:`datastore dump
` URL. (optional, default: False)
+ :type set_url_type: bool
+
+ Returns ``True`` if the job has been submitted and ``False`` if the job
+ has not been submitted, i.e. when the datapusher is not configured.
+
+ :rtype: bool
+ '''
+
+ schema = context.get('schema', dpschema.datapusher_submit_schema())
+ data_dict, errors = _validate(data_dict, schema, context)
+ if errors:
+ raise p.toolkit.ValidationError(errors)
+
+ res_id = data_dict['resource_id']
+
+ p.toolkit.check_access('datapusher_submit', context, data_dict)
+
+ datapusher_url = pylons.config.get('ckan.datapusher.url')
+
+ callback_url = p.toolkit.url_for(
+ controller='api', action='action', logic_function='datapusher_hook',
+ ver=3, qualified=True)
+
+ user = p.toolkit.get_action('user_show')(context, {'id': context['user']})
+
+ task = {
+ 'entity_id': res_id,
+ 'entity_type': 'resource',
+ 'task_type': 'datapusher',
+ 'last_updated': str(datetime.datetime.now()),
+ 'state': 'submitting',
+ 'key': 'datapusher',
+ 'value': '{}',
+ 'error': '{}',
+ }
+ try:
+ task_id = p.toolkit.get_action('task_status_show')(context, {
+ 'entity_id': res_id,
+ 'task_type': 'datapusher',
+ 'key': 'datapusher'
+ })['id']
+ task['id'] = task_id
+ except logic.NotFound:
+ pass
+
+ context['ignore_auth'] = True
+ result = p.toolkit.get_action('task_status_update')(context, task)
+ task_id = result['id']
+
+ try:
+ r = requests.post(
+ urlparse.urljoin(datapusher_url, 'job'),
+ headers={
+ 'Content-Type': 'application/json'
+ },
+ data=json.dumps({
+ 'api_key': user['apikey'],
+ 'job_type': 'push_to_datastore',
+ 'result_url': callback_url,
+ 'metadata': {
+ 'ckan_url': pylons.config['ckan.site_url'],
+ 'resource_id': res_id,
+ 'set_url_type': data_dict.get('set_url_type', False)
+ }
+ }))
+ r.raise_for_status()
+ except requests.exceptions.ConnectionError, e:
+ error = {'message': 'Could not connect to DataPusher.',
+ 'details': str(e)}
+ task['error'] = json.dumps(error)
+ task['state'] = 'error'
+ task['last_updated'] = str(datetime.datetime.now()),
+ p.toolkit.get_action('task_status_update')(context, task)
+ raise p.toolkit.ValidationError(error)
+
+ except requests.exceptions.HTTPError, e:
+ m = 'An Error occurred while sending the job: {0}'.format(e.message)
+ try:
+ body = e.response.json()
+ except ValueError:
+ body = e.response.text
+ error = {'message': m,
+ 'details': body,
+ 'status_code': r.status_code}
+ task['error'] = json.dumps(error)
+ task['state'] = 'error'
+ task['last_updated'] = str(datetime.datetime.now()),
+ p.toolkit.get_action('task_status_update')(context, task)
+ raise p.toolkit.ValidationError(error)
+
+ value = json.dumps({'job_id': r.json()['job_id'],
+ 'job_key': r.json()['job_key']})
+
+ task['value'] = value
+ task['state'] = 'pending'
+ task['last_updated'] = str(datetime.datetime.now()),
+ p.toolkit.get_action('task_status_update')(context, task)
+
+ return True
+
+
+def datapusher_hook(context, data_dict):
+ ''' Update datapusher task. This action is typically called by the
+ datapusher whenever the status of a job changes.
+
+ :param metadata: metadata produced by datapuser service must have
+ resource_id property.
+ :type metadata: dict
+ :param status: status of the job from the datapusher service
+ :type status: string
+ '''
+
+ metadata, status = _get_or_bust(data_dict, ['metadata', 'status'])
+
+ p.toolkit.check_access('datapusher_submit', context, data_dict)
+
+ res_id = metadata.get('resource_id')
+
+ task = p.toolkit.get_action('task_status_show')(context, {
+ 'entity_id': res_id,
+ 'task_type': 'datapusher',
+ 'key': 'datapusher'
+ })
+
+ task['state'] = status
+ task['last_updated'] = str(datetime.datetime.now())
+
+ p.toolkit.get_action('task_status_update')(context, task)
+
+
+def datapusher_status(context, data_dict):
+ ''' Get the status of a datapusher job for a certain resource.
+
+ :param resource_id: The resource id of the resource that you want the
+ datapusher status for.
+ :type resource_id: string
+ '''
+
+ p.toolkit.check_access('datapusher_status', context, data_dict)
+
+ if 'id' in data_dict:
+ data_dict['resource_id'] = data_dict['id']
+ res_id = _get_or_bust(data_dict, 'resource_id')
+
+ task = p.toolkit.get_action('task_status_show')(context, {
+ 'entity_id': res_id,
+ 'task_type': 'datapusher',
+ 'key': 'datapusher'
+ })
+
+ datapusher_url = pylons.config.get('ckan.datapusher.url')
+ if not datapusher_url:
+ raise p.toolkit.ValidationError(
+ {'configuration': ['ckan.datapusher.url not in config file']})
+
+ value = json.loads(task['value'])
+ job_key = value.get('job_key')
+ job_id = value.get('job_id')
+ url = None
+ job_detail = None
+
+ if job_id:
+ url = urlparse.urljoin(datapusher_url, 'job' + '/' + job_id)
+ try:
+ r = requests.get(url, headers={'Content-Type': 'application/json',
+ 'Authorization': job_key})
+ r.raise_for_status()
+ job_detail = r.json()
+ except (requests.exceptions.ConnectionError,
+ requests.exceptions.HTTPError), e:
+ job_detail = {'error': 'cannot connect to datapusher'}
+
+ return {
+ 'status': task['state'],
+ 'job_id': job_id,
+ 'job_url': url,
+ 'last_updated': task['last_updated'],
+ 'job_key': job_key,
+ 'task_info': job_detail,
+ 'error': json.loads(task['error'])
+ }
diff --git a/ckanext/datapusher/logic/auth.py b/ckanext/datapusher/logic/auth.py
new file mode 100644
index 00000000000..55a7e832488
--- /dev/null
+++ b/ckanext/datapusher/logic/auth.py
@@ -0,0 +1,9 @@
+import ckanext.datastore.logic.auth as auth
+
+
+def datapusher_submit(context, data_dict):
+ return auth.datastore_auth(context, data_dict)
+
+
+def datapusher_status(context, data_dict):
+ return auth.datastore_auth(context, data_dict)
diff --git a/ckanext/datapusher/logic/schema.py b/ckanext/datapusher/logic/schema.py
new file mode 100644
index 00000000000..07e8a36aec8
--- /dev/null
+++ b/ckanext/datapusher/logic/schema.py
@@ -0,0 +1,25 @@
+import ckan.plugins as p
+import ckanext.datastore.logic.schema as dsschema
+
+get_validator = p.toolkit.get_validator
+
+not_missing = get_validator('not_missing')
+not_empty = get_validator('not_empty')
+resource_id_exists = get_validator('resource_id_exists')
+package_id_exists = get_validator('package_id_exists')
+ignore_missing = get_validator('ignore_missing')
+empty = get_validator('empty')
+boolean_validator = get_validator('boolean_validator')
+int_validator = get_validator('int_validator')
+OneOf = get_validator('OneOf')
+
+
+def datapusher_submit_schema():
+ schema = {
+ 'resource_id': [not_missing, not_empty, unicode],
+ 'id': [ignore_missing],
+ 'set_url_type': [ignore_missing, boolean_validator],
+ '__junk': [empty],
+ '__before': [dsschema.rename('id', 'resource_id')]
+ }
+ return schema
diff --git a/ckanext/datapusher/plugin.py b/ckanext/datapusher/plugin.py
new file mode 100644
index 00000000000..c53dd14790d
--- /dev/null
+++ b/ckanext/datapusher/plugin.py
@@ -0,0 +1,131 @@
+import logging
+
+import ckan.plugins as p
+import ckan.lib.base as base
+import ckan.lib.helpers as core_helpers
+import ckanext.datapusher.logic.action as action
+import ckanext.datapusher.logic.auth as auth
+import ckanext.datapusher.helpers as helpers
+import ckan.logic as logic
+import ckan.model as model
+import ckan.plugins.toolkit as toolkit
+
+log = logging.getLogger(__name__)
+_get_or_bust = logic.get_or_bust
+
+DEFAULT_FORMATS = ['csv', 'xls', 'application/csv', 'application/vnd.ms-excel']
+
+
+class DatastoreException(Exception):
+ pass
+
+
+class ResourceDataController(base.BaseController):
+
+ def resource_data(self, id, resource_id):
+
+ if toolkit.request.method == 'POST':
+ try:
+ toolkit.c.pkg_dict = p.toolkit.get_action('datapusher_submit')(
+ None, {'resource_id': resource_id}
+ )
+ except logic.ValidationError:
+ pass
+
+ base.redirect(core_helpers.url_for(
+ controller='ckanext.datapusher.plugin:ResourceDataController',
+ action='resource_data',
+ id=id,
+ resource_id=resource_id)
+ )
+
+ try:
+ toolkit.c.pkg_dict = p.toolkit.get_action('package_show')(
+ None, {'id': id}
+ )
+ toolkit.c.resource = p.toolkit.get_action('resource_show')(
+ None, {'id': resource_id}
+ )
+ except logic.NotFound:
+ base.abort(404, _('Resource not found'))
+ except logic.NotAuthorized:
+ base.abort(401, _('Unauthorized to edit this resource'))
+
+ try:
+ datapusher_status = p.toolkit.get_action('datapusher_status')(
+ None, {'resource_id': resource_id}
+ )
+ except logic.NotFound:
+ datapusher_status = {}
+
+ return base.render('package/resource_data.html',
+ extra_vars={'status': datapusher_status})
+
+
+class DatapusherPlugin(p.SingletonPlugin):
+ p.implements(p.IConfigurable, inherit=True)
+ p.implements(p.IActions)
+ p.implements(p.IAuthFunctions)
+ p.implements(p.IResourceUrlChange)
+ p.implements(p.IDomainObjectModification, inherit=True)
+ p.implements(p.ITemplateHelpers)
+ p.implements(p.IRoutes, inherit=True)
+
+ legacy_mode = False
+ resource_show_action = None
+
+ def configure(self, config):
+ self.config = config
+
+ datapusher_formats = config.get('ckan.datapusher.formats', '').lower()
+ self.datapusher_formats = datapusher_formats.split() or DEFAULT_FORMATS
+
+ datapusher_url = config.get('ckan.datapusher.url')
+ if not datapusher_url:
+ raise Exception(
+ 'Config option `ckan.datapusher.url` has to be set.')
+
+ def notify(self, entity, operation=None):
+ if isinstance(entity, model.Resource):
+ if (operation == model.domain_object.DomainObjectOperation.new
+ or not operation):
+ # if operation is None, resource URL has been changed, as
+ # the notify function in IResourceUrlChange only takes
+ # 1 parameter
+ context = {'model': model, 'ignore_auth': True,
+ 'defer_commit': True}
+ package = p.toolkit.get_action('package_show')(context, {
+ 'id': entity.get_package_id()
+ })
+ if (not package['private'] and entity.format and
+ entity.format.lower() in self.datapusher_formats and
+ entity.url_type != 'datapusher'):
+ try:
+ p.toolkit.get_action('datapusher_submit')(context, {
+ 'resource_id': entity.id
+ })
+ except p.toolkit.ValidationError, e:
+ # If datapusher is offline want to catch error instead
+ # of raising otherwise resource save will fail with 500
+ log.critical(e)
+ pass
+
+ def before_map(self, m):
+ m.connect(
+ 'resource_data', '/dataset/{id}/resource_data/{resource_id}',
+ controller='ckanext.datapusher.plugin:ResourceDataController',
+ action='resource_data', ckan_icon='cloud-upload')
+ return m
+
+ def get_actions(self):
+ return {'datapusher_submit': action.datapusher_submit,
+ 'datapusher_hook': action.datapusher_hook,
+ 'datapusher_status': action.datapusher_status}
+
+ def get_auth_functions(self):
+ return {'datapusher_submit': auth.datapusher_submit,
+ 'datapusher_status': auth.datapusher_status}
+
+ def get_helpers(self):
+ return {
+ 'datapusher_status': helpers.datapusher_status}
diff --git a/ckanext/datapusher/tests/__init__.py b/ckanext/datapusher/tests/__init__.py
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/ckanext/datapusher/tests/test.py b/ckanext/datapusher/tests/test.py
new file mode 100644
index 00000000000..2ec2df42d21
--- /dev/null
+++ b/ckanext/datapusher/tests/test.py
@@ -0,0 +1,184 @@
+import json
+import httpretty
+import nose
+import sys
+import datetime
+
+import pylons
+from pylons import config
+import sqlalchemy.orm as orm
+import paste.fixture
+
+import ckan.plugins as p
+import ckan.lib.create_test_data as ctd
+import ckan.model as model
+import ckan.tests as tests
+import ckan.config.middleware as middleware
+
+import ckanext.datastore.db as db
+from ckanext.datastore.tests.helpers import rebuild_all_dbs, set_url_type
+
+
+# avoid hanging tests https://github.com/gabrielfalcao/HTTPretty/issues/34
+if sys.version_info < (2, 7, 0):
+ import socket
+ socket.setdefaulttimeout(1)
+
+
+class TestDatastoreCreate(tests.WsgiAppCase):
+ sysadmin_user = None
+ normal_user = None
+
+ @classmethod
+ def setup_class(cls):
+
+ wsgiapp = middleware.make_app(config['global_conf'], **config)
+ cls.app = paste.fixture.TestApp(wsgiapp)
+ if not tests.is_datastore_supported():
+ raise nose.SkipTest("Datastore not supported")
+ p.load('datastore')
+ p.load('datapusher')
+ ctd.CreateTestData.create()
+ cls.sysadmin_user = model.User.get('testsysadmin')
+ cls.normal_user = model.User.get('annafan')
+ engine = db._get_engine(
+ {'connection_url': pylons.config['ckan.datastore.write_url']})
+ cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
+ set_url_type(
+ model.Package.get('annakarenina').resources, cls.sysadmin_user)
+
+ @classmethod
+ def teardown_class(cls):
+ rebuild_all_dbs(cls.Session)
+ p.unload('datastore')
+ p.unload('datapusher')
+
+ def test_create_ckan_resource_in_package(self):
+ package = model.Package.get('annakarenina')
+ data = {
+ 'resource': {'package_id': package.id}
+ }
+ postparams = '%s=1' % json.dumps(data)
+ auth = {'Authorization': str(self.sysadmin_user.apikey)}
+ res = self.app.post('/api/action/datastore_create', params=postparams,
+ extra_environ=auth, status=200)
+ res_dict = json.loads(res.body)
+
+ assert 'resource_id' in res_dict['result']
+ assert len(model.Package.get('annakarenina').resources) == 3
+
+ res = tests.call_action_api(
+ self.app, 'resource_show', id=res_dict['result']['resource_id'])
+ assert res['url'] == '/datastore/dump/' + res['id'], res
+
+ @httpretty.activate
+ def test_providing_res_with_url_calls_datapusher_correctly(self):
+ pylons.config['datapusher.url'] = 'http://datapusher.ckan.org'
+ httpretty.HTTPretty.register_uri(
+ httpretty.HTTPretty.POST,
+ 'http://datapusher.ckan.org/job',
+ content_type='application/json',
+ body=json.dumps({'job_id': 'foo', 'job_key': 'bar'}))
+
+ package = model.Package.get('annakarenina')
+
+ tests.call_action_api(
+ self.app, 'datastore_create', apikey=self.sysadmin_user.apikey,
+ resource=dict(package_id=package.id, url='demo.ckan.org'))
+
+ assert len(package.resources) == 4, len(package.resources)
+ resource = package.resources[3]
+ data = json.loads(httpretty.last_request().body)
+ assert data['metadata']['resource_id'] == resource.id, data
+ assert data['result_url'].endswith('/action/datapusher_hook'), data
+ assert data['result_url'].startswith('http://'), data
+
+ def test_cant_provide_resource_and_resource_id(self):
+ package = model.Package.get('annakarenina')
+ resource = package.resources[0]
+ data = {
+ 'resource_id': resource.id,
+ 'resource': {'package_id': package.id}
+ }
+ postparams = '%s=1' % json.dumps(data)
+ auth = {'Authorization': str(self.sysadmin_user.apikey)}
+ res = self.app.post('/api/action/datastore_create', params=postparams,
+ extra_environ=auth, status=409)
+ res_dict = json.loads(res.body)
+
+ assert res_dict['error']['__type'] == 'Validation Error'
+
+ @httpretty.activate
+ def test_send_datapusher_creates_task(self):
+ httpretty.HTTPretty.register_uri(
+ httpretty.HTTPretty.POST,
+ 'http://datapusher.ckan.org/job',
+ content_type='application/json',
+ body=json.dumps({'job_id': 'foo', 'job_key': 'bar'}))
+
+ package = model.Package.get('annakarenina')
+ resource = package.resources[0]
+
+ context = {
+ 'ignore_auth': True,
+ 'user': self.sysadmin_user.name
+ }
+
+ p.toolkit.get_action('datapusher_submit')(context, {
+ 'resource_id': resource.id
+ })
+
+ context.pop('task_status', None)
+
+ task = p.toolkit.get_action('task_status_show')(context, {
+ 'entity_id': resource.id,
+ 'task_type': 'datapusher',
+ 'key': 'datapusher'
+ })
+
+ assert task['state'] == 'pending', task
+
+ def test_datapusher_hook(self):
+ package = model.Package.get('annakarenina')
+ resource = package.resources[0]
+
+ context = {
+ 'user': self.sysadmin_user.name
+ }
+
+ p.toolkit.get_action('task_status_update')(context, {
+ 'entity_id': resource.id,
+ 'entity_type': 'resource',
+ 'task_type': 'datapusher',
+ 'key': 'datapusher',
+ 'value': '{"job_id": "my_id", "job_key":"my_key"}',
+ 'last_updated': str(datetime.datetime.now()),
+ 'state': 'pending'
+ })
+
+ data = {
+ 'status': 'success',
+ 'metadata': {
+ 'resource_id': resource.id
+ }
+ }
+ postparams = '%s=1' % json.dumps(data)
+ auth = {'Authorization': str(self.sysadmin_user.apikey)}
+ res = self.app.post('/api/action/datapusher_hook', params=postparams,
+ extra_environ=auth, status=200)
+ print res.body
+ res_dict = json.loads(res.body)
+
+ assert res_dict['success'] is True
+
+ task = tests.call_action_api(
+ self.app, 'task_status_show', entity_id=resource.id,
+ task_type='datapusher', key='datapusher')
+
+ assert task['state'] == 'success', task
+
+ task = tests.call_action_api(
+ self.app, 'task_status_show', entity_id=resource.id,
+ task_type='datapusher', key='datapusher')
+
+ assert task['state'] == 'success', task
diff --git a/ckanext/datastore/logic/action.py b/ckanext/datastore/logic/action.py
index d9d291b9df3..804fec1a197 100644
--- a/ckanext/datastore/logic/action.py
+++ b/ckanext/datastore/logic/action.py
@@ -1,10 +1,6 @@
import logging
-import json
-import urlparse
-import datetime
import pylons
-import requests
import sqlalchemy
import ckan.lib.navl.dictization_functions
@@ -39,6 +35,8 @@ def datastore_create(context, data_dict):
:param resource_id: resource id that the data is going to be stored against.
:type resource_id: string
+ :param force: set to True to edit a read-only resource
+ :type force: bool (optional, default: False)
:param resource: resource dictionary that is passed to
:meth:`~ckan.logic.action.create.resource_create`.
Use instead of ``resource_id`` (optional)
@@ -90,22 +88,32 @@ def datastore_create(context, data_dict):
if 'resource' in data_dict:
has_url = 'url' in data_dict['resource']
- data_dict['resource'].setdefault('url', '_tmp')
+ # A datastore only resource does not have a url in the db
+ data_dict['resource'].setdefault('url', '_datastore_only_resource')
res = p.toolkit.get_action('resource_create')(context,
data_dict['resource'])
data_dict['resource_id'] = res['id']
# create resource from file
if has_url:
+ if not p.plugin_loaded('datapusher'):
+ raise p.toolkit.ValidationError({'resource': [
+ 'The datapusher has to be enabled.']})
p.toolkit.get_action('datapusher_submit')(context, {
'resource_id': res['id'],
- 'set_url_to_dump': True
+ 'set_url_type': True
})
+ # since we'll overwrite the datastore resource anyway, we
+ # don't need to create it here
+ return
+
# create empty resource
else:
# no need to set the full url because it will be set in before_show
res['url_type'] = 'datastore'
p.toolkit.get_action('resource_update')(context, res)
+ else:
+ _check_read_only(context, data_dict)
data_dict['connection_url'] = pylons.config['ckan.datastore.write_url']
@@ -153,6 +161,8 @@ def datastore_upsert(context, data_dict):
:param resource_id: resource id that the data is going to be stored under.
:type resource_id: string
+ :param force: set to True to edit a read-only resource
+ :type force: bool (optional, default: False)
:param records: the data, eg: [{"dob": "2005", "some_stuff": ["a","b"]}] (optional)
:type records: list of dictionaries
:param method: the method to use to put the data into the datastore.
@@ -173,6 +183,10 @@ def datastore_upsert(context, data_dict):
if errors:
raise p.toolkit.ValidationError(errors)
+ p.toolkit.check_access('datastore_upsert', context, data_dict)
+
+ _check_read_only(context, data_dict)
+
data_dict['connection_url'] = pylons.config['ckan.datastore.write_url']
res_id = data_dict['resource_id']
@@ -186,8 +200,6 @@ def datastore_upsert(context, data_dict):
u'Resource "{0}" was not found.'.format(res_id)
))
- p.toolkit.check_access('datastore_upsert', context, data_dict)
-
result = db.upsert(context, data_dict)
result.pop('id', None)
result.pop('connection_url')
@@ -199,6 +211,8 @@ def datastore_delete(context, data_dict):
:param resource_id: resource id that the data will be deleted from. (optional)
:type resource_id: string
+ :param force: set to True to edit a read-only resource
+ :type force: bool (optional, default: False)
:param filters: filters to apply before deleting (eg {"name": "fred"}).
If missing delete whole table and all dependent views. (optional)
:type filters: dictionary
@@ -217,6 +231,10 @@ def datastore_delete(context, data_dict):
if errors:
raise p.toolkit.ValidationError(errors)
+ p.toolkit.check_access('datastore_delete', context, data_dict)
+
+ _check_read_only(context, data_dict)
+
data_dict['connection_url'] = pylons.config['ckan.datastore.write_url']
res_id = data_dict['resource_id']
@@ -230,8 +248,6 @@ def datastore_delete(context, data_dict):
u'Resource "{0}" was not found.'.format(res_id)
))
- p.toolkit.check_access('datastore_delete', context, data_dict)
-
result = db.delete(context, data_dict)
result.pop('id', None)
result.pop('connection_url')
@@ -427,129 +443,8 @@ def datastore_make_public(context, data_dict):
db.make_public(context, data_dict)
-def datapusher_submit(context, data_dict):
- ''' Submit a job to the datapusher. The datapusher is a service that
- imports tabular data into the datastore.
-
- :param resource_id: The resource id of the resource that the data
- should be imported in. The resource's URL will be used to get the data.
- :type resource_id: string
- :param set_url_type: If set to true, the ``url_type`` of the resource will
- be set to ``datastore`` and the resource URL will automatically point
- to the :ref:`datastore dump ` URL. (optional, default: False)
- :type set_url_type: boolean
-
- Returns ``True`` if the job has been submitted and ``False`` if the job
- has not been submitted, i.e. when the datapusher is not configured.
-
- :rtype: boolean
- '''
-
- if 'id' in data_dict:
- data_dict['resource_id'] = data_dict['id']
- res_id = _get_or_bust(data_dict, 'resource_id')
-
- p.toolkit.check_access('datapusher_submit', context, data_dict)
-
- datapusher_url = pylons.config.get('datapusher.url')
-
- # no datapusher url means the datapusher should not be used
- if not datapusher_url:
- return False
-
- callback_url = p.toolkit.url_for(
- controller='api', action='action', logic_function='datapusher_hook',
- ver=3, qualified=True)
-
- user = p.toolkit.get_action('user_show')(context, {'id': context['user']})
- try:
- r = requests.post(
- urlparse.urljoin(datapusher_url, 'job'),
- headers={
- 'Content-Type': 'application/json'
- },
- data=json.dumps({
- 'api_key': user['apikey'],
- 'job_type': 'push_to_datastore',
- 'result_url': callback_url,
- 'metadata': {
- 'ckan_url': pylons.config['ckan.site_url'],
- 'resource_id': res_id,
- 'set_url_type': data_dict.get('set_url_type', False)
- }
- }))
- r.raise_for_status()
- except requests.exceptions.ConnectionError, e:
- raise p.toolkit.ValidationError({'datapusher': {
- 'message': 'Could not connect to DataPusher.',
- 'details': str(e)}})
- except requests.exceptions.HTTPError, e:
- m = 'An Error occurred while sending the job: {0}'.format(e.message)
- try:
- body = e.response.json()
- except ValueError:
- body = e.response.text
- raise p.toolkit.ValidationError({'datapusher': {
- 'message': m,
- 'details': body,
- 'status_code': r.status_code}})
-
- empty_task = {
- 'entity_id': res_id,
- 'entity_type': 'resource',
- 'task_type': 'datapusher',
- 'last_updated': str(datetime.datetime.now()),
- 'state': 'pending'
- }
-
- tasks = []
- for (k, v) in [('job_id', r.json()['job_id']),
- ('job_key', r.json()['job_key'])]:
- t = empty_task.copy()
- t['key'] = k
- t['value'] = v
- tasks.append(t)
- p.toolkit.get_action('task_status_update_many')(context, {'data': tasks})
-
- return True
-
-
-def datapusher_hook(context, data_dict):
- """ Update datapusher task. This action is typically called by the
- datapusher whenever the status of a job changes.
-
- Expects a job with ``status`` and ``metadata`` with a ``resource_id``.
- """
-
- # TODO: use a schema to validate
-
- p.toolkit.check_access('datapusher_submit', context, data_dict)
-
- res_id = data_dict['metadata']['resource_id']
-
- task_id = p.toolkit.get_action('task_status_show')(context, {
- 'entity_id': res_id,
- 'task_type': 'datapusher',
- 'key': 'job_id'
- })
-
- task_key = p.toolkit.get_action('task_status_show')(context, {
- 'entity_id': res_id,
- 'task_type': 'datapusher',
- 'key': 'job_key'
- })
-
- tasks = [task_id, task_key]
-
- for task in tasks:
- task['state'] = data_dict['status']
- task['last_updated'] = str(datetime.datetime.now())
-
- p.toolkit.get_action('task_status_update_many')(context, {'data': tasks})
-
-
def _resource_exists(context, data_dict):
- # Returns true if the resource exists in CKAN and in the datastore
+ ''' Returns true if the resource exists in CKAN and in the datastore '''
model = _get_or_bust(context, 'model')
res_id = _get_or_bust(data_dict, 'resource_id')
if not model.Resource.get(res_id):
@@ -559,3 +454,18 @@ def _resource_exists(context, data_dict):
WHERE name = :id AND alias_of IS NULL''')
results = db._get_engine(data_dict).execute(resources_sql, id=res_id)
return results.rowcount > 0
+
+
+def _check_read_only(context, data_dict):
+ ''' Raises exception if the resource is read-only.
+ Make sure the resource id is in resource_id
+ '''
+ if data_dict.get('force'):
+ return
+ res = p.toolkit.get_action('resource_show')(
+ context, {'id': data_dict['resource_id']})
+ if res.get('url_type') != 'datastore':
+ raise p.toolkit.ValidationError({
+ 'read-only': ['Cannot edit read-only resource. Either pass'
+ '"force=True" or change url-type to "datastore"']
+ })
diff --git a/ckanext/datastore/logic/auth.py b/ckanext/datastore/logic/auth.py
index ff410536e21..f99d7f45ae3 100644
--- a/ckanext/datastore/logic/auth.py
+++ b/ckanext/datastore/logic/auth.py
@@ -1,7 +1,7 @@
import ckan.plugins as p
-def _datastore_auth(context, data_dict, privilege='resource_update'):
+def datastore_auth(context, data_dict, privilege='resource_update'):
if not 'id' in data_dict:
data_dict['id'] = data_dict.get('resource_id')
user = context.get('user')
@@ -19,25 +19,21 @@ def _datastore_auth(context, data_dict, privilege='resource_update'):
def datastore_create(context, data_dict):
- return _datastore_auth(context, data_dict)
+ return datastore_auth(context, data_dict)
def datastore_upsert(context, data_dict):
- return _datastore_auth(context, data_dict)
+ return datastore_auth(context, data_dict)
def datastore_delete(context, data_dict):
- return _datastore_auth(context, data_dict)
+ return datastore_auth(context, data_dict)
@p.toolkit.auth_allow_anonymous_access
def datastore_search(context, data_dict):
- return _datastore_auth(context, data_dict, 'resource_show')
-
-
-def datapusher_submit(context, data_dict):
- return _datastore_auth(context, data_dict)
+ return datastore_auth(context, data_dict, 'resource_show')
def datastore_change_permissions(context, data_dict):
- return _datastore_auth(context, data_dict)
+ return datastore_auth(context, data_dict)
diff --git a/ckanext/datastore/logic/schema.py b/ckanext/datastore/logic/schema.py
index 018eb249d79..6f0a74723ca 100644
--- a/ckanext/datastore/logic/schema.py
+++ b/ckanext/datastore/logic/schema.py
@@ -68,6 +68,7 @@ def json_validator(value, context):
def datastore_create_schema():
schema = {
'resource_id': [ignore_missing, unicode, resource_id_exists],
+ 'force': [ignore_missing, boolean_validator],
'id': [ignore_missing],
'aliases': [ignore_missing, list_of_strings_or_string],
'fields': {
@@ -85,6 +86,7 @@ def datastore_create_schema():
def datastore_upsert_schema():
schema = {
'resource_id': [not_missing, not_empty, unicode],
+ 'force': [ignore_missing, boolean_validator],
'id': [ignore_missing],
'method': [ignore_missing, unicode, OneOf(
['upsert', 'insert', 'update'])],
@@ -97,6 +99,7 @@ def datastore_upsert_schema():
def datastore_delete_schema():
schema = {
'resource_id': [not_missing, not_empty, unicode],
+ 'force': [ignore_missing, boolean_validator],
'id': [ignore_missing],
'__junk': [empty],
'__before': [rename('id', 'resource_id')]
diff --git a/ckanext/datastore/plugin.py b/ckanext/datastore/plugin.py
index 04f201f9eb0..f91310ecdec 100644
--- a/ckanext/datastore/plugin.py
+++ b/ckanext/datastore/plugin.py
@@ -61,9 +61,9 @@ def configure(self, config):
else:
self.read_url = self.config['ckan.datastore.read_url']
- read_engine = db._get_engine(
+ self.read_engine = db._get_engine(
{'connection_url': self.read_url})
- if not model.engine_is_pg(read_engine):
+ if not model.engine_is_pg(self.read_engine):
log.warn('We detected that you do not use a PostgreSQL '
'database. The DataStore will NOT work and DataStore '
'tests will be skipped.')
@@ -75,63 +75,15 @@ def configure(self, config):
'of _table_metadata are skipped.')
else:
self._check_urls_and_permissions()
-
self._create_alias_table()
- # update the resource_show action to have datastore_active property
- if self.resource_show_action is None:
- resource_show = p.toolkit.get_action('resource_show')
-
- @logic.side_effect_free
- def new_resource_show(context, data_dict):
- new_data_dict = resource_show(context, data_dict)
- try:
- connection = read_engine.connect()
- result = connection.execute(
- 'SELECT 1 FROM "_table_metadata" WHERE name = %s AND alias_of IS NULL',
- new_data_dict['id']
- ).fetchone()
- if result:
- new_data_dict['datastore_active'] = True
- else:
- new_data_dict['datastore_active'] = False
- finally:
- connection.close()
- return new_data_dict
-
- self.resource_show_action = new_resource_show
def notify(self, entity, operation=None):
- '''
- if not isinstance(entity, model.Resource):
- return
- if operation:
- if operation == model.domain_object.DomainObjectOperation.new:
- self._create_datastorer_task(entity)
- else:
- # if operation is None, resource URL has been changed, as the
- # notify function in IResourceUrlChange only takes 1 parameter
- self._create_datastorer_task(entity)
- '''
- context = {'model': model, 'ignore_auth': True}
- if isinstance(entity, model.Resource):
- if (operation == model.domain_object.DomainObjectOperation.new
- or not operation):
- # if operation is None, resource URL has been changed, as
- # the notify function in IResourceUrlChange only takes
- # 1 parameter
- package = p.toolkit.get_action('package_show')(context, {
- 'id': entity.get_package_id()
- })
- if (not package['private'] and
- entity.format in self.datapusher_formats):
- p.toolkit.get_action('datapusher_submit')(context, {
- 'resource_id': entity.id
- })
if not isinstance(entity, model.Package) or self.legacy_mode:
return
# if a resource is new, it cannot have a datastore resource, yet
if operation == model.domain_object.DomainObjectOperation.changed:
+ context = {'model': model, 'ignore_auth': True}
if entity.private:
func = p.toolkit.get_action('datastore_make_private')
else:
@@ -158,7 +110,7 @@ def _check_urls_and_permissions(self):
self._log_or_raise('CKAN and DataStore database '
'cannot be the same.')
- # in legacy mode, the read and write url are ths same (both write url)
+ # in legacy mode, the read and write url are the same (both write url)
# consequently the same url check and and write privilege check
# don't make sense
if not self.legacy_mode:
@@ -256,9 +208,6 @@ def get_actions(self):
'datastore_upsert': action.datastore_upsert,
'datastore_delete': action.datastore_delete,
'datastore_search': action.datastore_search,
- 'datapusher_submit': action.datapusher_submit,
- 'datapusher_hook': action.datapusher_hook,
- 'resource_show': self.resource_show_action,
}
if not self.legacy_mode:
actions.update({
@@ -272,8 +221,7 @@ def get_auth_functions(self):
'datastore_upsert': auth.datastore_upsert,
'datastore_delete': auth.datastore_delete,
'datastore_search': auth.datastore_search,
- 'datastore_change_permissions': auth.datastore_change_permissions,
- 'datapusher_submit': auth.datapusher_submit}
+ 'datastore_change_permissions': auth.datastore_change_permissions}
def before_map(self, m):
m.connect('/datastore/dump/{resource_id}',
@@ -282,11 +230,23 @@ def before_map(self, m):
return m
def before_show(self, resource_dict):
- ''' Modify the resource url of datastore resources so that
- they link to the datastore dumps.
- '''
+ # Modify the resource url of datastore resources so that
+ # they link to the datastore dumps.
if resource_dict.get('url_type') == 'datastore':
resource_dict['url'] = p.toolkit.url_for(
controller='ckanext.datastore.controller:DatastoreController',
action='dump', resource_id=resource_dict['id'])
+
+ try:
+ connection = self.read_engine.connect()
+ result = connection.execute(
+ 'SELECT 1 FROM "_table_metadata" WHERE name = %s AND alias_of IS NULL',
+ resource_dict['id']
+ ).fetchone()
+ if result:
+ resource_dict['datastore_active'] = True
+ else:
+ resource_dict['datastore_active'] = False
+ finally:
+ connection.close()
return resource_dict
diff --git a/ckanext/datastore/tests/helpers.py b/ckanext/datastore/tests/helpers.py
index cf83f57378a..3ee89cdda20 100644
--- a/ckanext/datastore/tests/helpers.py
+++ b/ckanext/datastore/tests/helpers.py
@@ -1,6 +1,8 @@
import ckan.model as model
import ckan.lib.cli as cli
+import ckan.plugins as p
+
def extract(d, keys):
return dict((k, d[k]) for k in keys if k in d)
@@ -29,3 +31,12 @@ def rebuild_all_dbs(Session):
model.repo.tables_created_and_initialised = False
clear_db(Session)
model.repo.rebuild_db()
+
+
+def set_url_type(resources, user):
+ context = {'user': user.name}
+ for resource in resources:
+ resource = p.toolkit.get_action('resource_show')(
+ context, {'id': resource.id})
+ resource['url_type'] = 'datastore'
+ p.toolkit.get_action('resource_update')(context, resource)
diff --git a/ckanext/datastore/tests/test_create.py b/ckanext/datastore/tests/test_create.py
index d7b893235ed..f2db232af8a 100644
--- a/ckanext/datastore/tests/test_create.py
+++ b/ckanext/datastore/tests/test_create.py
@@ -17,7 +17,7 @@
import ckan.config.middleware as middleware
import ckanext.datastore.db as db
-from ckanext.datastore.tests.helpers import rebuild_all_dbs
+from ckanext.datastore.tests.helpers import rebuild_all_dbs, set_url_type
# avoid hanging tests https://github.com/gabrielfalcao/HTTPretty/issues/34
@@ -44,6 +44,8 @@ def setup_class(cls):
engine = db._get_engine(
{'connection_url': pylons.config['ckan.datastore.write_url']})
cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
+ set_url_type(
+ model.Package.get('annakarenina').resources, cls.sysadmin_user)
@classmethod
def teardown_class(cls):
@@ -536,144 +538,6 @@ def test_create_basic(self):
assert res_dict['success'] is True, res_dict
- def test_create_ckan_resource_in_package(self):
- package = model.Package.get('annakarenina')
- data = {
- 'resource': {'package_id': package.id}
- }
- postparams = '%s=1' % json.dumps(data)
- auth = {'Authorization': str(self.sysadmin_user.apikey)}
- res = self.app.post('/api/action/datastore_create', params=postparams,
- extra_environ=auth, status=200)
- res_dict = json.loads(res.body)
-
- assert 'resource_id' in res_dict['result']
- assert len(model.Package.get('annakarenina').resources) == 3
-
- res = tests.call_action_api(
- self.app, 'resource_show', id=res_dict['result']['resource_id'])
- assert res['url'] == '/datastore/dump/' + res['id'], res
-
- @httpretty.activate
- def test_providing_res_with_url_calls_datapusher_correctly(self):
- pylons.config['datapusher.url'] = 'http://datapusher.ckan.org'
- httpretty.HTTPretty.register_uri(
- httpretty.HTTPretty.POST,
- 'http://datapusher.ckan.org/job',
- content_type='application/json',
- body=json.dumps({'job_id': 'foo', 'job_key': 'bar'}))
-
- package = model.Package.get('annakarenina')
-
- tests.call_action_api(
- self.app, 'datastore_create', apikey=self.sysadmin_user.apikey,
- resource=dict(package_id=package.id, url='demo.ckan.org'))
-
- assert len(package.resources) == 4, len(package.resources)
- resource = package.resources[3]
- data = json.loads(httpretty.last_request().body)
- assert data['metadata']['resource_id'] == resource.id, data
- assert data['result_url'].endswith('/action/datapusher_hook'), data
- assert data['result_url'].startswith('http://'), data
-
- def test_cant_provide_resource_and_resource_id(self):
- package = model.Package.get('annakarenina')
- resource = package.resources[0]
- data = {
- 'resource_id': resource.id,
- 'resource': {'package_id': package.id}
- }
- postparams = '%s=1' % json.dumps(data)
- auth = {'Authorization': str(self.sysadmin_user.apikey)}
- res = self.app.post('/api/action/datastore_create', params=postparams,
- extra_environ=auth, status=409)
- res_dict = json.loads(res.body)
-
- assert res_dict['error']['__type'] == 'Validation Error'
-
- @httpretty.activate
- def test_send_datapusher_creates_task(self):
- httpretty.HTTPretty.register_uri(
- httpretty.HTTPretty.POST,
- 'http://datapusher.ckan.org/job',
- content_type='application/json',
- body=json.dumps({'job_id': 'foo', 'job_key': 'bar'}))
-
- package = model.Package.get('annakarenina')
- resource = package.resources[0]
-
- context = {
- 'ignore_auth': True,
- 'user': self.sysadmin_user.name
- }
-
- p.toolkit.get_action('datapusher_submit')(context, {
- 'resource_id': resource.id
- })
-
- task = p.toolkit.get_action('task_status_show')(context, {
- 'entity_id': resource.id,
- 'task_type': 'datapusher',
- 'key': 'job_id'
- })
-
- assert task['state'] == 'pending', task
-
- def test_datapusher_hook(self):
- package = model.Package.get('annakarenina')
- resource = package.resources[0]
-
- context = {
- 'user': self.sysadmin_user.name
- }
-
- p.toolkit.get_action('task_status_update')(context, {
- 'entity_id': resource.id,
- 'entity_type': 'resource',
- 'task_type': 'datapusher',
- 'key': 'job_id',
- 'value': 'my_id',
- 'last_updated': str(datetime.datetime.now()),
- 'state': 'pending'
- })
-
- p.toolkit.get_action('task_status_update')(context, {
- 'entity_id': resource.id,
- 'entity_type': 'resource',
- 'task_type': 'datapusher',
- 'key': 'job_key',
- 'value': 'my_key',
- 'last_updated': str(datetime.datetime.now()),
- 'state': 'pending'
- })
-
- data = {
- 'status': 'success',
- 'metadata': {
- 'resource_id': resource.id
- }
- }
- postparams = '%s=1' % json.dumps(data)
- auth = {'Authorization': str(self.sysadmin_user.apikey)}
- res = self.app.post('/api/action/datapusher_hook', params=postparams,
- extra_environ=auth, status=200)
- print res.body
- res_dict = json.loads(res.body)
-
- assert res_dict['success'] is True
-
- task = tests.call_action_api(
- self.app, 'task_status_show', entity_id=resource.id,
- task_type='datapusher', key='job_id')
-
- assert task['state'] == 'success', task
-
- task = tests.call_action_api(
- self.app, 'task_status_show', entity_id=resource.id,
- task_type='datapusher', key='job_key')
-
- assert task['state'] == 'success', task
-
def test_guess_types(self):
resource = model.Package.get('annakarenina').resources[1]
diff --git a/ckanext/datastore/tests/test_delete.py b/ckanext/datastore/tests/test_delete.py
index ce1b02efdb1..bb7db39217a 100644
--- a/ckanext/datastore/tests/test_delete.py
+++ b/ckanext/datastore/tests/test_delete.py
@@ -11,7 +11,7 @@
import ckan.tests as tests
import ckanext.datastore.db as db
-from ckanext.datastore.tests.helpers import rebuild_all_dbs
+from ckanext.datastore.tests.helpers import rebuild_all_dbs, set_url_type
class TestDatastoreDelete(tests.WsgiAppCase):
@@ -43,6 +43,8 @@ def setup_class(cls):
engine = db._get_engine(
{'connection_url': pylons.config['ckan.datastore.write_url']})
cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
+ set_url_type(
+ model.Package.get('annakarenina').resources, cls.sysadmin_user)
@classmethod
def teardown_class(cls):
diff --git a/ckanext/datastore/tests/test_dump.py b/ckanext/datastore/tests/test_dump.py
index 041105af986..6061535f4ba 100644
--- a/ckanext/datastore/tests/test_dump.py
+++ b/ckanext/datastore/tests/test_dump.py
@@ -32,6 +32,7 @@ def setup_class(cls):
resource = model.Package.get('annakarenina').resources[0]
cls.data = {
'resource_id': resource.id,
+ 'force': True,
'aliases': 'books',
'fields': [{'id': u'b\xfck', 'type': 'text'},
{'id': 'author', 'type': 'text'},
diff --git a/ckanext/datastore/tests/test_search.py b/ckanext/datastore/tests/test_search.py
index 21cb19de4e2..1b7c9486da7 100644
--- a/ckanext/datastore/tests/test_search.py
+++ b/ckanext/datastore/tests/test_search.py
@@ -30,6 +30,7 @@ def setup_class(cls):
cls.resource = cls.dataset.resources[0]
cls.data = {
'resource_id': cls.resource.id,
+ 'force': True,
'aliases': 'books3',
'fields': [{'id': u'b\xfck', 'type': 'text'},
{'id': 'author', 'type': 'text'},
@@ -116,7 +117,7 @@ def test_search_private_dataset(self):
context,
{'name': 'privatedataset',
'private': True,
- 'owner_org' : self.organization['id'],
+ 'owner_org': self.organization['id'],
'groups': [{
'id': group.id
}]})
@@ -128,6 +129,7 @@ def test_search_private_dataset(self):
postparams = '%s=1' % json.dumps({
'resource_id': resource['id'],
+ 'force': True
})
auth = {'Authorization': str(self.sysadmin_user.apikey)}
res = self.app.post('/api/action/datastore_create', params=postparams,
@@ -425,6 +427,7 @@ def setup_class(cls):
resource = model.Package.get('annakarenina').resources[0]
cls.data = dict(
resource_id=resource.id,
+ force=True,
fields=[
{'id': 'id'},
{'id': 'date', 'type':'date'},
@@ -499,6 +502,7 @@ def setup_class(cls):
resource = cls.dataset.resources[0]
cls.data = {
'resource_id': resource.id,
+ 'force': True,
'aliases': 'books4',
'fields': [{'id': u'b\xfck', 'type': 'text'},
{'id': 'author', 'type': 'text'},
@@ -517,7 +521,7 @@ def setup_class(cls):
extra_environ=auth)
res_dict = json.loads(res.body)
assert res_dict['success'] is True
-
+
# Make an organization, because private datasets must belong to one.
cls.organization = tests.call_action_api(
cls.app, 'organization_create',
@@ -669,6 +673,7 @@ def test_new_datastore_table_from_private_resource(self):
postparams = '%s=1' % json.dumps({
'resource_id': resource['id'],
+ 'force': True
})
auth = {'Authorization': str(self.sysadmin_user.apikey)}
res = self.app.post('/api/action/datastore_create', params=postparams,
@@ -708,7 +713,9 @@ def test_making_resource_private_makes_datastore_private(self):
'package_id': package['id']})
postparams = '%s=1' % json.dumps({
- 'resource_id': resource['id']})
+ 'resource_id': resource['id'],
+ 'force': True
+ })
auth = {'Authorization': str(self.sysadmin_user.apikey)}
res = self.app.post('/api/action/datastore_create', params=postparams,
extra_environ=auth)
diff --git a/ckanext/datastore/tests/test_upsert.py b/ckanext/datastore/tests/test_upsert.py
index e27f8b9c523..9d65700198a 100644
--- a/ckanext/datastore/tests/test_upsert.py
+++ b/ckanext/datastore/tests/test_upsert.py
@@ -11,7 +11,7 @@
import ckan.tests as tests
import ckanext.datastore.db as db
-from ckanext.datastore.tests.helpers import rebuild_all_dbs
+from ckanext.datastore.tests.helpers import rebuild_all_dbs, set_url_type
class TestDatastoreUpsert(tests.WsgiAppCase):
@@ -26,6 +26,8 @@ def setup_class(cls):
ctd.CreateTestData.create()
cls.sysadmin_user = model.User.get('testsysadmin')
cls.normal_user = model.User.get('annafan')
+ set_url_type(
+ model.Package.get('annakarenina').resources, cls.sysadmin_user)
resource = model.Package.get('annakarenina').resources[0]
cls.data = {
'resource_id': resource.id,
@@ -249,6 +251,8 @@ def setup_class(cls):
ctd.CreateTestData.create()
cls.sysadmin_user = model.User.get('testsysadmin')
cls.normal_user = model.User.get('annafan')
+ set_url_type(
+ model.Package.get('annakarenina').resources, cls.sysadmin_user)
resource = model.Package.get('annakarenina').resources[0]
cls.data = {
'resource_id': resource.id,
@@ -349,6 +353,8 @@ def setup_class(cls):
ctd.CreateTestData.create()
cls.sysadmin_user = model.User.get('testsysadmin')
cls.normal_user = model.User.get('annafan')
+ set_url_type(
+ model.Package.get('annakarenina').resources, cls.sysadmin_user)
resource = model.Package.get('annakarenina').resources[0]
hhguide = u"hitchhiker's guide to the galaxy"
cls.data = {
diff --git a/doc/configuration.rst b/doc/configuration.rst
index 2a6e3aa592e..0b623c5e656 100644
--- a/doc/configuration.rst
+++ b/doc/configuration.rst
@@ -1098,23 +1098,23 @@ Secret Access Key.
DataPusher Settings
-------------------
-.. _datapusher.formats:
+.. _ckan.datapusher.formats:
-datapusher.formats
+ckan.datapusher.formats
^^^^^^^^^^^^^^^^^^
Example::
- datapusher.formats = csv xls xlsx
+ ckan.datapusher.formats = csv xls xlsx
.. todo:: Expand
-.. _datapusher.url:
+.. _ckan.datapusher.url:
-datapusher.url
-^^^^^^^^^^^^^^
+ckan.datapusher.url
+^^^^^^^^^^^^^^^^^^^
Example::
- datapusher.url = http://datapusher.ckan.org/
+ ckan.datapusher.url = http://datapusher.ckan.org/
.. todo:: Expand
diff --git a/setup.py b/setup.py
index 3f5115bfa8f..cfe3bc63cda 100644
--- a/setup.py
+++ b/setup.py
@@ -115,6 +115,7 @@
organizations=ckanext.organizations.forms:OrganizationForm
organizations_dataset=ckanext.organizations.forms:OrganizationDatasetForm
datastore=ckanext.datastore.plugin:DatastorePlugin
+ datapusher=ckanext.datapusher.plugin:DatapusherPlugin
test_tag_vocab_plugin=ckanext.test_tag_vocab_plugin:MockVocabTagsPlugin
resource_proxy=ckanext.resourceproxy.plugin:ResourceProxy
text_preview=ckanext.textpreview.plugin:TextPreview
diff --git a/test-core.ini b/test-core.ini
index c4278e6540c..ac493b5461f 100644
--- a/test-core.ini
+++ b/test-core.ini
@@ -25,6 +25,8 @@ sqlalchemy.url = postgresql://ckan_default:pass@localhost/ckan_test
ckan.datastore.write_url = postgresql://ckan_default:pass@localhost/datastore_test
ckan.datastore.read_url = postgresql://datastore_default:pass@localhost/datastore_test
+ckan.datapusher.url = http://datapusher.ckan.org/
+
## Solr support
solr_url = http://127.0.0.1:8983/solr