In [None]:
# ==========================================
# ETL OLTP (db_gestion) -> DW (dw_soporte)
# Con cálculo correcto de métricas por EQUIPO
# ==========================================

import mysql.connector
import pandas as pd
import datetime

# --------------------------------------------------
# 1. CONEXIÓN A LA BD DE GESTIÓN (OLTP)
# --------------------------------------------------
oltp_conn = mysql.connector.connect(
    host="192.168.100.3",
    port=3306,
    user="etl_user",
    password="TuPasswordFuerte",
    database="db_gestion"
)
oltp_cursor = oltp_conn.cursor(dictionary=True)
print("Conectado a OLTP (db_gestion).")



In [None]:
# --------------------------------------------------
# 2. EXTRACCIÓN: PROCEDIMIENTO obtener_todo()
# --------------------------------------------------
oltp_cursor.callproc("obtener_todo")

result_sets = []
for res in oltp_cursor.stored_results():
    rows = res.fetchall()
    result_sets.append(pd.DataFrame(rows))

# Asignar cada DataFrame
cliente_df, equipo_df, empleado_df, estadistica_df, proyecto_df, tarea_df, asignacion_df, incidente_df = result_sets

oltp_cursor.close()
oltp_conn.close()
print("Conexión OLTP cerrada.\nDatos extraídos correctamente.")


def safe_strip_cols(df):
    """Limpia los nombres de columna quitando espacios extra."""
    df.columns = [str(col).strip() for col in df.columns]
    return df

def safe_index(df, colname):
    """Crea índice robusto."""
    if colname in df.columns and not df.empty:
        df = df.dropna(subset=[colname])
        df = df.loc[~df[colname].duplicated()]
        return df.set_index(colname, drop=False)
    return pd.DataFrame()

def safe_lookup(df_indexed, key, fallback):
    """Acceso seguro por índice."""
    try:
        if key in df_indexed.index:
            return df_indexed.loc[key]
        else:
            return fallback
    except Exception:
        return fallback

def safe_int(val):
    """Convierte robustamente a int."""
    try:
        return int(val)
    except Exception:
        return 0

def safe_float(val):
    """Convierte robustamente a float."""
    try:
        return float(val)
    except Exception:
        return 0.0

# ------------------------------
# Limpieza y organización
# ------------------------------
for df in [cliente_df, equipo_df, empleado_df, proyecto_df, tarea_df, asignacion_df, incidente_df]:
    safe_strip_cols(df)

cliente_by_id = safe_index(cliente_df, "idCliente")
equipo_by_id = safe_index(equipo_df, "idEquipo")
empleado_by_id = safe_index(empleado_df, "idEmpleado")
proyecto_by_id = safe_index(proyecto_df, "idProyecto")
tarea_by_id = safe_index(tarea_df, "idTarea")

# Fallbacks
cliente_fallback = {"idCliente": 0, "Nombre": "N/A", "Email": "", "Telefono": "", "Industria": "", "MetricaClienteInicial": 0.0}
equipo_fallback = {"idEquipo": 0, "Nombre": "Equipo N/A", "Activo": 0}
empleado_fallback = {"idEmpleado": 0, "Nombre": "N/A", "Email": "", "Salario": 0.0, "SalarioxHora": 0.0, "Equipo_idEquipo": 0}
proyecto_fallback = {"idProyecto": 0, "Nombre": "N/A", "Descripcion": "", "Tipo": "", "Fecha_inicio": None, "Fecha_fin_estimada": None, "Fecha_fin_real": None, "Estado": "", "Presupuesto": 0.0, "Costo_real": 0.0, "Cliente_idCliente": 0, "MetricaClienteFinal": 0.0, "CertificacionSeguridad": 0}
tarea_fallback = {"idTarea": 0, "Titulo": "N/A", "Descripcion": "", "Fecha_creacion": None, "Fecha_fin_estimada": None, "Fecha_fin_real": None, "Estado": "", "Prioridad": "", "EsAutomatizacion": 0, "EsReutilizado": 0}

# ==========================
# PRE-CÁLCULO DE MÉTRICAS POR EQUIPO
# ==========================
print("\n=== CALCULANDO MÉTRICAS POR EQUIPO ===")

# Crear DataFrame con asignaciones + info de tarea
if not asignacion_df.empty and not tarea_df.empty:
    asignacion_completa = asignacion_df.merge(
        tarea_df[['idTarea', 'Estado', 'EsAutomatizacion', 'EsReutilizado']], 
        left_on='Tarea_idTarea', 
        right_on='idTarea', 
        how='left'
    )
else:
    asignacion_completa = pd.DataFrame()

# Agregar info del empleado (para obtener el equipo)
if not asignacion_completa.empty and not empleado_df.empty:
    asignacion_completa = asignacion_completa.merge(
        empleado_df[['idEmpleado', 'Equipo_idEquipo']], 
        left_on='Empleado_idEmpleado', 
        right_on='idEmpleado', 
        how='left'
    )

# Agregar info de incidentes para costo_defecto
if not incidente_df.empty and not proyecto_df.empty:
    incidente_completo = incidente_df.merge(
        proyecto_df[['idProyecto']], 
        left_on='Proyecto_idProyecto', 
        right_on='idProyecto', 
        how='left'
    )
else:
    incidente_completo = pd.DataFrame()

# Función para calcular métricas por PROYECTO (sin filtrar por equipo)
def calcular_metricas_proyecto(proyecto_id):
    """Calcula todas las métricas para un proyecto específico."""
    metricas = {
        'tareas_auto_total': 0,
        'tareas_reutilizadas_total': 0,
        'costo_defecto': 0.0,
        'avance': 0.0,
        'horas_estimadas_total': 0.0,
        'horas_reales_total': 0.0
    }
    
    if asignacion_completa.empty:
        return metricas
    
    # Filtrar SOLO por proyecto (todas las tareas del proyecto, sin importar equipo)
    asig_proyecto = asignacion_completa[
        asignacion_completa['Proyecto_idProyecto'] == proyecto_id
    ]
    
    if asig_proyecto.empty:
        return metricas
    
    metricas['tareas_auto_total'] = safe_int(asig_proyecto[asig_proyecto['EsAutomatizacion'] == 1].shape[0])
    
    metricas['tareas_reutilizadas_total'] = safe_int(asig_proyecto[asig_proyecto['EsReutilizado'] == 1].shape[0])
    
    metricas['horas_estimadas_total'] = safe_float(asig_proyecto['Horas_estimadas'].sum())
    
    metricas['horas_reales_total'] = safe_float(asig_proyecto['Horas_reales'].sum())
    
    # Avance del proyecto (tareas completadas / tareas totales * 100)
    tareas_totales = asig_proyecto.shape[0]
    if tareas_totales > 0:
        tareas_completadas = asig_proyecto[asig_proyecto['Estado'] == 'COMPLETADA'].shape[0]
        metricas['avance'] = safe_float((tareas_completadas / tareas_totales) * 100)
    
    # Costo por defecto (TODOS los incidentes del proyecto)
    if not incidente_completo.empty:
        incidentes_proyecto = incidente_completo[
            incidente_completo['Proyecto_idProyecto'] == proyecto_id
        ]
        metricas['costo_defecto'] = safe_float(incidentes_proyecto['CostoCorreccion'].sum())
    
    return metricas

metricas_por_proyecto = {}
print("\n=== Métricas calculadas por Proyecto ===")
for proyecto_id in asignacion_completa['Proyecto_idProyecto'].unique():
    metricas_por_proyecto[proyecto_id] = calcular_metricas_proyecto(proyecto_id)
    print(f"Proyecto {proyecto_id}: {metricas_por_proyecto[proyecto_id]}")



In [None]:
# ==========================
# CONEXIÓN AL DW Y CARGA
# ==========================
dw_conn = mysql.connector.connect(
    host="192.168.100.3",
    port=3306,
    user="etl_user",
    password="TuPasswordFuerte",
    database="db_soporte"
)
dw_cursor = dw_conn.cursor()
print("\nConectado al DW (db_soporte).")

if incidente_df.empty:
    print("No hay incidentes en OLTP. No se ha cargado nada al DW.")
else:
    print(f"\nCargando {len(incidente_df)} incidentes y datos relacionados al DW...")

    for _, inc in incidente_df.iterrows():
        pid = inc.get("Proyecto_idProyecto", 0)
        tid = inc.get("idTarea", 0)

        proy = safe_lookup(proyecto_by_id, pid, proyecto_fallback)
        cli = safe_lookup(cliente_by_id, proy.get("Cliente_idCliente", 0), cliente_fallback)
        emp = empleado_fallback
        eq = equipo_fallback

        # Buscar empleado asignado
        asign_task_proj = asignacion_df[
            (asignacion_df["Proyecto_idProyecto"] == pid) &
            (asignacion_df["Tarea_idTarea"] == tid)
        ] if not asignacion_df.empty else pd.DataFrame()
        
        if not asign_task_proj.empty:
            emp_id = asign_task_proj.iloc[0].get("Empleado_idEmpleado", 0)
            emp = safe_lookup(empleado_by_id, emp_id, empleado_fallback)
        else:
            if not empleado_df.empty:
                emp = empleado_df.iloc[0]
        
        eq = safe_lookup(equipo_by_id, emp.get("Equipo_idEquipo", 0), equipo_fallback)
        equipo_id = safe_int(eq.get("idEquipo", 0))

        # Estado proyecto
        idEstado, nombreEstado = 1, proy.get("Estado", "SinEstado")
        
        # Tiempo
        tinfo = {
            "idTiempo": 1,
            "fecha": proy.get("Fecha_fin_estimada"),
            "anio": 2025, "trimestre": 1, "mes": 1, "semana": 1, "dia": 1
        }
        
        # Calidad
        cinfo = {
            "idCalidad": 1, 
            "severidad_defecto": str(inc.get("Severidad", "")), 
            "tipo_incidente": "INCIDENTE", 
            "cert_calidad": 0
        }
        
        # Contar defectos por proyecto
        defectos_por_proyecto = incidente_df[incidente_df["Proyecto_idProyecto"] == pid]
        num_defectos = len(defectos_por_proyecto)

        metricas_proyecto = metricas_por_proyecto.get(pid, {
            'tareas_auto_total': 0,
            'tareas_reutilizadas_total': 0,
            'costo_defecto': 0.0,
            'avance': 0.0,
            'horas_estimadas_total': 0.0,
            'horas_reales_total': 0.0
        })

        # Construir diccionario de métricas con valores calculados por PROYECTO
        m = {
            "presupuesto": safe_float(proy.get("Presupuesto", 0.0)),
            "costo_real": safe_float(proy.get("Costo_real", 0.0)),
            "desviacion_presupuestal": safe_float(proy.get("Presupuesto", 0.0)) - safe_float(proy.get("Costo_real", 0.0)),
            "metrica_base_roi": safe_float(cli.get("MetricaClienteInicial", 0.0)),
            "metrica_final_roi": safe_float(proy.get("MetricaClienteFinal", 0.0)),
            "tareas_auto_total": metricas_proyecto['tareas_auto_total'],
            "tareas_reutilizadas_total": metricas_proyecto['tareas_reutilizadas_total'],
            "defectos_reportados": num_defectos,
            "costo_defecto": metricas_proyecto['costo_defecto'],
            "avance": metricas_proyecto['avance'],
            "horas_estimadas_total": metricas_proyecto['horas_estimadas_total'],
            "horas_reales_total": metricas_proyecto['horas_reales_total']
        }

        tarea = safe_lookup(tarea_by_id, tid, tarea_fallback)

        # ------------- PARÁMETROS -------------
        params = [
            int(cli.get("idCliente", 0)), str(cli.get("Nombre", "")), str(cli.get("Email", "")), 
            str(cli.get("Telefono", "")), str(cli.get("Industria", "")), safe_float(cli.get("MetricaClienteInicial", 0.0)),
            
            int(eq.get("idEquipo", 0)), str(eq.get("Nombre", "")), safe_int(eq.get("Activo", 0)),
            
            int(emp.get("idEmpleado", 0)), str(emp.get("Nombre", "")), str(emp.get("Email", "")), 
            safe_float(emp.get("Salario", 0.0)), safe_float(emp.get("SalarioxHora", 0.0)), int(emp.get("Equipo_idEquipo", 0)),
            
            int(idEstado), str(nombreEstado),
            
            int(proy.get("idProyecto", 0)), str(proy.get("Nombre", "")), str(proy.get("Tipo", "")), 
            str(proy.get("Descripcion", "")), safe_float(proy.get("Presupuesto", 0.0)), safe_float(proy.get("Costo_real", 0.0)), 
            safe_float(proy.get("MetricaClienteFinal", 0.0)), safe_int(proy.get("CertificacionSeguridad", 0)), 
            proy.get("Fecha_inicio", None), proy.get("Fecha_fin_estimada", None), proy.get("Fecha_fin_real", None), 
            int(proy.get("Cliente_idCliente", 0)), int(emp.get("Equipo_idEquipo", 0)), int(idEstado),
            
            int(tarea.get("idTarea", 0)), str(tarea.get("Titulo", "")), str(tarea.get("Descripcion", "")), 
            tarea.get("Fecha_creacion", None), tarea.get("Fecha_fin_estimada", None), tarea.get("Fecha_fin_real", None), 
            str(tarea.get("Prioridad", "")), safe_int(tarea.get("EsAutomatizacion", 0)), safe_int(tarea.get("EsReutilizado", 0)), 
            int(pid),
            
            int(tinfo["idTiempo"]), tinfo["fecha"], int(tinfo["anio"]), int(tinfo["trimestre"]), 
            int(tinfo["mes"]), int(tinfo["semana"]), int(tinfo["dia"]),
            
            int(cinfo["idCalidad"]), str(cinfo["severidad_defecto"]), str(cinfo["tipo_incidente"]), int(cinfo["cert_calidad"]),
            
            int(pid), int(pid), int(cli.get("idCliente", 0)), int(emp.get("Equipo_idEquipo", 0)), 
            int(tinfo["idTiempo"]), int(idEstado),
            
            safe_float(m["presupuesto"]), safe_float(m["costo_real"]), safe_float(m["desviacion_presupuestal"]), 
            safe_float(m["metrica_base_roi"]), safe_float(m["metrica_final_roi"]),
            safe_int(m["tareas_auto_total"]), safe_int(m["tareas_reutilizadas_total"]), safe_int(m["defectos_reportados"]),
            safe_float(m["costo_defecto"]), safe_float(m["avance"]), safe_float(m["horas_estimadas_total"]), 
            safe_float(m["horas_reales_total"]),
            
            int(inc.get("idIncidente", 0)), int(pid), int(tid), int(cinfo["idCalidad"]), 
            inc.get("Fecha_reporte", None), str(inc.get("Severidad", "")), str(inc.get("Estado", "")), 
            safe_float(inc.get("CostoCorreccion", 0.0))
        ]
        
        try:
            dw_cursor.callproc("cargar_todo_dw", params)
        except Exception as e:
            print(f"[ERROR] Al cargar incidente {inc.get('idIncidente', 'N/A')}: {e}")

    dw_conn.commit()
    print(f"\nSe cargaron {len(incidente_df)} incidentes y hechos relacionados al DW.")

dw_cursor.close()
dw_conn.close()
print("Conexión DW cerrada. ETL finalizado correctamente.")