# SmartWaste ETL

Este notebook muestra cómo conectar a la base de datos real **smartwaste** (MySQL) usando SQLAlchemy, ejecutar los procesos ETL (lecturas de sensores y bitácoras) y exportar resultados a CSV. **ATENCIÓN:** Ejecuta este notebook en tu máquina local o en un entorno que pueda alcanzar `localhost:3306`.

---




## 1) Instalación de dependencias
Ejecuta la siguiente celda si aún no tienes las dependencias instaladas (esto instalará `pandas`, `sqlalchemy` y `pymysql`).

In [9]:
import pandas as pd
import numpy as np
from sqlalchemy.orm import Session
from datetime import datetime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Float, DateTime, Boolean, ForeignKey
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

## 2) Configuración de la conexión
Puedes dejar las credenciales directamente (no recomendado para entornos compartidos) o exportarlas como variables de entorno. Aquí dejamos las credenciales que proporcionaste como ejemplo.

In [11]:

# Configuración (modifica si prefieres usar variables de entorno)
DB_USERNAME = "root"
DB_PASSWORD = "12345"
DB_HOST = "localhost"
DB_PORT = 3306
DB_NAME = "smartwaste"

# Construir URL de conexión
DB_URL = f"mysql+pymysql://{DB_USERNAME}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
print("DB URL preparada (no se mostrará la contraseña por seguridad en logs).")


DB URL preparada (no se mostrará la contraseña por seguridad en logs).


## 3) Definición de modelos (SQLAlchemy Declarative)
Definimos los modelos tal como los proporcionaste para que el ORM pueda mapear las tablas. Si las tablas ya existen, no se crearán de nuevo (a menos que lo hagas explícitamente).

In [12]:



Base = declarative_base()

class BitacoraContenedor(Base):
    __tablename__ = 'tbd_bitacora_contenedor'
    Bitacora_Id = Column(Integer, ForeignKey("tbd_bitacora_recoleccion.ID"), primary_key=True)
    Contenedor_Id = Column(Integer, ForeignKey("tbb_contenedores.ID"), primary_key=True)

    Fecha_Registro = Column(DateTime, nullable=False)
    Estado_Contenedor = Column(String(100), nullable=True)
    Porcentaje_Llenado = Column(Integer, nullable=True)
    Recolectado = Column(Boolean, default=False)

class BitacoraRecoleccion(Base):
    __tablename__ = 'tbd_bitacora_recoleccion'
    ID = Column(Integer, primary_key=True)
    Fecha_Registro = Column(DateTime, nullable=False)
    Ruta_Id = Column(Integer, ForeignKey("tbb_rutas_recoleccion.ID"))

    Observaciones = Column(String(255), nullable=True)
    Tiempo_Duracion = Column(Integer, nullable=True)
    Cantidad_Contenedores = Column(Integer, nullable=True)

class LecturaSensor(Base):
    __tablename__ = 'tbd_lecturas_sensores'
    ID = Column(Integer, primary_key=True)
    Valor = Column(Float)
    Fecha = Column(DateTime)
    Sensor_Id = Column(Integer, ForeignKey("tbb_sensores.ID"), nullable=False)

print('Modelos definidos.')


Modelos definidos.


  Base = declarative_base()


## 4) Crear engine y Session
Se crea el engine SQLAlchemy y una fábrica de sesiones. Ejecuta esta celda para conectarte a la base de datos real. Si la conexión falla, revisa que MySQL corra y que las credenciales sean correctas.

In [13]:


engine = create_engine(DB_URL, echo=False, pool_pre_ping=True)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# Probar conexión simple (intenta abrir una conexión y cerrarla)
try:
    conn = engine.connect()
    conn.close()
    print("Conexión a la DB exitosa (localhost).")
except Exception as e:
    print("Error conectando a la base de datos:", e)
    raise


Conexión a la DB exitosa (localhost).


## 5) Funciones ETL adaptadas para el notebook
Incluimos `exportar_todas_lecturas` y `etl_bitacoras` que usan `Session` para leer desde la DB y producir DataFrames/CSV.

In [14]:



def exportar_todas_lecturas(ruta_csv: str, db_session: Session):
    try:
        lecturas = db_session.query(LecturaSensor).all()

        data = [{
            "ID": l.ID,
            "Sensor_Id": l.Sensor_Id,
            "Valor": l.Valor,
            "Fecha": l.Fecha,
        } for l in lecturas]

        df = pd.DataFrame(data)

        if df.empty:
            print("No hay lecturas para exportar.")
            return pd.DataFrame()

        df['Fecha'] = pd.to_datetime(df['Fecha'])
        df = df.sort_values(['Sensor_Id', 'Fecha'])
        df = df.drop_duplicates(subset=['ID'], keep='first')
        df = df[df['Valor'].notna()]
        df['Valor'] = df['Valor'].clip(lower=0, upper=100)
        df = df.set_index('Fecha')

        resampled_list = []
        for sensor_id, g in df.groupby('Sensor_Id'):
            g_resampled = g[['Valor']].resample('5min').mean().interpolate(method='time')
            g_resampled['Sensor_Id'] = sensor_id
            resampled_list.append(g_resampled)

        if not resampled_list:
            print('No se generaron series resampleadas.')
            return pd.DataFrame()

        df_resampled = pd.concat(resampled_list).reset_index()
        df_resampled['Valor'] = df_resampled['Valor'].round(2)
        df_resampled['Hora'] = df_resampled['Fecha'].dt.hour
        df_resampled['DiaSemana'] = df_resampled['Fecha'].dt.day_name()

        df_resampled.to_csv(ruta_csv, index=False, encoding='utf-8')
        print(f"CSV ETL exportado a {ruta_csv} con {len(df_resampled)} registros.")
        return df_resampled

    except Exception as e:
        print('Error en ETL de exportación:', e)
        raise RuntimeError(f'Error en ETL de exportación: {e}')

def etl_bitacoras(db_session: Session):
    try:
        recolecciones = db_session.query(BitacoraRecoleccion).all()
        contenedores = db_session.query(BitacoraContenedor).all()

        data_recoleccion = []
        for r in recolecciones:
            if r.Fecha_Registro is None or r.Ruta_Id is None:
                continue

            duracion = r.Tiempo_Duracion if r.Tiempo_Duracion and r.Tiempo_Duracion <= 500 else None
            data_recoleccion.append({
                "ID": r.ID,
                "Fecha_Registro": r.Fecha_Registro.strftime("%Y-%m-%d %H:%M:%S") if r.Fecha_Registro else None,
                "Ruta_Id": r.Ruta_Id,
                "Observaciones": r.Observaciones or "Sin observaciones",
                "Tiempo_Duracion": duracion,
                "Cantidad_Contenedores": r.Cantidad_Contenedores or 0
            })

        data_contenedor = []
        for c in contenedores:
            if c.Fecha_Registro is None or c.Contenedor_Id is None:
                continue

            porcentaje = c.Porcentaje_Llenado
            if porcentaje is not None and (porcentaje < 0 or porcentaje > 100):
                continue

            data_contenedor.append({
                "Bitacora_Id": c.Bitacora_Id,
                "Contenedor_Id": c.Contenedor_Id,
                "Fecha_Registro": c.Fecha_Registro.strftime("%Y-%m-%d %H:%M:%S") if c.Fecha_Registro else None,
                "Estado_Contenedor": c.Estado_Contenedor or "Desconocido",
                "Porcentaje_Llenado": porcentaje,
                "Recolectado": c.Recolectado
            })

        df_recoleccion = pd.DataFrame(data_recoleccion)
        df_contenedor = pd.DataFrame(data_contenedor)

        print('ETL de bitácoras completado.')
        return df_recoleccion, df_contenedor

    except Exception as e:
        print('Error en ETL:', e)
        return pd.DataFrame(), pd.DataFrame()


## 6) Ejecutar ETL y guardar resultados
La siguiente celda abre una sesión, ejecuta los ETLs y muestra los resultados. Cambia las rutas de salida si deseas.

In [15]:

# Ejecutar ETL (usa rutas locales para los CSV)
csv_lecturas = 'export_lecturas_sensor.csv'
csv_recoleccion = 'export_bitacora_recoleccion.csv'
csv_contenedor = 'export_bitacora_contenedor.csv'

session = SessionLocal()
try:
    df_lecturas = exportar_todas_lecturas(csv_lecturas, session)
    df_recoleccion, df_contenedor = etl_bitacoras(session)

    # Guardar bitácoras si no están vacías
    if not df_recoleccion.empty:
        df_recoleccion.to_csv(csv_recoleccion, index=False, encoding='utf-8')
        print(f"Bitacora recolección exportada a {csv_recoleccion} ({len(df_recoleccion)} registros)")
    else:
        print("No hay registros de recolección exportados.")

    if not df_contenedor.empty:
        df_contenedor.to_csv(csv_contenedor, index=False, encoding='utf-8')
        print(f"Bitacora contenedor exportada a {csv_contenedor} ({len(df_contenedor)} registros)")
    else:
        print("No hay registros de contenedor exportados.")

    # Mostrar vistas previas
    display(df_lecturas.head() if not df_lecturas.empty else df_lecturas)
    display(df_recoleccion.head() if not df_recoleccion.empty else df_recoleccion)
    display(df_contenedor.head() if not df_contenedor.empty else df_contenedor)

finally:
    session.close()


CSV ETL exportado a export_lecturas_sensor.csv con 1000 registros.
ETL de bitácoras completado.
Bitacora recolección exportada a export_bitacora_recoleccion.csv (446162 registros)
Bitacora contenedor exportada a export_bitacora_contenedor.csv (1339052 registros)


Unnamed: 0,Fecha,Valor,Sensor_Id,Hora,DiaSemana
0,2025-08-16 00:50:00,70.42,12,0,Saturday
1,2025-08-16 00:55:00,94.74,12,0,Saturday
2,2025-08-16 01:00:00,97.51,12,1,Saturday
3,2025-08-16 01:05:00,54.61,12,1,Saturday
4,2025-08-16 01:10:00,48.45,12,1,Saturday


Unnamed: 0,ID,Fecha_Registro,Ruta_Id,Observaciones,Tiempo_Duracion,Cantidad_Contenedores
0,1,2025-08-19 12:12:11,7,Demora por tráfico.,97,2
1,2,2025-08-18 12:12:11,6,Demora por tráfico.,52,5
2,3,2025-08-17 12:12:11,6,Demora por tráfico.,71,3
3,4,2025-08-16 12:12:11,7,Demora por tráfico.,104,5
4,5,2025-08-15 12:12:11,7,Derrame en contenedor.,70,3


Unnamed: 0,Bitacora_Id,Contenedor_Id,Fecha_Registro,Estado_Contenedor,Porcentaje_Llenado,Recolectado
0,1,8,2025-08-19 12:12:11,Vaciado,88,False
1,1,9,2025-08-19 12:12:11,Vaciado,48,False
2,2,6,2025-08-18 12:12:11,Lleno,54,False
3,2,7,2025-08-18 12:12:11,Lleno,29,True
4,2,8,2025-08-18 12:12:11,Dañado,31,True
