<a href="https://colab.research.google.com/github/Mondin0/ANGULARAZURE/blob/main/Segunda_entrega_Gabriel_mondino_UTN_DATA_ENG.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Extracción y almacenamiento de datos - Gabriel Mondino

En el presente trabajo realizo un trabajo de recolecciń de datos y guardado en un Data lakehouse, tomando datos de la [API de transporte oficial del Gobierno de CABA](https://api-transporte.buenosaires.gob.ar/)

En resumen, lo que hago es solicitar datos de las *estaciones de bicicletas [(ECO-BICI)](https://baecobici.com.ar/)*

Los requerimientos de la entrega parcial eran:

- Implementar técnicas de extracción de datos por medio del lenguaje de programación Python.
- Implementar técnicas de almacenamiento de datos, con el formato Delta lake.

La consigna de la primera entrega era:

Desarrollar un programa en Python que realice:
1. extracción de una API, como fuente de datos,
2. convierta los datos obtenidos como DataFrames de Pandas
3. y los guarde de forma cruda, sin transformaciones o con leves transformaciones, en formato Delta lake.

Por una cuestion de convención y para respetar buenas practicas, mantengo las credenciales en un archivo `credenciales.conf`.
que contiene la siguiente estructura:
```bash
[API]
client_id = ClientIdQueDaCABA
client_secret = ClientSecretQueDaCABA
url_station_information = https://apitransporte.buenosaires.gob.ar/ecobici/gbfs/stationInformation
url_station_status = https://apitransporte.buenosaires.gob.ar/ecobici/gbfs/stationStatus
```

La elección de esos dos endpoints en particular es que justo cumple con los dos requerimientos solicitados.

`Uno de los endpoints debe devolver datos temporales, que se actualicen periódicamente (mínimo
una vez al día), como por ejemplo: valores meteorológicos, cotizaciones de monedas o acciones de
compañías, variaciones de índices económicos, estadísticas deportivas, etc, El otro endpoint debe
ofrecer datos estáticos o metadatos, como por ejemplo campos o atributos...`

El endpoint `/ecobici/gbfs/stationInformation` trae un listado estático de todas las estaciones, sus capacidades y ubicaciones.


Por otro lado, el endpoint `/ecobici/gbfs/stationStatus` trae un listado del número de bicicletas y anclajes disponibles en cada estación y disponibilidad de estación(Datos temporales).


Uso extracción full debido a que no descubri un filtro dentro de la API de alguna fecha en particular. Lo obtenido hasta ahora estar


## Pipeline de Procesamiento de Datos para Estaciones de Bicicletas  
**Arquitectura:** Data Lakehouse con capas Raw, Bronze, Silver y Gold.  


### **Capa Bronze**  
**Funcionalidad:**  
- **Limpieza básica:**  
  - Reemplaza `None` por `NaN`.  
  - Rellena `NaN` con `0` en columnas numéricas.  
- **Almacenamiento:** Datos limpios en Delta Lake (`append`).  

---

### **Capa Silver**  
**Funcionalidad:**  
- **Transformaciones avanzadas:**  
  - Renombra columnas para claridad.  
  - Convierte tipos (ej: `ultimo_reporte` a datetime).  
  - Crea columnas derivadas (ej: `disponibilidad`).
  - Desgloce de año, mes y dia en la parte de status, particionado por capas.  
- **Almacenamiento:** Datos transformados en Delta Lake (`append` en status y `overwrite` en info).  

---

### **Capa Gold**  
**Funcionalidad:**  
- **Métricas clave:**  
  - **Diarias por estación:** Promedio de bicicletas disponibles, total de operaciones.  
  - **Generales por estación:** Capacidad máxima, disponibilidad promedio, tasa operativa.  
- **Almacenamiento:** Resultados en Delta Lake (`overwrite` para actualizaciones).  

---

### **Requerimientos Cumplidos**  
1. **Estructura de capas:** Organización clara (Raw → Bronze → Silver → Gold).  
2. **Extracción:** Uso de `requests` para API.  
3. **Transformación:** Limpieza, normalización y agregación.  
4. **Almacenamiento:** Delta Lake para ACID transactions y schema evolution.  
5. **Datos incrementales:** Modo `append` en Raw, Bronze y Silver.  
6. **Seguridad:** Configuración de credenciales en `credenciales.conf`.  

---

### **Instalación y Ejecución**  
#### **Dependencias**  


`!pip install requests deltalake pandas numpy pyarrow==14.0.0`


#### **Archivo de Credenciales (`credenciales.conf`)**  

```bash
[API]
client_id = TU_CLIENT_ID
client_secret = TU_CLIENT_SECRET
url_station_information = URL_DE_INFORMACION_DE_ESTACIONES
url_station_status = URL_DE_ESTADO_DE_ESTACIONES
```

**Nota:** No subir a repositorios públicos.  

#### **Pasos de Ejecución**  
1. Ejecutar celdas en orden en Google Colab.  
2. Verificar resultados con `DeltaTable` y `pandas`.  

Ejemplo de lectura de datos
```python
from deltalake import DeltaTable
df = DeltaTable("ruta_tabla").to_pandas()
```



In [None]:
!pip install requests
!pip install deltalake
!pip install pandas

Collecting deltalake
  Downloading deltalake-0.25.4-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.5 kB)
Downloading deltalake-0.25.4-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (44.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.4/44.4 MB[0m [31m10.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: deltalake
Successfully installed deltalake-0.25.4


In [None]:
#importación de librerías
import configparser
import requests
import pandas as pd
import numpy as np
from datetime import datetime
from deltalake import DeltaTable, write_deltalake

In [None]:
# Configuración de rutas para las capas
BRONZE_STATION_INFO_PATH = "Datahouse/bronze/api_stations_info"
BRONZE_STATION_STATUS_PATH = "Datahouse/bronze/api_stations_status"
SILVER_PATH = "Datahouse/silver"
GOLD_PATH = "Datahouse/gold"

In [None]:
# Archivo de configuración de credenciales (NO SUBIR A REPOSITORIOS PÚBLICOS)
CREDENCIALES_PATH = 'credenciales.conf'

In [None]:
def obtener_datos_api():
    """Obtiene datos de la API."""
    config = configparser.ConfigParser()
    config.read('credenciales.conf')
    client_id = config['API']['client_id']
    client_secret = config['API']['client_secret']
    url_station_information = config['API']['url_station_information']
    url_station_status = config['API']['url_station_status']

    params = {
        "client_id": client_id,
        "client_secret": client_secret
    }

    try:
        response_info = requests.get(url_station_information, params=params)
        response_status = requests.get(url_station_status, params=params)

        if response_info.status_code == 200 and response_status.status_code == 200:
            print("Solicitudes exitosas. Datos obtenidos")
            datos_info = response_info.json()
            datos_status = response_status.json()
            return datos_info, datos_status
        else:
            print(f"Error en la solicitud: {response_info.status_code}, {response_status.status_code}")
            return None, None
    except Exception as e:
        print(f"Ocurrió un error: {e}")
        return None, None



---
# Capa Bronze - Datos de la API
---

In [None]:
def bronze_layer_processing():
    """Capa Bronze: Almacenamiento de datos crudos."""
    datos_info, datos_status = obtener_datos_api()

    if datos_info and datos_status:
        df_bronze_info = pd.json_normalize(datos_info['data']['stations'])
        df_bronze_status = pd.json_normalize(datos_status['data']['stations'])

        # **Paso 1: Reemplazar None con NaN**
        df_bronze_info = df_bronze_info.replace({None: np.nan})
        df_bronze_status = df_bronze_status.replace({None: np.nan})

        # **Paso 2: Imputar NaN por tipo de columna**
        for col in df_bronze_info.columns:
          if pd.api.types.is_numeric_dtype(df_bronze_info[col]):
              df_bronze_info[col] = df_bronze_info[col].fillna(0).astype(float)
          else:
              df_bronze_info[col] = df_bronze_info[col].fillna('').astype(str)

        for col in df_bronze_status.columns:
            if pd.api.types.is_numeric_dtype(df_bronze_status[col]):
                df_bronze_status[col] = df_bronze_status[col].fillna(0).astype(float)
            else:
                df_bronze_status[col] = df_bronze_status[col].fillna('').astype(str)

        # **Paso 3: Convertir tipos explícitamente**
        df_bronze_info = df_bronze_info.convert_dtypes()
        df_bronze_status = df_bronze_status.convert_dtypes()

        # **Paso 4: Añadir metadatos**
        df_bronze_info['ingestion_timestamp'] = datetime.now()
        df_bronze_status['ingestion_timestamp'] = datetime.now()

        # **Paso 5: Escribir en Delta Lake (APPEND)**
        try:
            # Intenta cargar las tablas Delta
            dt_info = DeltaTable(BRONZE_STATION_INFO_PATH)
            dt_status = DeltaTable(BRONZE_STATION_STATUS_PATH)

            # Escribe en modo 'append'
            write_deltalake(dt_info.table_uri, df_bronze_info, mode='append')
            write_deltalake(dt_status.table_uri, df_bronze_status, mode='append')

            print("Datos añadidos a las tablas Delta existentes (append).")

        except Exception as e:
            # Si las tablas no existen, crea nuevas tablas con 'overwrite'
            print(f"Tablas Delta no encontradas, creando nuevas tablas...: {e}")
            write_deltalake(BRONZE_STATION_INFO_PATH, df_bronze_info, mode='overwrite')
            write_deltalake(BRONZE_STATION_STATUS_PATH, df_bronze_status, mode='overwrite')
            print("Nuevas tablas Delta creadas (overwrite).")

        # **Paso 6: Verificación final**
        if df_bronze_info.isnull().values.any():
            print("¡Advertencia! Bronze Info contiene valores nulos")
        else:
            print("Bronze Info: Sin valores nulos")

        if df_bronze_status.isnull().values.any():
            print("¡Advertencia! Bronze Status contiene valores nulos")
        else:
            print("Bronze Status: Sin valores nulos")

        return df_bronze_info, df_bronze_status

    return None, None



---
#CAPA SILVER, TRANSFORMACIÓN DE DATOS
---

In [None]:
def silver_layer_processing(df_bronze_info, df_bronze_status):
    """Capa Silver: Limpieza y transformación de datos."""
    if not _validate_inputs(df_bronze_info, df_bronze_status):
        return None, None

    df_silver_info = _process_info(df_bronze_info)
    df_silver_status = _process_status(df_bronze_status)

    # Escribir en Silver Layer
    _write_to_silver_layer(df_silver_info, df_silver_status)

    return df_silver_info, df_silver_status


def _validate_inputs(df_info, df_status):
    """Valida que los DataFrames no sean None"""
    if df_info is None or df_status is None:
        print("Error: DataFrames de la capa Bronze son None.")
        return False
    return True


def _process_info(df):
    """Procesamiento específico para station_info"""
    df = df.copy()
    df = _rename_columns(df, COLUMN_MAPPING_INFO)

    # Conversión de tipos y manejo de nulos
    df = _convert_capacidad(df)
    df = _expand_rental_methods(df)

    return df


def _process_status(df):
    """Procesamiento específico para station_status"""
    df = df.copy()
    df = _rename_columns(df, COLUMN_MAPPING_STATUS)

    # Conversión de tipos y manejo de nulos
    df = _convert_fecha_reporte(df)
    df = _create_disponibilidad(df)
    df = _segmentar_fecha(df)

    return df


def _rename_columns(df, mapping):
    """Renombra columnas según el mapeo especificado"""
    return df.rename(columns=mapping, errors='ignore')


COLUMN_MAPPING_INFO = {
    'station_id': 'id_estacion',
    'name': 'nombre',
    'address': 'direccion',
    'lat': 'latitud',
    'lon': 'longitud',
    'capacity': 'capacidad',
    'rental_methods': 'metodos_renta'
}

COLUMN_MAPPING_STATUS = {
    'station_id': 'id_estacion',
    'num_bikes_available': 'bicicletas_disponibles',
    'num_docks_available': 'anclajes_disponibles',
    'is_installed': 'operativa',
    'is_renting': 'permitido_retirar',
    'is_returning': 'permitido_devolver',
    'last_reported': 'ultimo_reporte'
}

METODOS_RENTA = ['KEY', 'TRANSITCARD', 'PHONE']

def _convert_capacidad(df):
    """Convierte capacidad a entero y maneja nulos"""
    return df.assign(
        capacidad=df['capacidad'].fillna(0).astype(int, errors='ignore')
    )

def _expand_rental_methods(df):
    """Desglosa metodos de renta en columnas individuales"""
    if 'metodos_renta' not in df.columns:
        print("Advertencia: La columna 'metodos_renta' no existe en df_silver_info.")
        for metodo in METODOS_RENTA:
            df[f'renta_{metodo.lower()}'] = 0
        return df

    df['metodos_renta'] = df['metodos_renta'].apply(
        lambda x: [y.strip().upper() for y in x.strip("[]").replace("'", "").split(", ")]
        if isinstance(x, str) else []
    )

    for metodo in METODOS_RENTA:
        df[f'renta_{metodo.lower()}'] = df['metodos_renta'].apply(
            lambda x: 1 if metodo in x else 0
        )

    return df.drop(columns=['metodos_renta'], errors='ignore')

def _convert_fecha_reporte(df):
    """Convierte ultimo_reporte a datetime"""
    if 'ultimo_reporte' not in df.columns:
        print("Advertencia: La columna 'ultimo_reporte' no existe en df_silver_status.")
        return df

    return df.assign(
        ultimo_reporte=pd.to_datetime(df['ultimo_reporte'], unit='s', errors='coerce')
    )

def _create_disponibilidad(df):
    """Crea columna de disponibilidad"""
    if 'operativa' not in df.columns:
        print("Advertencia: La columna 'operativa' no existe en df_silver_status.")
        df['disponibilidad'] = 'Desconocida'
        return df

    return df.assign(
        disponibilidad=np.where(df['operativa'] == 1, 'Operativa', 'No operativa')
    )

def _segmentar_fecha(df):
    """Segmenta la columna ultimo_reporte en componentes temporales"""
    if 'ultimo_reporte' not in df.columns:
        print("Advertencia: La columna 'ultimo_reporte' no existe en el DataFrame.")
        return df

    # Convertir a datetime si no está ya convertido
    df['ultimo_reporte'] = pd.to_datetime(df['ultimo_reporte'], errors='coerce')

    # Crear columnas segmentadas
    df['anio'] = df['ultimo_reporte'].dt.year
    df['mes'] = df['ultimo_reporte'].dt.month
    df['dia'] = df['ultimo_reporte'].dt.day
    df['hora'] = df['ultimo_reporte'].dt.hour

    return df

def _write_to_silver_layer(df_info, df_status):
    """Escribe los DataFrames procesados en Silver Layer"""
    write_deltalake(f"{SILVER_PATH}/station_info", df_info, mode='overwrite')
    print("Actualizando silver/sation_info: overwrite")
    write_deltalake(f"{SILVER_PATH}/station_status", df_status, mode='append',partition_by=['anio','mes','dia'])
    print("Actualizando silver/station_status: append")



---
# CAPA GOLD - PARA STAKEHOLDERS
---

In [None]:
def gold_layer_processing(df_silver_info, df_silver_status):
    """Capa Gold: Datos agregados para consumo final."""
    if df_silver_info is None or df_silver_status is None:
        print("Error: DataFrames de la capa Silver son None.")
        return None, None

    # Unir datasets
    df_merged = pd.merge(
        df_silver_status,
        df_silver_info[['id_estacion', 'nombre', 'capacidad']],
        on='id_estacion',
        how='left'
    )

    # Agregación diaria
    df_daily = df_merged.groupby(
        [pd.Grouper(key='ultimo_reporte', freq='D'), 'id_estacion', 'nombre']
    ).agg(
        promedio_bicicletas=('bicicletas_disponibles', 'mean'),
        total_operaciones=('bicicletas_disponibles', 'count')
    ).reset_index()

    # Métricas clave
    df_metrics = df_merged.groupby('id_estacion').agg(
        nombre=('nombre', 'first'),
        capacidad_max=('capacidad', 'max'),
        disponibilidad_promedio=('bicicletas_disponibles', 'mean'),
        tasa_operativa=('operativa', 'mean')
    ).reset_index()

    # Escribir en Gold Layer
    write_deltalake(f"{GOLD_PATH}/daily_metrics", df_daily, mode='overwrite')
    write_deltalake(f"{GOLD_PATH}/station_metrics", df_metrics, mode='overwrite')

    return df_daily, df_metrics

In [None]:
if __name__ == "__main__":
    # Ejecutar capa Bronze
    print("Procesando capa Bronze...")
    df_bronze_info, df_bronze_status = bronze_layer_processing()

    if df_bronze_info is not None and df_bronze_status is not None:
        # Ejecutar capa Silver
        print("Procesando capa Silver...")
        df_silver_info, df_silver_status = silver_layer_processing(df_bronze_info, df_bronze_status)

        if df_silver_info is not None and df_silver_status is not None:
            # Ejecutar capa Gold
            print("Procesando capa Gold...")
            df_gold_daily, df_gold_metrics = gold_layer_processing(df_silver_info, df_silver_status)

            # Mostrar resultados
            if df_gold_daily is not None and df_gold_metrics is not None:
                print("\nMuestra de datos Gold - Métricas Diarias:")
                print(df_gold_daily.head())

                print("\nMuestra de datos Gold - Métricas por Estación:")
                print(df_gold_metrics.head())
            else:
                print("Error al procesar la capa Gold.")
        else:
            print("Error al procesar la capa Silver.")
    else:
        print("Error al procesar la capa Bronze.")


Procesando capa Bronze...
Solicitudes exitosas. Datos obtenidos
Tablas Delta no encontradas, creando nuevas tablas...: no log files
Nuevas tablas Delta creadas (overwrite).
Bronze Info: Sin valores nulos
Bronze Status: Sin valores nulos
Procesando capa Silver...
Actualizando silver/sation_info: overwrite
Actualizando silver/station_status: append
Procesando capa Gold...


  df_bronze_status = df_bronze_status.replace({None: np.nan})



Muestra de datos Gold - Métricas Diarias:
  ultimo_reporte id_estacion                   nombre  promedio_bicicletas  \
0     2025-03-28         101           101 - Fitz Roy                 10.0   
1     2025-03-28         102             102 - ROSETI                  1.0   
2     2025-03-28         104   104 - Federico Lacroze                 12.0   
3     2025-03-28         107  107 - HOSPITAL GARRAHAN                  0.0   
4     2025-03-28         111     111 - MACACHA GUEMES                 13.0   

   total_operaciones  
0                  1  
1                  1  
2                  1  
3                  1  
4                  1  

Muestra de datos Gold - Métricas por Estación:
  id_estacion                   nombre  capacidad_max  \
0         101           101 - Fitz Roy             20   
1         102             102 - ROSETI             20   
2         104   104 - Federico Lacroze             30   
3         107  107 - HOSPITAL GARRAHAN             16   
4         111    

In [None]:
df =DeltaTable("Datahouse/silver/station_status").to_pandas()
#df =DeltaTable("Datahouse/bronze/api_stations_info").to_pandas()
df

Unnamed: 0,id_estacion,bicicletas_disponibles,num_bikes_disabled,anclajes_disponibles,num_docks_disabled,ultimo_reporte,is_charging_station,status,operativa,permitido_retirar,permitido_devolver,traffic,num_bikes_available_types.mechanical,num_bikes_available_types.ebike,ingestion_timestamp,disponibilidad,anio,mes,dia,hora
0,2,2,1,37,0,2025-03-28 16:39:31,0,IN_SERVICE,1,1,1,0,2,0,2025-03-28 16:40:17.682875,Operativa,2025,3,28,16
1,3,6,2,20,0,2025-03-28 16:39:00,0,IN_SERVICE,1,1,1,0,6,0,2025-03-28 16:40:17.682875,Operativa,2025,3,28,16
2,4,9,2,9,0,2025-03-28 16:39:35,0,IN_SERVICE,1,1,1,0,9,0,2025-03-28 16:40:17.682875,Operativa,2025,3,28,16
3,5,0,1,41,0,2025-03-28 16:40:00,0,IN_SERVICE,1,1,1,0,0,0,2025-03-28 16:40:17.682875,Operativa,2025,3,28,16
4,6,5,1,14,0,2025-03-28 16:38:54,0,IN_SERVICE,1,1,1,0,5,0,2025-03-28 16:40:17.682875,Operativa,2025,3,28,16
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
394,587,2,0,18,0,2025-03-28 16:37:56,0,IN_SERVICE,1,1,1,0,2,0,2025-03-28 16:40:17.682875,Operativa,2025,3,28,16
395,588,3,2,11,0,2025-03-28 16:38:50,0,IN_SERVICE,1,1,1,0,3,0,2025-03-28 16:40:17.682875,Operativa,2025,3,28,16
396,589,0,3,13,0,2025-03-28 16:38:50,0,IN_SERVICE,1,1,1,0,0,0,2025-03-28 16:40:17.682875,Operativa,2025,3,28,16
397,590,2,2,12,0,2025-03-28 16:37:49,0,IN_SERVICE,1,1,1,0,2,0,2025-03-28 16:40:17.682875,Operativa,2025,3,28,16


In [None]:
import os
import shutil

# Elimina la carpeta y su contenido
shutil.rmtree('Datahouse')
