In [1]:
# Instalación de librerías requeridas
%pip install sqlalchemy
%pip install psycopg2
%pip install psycopg2-binary
%pip install scikit-learn
%pip install pandas

import pandas as pd
import yaml
from sqlalchemy import create_engine

with open('../../configBD/config.yml', 'r') as f:
    cfg = yaml.safe_load(f)
    cfg_etl = cfg['bodega']
    cfg_bd = cfg['mensajeria']
cfg_etl  # verificación rápida

url_bd = f"{cfg_bd['driver']}://{cfg_bd['user']}:{cfg_bd['password']}@{cfg_bd['host']}:{cfg_bd['port']}/{cfg_bd['db']}"
url_etl = f"{cfg_etl['driver']}://{cfg_etl['user']}:{cfg_etl['password']}@{cfg_etl['host']}:{cfg_etl['port']}/{cfg_etl['db']}"

cliente_bd = create_engine(url_bd)     # base operativa
cliente_etl = create_engine(url_etl)    # Data Warehouse

# Carga de dimensiones
dim_cliente = pd.read_sql_table('dim_cliente', url_etl)
dim_mensajero = pd.read_sql_table('dim_mensajero', url_etl)
dim_sede = pd.read_sql_table('dim_sede', url_etl)
dim_tiempo = pd.read_sql_table('dim_tiempo', url_etl)

# normaliza dim_tiempo → genera columna date
dim_tiempo = dim_tiempo.rename(columns={'Año':'year','Mes':'month','Dia':'day'})
dim_tiempo['date'] = pd.to_datetime(dim_tiempo[['year','month','day']]).dt.date

# Carga de tablas operacionales
servicios = pd.read_sql_table('mensajeria_servicio', url_bd)
origenes = pd.read_sql_table('mensajeria_origenservicio', url_bd)
destinos = pd.read_sql_table('mensajeria_destinoservicio', url_bd)
estados_srv = pd.read_sql_table('mensajeria_estadosservicio', url_bd)
cat_estado = pd.read_sql_table('mensajeria_estado', url_bd)
cat_tipo_srv = pd.read_sql_table('mensajeria_tiposervicio', url_bd)

# ----- 0) copia base -----
fact_serv = servicios.copy()

# ----- 1) TiempoKey (fecha_solicitud) -----
fact_serv['FechaSolicitud'] = pd.to_datetime(fact_serv['fecha_solicitud']).dt.date
dim_tiempo_subset = dim_tiempo[['tiempo_key','date']].rename(columns={'tiempo_key':'TiempoKey'})
fact_serv = fact_serv.merge(dim_tiempo_subset,
                          left_on='FechaSolicitud',
                          right_on='date', how='left') \
                   .drop(columns=['date'])

# ----- 2) FK Cliente -----
fact_serv = fact_serv.merge(dim_cliente[['ClienteKey','cliente_id']],
                          on='cliente_id', how='left')

# ----- 3) FK Mensajero (titular) -----
fact_serv = fact_serv.merge(dim_mensajero[['MensajeroKey','user_id']],
                          left_on='mensajero_id', right_on='user_id',
                          how='left').drop(columns=['user_id'])

# ----- 4) FK Sede Origen y Destino -----
# 4a) unir origen
fact_serv = (
    fact_serv
    .merge(
        origenes[['id','cliente_id','ciudad_id']],
        left_on='origen_id', right_on='id',
        how='left',
        suffixes=('','_ori')
    )
    .rename(columns={
        'ciudad_id': 'ciudad_ori',
        'cliente_id_ori': 'cliente_ori'
    })
    .drop(columns=['id_ori'])
)

# 4b) unir destino
fact_serv = (
    fact_serv
    .merge(
        destinos[['id','cliente_id','ciudad_id']],
        left_on='destino_id', right_on='id',
        how='left',
        suffixes=('','_des')
    )
    .rename(columns={
        'ciudad_id': 'ciudad_des',
        'cliente_id_des': 'cliente_des'
    })
    .drop(columns=['id_des'])
)

# 4c) lookup en dim_sede para SedeOrigenKey
fact_serv = (
    fact_serv
    .merge(
        dim_sede[['SedeKey','ciudad_id','cliente_id']],
        left_on=['ciudad_ori','cliente_ori'],
        right_on=['ciudad_id','cliente_id'],
        how='left',
        suffixes=('','_dim')
    )
    .rename(columns={'SedeKey':'SedeOrigenKey'})
    .drop(columns=['ciudad_id','cliente_id_dim'])
)

# 4d) lookup en dim_sede para SedeDestinoKey
fact_serv = (
    fact_serv
    .merge(
        dim_sede[['SedeKey','ciudad_id','cliente_id']],
        left_on=['ciudad_des','cliente_des'],
        right_on=['ciudad_id','cliente_id'],
        how='left',
        suffixes=('','_dim')
    )
    .rename(columns={'SedeKey':'SedeDestinoKey'})
    .drop(columns=['ciudad_id','cliente_id_dim'])
)

# ----- 5) TipoServicio -----
fact_serv = (
    fact_serv
    .merge(
        cat_tipo_srv[['id','nombre']],
        left_on='tipo_servicio_id',
        right_on='id',
        how='left',
        suffixes=('','_tipo')
    )
    .rename(columns={'nombre':'TipoServicio'})
    .drop(columns=['id_tipo'])
)

# ----- 6) EstadoServicio (último estado registrado) -----
# 6a) fusionar nombre del estado
estados_srv = (
    estados_srv
    .merge(
        cat_estado[['id','nombre']],
        left_on='estado_id',
        right_on='id',
        how='left',
        suffixes=('','_cat')
    )
    .rename(columns={'nombre':'EstadoNom'})
    .drop(columns=['id_cat'])
)

# Eliminar columnas duplicadas
estados_srv = estados_srv.loc[:, ~estados_srv.columns.duplicated()]

# 6b) timestamp con microsegundos
estados_srv['timestamp'] = pd.to_datetime(
    estados_srv['fecha'].astype(str) + ' ' + estados_srv['hora'].astype(str),
    format='%Y-%m-%d %H:%M:%S.%f',
    errors='coerce'
)

# 6c) extraer último estado
ult_estado = (
    estados_srv
    .sort_values('timestamp')
    .groupby('servicio_id', as_index=False)
    .tail(1)[['servicio_id','EstadoNom','timestamp']]
)

# 6d) merge en fact_serv
fact_serv = (
    fact_serv
    .merge(ult_estado,
          left_on='id', right_on='servicio_id',
          how='left')
    .rename(columns={'EstadoNom':'EstadoServicio'})
    .drop(columns=['servicio_id'])
)

# Eliminar columnas duplicadas
fact_serv = fact_serv.loc[:, ~fact_serv.columns.duplicated()]

# ----- 7) EsFinal (TRUE si el estado es uno de los finales) -----
finales = ['Entregado en destino', 'Terminado completo',]
fact_serv['EsFinal'] = fact_serv['EstadoServicio'].isin(finales)

# ----- 8) Timestamps de tracking alternativos -----
# 8.1) Inicio = fecha_solicitud + hora_solicitud
fact_serv['Tiempo_Inicio'] = pd.to_datetime(
    fact_serv['fecha_solicitud'].astype(str) + ' ' +
    fact_serv['hora_solicitud'].astype(str),
    errors='coerce'
)

# 8.2) Asignado = fecha_solicitud + hora_visto_por_mensajero
fact_serv['Tiempo_Mensajero_Asignado'] = pd.to_datetime(
    fact_serv['fecha_solicitud'].astype(str) + ' ' +
    fact_serv['hora_visto_por_mensajero'].astype(str),
    errors='coerce'
)

# 8.3) Recogido = fecha_deseada + hora_deseada
fact_serv['Tiempo_Recogido_Origen'] = pd.to_datetime(
    fact_serv['fecha_deseada'].astype(str) + ' ' +
    fact_serv['hora_deseada'].astype(str),
    errors='coerce'
)

# 8.4) Entregado = Recogido + horas según prioridad
priority_hours = {
    'Alta: En una Hora': 1,
    'Media: De 1 a 3 horas': 2,
    'Media: De 1 - 3 Horas': 2,
    'Baja: Transcurso del Dia': 8
}
fact_serv['Tiempo_Entregado_Destino'] = (
    fact_serv['Tiempo_Recogido_Origen'] +
    fact_serv['prioridad']
             .map(priority_hours)
             .fillna(1)
             .apply(lambda h: pd.Timedelta(hours=h))
)

fact_serv['Minutos_Entregado'] = fact_serv['Tiempo_Entregado_Destino'].dt.minute




# ----- Verificación y limpieza de datos temporales -----
print("\nVerificación de datos temporales:")
for col in ['Tiempo_Inicio', 'Tiempo_Mensajero_Asignado', 'Tiempo_Recogido_Origen', 'Tiempo_Entregado_Destino']:
    # Convertir a datetime y manejar valores inválidos
    fact_serv[col] = pd.to_datetime(fact_serv[col], errors='coerce')
    
    # Verificar si hay valores nulos
    null_count = fact_serv[col].isnull().sum()
    if null_count > 0:
        print(f"Advertencia: {col} tiene {null_count} valores nulos/inválidos")

# ----- Cálculo robusto de duraciones -----
def calcular_duracion(inicio, fin):
    try:
        # Verificar que ambos valores no sean nulos
        if pd.isnull(inicio) or pd.isnull(fin):
            return None
        
        # Calcular diferencia en minutos
        diff = (fin - inicio).total_seconds() / 60
        
        # Asegurar que no sea negativo
        return max(0, diff) if pd.notnull(diff) else None
    except:
        return None

# Calcular cada duración con manejo de errores
fact_serv['Duracion_Total_Min'] = fact_serv.apply(
    lambda x: calcular_duracion(x['Tiempo_Inicio'], x['Tiempo_Entregado_Destino']), axis=1
).round(2)

fact_serv['Duracion_Asignacion_Min'] = fact_serv.apply(
    lambda x: calcular_duracion(x['Tiempo_Inicio'], x['Tiempo_Mensajero_Asignado']), axis=1
).round(2)

fact_serv['Duracion_Recogida_Min'] = fact_serv.apply(
    lambda x: calcular_duracion(x['Tiempo_Mensajero_Asignado'], x['Tiempo_Recogido_Origen']), axis=1
).round(2)

fact_serv['Duracion_Entrega_Min'] = fact_serv.apply(
    lambda x: calcular_duracion(x['Tiempo_Recogido_Origen'], x['Tiempo_Entregado_Destino']), axis=1
).round(2)

fact_serv['Duracion_Cierre_Min'] = fact_serv.apply(
    lambda x: calcular_duracion(x['Tiempo_Inicio'], x['Tiempo_Entregado_Destino']), axis=1
).round(2)

# ----- Cálculo de eficiencia con manejo de valores nulos -----
# Primero calcular el promedio de duración por mensajero
avg_duracion_por_mensajero = fact_serv.groupby('MensajeroKey')['Duracion_Total_Min'].transform('mean')

# Calcular eficiencia con manejo de nulos
fact_serv['Eficiencia_Mensajero'] = fact_serv.apply(
    lambda x: (x['Duracion_Total_Min'] / avg_duracion_por_mensajero[x.name]).round(2) 
    if pd.notnull(x['Duracion_Total_Min']) and pd.notnull(avg_duracion_por_mensajero[x.name]) 
    else None, 
    axis=1
)

# Imputar valores nulos para duraciones (usando la mediana)
for col in ['Duracion_Total_Min', 'Duracion_Asignacion_Min', 'Duracion_Recogida_Min', 'Duracion_Entrega_Min']:
    median_val = fact_serv[col].median()
    fact_serv[col] = fact_serv[col].fillna(median_val)

# Para eficiencia, imputar 1.0 (valor neutral)
fact_serv['Eficiencia_Mensajero'] = fact_serv['Eficiencia_Mensajero'].fillna(1.0)

# ----- Añadir nuevas columnas temporales -----
# 1) Columnas temporales para análisis
fact_serv['Mes'] = pd.to_datetime(fact_serv['Tiempo_Inicio']).dt.month_name()
fact_serv['DiaSemana'] = pd.to_datetime(fact_serv['Tiempo_Inicio']).dt.day_name()
fact_serv['Año'] = pd.to_datetime(fact_serv['Tiempo_Inicio']).dt.year
fact_serv['Hora'] = pd.to_datetime(fact_serv['Tiempo_Inicio']).dt.hour
fact_serv['DiaMes'] = pd.to_datetime(fact_serv['Tiempo_Inicio']).dt.day

# ----- 9) limpieza de columnas intermedias -----
drop_cols = (
    [c for c in fact_serv.columns if c.startswith(('fecha_','hora_','ciudad_','cliente_'))]
    + ['servicio_id']
)
fact_serv = fact_serv.drop(columns=drop_cols, errors='ignore')

# 0) renombra el PK para que sea ServicioKey
fact_serv = fact_serv.rename(columns={'id':'ServicioKey'})

# Definición de columnas finales incluyendo las nuevas métricas
cols_final = [
    'ServicioKey', 'TiempoKey', 'ClienteKey', 'SedeOrigenKey', 'SedeDestinoKey',
    'MensajeroKey', 'TipoServicio', 'EstadoServicio', 'EsFinal',
    'Tiempo_Inicio', 'Tiempo_Mensajero_Asignado', 'Tiempo_Recogido_Origen',
    'Tiempo_Entregado_Destino',
    'Mes', 'DiaSemana', 'Año', 'Hora', 'DiaMes',  # Columnas temporales
    'Duracion_Total_Min', 'Duracion_Asignacion_Min', 'Duracion_Recogida_Min',
    'Duracion_Entrega_Min', 'Duracion_Cierre_Min', # Métricas de duración
    'Eficiencia_Mensajero'  # Indicador de eficiencia
]

# Seleccionar solo las columnas finales
fact_serv = fact_serv[cols_final]

# Verificación final de valores nulos
print("\nResumen final de valores nulos:")
print(fact_serv.isnull().sum())

# Subconjunto de 5 000 filas
fact_serv_sub = fact_serv.head(5000)

# Reset de transacción y volcado
raw = cliente_etl.raw_connection()
raw.rollback()
raw.close()

# Guardar en la base de datos
fact_serv_sub.to_sql(
    'fact_servicios',
    cliente_etl,
    if_exists='replace',
    index=False,
    method='multi',
    chunksize=1000
)

print("\nProceso completado exitosamente. Datos guardados en la tabla fact_servicios.")








Note: you may need to restart the kernel to use updated packages.


Collecting psycopg2
  Using cached psycopg2-2.9.10.tar.gz (385 kB)


  Installing build dependencies ... [?25l-

 \

 |

 done


[?25h  Getting requirements to build wheel ... [?25l-

 error
  [1;31merror[0m: [1msubprocess-exited-with-error[0m
  
  [31m×[0m [32mGetting requirements to build wheel[0m did not run successfully.
  [31m│[0m exit code: [1;36m1[0m
  [31m╰─>[0m [31m[34 lines of output][0m
  [31m   [0m !!
  [31m   [0m 
  [31m   [0m         ********************************************************************************
  [31m   [0m         Please consider removing the following classifiers in favor of a SPDX license expression:
  [31m   [0m 
  [31m   [0m         License :: OSI Approved :: GNU Library or Lesser General Public License (LGPL)
  [31m   [0m 
  [31m   [0m         See https://packaging.python.org/en/latest/guides/writing-pyproject-toml/#license for details.
  [31m   [0m         ********************************************************************************
  [31m   [0m 
  [31m   [0m !!
  [31m   [0m   self._finalize_license_expression()
  [31m   [0m running egg_info
  [31m   [0m writi

[?25h

Note: you may need to restart the kernel to use updated packages.




Note: you may need to restart the kernel to use updated packages.








Note: you may need to restart the kernel to use updated packages.






Note: you may need to restart the kernel to use updated packages.


  fact_serv['Tiempo_Mensajero_Asignado'] = pd.to_datetime(



Verificación de datos temporales:
Advertencia: Tiempo_Mensajero_Asignado tiene 1341629 valores nulos/inválidos
Advertencia: Tiempo_Recogido_Origen tiene 103 valores nulos/inválidos
Advertencia: Tiempo_Entregado_Destino tiene 103 valores nulos/inválidos



Resumen final de valores nulos:
ServicioKey                        0
TiempoKey                          0
ClienteKey                         0
SedeOrigenKey                    246
SedeDestinoKey                  2358
MensajeroKey                   33530
TipoServicio                       0
EstadoServicio                     0
EsFinal                            0
Tiempo_Inicio                      0
Tiempo_Mensajero_Asignado    1341629
Tiempo_Recogido_Origen           103
Tiempo_Entregado_Destino         103
Mes                                0
DiaSemana                          0
Año                                0
Hora                               0
DiaMes                             0
Duracion_Total_Min                 0
Duracion_Asignacion_Min            0
Duracion_Recogida_Min              0
Duracion_Entrega_Min               0
Duracion_Cierre_Min              103
Eficiencia_Mensajero               0
dtype: int64



Proceso completado exitosamente. Datos guardados en la tabla fact_servicios.
