Skip to content

Commit

Permalink
Merge 038a70e into ebec539
Browse files Browse the repository at this point in the history
  • Loading branch information
lrromero committed Dec 5, 2018
2 parents ebec539 + 038a70e commit 586ab0d
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 68 deletions.
26 changes: 21 additions & 5 deletions docs/MANUAL.md
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,10 @@ Toma los siguientes parámetros:
- **download_strategy** (opcional, default None): La referencia a una función que toma (catalog, distribution) de
entrada y devuelve un booleano. Esta función se aplica sobre todas las distribuciones del dataset. Si devuelve `True`,
se descarga el archivo indicado en el `downloadURL` de la distribución y se lo sube al portal de destino. Si es None,
se omite esta operación.
se omite esta operación.
- **generate_new_access_url** (opcional, default None): Se pasan los ids de las distribuciones cuyo accessURL se regenerar en el portal de
destino. Para el resto, el portal debe mantiene el valor pasado en el DataJson.



Retorna el id en el nodo de destino del dataset federado.
Expand Down Expand Up @@ -423,6 +426,9 @@ Toma los siguientes parámetros:
entrada y devuelve un booleano. Esta función se aplica sobre todas las distribuciones del dataset. Si devuelve `True`,
se descarga el archivo indicado en el `downloadURL` de la distribución y se lo sube al portal de destino. Si es None,
se omite esta operación.
- **generate_new_access_url** (opcional, default None): Se pasan los ids de las distribuciones cuyo accessURL se regenerar en el portal de
destino. Para el resto, el portal debe mantiene el valor pasado en el DataJson.


Retorna el id del dataset restaurado.

Expand Down Expand Up @@ -453,6 +459,8 @@ parámetro. Toma los siguientes parámetros:
- **owner_org**: La organización a la cual pertencen los datasets.
- **download_strategy**: Una función (catálogo, distribución)->bool. Sobre las distribuciones que evalúa True,
descarga el recurso en el downloadURL y lo sube al portal de destino. Por default no sube ninguna distribución.
- **generate_new_access_url** (opcional, default None): Se pasan los ids de las distribuciones cuyo accessURL se regenerar en el portal de
destino. Para el resto, el portal debe mantiene el valor pasado en el DataJson.

Retorna la lista de ids de datasets subidos.

Expand All @@ -463,19 +471,27 @@ Toma los siguientes parámetros:
- **destination_portal_url**: La URL del portal CKAN de destino.
- **apikey**: La apikey de un usuario con los permisos que le permitan crear o actualizar los dataset.
- **download_strategy**: Una función (catálogo, distribución)-> bool. Sobre las distribuciones que evalúa True,
descarga el recurso en el downloadURL y lo sube al portal de destino. Por default no sube ninguna distribución.
descarga el recurso en el downloadURL y lo sube al portal de destino. Por default no sube ninguna distribución.
- **generate_new_access_url** (opcional, default None): Se pasan los ids de las distribuciones cuyo accessURL se regenerar en el portal de
destino. Para el resto, el portal debe mantiene el valor pasado en el DataJson.

Retorna un diccionario con key organización y value la lista de ids de datasets subidos a esa organización

- **pydatajson.federation.resources_upload()**: Sube archivos de recursos a las distribuciones indicadas.
Toma los siguientes parámetros:
- **pydatajson.federation.resources_update()**: Sube archivos de recursos a las distribuciones indicadas y regenera los
accessURL en las distribuciones indicadas. Toma los siguientes parámetros:
- **portal_url**: URL del portal de CKAN de destino.
- **apikey**: La apikey de un usuario del portal de destino con los permisos para modificar la distribución.
- **distributions**: Lista de distribuciones posibles para actualizar.
- **resource_files** Diccionario con el id de las distribuciones y un path al recurso correspondiente a subir.
- **generate_new_access_url** (opcional, default None): Lista de ids de distribuciones a las cuales se actualizará el
accessURL con los valores generados por el portal de destino.
- **catalog_id** (opcional, default None): prependea el id al id del recurso para encontrarlo antes de subirlo si es
necesario.


Retorna una lista con los ids de las distribuciones modificadas exitosamente.

**Advertencia**: La función `resources_upload()` cambia el `resource_type` de las distribuciones a `file.upload`.
**Advertencia**: La función `resources_update()` cambia el `resource_type` de las distribuciones a `file.upload`.

### Métodos para manejo de organizaciones

Expand Down
97 changes: 70 additions & 27 deletions pydatajson/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
def push_dataset_to_ckan(catalog, owner_org, dataset_origin_identifier,
portal_url, apikey, catalog_id=None,
demote_superThemes=True, demote_themes=True,
download_strategy=None):
download_strategy=None, generate_new_access_url=None):
"""Escribe la metadata de un dataset en el portal pasado por parámetro.
Args:
Expand All @@ -39,6 +39,10 @@ def push_dataset_to_ckan(catalog, owner_org, dataset_origin_identifier,
bool. Sobre las distribuciones que evalúa True, descarga el
recurso en el downloadURL y lo sube al portal de destino.
Por default no sube ninguna distribución.
generate_new_access_url(list): Se pasan los ids de las
distribuciones cuyo accessURL se regenerar en el portal de
destino. Para el resto, el portal debe mantiene el valor pasado
en el DataJson.
Returns:
str: El id del dataset en el catálogo de destino.
"""
Expand Down Expand Up @@ -70,46 +74,65 @@ def push_dataset_to_ckan(catalog, owner_org, dataset_origin_identifier,
pushed_package = ckan_portal.call_action(
'package_create', data_dict=package)

if download_strategy:
with resource_files_download(catalog, dataset.get('distribution', []),
download_strategy) as resource_files:
resources_upload(portal_url, apikey, resource_files,
catalog_id=catalog_id)
with resource_files_download(catalog, dataset.get('distribution', []),
download_strategy) as resource_files:
resources_update(portal_url, apikey, dataset.get('distribution', []),
resource_files, generate_new_access_url, catalog_id)

ckan_portal.close()
return pushed_package['id']


def resources_upload(portal_url, apikey, resource_files, catalog_id=None):
def resources_update(portal_url, apikey, distributions,
resource_files, generate_new_access_url=None,
catalog_id=None):
"""Sube archivos locales a sus distribuciones correspondientes en el portal
pasado por parámetro.
Args:
portal_url (str): La URL del portal CKAN de destino.
apikey (str): La apikey de un usuario con los permisos que le
permitan crear o actualizar el dataset.
distributions(list): Lista de distribuciones posibles para
actualizar.
resource_files(dict): Diccionario con entradas
id_de_distribucion:path_al_recurso a subir
generate_new_access_url(list): Lista de ids de distribuciones a
las cuales se actualizará el accessURL con los valores
generados por el portal de destino
catalog_id(str): prependea el id al id del recurso para
encontrarlo antes de subirlo
Returns:
list: los ids de los recursos modificados
"""
ckan_portal = RemoteCKAN(portal_url, apikey=apikey)
res = []
for resource in resource_files:
resource_id = catalog_id + '_' + resource if catalog_id else resource
try:
pushed = ckan_portal.action.resource_patch(
id=resource_id,
resource_type='file.upload',
upload=open(resource_files[resource], 'rb'))
res.append(pushed['id'])
except Exception as e:
logger.exception(
"Error subiendo recurso {} a la distribución {}: {}"
.format(resource_files[resource], resource_files, str(e)))
return res
result = []
generate_new_access_url = generate_new_access_url or []
for distribution in distributions:
updated = False
resource_id = catalog_id + '_' + distribution['identifier']\
if catalog_id else distribution['identifier']
fields = {'id': resource_id}
if distribution['identifier'] in generate_new_access_url:
fields.update({'accessURL': ''})
updated = True
if distribution['identifier'] in resource_files:
fields.update({'resource_type': 'file.upload',
'upload':
open(resource_files[distribution['identifier']],
'rb')
})
updated = True
if updated:
try:
pushed = ckan_portal.action.resource_patch(**fields)
result.append(pushed['id'])
except CKANAPIError as e:
logger.exception(
"Error subiendo recurso {} a la distribución {}: {}"
.format(resource_files[distribution['identifier']],
resource_files, str(e)))
return result


def remove_dataset_from_ckan(identifier, portal_url, apikey):
Expand Down Expand Up @@ -214,7 +237,8 @@ def push_theme_to_ckan(catalog, portal_url, apikey,


def restore_dataset_to_ckan(catalog, owner_org, dataset_origin_identifier,
portal_url, apikey, download_strategy=None):
portal_url, apikey, download_strategy=None,
generate_new_access_url=None):
"""Restaura la metadata de un dataset en el portal pasado por parámetro.
Args:
Expand All @@ -229,13 +253,18 @@ def restore_dataset_to_ckan(catalog, owner_org, dataset_origin_identifier,
bool. Sobre las distribuciones que evalúa True, descarga el
recurso en el downloadURL y lo sube al portal de destino.
Por default no sube ninguna distribución.
generate_new_access_url(list): Se pasan los ids de las
distribuciones cuyo accessURL se regenerar en el portal de
destino. Para el resto, el portal debe mantiene el valor
pasado en el DataJson.
Returns:
str: El id del dataset restaurado.
"""

return push_dataset_to_ckan(catalog, owner_org,
dataset_origin_identifier, portal_url,
apikey, None, False, False, download_strategy)
apikey, None, False, False, download_strategy,
generate_new_access_url)


def harvest_dataset_to_ckan(catalog, owner_org, dataset_origin_identifier,
Expand Down Expand Up @@ -450,7 +479,9 @@ def remove_organization_from_ckan(portal_url, apikey, organization_id):


def restore_organization_to_ckan(catalog, owner_org, portal_url, apikey,
dataset_list=None, download_strategy=None):
dataset_list=None, download_strategy=None,
generate_new_access_url=None
):
"""Restaura los datasets de la organización de un catálogo al portal pasado
por parámetro. Si hay temas presentes en el DataJson que no están en el
portal de CKAN, los genera.
Expand All @@ -467,6 +498,10 @@ def restore_organization_to_ckan(catalog, owner_org, portal_url, apikey,
bool. Sobre las distribuciones que evalúa True, descarga el
recurso en el downloadURL y lo sube al portal de destino.
Por default no sube ninguna distribución.
generate_new_access_url(list): Se pasan los ids de las
distribuciones cuyo accessURL se regenerar en el portal de
destino. Para el resto, el portal debe mantiene el valor
pasado en el DataJson.
Returns:
list(str): La lista de ids de datasets subidos.
"""
Expand All @@ -483,7 +518,8 @@ def restore_organization_to_ckan(catalog, owner_org, portal_url, apikey,
try:
restored_id = restore_dataset_to_ckan(catalog, owner_org,
dataset_id, portal_url,
apikey, download_strategy)
apikey, download_strategy,
generate_new_access_url)
restored.append(restored_id)
except (CKANAPIError, KeyError, AttributeError) as e:
logger.exception('Ocurrió un error restaurando el dataset {}: {}'
Expand All @@ -492,7 +528,8 @@ def restore_organization_to_ckan(catalog, owner_org, portal_url, apikey,


def restore_catalog_to_ckan(catalog, origin_portal_url, destination_portal_url,
apikey, download_strategy=None):
apikey, download_strategy=None,
generate_new_access_url=None):
"""Restaura los datasets de un catálogo original al portal pasado
por parámetro. Si hay temas presentes en el DataJson que no están en
el portal de CKAN, los genera.
Expand All @@ -509,6 +546,10 @@ def restore_catalog_to_ckan(catalog, origin_portal_url, destination_portal_url,
que evalúa True, descarga el recurso en el downloadURL y lo
sube al portal de destino. Por default no sube ninguna
distribución.
generate_new_access_url(list): Se pasan los ids de las
distribuciones cuyo accessURL se regenerar en el portal de
destino. Para el resto, el portal debe mantiene el valor
pasado en el DataJson.
Returns:
dict: Diccionario con key organización y value la lista de ids
de datasets subidos a esa organización
Expand All @@ -530,6 +571,8 @@ def restore_catalog_to_ckan(catalog, origin_portal_url, destination_portal_url,
datasets = [package['id'] for package in response['packages']]
pushed_datasets = restore_organization_to_ckan(
catalog, org, destination_portal_url, apikey,
dataset_list=datasets, download_strategy=download_strategy)
dataset_list=datasets, download_strategy=download_strategy,
generate_new_access_url=generate_new_access_url
)
res[org] = pushed_datasets
return res
41 changes: 21 additions & 20 deletions pydatajson/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,26 +404,27 @@ def pprint(result):
@contextmanager
def resource_files_download(catalog, distributions, download_strategy):
resource_files = {}
distributions = [dist for dist in distributions if
download_strategy(catalog, dist)]
for dist in distributions:
try:
tmpdir = tempfile.mkdtemp()
tmpfile = tempfile.NamedTemporaryFile(delete=False, dir=tmpdir)
tmpfile.close()
file_name = dist.get('fileName') or \
dist['downloadURL'].split('/')[-1]
os.rename(tmpfile.name, os.path.join(tmpdir, file_name))
tmpfile.name = os.path.join(tmpdir, file_name)
download_to_file(dist['downloadURL'], tmpfile.name)
resource_files[dist['identifier']] = tmpfile.name
except Exception as e:
logger.exception(
"Error descargando el recurso {} de la distribución {}: {}"
.format(dist.get('downloadURL'),
dist.get('identifier'), str(e))
)
continue
if download_strategy is not None:
distributions = [dist for dist in distributions if
download_strategy(catalog, dist)]
for dist in distributions:
try:
tmpdir = tempfile.mkdtemp()
tmpfile = tempfile.NamedTemporaryFile(delete=False, dir=tmpdir)
tmpfile.close()
file_name = dist.get('fileName') or \
dist['downloadURL'].split('/')[-1]
os.rename(tmpfile.name, os.path.join(tmpdir, file_name))
tmpfile.name = os.path.join(tmpdir, file_name)
download_to_file(dist['downloadURL'], tmpfile.name)
resource_files[dist['identifier']] = tmpfile.name
except Exception as e:
logger.exception(
"Error descargando el recurso {} de la distribución {}: {}"
.format(dist.get('downloadURL'),
dist.get('identifier'), str(e))
)
continue
try:
yield resource_files

Expand Down

0 comments on commit 586ab0d

Please sign in to comment.