# Proyecto Final del curso de Ingeniería de Datos 

Se propone crear un pipeline que extraiga datos de una API pública de forma constante combinándolos con información extraída de una base de datos y colocándolos en un Data Warehouse.

## Setup

### Instalación de librerias

In [1]:
# Instalacion de la libreria para interactuar con la base de datos, especificamente con Postgres
#%pip install sqlalchemy psycopg2-binary

### Importación de librerias

In [21]:
# Libreria para interactuar con APIs
import requests

import pandas as pd

# Libreria para interactuar con la base de datos
import sqlalchemy as sa
from configparser import ConfigParser

### Definición de funciones

In [3]:
def build_conn_string(config_path, config_section):
    """
    Construye la cadena de conexión a la base de datos
    a partir de un archivo de configuración.

    Parametros:
    config_path: ruta del archivo de configuración
    config_section: sección del archivo de configuración que contiene
    los datos de conexión a la base de datos

    Retorna:
    conn_string: cadena de conexión a la base de datos
    """

    # Lee el archivo de configuración
    parser = ConfigParser()
    parser.read(config_path)

    # Lee la sección de configuración de PostgreSQL
    config = parser[config_section]
    host = config['host']
    port = config['port']
    dbname = config['dbname']
    username = config['user']
    pwd = config['pwd']

    # Construye la cadena de conexión
    conn_string = f'postgresql://{username}:{pwd}@{host}:{port}/{dbname}?sslmode=require'
    
    return conn_string

In [22]:
def connect_to_db(conn_string):
    """
    Crea una conexión a la base de datos.

    Parametros:
    conn_string: cadena de conexión a la base de datos

    Retorna:
    conn: objeto de conexión a la base de datos
    """
    engine = sa.create_engine(conn_string)
    conn = engine.connect()
    return conn

## Conexion con la API

Extraccion de datos de la API de transporte de Buenos Aires

In [5]:
base_url = "https://apitransporte.buenosaires.gob.ar"

### Extracción de datos de posiciones de los bus

In [37]:
# Obtencion de la posición de los vehículos monitoreados actualizada cada 30 segundos. 
# Si no se pasan parámetros de entrada, retorna la posición actual de todos los vehículos monitoreados.

endpoint_busPositions = "/colectivos/vehiclePositions"
full_url_busPositions = f"{base_url}/{endpoint_busPositions}"

parametros_busPositions = {
    "json" : 1 ,
    "client_id" : "33987e066c34484585f7d1c725e12831",
    "client_secret" : "B72247DC28B7432BBCa552ED24E894C5"
}

response = requests.get(full_url_busPositions, params=parametros_busPositions)

In [38]:
# Obtencion de la posición de los vehículos monitoreados actualizada cada 30 segundos. 
# Si no se pasan parámetros de entrada, retorna la posición actual de todos los vehículos monitoreados.

endpoint_busPositions = "colectivos/vehiclePositions?json=1&client_id=33987e066c34484585f7d1c725e12831&client_secret=B72247DC28B7432BBCa552ED24E894C5"

full_url_busPositions = f"{base_url}/{endpoint_busPositions}"

r_busPositions = requests.get(full_url_busPositions)

In [39]:
r_busPositions.status_code

200

In [40]:
json_busData = r_busPositions.json()
json_busData

{'_entity': [{'_alert': None,
   '_id': '1',
   '_is_deleted': False,
   '_trip_update': None,
   '_vehicle': {'_congestion_level': 0,
    '_current_status': 2,
    '_current_stop_sequence': 0,
    '_occupancy_status': 0,
    '_position': {'_bearing': 0,
     '_latitude': -34.70555,
     '_longitude': -58.67154,
     '_odometer': 76,
     '_speed': 3.055555,
     'extensionObject': None},
    '_stop_id': None,
    '_timestamp': 1706810598,
    '_trip': None,
    '_vehicle': {'_id': '1830',
     '_label': '2829-922',
     '_license_plate': None,
     'extensionObject': None},
    'extensionObject': None},
   'extensionObject': None},
  {'_alert': None,
   '_id': '2',
   '_is_deleted': False,
   '_trip_update': None,
   '_vehicle': {'_congestion_level': 0,
    '_current_status': 2,
    '_current_stop_sequence': 0,
    '_occupancy_status': 0,
    '_position': {'_bearing': 0,
     '_latitude': -34.7057,
     '_longitude': -58.6708946,
     '_odometer': 39066,
     '_speed': 0,
     'extens

In [41]:
type(json_busData)

dict

In [42]:
json_busData.keys()

dict_keys(['_entity', '_header', 'extensionObject'])

Para pasar a un dataframe la data


In [70]:
# Para pasar el json a una dataframe
df_busPositions = pd.json_normalize(json_busData['_entity'])
df_busPositions.sample(n=10)

Unnamed: 0,_alert,_id,_is_deleted,_trip_update,extensionObject,_vehicle._congestion_level,_vehicle._current_status,_vehicle._current_stop_sequence,_vehicle._occupancy_status,_vehicle._position._bearing,...,_vehicle._vehicle._license_plate,_vehicle._vehicle.extensionObject,_vehicle.extensionObject,_vehicle._trip._direction_id,_vehicle._trip._route_id,_vehicle._trip._schedule_relationship,_vehicle._trip._start_date,_vehicle._trip._start_time,_vehicle._trip._trip_id,_vehicle._trip.extensionObject
9796,,9797,False,,,0,2,0,0,0,...,,,,,,,,,,
9754,,9755,False,,,0,2,0,0,0,...,,,,,,,,,,
1602,,1603,False,,,0,2,0,0,0,...,,,,,,,,,,
1897,,1898,False,,,0,2,112,0,0,...,,,,1.0,307.0,0.0,20240201.0,13:55:30,23328-1,
3498,,3499,False,,,0,2,5,0,0,...,,,,1.0,299.0,0.0,20240201.0,15:48:00,22564-1,
4256,,4257,False,,,0,2,47,0,0,...,,,,1.0,1807.0,0.0,20240201.0,14:54:00,118544-1,
9026,,9027,False,,,0,2,0,0,0,...,,,,,,,,,,
1800,,1801,False,,,0,2,18,0,0,...,,,,0.0,304.0,0.0,20240201.0,15:37:00,22912-1,
6762,,6763,False,,,0,2,23,0,0,...,,,,1.0,1515.0,0.0,20240201.0,15:40:00,97881-1,
5291,,5292,False,,,0,2,45,0,0,...,,,,0.0,1256.0,0.0,20240201.0,14:00:00,80778-1,


### Extracción de datos del estado de las estaciones de las ecobicis

In [32]:
# Obtencion del número de bicicletas y anclajes disponibles en cada estación y disponibilidad de estación.

endpoint_ecobiciSS = "ecobici/gbfs/stationStatus"
full_url_ecobiciSS = f"{base_url}/{endpoint_ecobiciSS}"

parametros_ecobiciSS = {
    "client_id" : "33987e066c34484585f7d1c725e12831",
    "client_secret" : "B72247DC28B7432BBCa552ED24E894C5"
}

r_ecobiciSS = requests.get(full_url_ecobiciSS, params=parametros_ecobiciSS)

In [33]:
r_ecobiciSS.status_code

200

In [34]:
json_ecobiciSS = r_ecobiciSS.json()
json_ecobiciSS

{'last_updated': 1706813739,
 'ttl': 19,
 'data': {'stations': [{'station_id': '2',
    'num_bikes_available': 3,
    'num_bikes_available_types': {'mechanical': 3, 'ebike': 0},
    'num_bikes_disabled': 0,
    'num_docks_available': 37,
    'num_docks_disabled': 0,
    'last_reported': 1706813492,
    'is_charging_station': False,
    'status': 'IN_SERVICE',
    'is_installed': 1,
    'is_renting': 1,
    'is_returning': 1,
    'traffic': None},
   {'station_id': '3',
    'num_bikes_available': 11,
    'num_bikes_available_types': {'mechanical': 11, 'ebike': 0},
    'num_bikes_disabled': 1,
    'num_docks_available': 16,
    'num_docks_disabled': 0,
    'last_reported': 1706813570,
    'is_charging_station': False,
    'status': 'IN_SERVICE',
    'is_installed': 1,
    'is_renting': 1,
    'is_returning': 1,
    'traffic': None},
   {'station_id': '4',
    'num_bikes_available': 4,
    'num_bikes_available_types': {'mechanical': 4, 'ebike': 0},
    'num_bikes_disabled': 2,
    'num_do

In [35]:
json_ecobiciSS.keys()

dict_keys(['last_updated', 'ttl', 'data'])

In [53]:
# Para pasar el json a una dataframe

data_ecobiciSS= json_ecobiciSS['data']['stations']
df_ecobiciSS = pd.DataFrame(data_ecobiciSS)

df_ecobiciSS.sample(n=10)

Unnamed: 0,station_id,num_bikes_available,num_bikes_available_types,num_bikes_disabled,num_docks_available,num_docks_disabled,last_reported,is_charging_station,status,is_installed,is_renting,is_returning,traffic
23,34,0,"{'mechanical': 0, 'ebike': 0}",0,0,0,,False,END_OF_LIFE,1,0,0,
14,22,0,"{'mechanical': 0, 'ebike': 0}",2,18,0,1706814000.0,False,IN_SERVICE,1,1,1,
191,275,0,"{'mechanical': 0, 'ebike': 0}",5,15,0,1706814000.0,False,IN_SERVICE,1,1,1,
106,155,11,"{'mechanical': 11, 'ebike': 0}",1,1,0,1706814000.0,False,IN_SERVICE,1,1,1,
233,360,0,"{'mechanical': 0, 'ebike': 0}",0,0,0,,False,END_OF_LIFE,1,0,0,
355,525,2,"{'mechanical': 2, 'ebike': 0}",1,13,0,1706814000.0,False,IN_SERVICE,1,1,1,
76,104,5,"{'mechanical': 5, 'ebike': 0}",1,25,0,1706814000.0,False,IN_SERVICE,1,1,1,
64,89,2,"{'mechanical': 2, 'ebike': 0}",2,16,0,1706814000.0,False,IN_SERVICE,1,1,1,
20,29,0,"{'mechanical': 0, 'ebike': 0}",1,29,0,1706814000.0,False,IN_SERVICE,1,1,1,
230,357,0,"{'mechanical': 0, 'ebike': 0}",0,0,0,,False,END_OF_LIFE,1,0,0,


## Conexión con base de datos

In [19]:
# Conexión a Redshift

# Obtener string de conexion
config_dir = "config/pipeline.conf"
conn_string = build_conn_string(config_dir, "RedShift")
conn_string
#conn = connect_to_db(conn_string)

'postgresql://camilagonzalezalejo02_coderhouse:XX30wl6NtL@data-engineer-cluster.cyhh5bfevlmn.us-east-1.redshift.amazonaws.com:5439/data-engineer-database?sslmode=require'

In [23]:
# Conexion a la base de datos
conn = connect_to_db(conn_string)

ProgrammingError: (psycopg2.errors.UndefinedObject) unrecognized configuration parameter "standard_conforming_strings"

[SQL: show standard_conforming_strings]
(Background on this error at: https://sqlalche.me/e/20/f405)

In [None]:
type(conn) #deberia ser de tipo .Connection

In [None]:
# Cargar los datasets a la base de datos

# dfs es el dataframe 
# table_names hay que definirlo
# dfs = [df_venue, df_sales]
# tbl_names = ["venue", "sales"]

for df, tbl_name in zip(dfs, tbl_names):
    df.to_sql(
        name=tbl_name,
        con=conn,
        schema="stage",
        if_exists="replace",
        method="multi",
        index=False,
    )