In [10]:
# %load etl.py
# airflow related
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
# other packages
import requests
import pandas as pd
from datetime import datetime
from datetime import date


def extract_api_data(api_url, resource, client_id, client_secret):
    """
    Función para obtener datos en formato JSON de la API de transporte de CABA
    """
    url = api_url + resource
    url += "?client_id=" + client_id + "&client_secret=" + client_secret
    r = requests.get(url)
    return r.json()["data"]


def json_to_df(data, name):
    """
    Función para crear un DataFrame a partir de los datos en formato JSON
    y el nombre de la key del JSON que tiene como value una lista de
    JSON objects donde cada uno representa una instancia en particular
    """
    return pd.DataFrame(data[name])


def change_type_column(df, **kwargs):
    """
    Función para convertir el tipo de datos de una columna
    donde cada nombre de los parámetros a pasar será una columna
    del DataFrame y su respectivo valor será el nuevo tipo de dato
    para esa columna.
    Retorna el mismo DataFrame con las columnas con sus nuevos tipos
    de datos
    """
    for col, a_type in kwargs.items():
        df[col] = df[col].astype(a_type)
    return df


def split_column(df, column):
    types = list(df[column][0].keys())
    for a_type in types:
        df["num_"+a_type+"_bikes_available"] = df[column].apply(lambda r: r[a_type])
    return df.drop(column, axis=1)


def merge_df(df_left, df_right, col_join):
    return pd.merge(df_left, df_right, on=col_join, how="inner")


def save_data(df, path):
    df.to_csv(path, index=False, mode='a+')


def etl(api_url, status_resource, client_id, client_secret, path):
    # extract data
    stations_data = extract_api_data(api_url, status_resource, client_id, client_secret)
    df_stations = json_to_df(stations_data, 'stations')

    # transform
    df_stations = change_type_column(df_stations, station_id='int64', is_returning='bool', is_renting='bool', is_installed='bool')
    df_stations = split_column(df_stations, 'num_bikes_available_types')

    # load
    save_data(df_stations, path)


api_url = "https://apitransporte.buenosaires.gob.ar/"
status_resource = "ecobici/gbfs/stationStatus"
with open("/home/guido/datadev/ecobici/credenciales.txt", "r") as file:
    client_id = file.readline().rstrip()
    client_secret = file.readline().rstrip()

path = '~/datadev/ecobici/ecobici.csv'


In [12]:
etl(api_url, status_resource, client_id, client_secret, path)