Skip to content

Commit

Permalink
Merge 695e085 into e4a3fc2
Browse files Browse the repository at this point in the history
  • Loading branch information
lrromero committed Nov 23, 2018
2 parents e4a3fc2 + 695e085 commit ce281f1
Show file tree
Hide file tree
Showing 5 changed files with 274 additions and 43 deletions.
57 changes: 57 additions & 0 deletions docs/MANUAL.md
Expand Up @@ -444,6 +444,29 @@ Toma los siguientes parámetros:

Retorna el id en el nodo de destino de los datasets federados.

- **pydatajson.DataJson.restore_organization_to_ckan()**: Restaura los datasets de una organización al portal pasado por
parámetro. Toma los siguientes parámetros:
- **catalog**: El catálogo de origen que se restaura.
- **portal_url**: La URL del portal CKAN de destino.
- **apikey**: La apikey de un usuario con los permisos que le permitan crear o actualizar los dataset.
- **dataset_list**: Los ids de los datasets a restaurar. Si no se pasa una lista, todos los datasests se restauran.
- **owner_org**: La organización a la cual pertencen los datasets.
- **download_strategy**: Una función (catálogo, distribución)->bool. Sobre las distribuciones que evalúa True,
descarga el recurso en el downloadURL y lo sube al portal de destino. Por default no sube ninguna distribución.

Retorna la lista de ids de datasets subidos.

- **pydatajson.DataJson.restore_catalog_to_ckan()**: Restaura los datasets de un catálogo al portal pasado por parámetro.
Toma los siguientes parámetros:
- **catalog**: El catálogo de origen que se restaura.
- **origin_portal_url**: La URL del portal CKAN de origen.
- **destination_portal_url**: La URL del portal CKAN de destino.
- **apikey**: La apikey de un usuario con los permisos que le permitan crear o actualizar los dataset.
- **download_strategy**: Una función (catálogo, distribución)-> bool. Sobre las distribuciones que evalúa True,
descarga el recurso en el downloadURL y lo sube al portal de destino. Por default no sube ninguna distribución.

Retorna un diccionario con key organización y value la lista de ids de datasets subidos a esa organización

- **pydatajson.federation.resources_upload()**: Sube archivos de recursos a las distribuciones indicadas.
Toma los siguientes parámetros:
- **portal_url**: URL del portal de CKAN de destino.
Expand Down Expand Up @@ -716,3 +739,37 @@ OK | OK | 16 | 56
Por favor, consulte el informe [`datasets.csv`](datasets.csv).
```

## Anexo II: Restaurar un catálogo

El primer paso es replicar la estructura de organizaciones del catálogo original al catálogo destino. Asumiendo que
los nombres e ids de las organizaciones del original no se utilizan en el portal donde se replican:

```python
from pydatajson.federation import get_organizations_from_ckan, push_organization_tree_to_ckan
arbol_original = get_organizations_from_ckan('url_portal_original')
arbol_replicado = push_organization_tree_to_ckan('url_portal_destino', 'apikey', arbol_original)
```

Para cada organización en `arbol_replicado`, el campo `success` tiene un booleano que marca si fue subida exitosamente.
Con las organizaciones replicadas podemos restaurar la data y metadata del catálogo orginal:

```python
from pydatajson.core import DataJson
from pydatajson.helpers import is_local_andino_resource

original = DataJson('portal-original/data.json')
pushed_datasets =original.restore_catalog_to_ckan('portal-original-url', 'portal-destino-url', 'apikey',
download_strategy=is_local_andino_resource)
```

Si pasamos `download_strategy=None`, tan solo se restaura la metadata. `is_local_andino_resource` es una función
auxiliar que toma una distribución y un catálogo y realiza las siguientes validaciones:

-1: Chequea que el campo `type` sea `file.upload`

-2: Si la distribución no tiene campo `type`, chequea que el `downloadURL` comience con el `homepage` del catálogo

Si se cumple alguna de las condiciones, descarga el recurso y lo sube al portal de destino. Tambien es posible definir
una función propia como estrategia para carga y descarga de archivos. Esta función debe tomar una distribución, un
catálogo y devolver un booleano.
11 changes: 6 additions & 5 deletions pydatajson/ckan_utils.py
Expand Up @@ -90,11 +90,12 @@ def map_dataset_to_package(catalog, dataset, owner_org, catalog_id=None,
logger.exception('Theme no presente en catálogo.')
continue
else:
package['groups'] = package.get('groups', []) + [
{'name': title_to_name(theme, decode=False)}
for theme in themes
]

package.setdefault('groups', [])
for theme in themes:
theme_dict = catalog.get_theme(identifier=theme) or\
catalog.get_theme(label=theme)
if theme_dict:
package['groups'].append(map_theme_to_group(theme_dict))
return package


Expand Down
130 changes: 93 additions & 37 deletions pydatajson/federation.py
Expand Up @@ -7,7 +7,7 @@
from __future__ import print_function, unicode_literals
import logging
from ckanapi import RemoteCKAN
from ckanapi.errors import NotFound
from ckanapi.errors import NotFound, CKANAPIError
from .ckan_utils import map_dataset_to_package, map_theme_to_group
from .search import get_datasets
from .helpers import resource_files_download
Expand Down Expand Up @@ -265,40 +265,6 @@ def harvest_dataset_to_ckan(catalog, owner_org, dataset_origin_identifier,
download_strategy=download_strategy)


def restore_catalog_to_ckan(catalog, owner_org, portal_url, apikey,
dataset_list=None, download_strategy=None):
"""Restaura los datasets de un catálogo al portal pasado por parámetro.
Si hay temas presentes en el DataJson que no están en el portal de
CKAN, los genera.
Args:
catalog (DataJson): El catálogo de origen que se restaura.
portal_url (str): La URL del portal CKAN de destino.
apikey (str): La apikey de un usuario con los permisos que le
permitan crear o actualizar el dataset.
dataset_list(list(str)): Los ids de los datasets a restaurar. Si no
se pasa una lista, todos los datasests se restauran.
owner_org (str): La organización a la cual pertencen los datasets.
Si no se pasa, se utiliza el catalog_id.
download_strategy(callable): Una función (catálogo, distribución)->
bool. Sobre las distribuciones que evalúa True, descarga el
recurso en el downloadURL y lo sube al portal de destino.
Por default no sube ninguna distribución.
Returns:
str: El id del dataset en el catálogo de destino.
"""
push_new_themes(catalog, portal_url, apikey)
dataset_list = dataset_list or [ds['identifier']
for ds in catalog.datasets]
restored = []
for dataset_id in dataset_list:
restored_id = restore_dataset_to_ckan(catalog, owner_org, dataset_id,
portal_url, apikey,
download_strategy)
restored.append(restored_id)
return restored


def harvest_catalog_to_ckan(catalog, portal_url, apikey, catalog_id,
dataset_list=None, owner_org=None,
download_strategy=None):
Expand All @@ -323,10 +289,14 @@ def harvest_catalog_to_ckan(catalog, portal_url, apikey, catalog_id,
str: El id del dataset en el catálogo de destino.
"""
# Evitar entrar con valor falsy
harvested = []
if dataset_list is None:
dataset_list = [ds['identifier'] for ds in catalog.datasets]
try:
dataset_list = [ds['identifier'] for ds in catalog.datasets]
except KeyError:
logger.exception('Hay datasets sin identificadores')
return harvested
owner_org = owner_org or catalog_id
harvested = []
errors = {}
for dataset_id in dataset_list:
try:
Expand Down Expand Up @@ -477,3 +447,89 @@ def remove_organization_from_ckan(portal_url, apikey, organization_id):
except Exception as e:
logger.exception('Ocurrió un error borrando la organización {}: {}'
.format(organization_id, str(e)))


def restore_organization_to_ckan(catalog, owner_org, portal_url, apikey,
dataset_list=None, download_strategy=None):
"""Restaura los datasets de la organización de un catálogo al portal pasado
por parámetro. Si hay temas presentes en el DataJson que no están en el
portal de CKAN, los genera.
Args:
catalog (DataJson): El catálogo de origen que se restaura.
portal_url (str): La URL del portal CKAN de destino.
apikey (str): La apikey de un usuario con los permisos que le
permitan crear o actualizar el dataset.
dataset_list(list(str)): Los ids de los datasets a restaurar. Si no
se pasa una lista, todos los datasests se restauran.
owner_org (str): La organización a la cual pertencen los datasets.
download_strategy(callable): Una función (catálogo, distribución)->
bool. Sobre las distribuciones que evalúa True, descarga el
recurso en el downloadURL y lo sube al portal de destino.
Por default no sube ninguna distribución.
Returns:
list(str): La lista de ids de datasets subidos.
"""
push_new_themes(catalog, portal_url, apikey)
restored = []
if dataset_list is None:
try:
dataset_list = [ds['identifier'] for ds in catalog.datasets]
except KeyError:
logger.exception('Hay datasets sin identificadores')
return restored

for dataset_id in dataset_list:
try:
restored_id = restore_dataset_to_ckan(catalog, owner_org,
dataset_id, portal_url,
apikey, download_strategy)
restored.append(restored_id)
except (CKANAPIError, KeyError, AttributeError) as e:
logger.exception('Ocurrió un error restaurando el dataset {}: {}'
.format(dataset_id, str(e)))
return restored


def restore_catalog_to_ckan(catalog, origin_portal_url, destination_portal_url,
apikey, download_strategy=None):
"""Restaura los datasets de un catálogo original al portal pasado
por parámetro. Si hay temas presentes en el DataJson que no están en
el portal de CKAN, los genera.
Args:
catalog (DataJson): El catálogo de origen que se restaura.
origin_portal_url (str): La URL del portal CKAN de origen.
destination_portal_url (str): La URL del portal CKAN de
destino.
apikey (str): La apikey de un usuario con los permisos que le
permitan crear o actualizar el dataset.
download_strategy(callable): Una función
(catálogo, distribución)-> bool. Sobre las distribuciones
que evalúa True, descarga el recurso en el downloadURL y lo
sube al portal de destino. Por default no sube ninguna
distribución.
Returns:
dict: Diccionario con key organización y value la lista de ids
de datasets subidos a esa organización
"""
catalog['homepage'] = catalog.get('homepage') or origin_portal_url
res = {}
origin_portal = RemoteCKAN(origin_portal_url)
try:
org_list = origin_portal.action.organization_list()
except CKANAPIError as e:
logger.exception(
'Ocurrió un error buscando las organizaciones del portal {}: {}'
.format(origin_portal_url, str(e)))
return res

for org in org_list:
response = origin_portal.action.organization_show(
id=org, include_datasets=True)
datasets = [package['id'] for package in response['packages']]
pushed_datasets = restore_organization_to_ckan(
catalog, org, destination_portal_url, apikey,
dataset_list=datasets, download_strategy=download_strategy)
res[org] = pushed_datasets
return res
7 changes: 6 additions & 1 deletion pydatajson/helpers.py
Expand Up @@ -408,8 +408,13 @@ def resource_files_download(catalog, distributions, download_strategy):
download_strategy(catalog, dist)]
for dist in distributions:
try:
tmpfile = tempfile.NamedTemporaryFile(delete=False)
tmpdir = tempfile.mkdtemp()
tmpfile = tempfile.NamedTemporaryFile(delete=False, dir=tmpdir)
tmpfile.close()
file_name = dist.get('fileName') or \
dist['downloadURL'].split('/')[-1]
os.rename(tmpfile.name, os.path.join(tmpdir, file_name))
tmpfile.name = os.path.join(tmpdir, file_name)
download_to_file(dist['downloadURL'], tmpfile.name)
resource_files[dist['identifier']] = tmpfile.name
except Exception as e:
Expand Down
112 changes: 112 additions & 0 deletions tests/test_federation.py
Expand Up @@ -638,3 +638,115 @@ def test_remove_organization_logs_failures(self, mock_logger, mock_portal):
'organization_purge', data_dict={'id': 'test_id'})
mock_logger.assert_called_with(
'Ocurrió un error borrando la organización test_id: test')


@patch('pydatajson.federation.push_dataset_to_ckan')
class RestoreToCKANTestCase(FederationSuite):

@classmethod
def setUpClass(cls):
cls.catalog = pydatajson.DataJson(cls.get_sample('full_data.json'))
cls.catalog_id = cls.catalog.get('identifier', re.sub(
r'[^a-z-_]+', '', cls.catalog['title']).lower())
cls.dataset = cls.catalog.datasets[0]
cls.dataset_id = cls.dataset['identifier']

def test_restore_dataset_to_ckan(self, mock_push):
def test_strategy(_catalog, _dist):
return False
restore_dataset_to_ckan(self.catalog, 'owner_org', self.dataset_id,
'portal', 'apikey', test_strategy)
mock_push.assert_called_with(self.catalog, 'owner_org',
self.dataset_id, 'portal', 'apikey', None,
False, False, test_strategy)

@patch('pydatajson.federation.push_new_themes')
def test_restore_organization_to_ckan(self, mock_push_thm, mock_push_dst):
identifiers = [ds['identifier'] for ds in self.catalog.datasets]
mock_push_dst.side_effect = identifiers
pushed = restore_organization_to_ckan(self.catalog, 'owner_org',
'portal', 'apikey', identifiers)
self.assertEqual(identifiers, pushed)
mock_push_thm.assert_called_with(self.catalog, 'portal', 'apikey')
for identifier in identifiers:
mock_push_dst.assert_any_call(self.catalog, 'owner_org',
identifier, 'portal', 'apikey', None,
False, False, None)

@patch('pydatajson.federation.push_new_themes')
def test_restore_failing_organization_to_ckan(self, mock_push_thm,
mock_push_dst):
# Continua subiendo el segundo dataset a pesar que el primero falla
effects = [CKANAPIError('broken dataset'),
self.catalog.datasets[1]['identifier']]
mock_push_dst.side_effect = effects
identifiers = [ds['identifier'] for ds in self.catalog.datasets]
pushed = restore_organization_to_ckan(self.catalog, 'owner_org',
'portal', 'apikey', identifiers)
self.assertEqual([identifiers[1]], pushed)
mock_push_thm.assert_called_with(self.catalog, 'portal', 'apikey')
mock_push_dst.assert_called_with(self.catalog, 'owner_org',
identifiers[1], 'portal', 'apikey',
None, False, False, None)

@patch('pydatajson.federation.push_new_themes')
@patch('ckanapi.remoteckan.ActionShortcut')
def test_restore_catalog_to_ckan(self, mock_action, mock_push_thm,
mock_push_dst):
identifiers = [ds['identifier'] for ds in self.catalog.datasets]
mock_action.return_value.organization_list.return_value = \
['org_1', 'org_2']
mock_action.return_value.organization_show.side_effect = [
{'packages': [{'id': identifiers[0]}]},
{'packages': [{'id': identifiers[1]}]},
]
mock_push_dst.side_effect = (lambda *args, **kwargs: args[2])
pushed = restore_catalog_to_ckan(self.catalog, 'origin',
'destination', 'apikey')
mock_push_dst.assert_any_call(self.catalog, 'org_1',
identifiers[0], 'destination', 'apikey',
None, False, False, None)
mock_push_dst.assert_any_call(self.catalog, 'org_2',
identifiers[1], 'destination', 'apikey',
None, False, False, None)
expected = {'org_1': [identifiers[0]],
'org_2': [identifiers[1]]}
self.assertDictEqual(expected, pushed)

@patch('pydatajson.federation.push_new_themes')
@patch('ckanapi.remoteckan.ActionShortcut')
def test_restore_catalog_failing_origin_portal(
self, mock_action, mock_push_thm, mock_push_dst):
mock_action.return_value.organization_list.side_effect = \
CKANAPIError('Broken origin portal')
pushed = restore_catalog_to_ckan(self.catalog, 'origin',
'destination', 'apikey')
self.assertDictEqual({}, pushed)
mock_push_thm.assert_not_called()
mock_push_dst.assert_not_called()

@patch('pydatajson.federation.push_new_themes')
@patch('ckanapi.remoteckan.ActionShortcut')
def test_restore_catalog_failing_destination_portal(
self, mock_action, mock_push_thm, mock_push_dst):

identifiers = [ds['identifier'] for ds in self.catalog.datasets]
mock_action.return_value.organization_list.return_value = \
['org_1', 'org_2']
mock_action.return_value.organization_show.side_effect = [
{'packages': [{'id': identifiers[0]}]},
{'packages': [{'id': identifiers[1]}]},
]
mock_push_dst.side_effect = CKANAPIError('Broken destination portal')

pushed = restore_catalog_to_ckan(self.catalog, 'origin',
'destination', 'apikey')
mock_push_dst.assert_any_call(self.catalog, 'org_1',
identifiers[0], 'destination', 'apikey',
None, False, False, None)
mock_push_dst.assert_any_call(self.catalog, 'org_2',
identifiers[1], 'destination', 'apikey',
None, False, False, None)
expected = {'org_1': [],
'org_2': []}
self.assertDictEqual(expected, pushed)

0 comments on commit ce281f1

Please sign in to comment.