# Pruebas ETL de series de tiempo

In [2]:
import os
from copy import deepcopy
import pandas as pd
import pysftp
import requests
import yaml
from pydatajson import DataJson
import time 
from bs4 import BeautifulSoup
import dataset
from pprint import pprint

from xlseries import XlSeries
from pydatajson import readers
from sqlalchemy import create_engine

from scrape_datasets import scrape_distribution
from scrape_datasets import scrape_dataset, scrape_file
from paths import *

catalog_xlsx_path = os.path.join(DATOS_DIR, "catalogo-sspm.xlsx")
catalog_json_path = os.path.join(CATALOGS_DIR, "catalog", "sspm", "data.json")
test_catalog_json_path = os.path.join(DATOS_DIR, "tests", "catalog", "sspm", "data.json")
etl_params_path = os.path.join(DATOS_DIR, "params", "etl_params.csv")

etl_params = pd.read_csv(etl_params_path, dtype={"distribution_identifier": "str"})
catalog = readers.read_catalog(catalog_json_path)

cnopts = pysftp.CnOpts()
# cnopts.hostkeys.load('/Users/abenassi/.ssh/known_hosts')
cnopts.hostkeys = None
config_ind_path="config/config_ind.yaml"

with open(config_ind_path, 'r') as f:
        ind_params = yaml.load(f)

In [3]:
db_con_url = 'postgresql://{user}:{password}@{host}:{port}/{db}'.format(
        host=ind_params["db"]["host"],
        port=ind_params["db"]["port"],
        user=ind_params["db"]["user"],
        password=ind_params["db"]["pass"],
        db=ind_params["db"]["db"]
)
engine = create_engine(db_con_url)
db = dataset.connect(db_con_url)

In [4]:
db.tables

[u'archivo_permitidos',
 u'permisos_catalog',
 u'dataset',
 u'archivo',
 u'databasechangelog',
 u'ad_esp_permitidos',
 u'jhi_authority',
 u'ad_esp',
 u'field',
 u'jhi_persistent_audit_evt_data',
 u'jhi_user_authority',
 u'ad_esp_tecnica',
 u'catalog_tematica_global',
 u'catalog_permitidos',
 u'ad_attachment',
 u'dataset_tematica_especifica',
 u'ad_status',
 u'databasechangeloglock',
 u'dataset_permitidos',
 u'catalog_tematica_especifica',
 u'jhi_user',
 u'catalog',
 u'jhi_persistent_audit_event',
 u'dataset_tematica_global',
 u'distribution']

## Upload data y metadata

In [20]:
def do_ind_api_request(distribution_id_ind=None, req_type="data", 
                       config_ind="config/config_ind.yaml"):
    
    if isinstance(config_ind, dict):
        ind_params = config_ind
    else:
        with open(config_ind, 'r') as f:
            ind_params = yaml.load(f)

    # request original
    headers = {"Authorization": ind_params["api"]["auth_header"]}
    if req_type == "metadata":
        url = ind_params["api"][req_type]["url"]
    elif req_type == "data":
        url = ind_params["api"][req_type]["url"].format(distribution_id_ind)
    else:
        raise Exception("{} no es un tipo de request valido".format(
            req_type))
    print(url)

    res = requests.get(url, headers=headers)
    print(res.content)
    
    req_id = BeautifulSoup(res.content).find("id").get_text()
    
    # request de status con bloqueo hasta finalizar
    job_completed = False
    while not job_completed:
        headers_status = {
            "Authorization": ind_params["api"]["auth_header"]
        }
        url_status = ind_params["api"]["job_status"]["url"].format(req_id)
        res_status = requests.get(url_status, headers=headers_status)
        bs = BeautifulSoup(res_status.content)
        
        # chequea el estado para saber si seguir esperando o terminar
        try:
            status = bs.find("status_desc").get_text()
            if status == "Running":
                time.sleep(0.3)
            else:
                job_completed = True
        except:
            job_completed = True
    
    return bs

In [21]:
def upload_file_to_ind(local_path, remote_dir=None, file_name=None, config_ind="config/config_ind.yaml"):
    """Carga de un archivo a la infraestructura."""

    if isinstance(config_ind, dict):
        ind_params = config_ind
    else:
        with open(config_ind, 'r') as f:
            ind_params = yaml.load(f)

    remote_dir = remote_dir or '/home/{}'.format(ind_params["api"]["user"])

    with pysftp.Connection(ind_params["api"]["host"], username=ind_params["api"]["user"], 
                           password=ind_params["api"]["pass"], cnopts=cnopts) as sftp:
    
        with sftp.cd(remote_dir):
            if file_name:
                remote_path = os.path.join(remote_dir, file_name)

                print("Local: {} / Remote: {}".format(local_path, remote_path))
                res = sftp.put(local_path, remote_path)

                while not sftp.exists(remote_path):
                    time.sleep(0.2)

                return res
            else:
                res = sftp.put(local_path)

                return res

In [22]:
def upload_datajson_to_ind(local_path, config_ind_path="config/config_ind.yaml"):
    
    with open(config_ind_path, 'r') as f:
        ind_params = yaml.load(f)
        
    upload_file_to_ind(local_path, config_ind=ind_params)

    status = do_ind_api_request(req_type="metadata", config_ind=ind_params)

    return status

In [23]:
def upload_distribution_to_ind(local_path, distribution_id, config_ind_path="config/config_ind.yaml"):
    
    with open(config_ind_path, 'r') as f:
        ind_params = yaml.load(f)
        
    file_name = "{}.csv".format(distribution_id)
    upload_file_to_ind(local_path, file_name=file_name, config_ind=ind_params)

    status = do_ind_api_request(distribution_id_ind=distribution_id,
        req_type="data", config_ind=ind_params)

    return status

In [24]:
def get_distribution_ind_ids(config_ind_path="config/config_ind.yaml"):
    with open(config_ind_path, 'r') as f:
        ind_params = yaml.load(f)
        
    query = """
        SELECT
            ct.id_catalog, ct.nombre, ds.id_distribution, ds.identificador
        FROM distribution ds
            inner join dataset dt on (dt.id_dataset = ds.id_dataset)
            inner join catalog ct on (dt.id_catalog= ct.id_catalog)
        WHERE 
            ds.vigente = TRUE and 
            dt.vigente = TRUE and 
            ct.vigente = TRUE;
    """

    engine = create_engine('postgresql://{user}:{password}@{host}:{port}/modernizacion'.format(
        host=ind_params["db"]["host"],
        port=ind_params["db"]["port"],
        user=ind_params["db"]["user"],
        password=ind_params["db"]["pass"]
    ))
    
    index_cols = ["nombre", "identificador"]
    df_distribs = pd.read_sql_query(query, engine).dropna().sort_values(index_cols)
    df = df_distribs.drop_duplicates(index_cols, keep="last").set_index(index_cols)
    df.sort_index(level=index_cols, ascending=[1, 0], inplace=True)
    
    return df

In [25]:
def upload_distributions(config_ind_path="config/config_ind.yaml"):
    df = get_distribution_ind_ids(config_ind_path)
    
    status_uploads = {}
    for dataset in catalog["dataset"]:
        for distribution in dataset["distribution"]:
            try:
                id_distribution = df.loc[catalog["title"], distribution["identifier"]]["id_distribution"]

                distribution_path = os.path.join(
                    DATASETS_DIR, dataset["identifier"], "{}.csv".format(distribution["identifier"]))

                # renamed_path = "/Users/abenassi/github/series-tiempo/catalogo/datos/datasets_ind/{}.csv".format(id_distribution)
                # shutil.copyfile(distribution_path, renamed_path)
                status = upload_distribution_to_ind(distribution_path, id_distribution, config_ind_path)
                
                if not status in status_uploads:
                    status_uploads[status] = []
                status_uploads[status].append({
                    "dataset_identifier": dataset["identifier"],
                    "distribution_identifier": distribution["identifier"],
                    "distribution_id_ind": id_distribution
                })
                print(dataset["identifier"], distribution["identifier"], id_distribution, status)

            except Exception as e:
                if not str(e) in status_uploads:
                    status_uploads[str(e)] = []
                status_uploads[str(e)].append({
                    "dataset_identifier": dataset["identifier"],
                    "distribution_identifier": distribution["identifier"],
                    "distribution_id_ind": id_distribution
                })
                print(dataset["identifier"], distribution["identifier"], id_distribution, e)
                continue

    return status_uploads

In [14]:
catalog = readers.read_catalog(test_catalog_json_path)
catalog_json_path

u'/Users/abenassi/github/series-tiempo/data/output/catalog/sspm/data.json'

In [27]:
upload_datajson_to_ind(test_catalog_json_path)

http://192.168.150.221:9080/pentaho-di/kettle/runJob/?job=public%2Fmetadata-dinamica%2Fjb_bulkLoad_json&ID_USUARIO=n7/evuGi92aapvp5XgLnnQ==
<webresult>
  <result>OK</result>
  <message>Job started</message>
  <id>4ac6bcef-4b91-42fa-9c36-d5d00fa7dd1a</id>
</webresult>




<?xml version="1.0" encoding="UTF-8"?><html><body><jobstatus>\n<jobname></jobname>\n<id>4ac6bcef-4b91-42fa-9c36-d5d00fa7dd1a</id>\n<status_desc>Finished</status_desc>\n<error_desc></error_desc>\n<logging_string>&lt;![CDATA[H4sIAAAAAAAAAAMAAAAAAAAAAAA=]]&gt;</logging_string>\n<first_log_line_nr>0</first_log_line_nr>\n<last_log_line_nr>21</last_log_line_nr>\n</jobstatus>\n</body></html>

In [35]:
# sube distribuciones
status_distributions_2 = upload_distributions("config/config_ind.yaml")
for key, value in status_distributions_2.iteritems():
    print(key, len(value))

ProgrammingError: (psycopg2.ProgrammingError) permission denied for relation distribution
 [SQL: '\n        SELECT\n            ct.id_catalog, ct.nombre, ds.id_distribution, ds.identificador\n        FROM distribution ds\n            inner join dataset dt on (dt.id_dataset = ds.id_dataset)\n            inner join catalog ct on (dt.id_catalog= ct.id_catalog)\n        WHERE \n            ds.vigente = TRUE and \n            dt.vigente = TRUE and \n            ct.vigente = TRUE;\n    ']

In [30]:
# sube distribuciones
status_distributions = upload_distributions("config/config_ind.yaml")
for key, value in status_distributions.iteritems():
    print(key, len(value))

Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/1/1.1.csv / Remote: /home/abenassi/1016.csv
http://192.168.150.211:9080/pentaho-di/kettle/runJob/?job=public%2Fmetadata-dinamica%2Fjb_invocacion_api_ext&ID_DISTRIBUTION=1016&TOKEN=YWJlbmFzc2k6OjokMmEkMTAkWkJvOHJEOG9nbkJDSDB3dGFHREYxLkVLcVdoWUxPQzc1QXh3UDQxcFRtOFB4ZUVZdkdqNm0=
(u'1', u'1.1', 1016, u'Finished')
Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/1/1.2.csv / Remote: /home/abenassi/1017.csv
http://192.168.150.211:9080/pentaho-di/kettle/runJob/?job=public%2Fmetadata-dinamica%2Fjb_invocacion_api_ext&ID_DISTRIBUTION=1017&TOKEN=YWJlbmFzc2k6OjokMmEkMTAkWkJvOHJEOG9nbkJDSDB3dGFHREYxLkVLcVdoWUxPQzc1QXh3UDQxcFRtOFB4ZUVZdkdqNm0=
(u'1', u'1.2', 1017, u'Finished')
Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/2/2.1.csv / Remote: /home/abenassi/1039.csv
http://192.168.150.211:9080/pentaho-di/kettle/runJob/?job=public%2Fmetadata-dinamica%2Fjb_invocacion_api_ext&ID_DISTRIBUTION=1039&

(u'9', u'9.2', 1091, u'Stopped (with errors)')
Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/10/10.1.csv / Remote: /home/abenassi/1018.csv
http://192.168.150.211:9080/pentaho-di/kettle/runJob/?job=public%2Fmetadata-dinamica%2Fjb_invocacion_api_ext&ID_DISTRIBUTION=1018&TOKEN=YWJlbmFzc2k6OjokMmEkMTAkWkJvOHJEOG9nbkJDSDB3dGFHREYxLkVLcVdoWUxPQzc1QXh3UDQxcFRtOFB4ZUVZdkdqNm0=
(u'10', u'10.1', 1018, u'Stopped (with errors)')
Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/10/10.2.csv / Remote: /home/abenassi/1019.csv
http://192.168.150.211:9080/pentaho-di/kettle/runJob/?job=public%2Fmetadata-dinamica%2Fjb_invocacion_api_ext&ID_DISTRIBUTION=1019&TOKEN=YWJlbmFzc2k6OjokMmEkMTAkWkJvOHJEOG9nbkJDSDB3dGFHREYxLkVLcVdoWUxPQzc1QXh3UDQxcFRtOFB4ZUVZdkdqNm0=
(u'10', u'10.2', 1019, u'Finished')
Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/10/10.3.csv / Remote: /home/abenassi/1020.csv
http://192.168.150.211:9080/pentaho-di/kettle/runJob/?job=pu

(u'20', u'20.1', 1043, u'Stopped (with errors)')
Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/20/20.2.csv / Remote: /home/abenassi/1044.csv
http://192.168.150.211:9080/pentaho-di/kettle/runJob/?job=public%2Fmetadata-dinamica%2Fjb_invocacion_api_ext&ID_DISTRIBUTION=1044&TOKEN=YWJlbmFzc2k6OjokMmEkMTAkWkJvOHJEOG9nbkJDSDB3dGFHREYxLkVLcVdoWUxPQzc1QXh3UDQxcFRtOFB4ZUVZdkdqNm0=
(u'20', u'20.2', 1044, u'Stopped (with errors)')
Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/21/21.1.csv / Remote: /home/abenassi/1045.csv
http://192.168.150.211:9080/pentaho-di/kettle/runJob/?job=public%2Fmetadata-dinamica%2Fjb_invocacion_api_ext&ID_DISTRIBUTION=1045&TOKEN=YWJlbmFzc2k6OjokMmEkMTAkWkJvOHJEOG9nbkJDSDB3dGFHREYxLkVLcVdoWUxPQzc1QXh3UDQxcFRtOFB4ZUVZdkdqNm0=
(u'21', u'21.1', 1045, u'Stopped (with errors)')
Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/21/21.2.csv / Remote: /home/abenassi/1046.csv
http://192.168.150.211:9080/pentaho-di/kettle

(u'30', u'30.2', 1066, u'Stopped (with errors)')
Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/30/30.3.csv / Remote: /home/abenassi/1067.csv
http://192.168.150.211:9080/pentaho-di/kettle/runJob/?job=public%2Fmetadata-dinamica%2Fjb_invocacion_api_ext&ID_DISTRIBUTION=1067&TOKEN=YWJlbmFzc2k6OjokMmEkMTAkWkJvOHJEOG9nbkJDSDB3dGFHREYxLkVLcVdoWUxPQzc1QXh3UDQxcFRtOFB4ZUVZdkdqNm0=
(u'30', u'30.3', 1067, u'Stopped (with errors)')
Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/31/31.2.csv / Remote: /home/abenassi/1068.csv
http://192.168.150.211:9080/pentaho-di/kettle/runJob/?job=public%2Fmetadata-dinamica%2Fjb_invocacion_api_ext&ID_DISTRIBUTION=1068&TOKEN=YWJlbmFzc2k6OjokMmEkMTAkWkJvOHJEOG9nbkJDSDB3dGFHREYxLkVLcVdoWUxPQzc1QXh3UDQxcFRtOFB4ZUVZdkdqNm0=
(u'31', u'31.2', 1068, u'Halting')
Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/31/31.3.csv / Remote: /home/abenassi/1069.csv
http://192.168.150.211:9080/pentaho-di/kettle/runJob/?job=p

Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/39/39.2.csv / Remote: /home/abenassi/1183.csv
http://192.168.150.211:9080/pentaho-di/kettle/runJob/?job=public%2Fmetadata-dinamica%2Fjb_invocacion_api_ext&ID_DISTRIBUTION=1183&TOKEN=YWJlbmFzc2k6OjokMmEkMTAkWkJvOHJEOG9nbkJDSDB3dGFHREYxLkVLcVdoWUxPQzc1QXh3UDQxcFRtOFB4ZUVZdkdqNm0=
(u'39', u'39.2', 1183, u'Stopped (with errors)')
Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/39/39.3.csv / Remote: /home/abenassi/1184.csv
http://192.168.150.211:9080/pentaho-di/kettle/runJob/?job=public%2Fmetadata-dinamica%2Fjb_invocacion_api_ext&ID_DISTRIBUTION=1184&TOKEN=YWJlbmFzc2k6OjokMmEkMTAkWkJvOHJEOG9nbkJDSDB3dGFHREYxLkVLcVdoWUxPQzc1QXh3UDQxcFRtOFB4ZUVZdkdqNm0=
(u'39', u'39.3', 1184, u'Stopped (with errors)')
Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/40/40.1.csv / Remote: /home/abenassi/1185.csv
http://192.168.150.211:9080/pentaho-di/kettle/runJob/?job=public%2Fmetadata-dinamica%2Fjb_invo

(u'48', u'48.2', 1204, u'Stopped (with errors)')
Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/49/49.1.csv / Remote: /home/abenassi/1205.csv
http://192.168.150.211:9080/pentaho-di/kettle/runJob/?job=public%2Fmetadata-dinamica%2Fjb_invocacion_api_ext&ID_DISTRIBUTION=1205&TOKEN=YWJlbmFzc2k6OjokMmEkMTAkWkJvOHJEOG9nbkJDSDB3dGFHREYxLkVLcVdoWUxPQzc1QXh3UDQxcFRtOFB4ZUVZdkdqNm0=
(u'49', u'49.1', 1205, u'Stopped (with errors)')
Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/49/49.2.csv / Remote: /home/abenassi/1206.csv
http://192.168.150.211:9080/pentaho-di/kettle/runJob/?job=public%2Fmetadata-dinamica%2Fjb_invocacion_api_ext&ID_DISTRIBUTION=1206&TOKEN=YWJlbmFzc2k6OjokMmEkMTAkWkJvOHJEOG9nbkJDSDB3dGFHREYxLkVLcVdoWUxPQzc1QXh3UDQxcFRtOFB4ZUVZdkdqNm0=
(u'49', u'49.2', 1206, u'Stopped (with errors)')
Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/50/50.1.csv / Remote: /home/abenassi/1207.csv
http://192.168.150.211:9080/pentaho-di/kettle

(u'63', u'63.1', 1226, u'Stopped (with errors)')
Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/64/64.1.csv / Remote: /home/abenassi/1227.csv
http://192.168.150.211:9080/pentaho-di/kettle/runJob/?job=public%2Fmetadata-dinamica%2Fjb_invocacion_api_ext&ID_DISTRIBUTION=1227&TOKEN=YWJlbmFzc2k6OjokMmEkMTAkWkJvOHJEOG9nbkJDSDB3dGFHREYxLkVLcVdoWUxPQzc1QXh3UDQxcFRtOFB4ZUVZdkdqNm0=
(u'64', u'64.1', 1227, u'Stopped (with errors)')
Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/65/65.1.csv / Remote: /home/abenassi/1228.csv
http://192.168.150.211:9080/pentaho-di/kettle/runJob/?job=public%2Fmetadata-dinamica%2Fjb_invocacion_api_ext&ID_DISTRIBUTION=1228&TOKEN=YWJlbmFzc2k6OjokMmEkMTAkWkJvOHJEOG9nbkJDSDB3dGFHREYxLkVLcVdoWUxPQzc1QXh3UDQxcFRtOFB4ZUVZdkdqNm0=
(u'65', u'65.1', 1228, u'Stopped (with errors)')
Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/66/66.1.csv / Remote: /home/abenassi/1229.csv
http://192.168.150.211:9080/pentaho-di/kettle

(u'74', u'74.3', 1247, u'Stopped (with errors)')
Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/75/75.1.csv / Remote: /home/abenassi/1248.csv
http://192.168.150.211:9080/pentaho-di/kettle/runJob/?job=public%2Fmetadata-dinamica%2Fjb_invocacion_api_ext&ID_DISTRIBUTION=1248&TOKEN=YWJlbmFzc2k6OjokMmEkMTAkWkJvOHJEOG9nbkJDSDB3dGFHREYxLkVLcVdoWUxPQzc1QXh3UDQxcFRtOFB4ZUVZdkdqNm0=
(u'75', u'75.1', 1248, u'Stopped (with errors)')
Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/75/75.2.csv / Remote: /home/abenassi/1249.csv
http://192.168.150.211:9080/pentaho-di/kettle/runJob/?job=public%2Fmetadata-dinamica%2Fjb_invocacion_api_ext&ID_DISTRIBUTION=1249&TOKEN=YWJlbmFzc2k6OjokMmEkMTAkWkJvOHJEOG9nbkJDSDB3dGFHREYxLkVLcVdoWUxPQzc1QXh3UDQxcFRtOFB4ZUVZdkdqNm0=
(u'75', u'75.2', 1249, u'Stopped (with errors)')
Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/75/75.3.csv / Remote: /home/abenassi/1250.csv
http://192.168.150.211:9080/pentaho-di/kettle

(u'84', u'84.1', 1268, u'Stopped (with errors)')
Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/84/84.2.csv / Remote: /home/abenassi/1269.csv
http://192.168.150.211:9080/pentaho-di/kettle/runJob/?job=public%2Fmetadata-dinamica%2Fjb_invocacion_api_ext&ID_DISTRIBUTION=1269&TOKEN=YWJlbmFzc2k6OjokMmEkMTAkWkJvOHJEOG9nbkJDSDB3dGFHREYxLkVLcVdoWUxPQzc1QXh3UDQxcFRtOFB4ZUVZdkdqNm0=
(u'84', u'84.2', 1269, u'Stopped (with errors)')
Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/86/86.1.csv / Remote: /home/abenassi/1270.csv
http://192.168.150.211:9080/pentaho-di/kettle/runJob/?job=public%2Fmetadata-dinamica%2Fjb_invocacion_api_ext&ID_DISTRIBUTION=1270&TOKEN=YWJlbmFzc2k6OjokMmEkMTAkWkJvOHJEOG9nbkJDSDB3dGFHREYxLkVLcVdoWUxPQzc1QXh3UDQxcFRtOFB4ZUVZdkdqNm0=
(u'86', u'86.1', 1270, u'Stopped (with errors)')
Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/87/87.1.csv / Remote: /home/abenassi/1271.csv
http://192.168.150.211:9080/pentaho-di/kettle

Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/98/98.3.csv / Remote: /home/abenassi/1291.csv
http://192.168.150.211:9080/pentaho-di/kettle/runJob/?job=public%2Fmetadata-dinamica%2Fjb_invocacion_api_ext&ID_DISTRIBUTION=1291&TOKEN=YWJlbmFzc2k6OjokMmEkMTAkWkJvOHJEOG9nbkJDSDB3dGFHREYxLkVLcVdoWUxPQzc1QXh3UDQxcFRtOFB4ZUVZdkdqNm0=
(u'98', u'98.3', 1291, u'Stopped (with errors)')
(u'99', u'99.1', 1291, KeyError((u'Datos Programaci\xf3n Macroecon\xf3mica', u'99.1'),))
(u'99', u'99.2', 1291, KeyError((u'Datos Programaci\xf3n Macroecon\xf3mica', u'99.2'),))
Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/99/99.3.csv / Remote: /home/abenassi/1292.csv
http://192.168.150.211:9080/pentaho-di/kettle/runJob/?job=public%2Fmetadata-dinamica%2Fjb_invocacion_api_ext&ID_DISTRIBUTION=1292&TOKEN=YWJlbmFzc2k6OjokMmEkMTAkWkJvOHJEOG9nbkJDSDB3dGFHREYxLkVLcVdoWUxPQzc1QXh3UDQxcFRtOFB4ZUVZdkdqNm0=
(u'99', u'99.3', 1292, u'Stopped (with errors)')
Local: /Users/abenassi/github/

(u'110', u'110.1', 1109, u'Stopped (with errors)')
Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/110/110.2.csv / Remote: /home/abenassi/1110.csv
http://192.168.150.211:9080/pentaho-di/kettle/runJob/?job=public%2Fmetadata-dinamica%2Fjb_invocacion_api_ext&ID_DISTRIBUTION=1110&TOKEN=YWJlbmFzc2k6OjokMmEkMTAkWkJvOHJEOG9nbkJDSDB3dGFHREYxLkVLcVdoWUxPQzc1QXh3UDQxcFRtOFB4ZUVZdkdqNm0=
(u'110', u'110.2', 1110, u'Stopped (with errors)')
Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/111/111.1.csv / Remote: /home/abenassi/1111.csv
http://192.168.150.211:9080/pentaho-di/kettle/runJob/?job=public%2Fmetadata-dinamica%2Fjb_invocacion_api_ext&ID_DISTRIBUTION=1111&TOKEN=YWJlbmFzc2k6OjokMmEkMTAkWkJvOHJEOG9nbkJDSDB3dGFHREYxLkVLcVdoWUxPQzc1QXh3UDQxcFRtOFB4ZUVZdkdqNm0=
(u'111', u'111.1', 1111, u'Stopped (with errors)')
Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/111/111.2.csv / Remote: /home/abenassi/1112.csv
http://192.168.150.211:9080/penta

(u'120', u'120.1', 1130, u'Stopped (with errors)')
(u'121', u'121.1', 1130, KeyError((u'Datos Programaci\xf3n Macroecon\xf3mica', u'121.1'),))
(u'121', u'121.2', 1130, KeyError((u'Datos Programaci\xf3n Macroecon\xf3mica', u'121.2'),))
Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/121/121.3.csv / Remote: /home/abenassi/1131.csv
http://192.168.150.211:9080/pentaho-di/kettle/runJob/?job=public%2Fmetadata-dinamica%2Fjb_invocacion_api_ext&ID_DISTRIBUTION=1131&TOKEN=YWJlbmFzc2k6OjokMmEkMTAkWkJvOHJEOG9nbkJDSDB3dGFHREYxLkVLcVdoWUxPQzc1QXh3UDQxcFRtOFB4ZUVZdkdqNm0=
(u'121', u'121.3', 1131, u'Stopped (with errors)')
(u'122', u'122.1', 1131, KeyError((u'Datos Programaci\xf3n Macroecon\xf3mica', u'122.1'),))
Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/122/122.2.csv / Remote: /home/abenassi/1132.csv
http://192.168.150.211:9080/pentaho-di/kettle/runJob/?job=public%2Fmetadata-dinamica%2Fjb_invocacion_api_ext&ID_DISTRIBUTION=1132&TOKEN=YWJlbmFzc2k6OjokMmEkM

Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/134/134.3.csv / Remote: /home/abenassi/1150.csv
http://192.168.150.211:9080/pentaho-di/kettle/runJob/?job=public%2Fmetadata-dinamica%2Fjb_invocacion_api_ext&ID_DISTRIBUTION=1150&TOKEN=YWJlbmFzc2k6OjokMmEkMTAkWkJvOHJEOG9nbkJDSDB3dGFHREYxLkVLcVdoWUxPQzc1QXh3UDQxcFRtOFB4ZUVZdkdqNm0=
(u'134', u'134.3', 1150, u'Stopped (with errors)')
Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/134/134.4.csv / Remote: /home/abenassi/1151.csv
http://192.168.150.211:9080/pentaho-di/kettle/runJob/?job=public%2Fmetadata-dinamica%2Fjb_invocacion_api_ext&ID_DISTRIBUTION=1151&TOKEN=YWJlbmFzc2k6OjokMmEkMTAkWkJvOHJEOG9nbkJDSDB3dGFHREYxLkVLcVdoWUxPQzc1QXh3UDQxcFRtOFB4ZUVZdkdqNm0=
(u'134', u'134.4', 1151, u'Stopped (with errors)')
Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/135/135.1.csv / Remote: /home/abenassi/1152.csv
http://192.168.150.211:9080/pentaho-di/kettle/runJob/?job=public%2Fmetadata-dinamica

(u'143', u'143.1', 1170, u'Stopped (with errors)')
Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/143/143.2.csv / Remote: /home/abenassi/1171.csv
http://192.168.150.211:9080/pentaho-di/kettle/runJob/?job=public%2Fmetadata-dinamica%2Fjb_invocacion_api_ext&ID_DISTRIBUTION=1171&TOKEN=YWJlbmFzc2k6OjokMmEkMTAkWkJvOHJEOG9nbkJDSDB3dGFHREYxLkVLcVdoWUxPQzc1QXh3UDQxcFRtOFB4ZUVZdkdqNm0=
(u'143', u'143.2', 1171, u'Stopped (with errors)')
Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/143/143.3.csv / Remote: /home/abenassi/1172.csv
http://192.168.150.211:9080/pentaho-di/kettle/runJob/?job=public%2Fmetadata-dinamica%2Fjb_invocacion_api_ext&ID_DISTRIBUTION=1172&TOKEN=YWJlbmFzc2k6OjokMmEkMTAkWkJvOHJEOG9nbkJDSDB3dGFHREYxLkVLcVdoWUxPQzc1QXh3UDQxcFRtOFB4ZUVZdkdqNm0=
(u'143', u'143.3', 1172, u'Stopped (with errors)')
Local: /Users/abenassi/github/series-tiempo/catalogo/datos/datasets/144/144.1.csv / Remote: /home/abenassi/1294.csv
(u'144', u'144.1', 1294, OSError(

In [34]:
for key in status_distributions:
    print(key, len(status_distributions[key]))

("(u'Datos Programaci\\xf3n Macroecon\\xf3mica', u'11.2')", 1)
("[Errno 2] No such file or directory: '/Users/abenassi/github/series-tiempo/catalogo/datos/datasets/42/42.2.csv'", 1)
("(u'Datos Programaci\\xf3n Macroecon\\xf3mica', u'124.2')", 1)
("[Errno 2] No such file or directory: '/Users/abenassi/github/series-tiempo/catalogo/datos/datasets/42/42.1.csv'", 1)
("(u'Datos Programaci\\xf3n Macroecon\\xf3mica', u'79.2')", 1)
("(u'Datos Programaci\\xf3n Macroecon\\xf3mica', u'121.2')", 1)
("[Errno 2] No such file or directory: '/Users/abenassi/github/series-tiempo/catalogo/datos/datasets/144/144.1.csv'", 1)
("(u'Datos Programaci\\xf3n Macroecon\\xf3mica', u'126.1')", 1)
(u'Stopped (with errors)', 245)
(u'Finished', 25)
("[Errno 2] No such file or directory: '/Users/abenassi/github/series-tiempo/catalogo/datos/datasets/144/144.2.csv'", 1)
("[Errno 2] No such file or directory: '/Users/abenassi/github/series-tiempo/catalogo/datos/datasets/19/19.1.csv'", 1)
("[Errno 2] No such file or direc

In [29]:
bs = upload_distribution_to_ind(
    "/Users/abenassi/github/series-tiempo/data/output/catalog/sspm/dataset/4/distribution/4.4/download/oferta-demanda-globales-precios-corrientes-valores-trimestrales-base-2004.csv", 
    "1081")

Local: /Users/abenassi/github/series-tiempo/data/output/catalog/sspm/dataset/4/distribution/4.4/download/oferta-demanda-globales-precios-corrientes-valores-trimestrales-base-2004.csv / Remote: /home/abenassi/1081.csv
http://192.168.150.211:9080/pentaho-di/kettle/runJob/?job=public%2Fmetadata-dinamica%2Fjb_invocacion_api_ext&ID_DISTRIBUTION=1081&TOKEN=YWJlbmFzc2k6OjokMmEkMTAkWkJvOHJEOG9nbkJDSDB3dGFHREYxLkVLcVdoWUxPQzc1QXh3UDQxcFRtOFB4ZUVZdkdqNm0=


In [30]:
print(bs)

<?xml version="1.0" encoding="UTF-8"?><html><body><jobstatus>
<jobname></jobname>
<id>0256b302-8358-434d-b852-6e7262ecef9e</id>
<status_desc>Stopped (with errors)</status_desc>
<error_desc></error_desc>
<logging_string>&lt;![CDATA[H4sIAAAAAAAAALVXbW/aMBD+3l9xH1sJQhKqviChiRVaZeqgIqB+qKrISQx1l8SZ7bCyfzXtJ/SP7QxtCCuhUKiEIsWxn3vu7rnzYZvWac08q9k2WOcN86RxfAZVePQ9lkx4QALGE4+kzKNPCtddRYQCPoJH7gN9okGmcMOBvT0IS8ZAEyWmcIf7JFWUeBMiGPEjKr1U8CkNFL8vhy47ssJAvglCGsFu2JcsYfKBhvMIrMWHQ0FlFqnmnRIZvT/aMUxKeCRT+MICUsa9uAdfrzkJNYISJJEjLmKi0wUjwWMQNOWSKf4GGlgCIRPaBf2tlmZ+xIJaTBUJiSJVRCQxbqzlRzTxDRm1mUyJCh40K6kdxEgisf8ZbuQs9/MUGWYxNZiBgEqpbRw6TbMCPf3oN60K3OrHUL92mmZZRkIqaRIIlio+A+6lisXs95xaRCeYZFQIoB7OjRIIlngRQwdD7mUyQ5ZcLnMUdJ6anxkV0woEEZ/xDXiSYOTR0IeRl72v2/Xcf1P7P1vYNgJ7C21KhKQ5cy+IyGTPyRtFZLzQzgzbpeqlkCZM8CTGT5DXrGGURXo1Un4SLq+vvNZw0OkOnIuWVsOERBmFu26ZYN8DdNpe23EHfefrcOD0ugVIyzyzPoo6dIetvtMroBGfJgRD/FHEjjtotYuAN4Ljekx0A4RCW3j+W9oX3rPx3b3yOv1+r18w0xECW8UbEwYM53KCaY3rCsJmIunzH4K1EnChO1lpD4l+bFRFCxnW7eMNhfiqc

In [None]:
# sube catalogo
print(upload_datajson_to_ind(catalog_json_path))

IOError: [Errno 13] Permission denied

> [0;32m/Users/abenassi/anaconda/envs/series-tiempo/lib/python2.7/site-packages/paramiko/sftp_client.py[0m(850)[0;36m_convert_status[0;34m()[0m
[0;32m    848 [0;31m            [0;32mraise[0m [0mIOError[0m[0;34m([0m[0merrno[0m[0;34m.[0m[0mENOENT[0m[0;34m,[0m [0mtext[0m[0;34m)[0m[0;34m[0m[0m
[0m[0;32m    849 [0;31m        [0;32melif[0m [0mcode[0m [0;34m==[0m [0mSFTP_PERMISSION_DENIED[0m[0;34m:[0m[0;34m[0m[0m
[0m[0;32m--> 850 [0;31m            [0;32mraise[0m [0mIOError[0m[0;34m([0m[0merrno[0m[0;34m.[0m[0mEACCES[0m[0;34m,[0m [0mtext[0m[0;34m)[0m[0;34m[0m[0m
[0m[0;32m    851 [0;31m        [0;32melse[0m[0;34m:[0m[0;34m[0m[0m
[0m[0;32m    852 [0;31m            [0;32mraise[0m [0mIOError[0m[0;34m([0m[0mtext[0m[0;34m)[0m[0;34m[0m[0m
[0m
ipdb> u
> [0;32m/Users/abenassi/anaconda/envs/series-tiempo/lib/python2.7/site-packages/paramiko/sftp_client.py[0m(819)[0;36m_read_response[0;34m()[0m
[0;32m    8