Skip to content

Commit

Permalink
[#2] Add resourse data to DataStore
Browse files Browse the repository at this point in the history
If `push_resources_to_datastore: true`, add streaming resources to the
CKAN DataStore.
  • Loading branch information
brew committed Oct 30, 2017
1 parent 3014462 commit 32f0efa
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 24 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ A processor to save a datapackage and resources to a specified CKAN instance.
ckan-host: http://demo.ckan.org
ckan-api-key: env:CKAN_API_KEY
overwrite_existing: true
push_resources_to_datastore: true
dataset-properties:
name: test-dataset-010203
state: draft
Expand All @@ -59,6 +60,7 @@ A processor to save a datapackage and resources to a specified CKAN instance.
- `ckan-host`: The base url (and scheme) for the CKAN instance (e.g. http://demo.ckan.org).
- `ckan-api-key`: Either a CKAN user api key or, if in the format `env:CKAN_API_KEY_NAME`, an env var that defines an api key.
- `overwrite_existing`: If `true`, if the CKAN dataset already exists, it will be overwritten by the datapackage. Optional, and default is `false`.
- `push_resources_to_datastore`: If `true`, newly created resources will be pushed the CKAN DataStore. Optional, and default is `false`.
- `dataset-properties`: An optional object, the properties of which will be used to set properties of the CKAN dataset.

##### CKAN dataset from datapackage
Expand All @@ -68,3 +70,5 @@ The processor first creates a CKAN dataset from the datapackage specification, u
##### CKAN resources from datapackage resources

If the CKAN dataset was successfully created or updated, the dataset resources will be created for each resource in the datapackage, using [`resource_create`](http://docs.ckan.org/en/latest/api/#ckan.logic.action.create.resource_create). If datapackage resource are marked for streaming (they have the `dpp:streamed=True` property), resource files will be uploaded to the CKAN filestore. For example, remote resources may be marked for streaming by the inclusion of the `stream_remote_resources` processor earlier in the pipeline.

Additionally, if `push_resources_to_datastore` is `True`, the processor will push resources marked for streaming to the CKAN DataStore using [`datastore_create`](https://ckan.readthedocs.io/en/latest/maintaining/datastore.html#ckanext.datastore.logic.action.datastore_create) and [`datastore_upsert`](https://ckan.readthedocs.io/en/latest/maintaining/datastore.html#ckanext.datastore.logic.action.datastore_upsert).
58 changes: 37 additions & 21 deletions datapackage_pipelines_ckan/processors/dump/to_ckan.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
import json
import hashlib

from tabulator import Stream
import datapackage as datapackage_lib
from ckan_datapackage_tools import converter
from datapackage_pipelines.lib.dump.dumper_base import FileDumper, DumperBase
from tableschema_ckan_datastore import Storage

from datapackage_pipelines_ckan.utils import make_ckan_request, get_ckan_error

Expand All @@ -16,11 +18,16 @@ class CkanDumper(FileDumper):

def initialize(self, parameters):
super(CkanDumper, self).initialize(parameters)
self.ckan_base_url = '{ckan_host}/api/3/action/'.format(
ckan_host=parameters['ckan-host'])
self.ckan_api_key = parameters.get('ckan-api-key')
self.dataset_resources = []
self.dataset_id = None

base_path = "/api/3/action"
self.__base_url = parameters['ckan-host']
self.__base_endpoint = self.__base_url + base_path

self.__ckan_api_key = parameters.get('ckan-api-key')
self.__dataset_resources = []
self.__dataset_id = None
self.__push_to_datastore = \
parameters.get('push_resources_to_datastore', False)

def handle_resources(self, datapackage,
resource_iterator,
Expand Down Expand Up @@ -50,7 +57,7 @@ def handle_resources(self, datapackage,
for resource in datapackage['resources']:
if not resource.get('dpp:streaming', False):
resource_metadata = {
'package_id': self.dataset_id,
'package_id': self.__dataset_id,
'url': resource['dpp:streamedFrom'],
'name': resource['name'],
}
Expand Down Expand Up @@ -101,46 +108,44 @@ def handle_datapackage(self, datapackage, parameters, stats):
dp = datapackage_lib.DataPackage(datapackage)
dataset.update(converter.datapackage_to_dataset(dp))

self.dataset_resources = dataset.get('resources', [])
if self.dataset_resources:
self.__dataset_resources = dataset.get('resources', [])
if self.__dataset_resources:
del dataset['resources']

# Merge dataset-properties from parameters into dataset.
dataset_props_from_params = parameters.get('dataset-properties')
if dataset_props_from_params:
dataset.update(dataset_props_from_params)

package_create_url = \
'{ckan_base_url}package_create'.format(
ckan_base_url=self.ckan_base_url)
package_create_url = '{}/package_create'.format(self.__base_endpoint)

response = make_ckan_request(package_create_url,
method='POST',
json=dataset,
api_key=self.ckan_api_key)
api_key=self.__ckan_api_key)

ckan_error = get_ckan_error(response)
if ckan_error \
and parameters.get('overwrite_existing') \
and 'That URL is already in use.' in ckan_error.get('name', []):

package_update_url = '{ckan_base_url}package_update'.format(
ckan_base_url=self.ckan_base_url)
package_update_url = \
'{}/package_update'.format(self.__base_endpoint)

log.info('CKAN dataset with url already exists. '
'Attempting package_update.')
response = make_ckan_request(package_update_url,
method='POST',
json=dataset,
api_key=self.ckan_api_key)
api_key=self.__ckan_api_key)
ckan_error = get_ckan_error(response)

if ckan_error:
log.exception('CKAN returned an error: ' + json.dumps(ckan_error))
raise Exception

if response['success']:
self.dataset_id = response['result']['id']
self.__dataset_id = response['result']['id']

def rows_processor(self, resource, spec, temp_file, writer, fields,
datapackage):
Expand Down Expand Up @@ -170,7 +175,7 @@ def rows_processor(self, resource, spec, temp_file, writer, fields,
temp_file.close()

resource_metadata = {
'package_id': self.dataset_id,
'package_id': self.__dataset_id,
'url': 'url',
'url_type': 'upload',
'name': spec['name'],
Expand All @@ -189,18 +194,28 @@ def rows_processor(self, resource, spec, temp_file, writer, fields,
'files': resource_files
}
try:
self._create_ckan_resource(request_params)
# Create the CKAN resource
create_result = self._create_ckan_resource(request_params)
if self.__push_to_datastore:
# Create the DataStore resource
storage = Storage(base_url=self.__base_url,
dataset_id=self.__dataset_id,
api_key=self.__ckan_api_key)
resource_id = create_result['id']
storage.create(resource_id, spec['schema'])
storage.write(resource_id,
Stream(temp_file.name, format='csv').open(),
method='insert')
except Exception as e:
raise e
finally:
os.unlink(filename)

def _create_ckan_resource(self, request_params):
resource_create_url = '{ckan_base_url}resource_create'.format(
ckan_base_url=self.ckan_base_url)
resource_create_url = '{}/resource_create'.format(self.__base_endpoint)

create_response = make_ckan_request(resource_create_url,
api_key=self.ckan_api_key,
api_key=self.__ckan_api_key,
method='POST',
**request_params)

Expand All @@ -209,6 +224,7 @@ def _create_ckan_resource(self, request_params):
log.exception('CKAN returned an error when creating '
'a resource: ' + json.dumps(ckan_error))
raise Exception
return create_response['result']


if __name__ == '__main__':
Expand Down
2 changes: 1 addition & 1 deletion examples/pipeline-spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ example-dump-to-ckan:
parameters:
ckan-host: https://demo.ckan.org
ckan-api-key: env:CKAN_API_KEY
push_resources_to_datastore: false
push_resources_to_datastore: true
overwrite_existing: true
dataset-properties:
name: test-dataset-010203
Expand Down
5 changes: 3 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ def read(*paths):
NAME = PACKAGE.replace('_', '-')
INSTALL_REQUIRES = [
'datapackage-pipelines>=1.0,<2.0',
'ckan-datapackage-tools'
'ckan-datapackage-tools',
'tableschema-ckan-datastore'
]
TESTS_REQUIRE = [
'pylama',
Expand Down Expand Up @@ -52,7 +53,7 @@ def read(*paths):
'data', 'ckan'
],
classifiers=[
'Development Status :: 5 - Alpha',
'Development Status :: 3 - Alpha',
'Environment :: Web Environment',
'Intended Audience :: Developers',
'License :: OSI Approved :: MIT License',
Expand Down
128 changes: 128 additions & 0 deletions tests/test_dump_to_ckan.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,3 +369,131 @@ def test_dump_to_ckan_package_create_streaming_resource_fail(self,
with self.assertRaises(Exception):
for r in spew_res_iter:
list(r) # iterate the row to yield it

@requests_mock.mock()
def test_dump_to_ckan_package_create_streaming_resource_datastore(self, mock_request): # noqa
'''Create package with streaming resource, and pushing to datastore.'''

package_id = 'ckan-package-id'
base_url = 'https://demo.ckan.org/api/3/action/'
package_create_url = '{}package_create'.format(base_url)
resource_create_url = '{}resource_create'.format(base_url)
package_show_url = '{}package_show?id={}'.format(base_url, package_id)
datastore_search_url = \
'{}datastore_search?resource_id=_table_metadata'.format(base_url)
datastore_create_url = '{}datastore_create'.format(base_url)
datastore_upsert_url = '{}datastore_upsert'.format(base_url)

mock_request.post(package_create_url,
json={
'success': True,
'result': {'id': package_id}})
mock_request.post(resource_create_url,
json={
'success': True,
'result': {'id': 'ckan-resource-id'}})
mock_request.get(package_show_url,
json={
'success': True,
'result': {
'id': '7766839b-face-4336-8e1a-3c51c5e7634d',
'resources': [
{
'name': 'co2-mm-mlo_csv_not_streamed',
'format': 'CSV',
'url': 'https://pkgstore.datahub.io/core/co2-ppm:co2-mm-mlo_csv/data/co2-mm-mlo_csv.csv',
'datastore_active': False,
'cache_last_updated': None,
'package_id': '7766839b-face-4336-8e1a-3c51c5e7634d',
'id': '329e4271-8cc3-48c9-a219-c8eab52acc65',
}, {
'name': 'co2-mm-mlo_csv_streamed',
'encoding': 'utf-8',
'url': 'https://demo.ckan.org/dataset/7766839b-face-4336-8e1a-3c51c5e7634d/resource/723380d7-688a-465f-b0bd-ff6d1ec25680/download/co2-mm-mlo_csv_streamed.csv',
'datastore_active': False,
'format': 'CSV',
'package_id': '7766839b-face-4336-8e1a-3c51c5e7634d',
'id': '723380d7-688a-465f-b0bd-ff6d1ec25680',
}
],
'num_resources': 2,
'name': 'test-dataset-010203',
'title': 'Test Dataset'
}
})

mock_request.get(datastore_search_url,
json={
'success': True,
'result': {
'resource_id': '_table_metadata',
'records': []
}})
mock_request.post(datastore_create_url,
json={
'success': True,
'result': {
'resource_id': '7564690e-86ec-44de-a3f5-2cff9cbb521f'
}
})
mock_request.post(datastore_upsert_url,
json={
'success': True
})
# input arguments used by our mock `ingest`
datapackage = {
'name': 'my-datapackage',
'project': 'my-project',
'resources': [{
"dpp:streamedFrom": "https://example.com/file.csv",
"dpp:streaming": True,
"name": "resource_streamed.csv",
"path": "data/file.csv",
'schema': {'fields': [
{'name': 'first', 'type': 'string'},
{'name': 'last', 'type': 'string'}
]}
}, {
"dpp:streamedFrom": "https://example.com/file_02.csv",
"name": "resource_not_streamed.csv",
"path": "."
}]
}
params = {
'ckan-host': 'https://demo.ckan.org',
'ckan-api-key': 'my-api-key',
'overwrite_existing': True,
'force-format': True,
'push_resources_to_datastore': True
}

# Path to the processor we want to test
processor_dir = \
os.path.dirname(datapackage_pipelines_ckan.processors.__file__)
processor_path = os.path.join(processor_dir, 'dump/to_ckan.py')

# Trigger the processor with our mock `ingest` and capture what it will
# returned to `spew`.
json_file = {'first': 'Fred', 'last': 'Smith'}
json_file = json.dumps(json_file)
spew_args, _ = mock_dump_test(
processor_path,
(params, datapackage,
iter([ResourceIterator(io.StringIO(json_file),
datapackage['resources'][0],
{'schema': {'fields': []}})
])))

spew_res_iter = spew_args[1]
for r in spew_res_iter:
list(r) # iterate the row to yield it

requests = mock_request.request_history
assert len(requests) == 7
assert requests[0].url == package_create_url
assert requests[1].url == resource_create_url
assert requests[2].url == resource_create_url
assert requests[3].url == package_show_url
assert requests[4].url.startswith(datastore_search_url)
assert requests[5].url == datastore_create_url
assert requests[6].url == datastore_upsert_url

0 comments on commit 32f0efa

Please sign in to comment.