Skip to content

Commit

Permalink
Merge ca3598b into bdc7e8f
Browse files Browse the repository at this point in the history
  • Loading branch information
lrromero committed Nov 6, 2018
2 parents bdc7e8f + ca3598b commit d7dfaa9
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 16 deletions.
18 changes: 17 additions & 1 deletion docs/MANUAL.md
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,10 @@ Toma los siguientes parámetros:
como groups de CKAN.
- **demote_themes** (opcional, default: True): Si está en true, los labels de los themes del dataset, se escriben como
tags de CKAN; sino,se pasan como grupo.
- **download_strategy** (opcional, default None): La referencia a una función que toma (catalog, distribution) de
entrada y devuelve un booleano. Esta función se aplica sobre todas las distribuciones del dataset. Si devuelve `True`,
se descarga el archivo indicado en el `downloadURL` de la distribución y se lo sube al portal de destino. Si es None,
se omite esta operación.


Retorna el id en el nodo de destino del dataset federado.
Expand Down Expand Up @@ -400,6 +404,10 @@ Toma los siguientes parámetros:
organización pasada como parámetro.
- **catalog_id**: El prefijo que va a preceder el id y name del dataset en el portal
destino, separado por un guión.
- **download_strategy** (opcional, default None): La referencia a una función que toma (catalog, distribution) de
entrada y devuelve un booleano. Esta función se aplica sobre todas las distribuciones del dataset. Si devuelve `True`,
se descarga el archivo indicado en el `downloadURL` de la distribución y se lo sube al portal de destino. Si es None,
se omite esta operación.


Retorna el id en el nodo de destino del dataset federado.
Expand All @@ -411,6 +419,10 @@ Toma los siguientes parámetros:
- **portal_url**: URL del portal de CKAN de destino.
- **apikey**: La apikey de un usuario del portal de destino con los permisos para crear el dataset bajo la
organización pasada como parámetro.
- **download_strategy** (opcional, default None): La referencia a una función que toma (catalog, distribution) de
entrada y devuelve un booleano. Esta función se aplica sobre todas las distribuciones del dataset. Si devuelve `True`,
se descarga el archivo indicado en el `downloadURL` de la distribución y se lo sube al portal de destino. Si es None,
se omite esta operación.

Retorna el id del dataset restaurado.

Expand All @@ -424,7 +436,11 @@ Toma los siguientes parámetros:
- **dataset_list** (opcional, default: None): Lista de ids de los datasets a federar. Si no se pasa, se federan todos
los datasets del catálogo.
- **owner_org** (opcional, default: None): La organización a la que pertence el dataset. Debe encontrarse en el
portal de destino. Si no se pasa, se toma como organización el catalog_id
portal de destino. Si no se pasa, se toma como organización el catalog_id.
- **download_strategy** (opcional, default None): La referencia a una función que toma (catalog, distribution) de
entrada y devuelve un booleano. Esta función se aplica sobre todas las distribuciones del catálogo. Si devuelve
`True`, se descarga el archivo indicado en el `downloadURL` de la distribución y se lo sube al portal de destino. Si
es None, se omite esta operación.

Retorna el id en el nodo de destino de los datasets federados.

Expand Down
69 changes: 54 additions & 15 deletions pydatajson/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@
from ckanapi.errors import NotFound
from .ckan_utils import map_dataset_to_package, map_theme_to_group
from .search import get_datasets
from .helpers import resource_files_download

logger = logging.getLogger('pydatajson.federation')


def push_dataset_to_ckan(catalog, owner_org, dataset_origin_identifier,
portal_url, apikey, catalog_id=None,
demote_superThemes=True, demote_themes=True):
demote_superThemes=True, demote_themes=True,
download_strategy=None):
"""Escribe la metadata de un dataset en el portal pasado por parámetro.
Args:
Expand All @@ -33,6 +35,10 @@ def push_dataset_to_ckan(catalog, owner_org, dataset_origin_identifier,
themes del dataset, se propagan como grupo.
demote_themes(bool): Si está en true, los labels de los themes
del dataset, pasan a ser tags. Sino, se pasan como grupo.
download_strategy(callable): Una función (catálogo, distribución)->
bool. Sobre las distribuciones que evalúa True, descarga el
recurso en el downloadURL y lo sube al portal de destino.
Por default no sube ninguna distribución.
Returns:
str: El id del dataset en el catálogo de destino.
"""
Expand Down Expand Up @@ -64,11 +70,17 @@ def push_dataset_to_ckan(catalog, owner_org, dataset_origin_identifier,
pushed_package = ckan_portal.call_action(
'package_create', data_dict=package)

if download_strategy:
with resource_files_download(catalog, dataset.get('distribution', []),
download_strategy) as resource_files:
resources_upload(portal_url, apikey, resource_files,
catalog_id=catalog_id)

ckan_portal.close()
return pushed_package['id']


def resources_upload(portal_url, apikey, resource_files):
def resources_upload(portal_url, apikey, resource_files, catalog_id=None):
"""Sube archivos locales a sus distribuciones correspondientes en el portal
pasado por parámetro.
Expand All @@ -78,15 +90,18 @@ def resources_upload(portal_url, apikey, resource_files):
permitan crear o actualizar el dataset.
resource_files(dict): Diccionario con entradas
id_de_distribucion:path_al_recurso a subir
catalog_id(str): prependea el id al id del recurso para
encontrarlo antes de subirlo
Returns:
list: los ids de los recursos modificados
"""
ckan_portal = RemoteCKAN(portal_url, apikey=apikey)
res = []
for resource in resource_files:
resource_id = catalog_id + '_' + resource if catalog_id else resource
try:
pushed = ckan_portal.action.resource_patch(
id=resource,
id=resource_id,
resource_type='file.upload',
upload=open(resource_files[resource], 'rb'))
res.append(pushed['id'])
Expand Down Expand Up @@ -199,7 +214,7 @@ def push_theme_to_ckan(catalog, portal_url, apikey,


def restore_dataset_to_ckan(catalog, owner_org, dataset_origin_identifier,
portal_url, apikey):
portal_url, apikey, download_strategy=None):
"""Restaura la metadata de un dataset en el portal pasado por parámetro.
Args:
Expand All @@ -210,15 +225,22 @@ def restore_dataset_to_ckan(catalog, owner_org, dataset_origin_identifier,
portal_url (str): La URL del portal CKAN de destino.
apikey (str): La apikey de un usuario con los permisos que le
permitan crear o actualizar el dataset.
download_strategy(callable): Una función (catálogo, distribución)->
bool. Sobre las distribuciones que evalúa True, descarga el
recurso en el downloadURL y lo sube al portal de destino.
Por default no sube ninguna distribución.
Returns:
str: El id del dataset restaurado.
"""
return push_dataset_to_ckan(catalog, owner_org, dataset_origin_identifier,
portal_url, apikey, None, False, False)

return push_dataset_to_ckan(catalog, owner_org,
dataset_origin_identifier, portal_url,
apikey, None, False, False, download_strategy)


def harvest_dataset_to_ckan(catalog, owner_org, dataset_origin_identifier,
portal_url, apikey, catalog_id):
portal_url, apikey, catalog_id,
download_strategy=None):
"""Federa la metadata de un dataset en el portal pasado por parámetro.
Args:
Expand All @@ -229,17 +251,22 @@ def harvest_dataset_to_ckan(catalog, owner_org, dataset_origin_identifier,
portal_url (str): La URL del portal CKAN de destino.
apikey (str): La apikey de un usuario con los permisos que le
permitan crear o actualizar el dataset.
catalog_id(str): El id que prep
catalog_id(str): El id que prependea al dataset y recursos
download_strategy(callable): Una función (catálogo, distribución)->
bool. Sobre las distribuciones que evalúa True, descarga el
recurso en el downloadURL y lo sube al portal de destino.
Por default no sube ninguna distribución.
Returns:
str: El id del dataset restaurado.
"""

return push_dataset_to_ckan(catalog, owner_org, dataset_origin_identifier,
portal_url, apikey, catalog_id=catalog_id)
portal_url, apikey, catalog_id=catalog_id,
download_strategy=download_strategy)


def restore_catalog_to_ckan(catalog, owner_org, portal_url, apikey,
dataset_list=None):
dataset_list=None, download_strategy=None):
"""Restaura los datasets de un catálogo al portal pasado por parámetro.
Si hay temas presentes en el DataJson que no están en el portal de
CKAN, los genera.
Expand All @@ -253,6 +280,10 @@ def restore_catalog_to_ckan(catalog, owner_org, portal_url, apikey,
se pasa una lista, todos los datasests se restauran.
owner_org (str): La organización a la cual pertencen los datasets.
Si no se pasa, se utiliza el catalog_id.
download_strategy(callable): Una función (catálogo, distribución)->
bool. Sobre las distribuciones que evalúa True, descarga el
recurso en el downloadURL y lo sube al portal de destino.
Por default no sube ninguna distribución.
Returns:
str: El id del dataset en el catálogo de destino.
"""
Expand All @@ -261,14 +292,16 @@ def restore_catalog_to_ckan(catalog, owner_org, portal_url, apikey,
for ds in catalog.datasets]
restored = []
for dataset_id in dataset_list:
restored_id = restore_dataset_to_ckan(
catalog, owner_org, dataset_id, portal_url, apikey)
restored_id = restore_dataset_to_ckan(catalog, owner_org, dataset_id,
portal_url, apikey,
download_strategy)
restored.append(restored_id)
return restored


def harvest_catalog_to_ckan(catalog, portal_url, apikey, catalog_id,
dataset_list=None, owner_org=None):
dataset_list=None, owner_org=None,
download_strategy=None):
"""Federa los datasets de un catálogo al portal pasado por parámetro.
Args:
Expand All @@ -282,6 +315,10 @@ def harvest_catalog_to_ckan(catalog, portal_url, apikey, catalog_id,
se pasa una lista, todos los datasests se federan.
owner_org (str): La organización a la cual pertencen los datasets.
Si no se pasa, se utiliza el catalog_id.
download_strategy(callable): Una función (catálogo, distribución)->
bool. Sobre las distribuciones que evalúa True, descarga el
recurso en el downloadURL y lo sube al portal de destino.
Por default no sube ninguna distribución.
Returns:
str: El id del dataset en el catálogo de destino.
"""
Expand All @@ -293,8 +330,10 @@ def harvest_catalog_to_ckan(catalog, portal_url, apikey, catalog_id,
errors = {}
for dataset_id in dataset_list:
try:
harvested_id = harvest_dataset_to_ckan(
catalog, owner_org, dataset_id, portal_url, apikey, catalog_id)
harvested_id = harvest_dataset_to_ckan(catalog, owner_org,
dataset_id, portal_url,
apikey, catalog_id,
download_strategy)
harvested.append(harvested_id)
except Exception as e:
msg = "Error federando catalogo: %s, dataset: %s al portal: %s\n"\
Expand Down
43 changes: 43 additions & 0 deletions pydatajson/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,20 @@
import os
import json
import re
import logging
import tempfile

from contextlib import contextmanager
from openpyxl import load_workbook
from six.moves.urllib_parse import urlparse

from six import string_types, iteritems
from unidecode import unidecode

from pydatajson.download import download_to_file

logger = logging.getLogger('pydatajson.helpers')

ABSOLUTE_PROJECT_DIR = os.path.dirname(os.path.abspath(__file__))
ABSOLUTE_SCHEMA_DIR = os.path.join(ABSOLUTE_PROJECT_DIR, "schemas")
STOP_WORDS = [
Expand Down Expand Up @@ -392,3 +399,39 @@ def pprint(result):
result, indent=4, separators=(",", ": "),
ensure_ascii=False
)))


@contextmanager
def resource_files_download(catalog, distributions, download_strategy):
resource_files = {}
distributions = [dist for dist in distributions if
download_strategy(catalog, dist)]
for dist in distributions:
try:
tmpfile = tempfile.NamedTemporaryFile(delete=False)
tmpfile.close()
download_to_file(dist['downloadURL'], tmpfile.name)
resource_files[dist['identifier']] = tmpfile.name
except Exception as e:
logger.exception(
"Error descargando el recurso {} de la distribución {}: {}"
.format(dist.get('downloadURL'),
dist.get('identifier'), str(e))
)
continue
try:
yield resource_files

finally:
for resource in resource_files:
os.remove(resource_files[resource])


def is_local_andino_resource(catalog, distribution):
dist_type = distribution.get('type')
if dist_type is not None:
return dist_type == 'file.upload'
homepage = catalog.get('homepage')
if homepage is not None:
return distribution.get('downloadURL', '').startswith(homepage)
return False
37 changes: 37 additions & 0 deletions tests/test_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,43 @@ def test_resource_upload_error(self, mock_portal):
)
self.assertEqual([], res)

@patch('pydatajson.helpers.download_to_file')
def test_push_dataset_upload_strategy(self, mock_download, mock_portal):
def mock_call_action(action, data_dict=None):
if action == 'package_update':
return data_dict
else:
return []
mock_portal.return_value.call_action = mock_call_action
push_dataset_to_ckan(
self.catalog,
'owner',
self.dataset_id,
'portal',
'key',
download_strategy=(lambda _, x: x['identifier'] == '1.1'))
mock_portal.return_value.action.resource_patch.assert_called_with(
id='1.1',
resource_type='file.upload',
upload=ANY
)

def test_push_dataset_upload_empty_strategy(self, mock_portal):
def mock_call_action(action, data_dict=None):
if action == 'package_update':
return data_dict
else:
return []
mock_portal.return_value.call_action = mock_call_action
push_dataset_to_ckan(
self.catalog,
'owner',
self.dataset_id,
'portal',
'key',
download_strategy=(lambda _, __: False))
mock_portal.return_value.action.resource_patch.not_called()


class RemoveDatasetTestCase(FederationSuite):

Expand Down

0 comments on commit d7dfaa9

Please sign in to comment.