In [1]:
from datetime import date
import numpy as np
import pandas as pd
import yaml
from sqlalchemy import create_engine

In [2]:
# Configuración de las conexiones a las bases de datos
with open('./config.yml', 'r') as f:
    config = yaml.safe_load(f)
    config_ryf = config['RAPIDO_Y_FURIOSO']
    config_etl = config['ETL']

url_ryf = (f"{config_ryf['drivername']}://{config_ryf['user']}:{config_ryf['password']}@{config_ryf['host']}:"
        f"{config_ryf['port']}/{config_ryf['dbname']}")
url_etl = (f"{config_etl['drivername']}://{config_etl['user']}:{config_etl['password']}@{config_etl['host']}:"
        f"{config_etl['port']}/{config_etl['dbname']}")

ryf = create_engine(url_ryf)
etl_conn = create_engine(url_etl)

In [3]:
# Extraer datos necesarios de la base de datos operacional
estados_servicio = pd.read_sql_query("""
    SELECT 
        id as estado_servicio_id,
        servicio_id,
        estado_id,
        fecha_hora_estado,
        usuario_registro
    FROM estados_servicio
""", ryf)

# Obtener las dimensiones relacionadas de la bodega de datos
dim_servicio = pd.read_sql_table('fact_servicio', etl_conn)
dim_estado = pd.read_sql_table('estado', etl_conn)
dim_fecha = pd.read_sql_table('fecha', etl_conn)

In [4]:
# Convertir fecha_hora_estado a datetime si no lo está ya
estados_servicio['fecha_hora_estado'] = pd.to_datetime(estados_servicio['fecha_hora_estado'])

# Crear columnas de fecha y hora para los joins
estados_servicio['fecha'] = estados_servicio['fecha_hora_estado'].dt.date
estados_servicio['hora'] = estados_servicio['fecha_hora_estado'].dt.hour

In [5]:
# Realizar los joins con las dimensiones
fact_estados = estados_servicio.merge(
    dim_servicio,
    left_on='servicio_id',
    right_on='servicio_key',
    how='left'
).merge(
    dim_estado,
    left_on='estado_id',
    right_on='estado_key',
    how='left'
).merge(
    dim_fecha,
    left_on='fecha',
    right_on='fecha_valor',
    how='left'
)

In [6]:
# Limpiar y preparar los datos
fact_estados.replace({
    np.nan: 'no aplica',
    'NaT': 'no aplica'
}, inplace=True)

# Convertir tipos de datos según sea necesario
fact_estados['servicio_key'] = fact_estados['servicio_key'].astype('Int64')
fact_estados['estado_key'] = fact_estados['estado_key'].astype('Int64')
fact_estados['fecha_key'] = fact_estados['fecha_key'].astype('Int64')
fact_estados['hora'] = fact_estados['hora'].astype('Int64')

In [7]:
# Seleccionar solo las columnas necesarias para la tabla de hechos
fact_estados = fact_estados[[
    'estado_servicio_id',
    'servicio_key',
    'estado_key',
    'fecha_hora_estado',
    'fecha_key',
    'hora',
    'usuario_registro'
]]

In [8]:
# Agregar columna de fecha de carga
fact_estados['fecha_carga'] = date.today()

In [9]:
# Cargar los datos en la tabla de hechos
fact_estados.to_sql(
    'fact_estados_servicio',
    etl_conn,
    if_exists='replace',
    index=False
)