In [None]:
import pandas as pd
import numpy as np
from sqlalchemy import create_engine, text
import urllib.parse
import sys
import warnings

# Silenciar advertencias
warnings.simplefilter(action='ignore', category=FutureWarning)

# 1. CONEXIÃ“N
USER = "postgres"
PASSWORD = "as52"
HOST = "localhost"
PORT = "5432"
DB_NAME = "Historico_Hechos_Movilidad"

password_safe = urllib.parse.quote_plus(PASSWORD)
user_safe = urllib.parse.quote_plus(USER)
db_connection_str = f'postgresql+psycopg2://{user_safe}:{password_safe}@{HOST}:{PORT}/{DB_NAME}'
engine = create_engine(db_connection_str)

# FUNCIÃ“N AUXILIAR PARA ENCONTRAR COLUMNAS CON BOM
def buscar_columna_flexible(columnas, objetivo):
    """Busca una columna que contenga la palabra objetivo, ignorando basura (BOM)."""
    for col in columnas:
        if objetivo in col:
            return col
    return objetivo # Devuelve el original si no encuentra match (para que falle con nombre claro)

# 2. FUNCIONES DE CARGA

def carga_hechos_transito():
    print("\n[1/4] Procesando: Hechos de TrÃ¡nsito...")
    ruta = r"Data_Tratado\Hechos_Transito_Enriquecido.csv"
    
    try:
        df = pd.read_csv(ruta, encoding='latin-1', low_memory=False)
        df.columns = [c.strip().lower() for c in df.columns]
        
        # BÃºsqueda flexible de la columna fecha
        col_fecha = buscar_columna_flexible(df.columns, 'fecha_evento')
        print(f"Columna fecha detectada: '{col_fecha}'")
        
        fechas = pd.to_datetime(df[col_fecha], dayfirst=True, errors='coerce')
        df['fecha_evento'] = fechas
        df['fk_tiempo'] = fechas
        
        # GeometrÃ­a
        if 'longitud' in df.columns and 'latitud' in df.columns:
            mask = df['longitud'].notnull() & df['latitud'].notnull()
            df.loc[mask, 'geometria'] = df[mask].apply(
                lambda x: f"SRID=4326;POINT({x['longitud']} {x['latitud']})", axis=1
            )
        
        # Mapeo y Filtrado
        cols_bd = [
            'fecha_evento', 'hora_evento', 'tipo_evento', 'alcaldia', 'colonia', 
            'latitud', 'longitud', 'geometria', 'score_atlas', 'flag_historico', 
            'nivel_riesgo_estatico', 'fk_tiempo'
        ]
        df_final = df[[c for c in cols_bd if c in df.columns]].copy()
        
        df_final.to_sql('fact_hechos_transito', engine, if_exists='append', index=False, method='multi', chunksize=10000)
        print(f"Hechos cargados: {len(df_final)} filas.")

    except Exception as e:
        print(f"Error en Hechos: {e}")
        sys.exit(1)

def carga_inviales():
    print("\n[2/4] Procesando: Incidentes Viales (Inviales)...")
    ruta = r"Data_Tratado\Inviales_Enriquecido.csv"
    
    try:
        df = pd.read_csv(ruta, encoding='latin-1', low_memory=False)
        df.columns = [c.strip().lower() for c in df.columns]
        
        # BÃºsqueda flexible
        col_fecha = buscar_columna_flexible(df.columns, 'fecha_creacion')
        print(f"Columna fecha detectada: '{col_fecha}'")

        fechas = pd.to_datetime(df[col_fecha], dayfirst=True, errors='coerce')
        df['fecha_creacion'] = fechas
        df['fk_tiempo'] = fechas
        
        # GeometrÃ­a
        mask = df['longitud'].notnull() & df['latitud'].notnull()
        df.loc[mask, 'geometria'] = df[mask].apply(
            lambda x: f"SRID=4326;POINT({x['longitud']} {x['latitud']})", axis=1
        )
        
        # Renombrado seguro
        cols_map = {
            'tipo_incidente': 'tipo_incidente',
            'alcaldia': 'alcaldia',
            'hora_creacion': 'hora_creacion',
            'score_atlas': 'score_atlas', 
            'flag_historico': 'flag_historico', 
            'nivel_riesgo_estatico': 'nivel_riesgo_estatico'
        }
        df = df.rename(columns=cols_map)
        
        cols_bd = list(cols_map.values()) + ['fecha_creacion', 'fk_tiempo', 'latitud', 'longitud', 'geometria']
        df_final = df[[c for c in cols_bd if c in df.columns]].copy()
        
        df_final.to_sql('fact_inviales', engine, if_exists='append', index=False, method='multi', chunksize=10000)
        print(f"Inviales cargados: {len(df_final)} filas.")
        
    except Exception as e:
        print(f"Error en Inviales: {e}")
        sys.exit(1)

def carga_metro():
    print("\n[3/4] Procesando: Afluencia Metro...")
    ruta = r"Data_Tratado\Afluencia_Metro_Enriquecido.csv"
    
    try:
        df = pd.read_csv(ruta, encoding='latin-1', low_memory=False)
        df.columns = [c.strip().lower() for c in df.columns]
        
        # BÃºsqueda flexible (evita error 'fecha' vs 'Ã¯Â»Â¿fecha')
        col_fecha = buscar_columna_flexible(df.columns, 'fecha')
        print(f"Columna fecha detectada: '{col_fecha}'")
        
        fechas = pd.to_datetime(df[col_fecha], errors='coerce')
        df['fecha'] = fechas
        df['fk_tiempo'] = fechas
        
        df = df.rename(columns={'estacion_normalizada': 'nombre_estacion'})
        
        # GeometrÃ­a
        mask = df['longitud'].notnull() & df['latitud'].notnull()
        df.loc[mask, 'geometria'] = df[mask].apply(
            lambda x: f"SRID=4326;POINT({x['longitud']} {x['latitud']})", axis=1
        )
        
        cols_bd = ['fecha', 'nombre_estacion', 'linea', 'afluencia', 'latitud', 'longitud', 
                   'geometria', 'score_atlas', 'flag_historico', 'nivel_riesgo_estatico', 'fk_tiempo']
        
        df_final = df[[c for c in cols_bd if c in df.columns]].copy()
        
        df_final.to_sql('fact_afluencia_metro', engine, if_exists='append', index=False, method='multi', chunksize=10000)
        print(f"Metro cargado: {len(df_final)} filas.")
        
    except Exception as e:
        print(f"Error en Metro: {e}")
        sys.exit(1)

def carga_pluviales_interpolada():
    print("\n[4/4] Procesando: Pluviales...")
    ruta = r"Data_Tratado\Pluviales_DF.csv"
    
    try:
        df = pd.read_csv(ruta)
        
        df['fecha_ref'] = pd.to_datetime(
            df['ANIO'].astype(str) + '-' + df['MES'].astype(str).str.zfill(2) + '-15'
        )
        
        cols_base = ['CLAVE', 'NOMBRE', 'LON', 'LAT', 'PRECIPITACION', 'fecha_ref']
        df = df[cols_base].sort_values('fecha_ref')
        
        dfs_simulados = []
        estaciones = df['CLAVE'].unique()
        
        print(f"Interpolando {len(estaciones)} estaciones...")
        
        for estacion in estaciones:
            dfe = df[df['CLAVE'] == estacion].copy()
            dfe = dfe.drop_duplicates(subset='fecha_ref').set_index('fecha_ref')
            
            dfe_diario = dfe.resample('D').interpolate(method='time')
            
            for col in ['CLAVE', 'NOMBRE', 'LON', 'LAT']:
                dfe_diario[col] = dfe_diario[col].ffill().bfill()
            
            dfe_diario = dfe_diario.reset_index().rename(columns={'fecha_ref': 'fecha'})
            dfs_simulados.append(dfe_diario)
            
        df_final = pd.concat(dfs_simulados, ignore_index=True)
        df_final = df_final.dropna(subset=['PRECIPITACION'])
        
        df_final = df_final.rename(columns={
            'CLAVE': 'clave_estacion',
            'NOMBRE': 'nombre_estacion',
            'PRECIPITACION': 'precipitacion_simulada',
            'LON': 'longitud',
            'LAT': 'latitud',
            'fecha': 'fk_tiempo'
        })
        df_final['fecha'] = df_final['fk_tiempo']
        
        df_final['geometria'] = df_final.apply(
            lambda x: f"SRID=4326;POINT({x['longitud']} {x['latitud']})", axis=1
        )
        
        df_final.to_sql('fact_pluviales', engine, if_exists='append', index=False, method='multi', chunksize=10000)
        print(f"Pluviales cargados: {len(df_final)} registros.")
    
    except Exception as e:
        print(f"Error en Pluviales: {e}")
        sys.exit(1)

def finalizar_proceso():
    print("\nRefrescando Vista Maestra...")
    with engine.connect() as con:
        try:
            con.execute(text("REFRESH MATERIALIZED VIEW mv_hechos_con_clima;"))
            con.commit()
            count = con.execute(text("SELECT COUNT(*) FROM mv_hechos_con_clima")).scalar()
            print(f"Vista actualizada. Registros listos: {count}")
            print("\nINGESTA TOTAL COMPLETADA.")
            
        except Exception as e:
            print(f"Error refrescando vista: {e}")

# EJECUCIÃ“N
if __name__ == "__main__":
    carga_hechos_transito()
    carga_inviales()
    carga_metro()
    carga_pluviales_interpolada()
    finalizar_proceso()


[1/4] Procesando: Hechos de TrÃ¡nsito...
Columna fecha detectada: 'Ã¯Â»Â¿fecha_evento'
Hechos cargados: 132065 filas.

ðŸš§ [2/4] Procesando: Incidentes Viales (Inviales)...
Columna fecha detectada: 'fecha_creacion'
Inviales cargados: 11559010 filas.

[3/4] Procesando: Afluencia Metro...
Columna fecha detectada: 'Ã¯Â»Â¿fecha'
Metro cargado: 5139574 filas.

[4/4] Procesando: Pluviales...
Interpolando 265 estaciones...
Pluviales cargados: 433747 registros.

Refrescando Vista Maestra...
Vista actualizada. Registros listos: 53439

INGESTA TOTAL COMPLETADA.
