## **EXTRACCIÓN, TRANSFORMACIÓN Y CARGA**

- **Objetivos:**
    - **Extraer la información de dos orígenes de datos**
    - **Realizar la correcta transformación y limpieza**
    - **Modelar en dos tablas de hechos y sus correspondientes dimensiones**
    - **Concretar la carga al servidor donde está alojada la base de datos**

**DEPENDENCIAS**

In [None]:
# DATOS
import pandas as pd
import numpy as np
import re

# BASE DE DATOS NEON
from sqlalchemy import create_engine, text
from contextlib import contextmanager
from sqlalchemy.orm import sessionmaker

**CONECTOR Y CONSULTOR DE BASE DE DATOS**

In [None]:
# MÓDULO PARA CONECTAR E INTERACTUAR CON LA BASE DE DATOS
"""
Módulo para gestionar la conexión a PostgreSQL (Neon).

Este módulo provee una clase simple para conectarse a una base Neon
utilizando SQLAlchemy, y ejecutar consultas dentro de un contexto seguro.
"""

# Credenciales explícitas (educativo, no recomendado en producción)
NEON_DATABASE_URL = (
    "postgresql://neondb_owner:npg_3IfJpa0gQNqT"
    "@ep-round-hall-aegfmhdo-pooler.c-2.us-east-2.aws.neon.tech/neondb"
    "?sslmode=require&channel_binding=require"
)


class ConnectorNeon:
    """
    Clase para gestionar la conexión a PostgreSQL Neon.
    """

    def __init__(self):
        """
        Inicializa el conector utilizando la URL completa de conexión.
        """
        self.db_url = NEON_DATABASE_URL

    def engine(self):
        """
        Crea y retorna un engine SQLAlchemy para PostgreSQL.

        Returns
        -------
        sqlalchemy.engine.Engine
            Motor de conexión a Neon.
        """
        engine = create_engine(self.db_url, isolation_level="AUTOCOMMIT")
        return engine

    @contextmanager
    def db_session(self):
        """
        Context manager para manejar sesiones con commit/rollback automático.

        Yields
        ------
        sqlalchemy.orm.Session
            Sesión activa de base de datos.
        """
        engine = self.engine()
        Session = sessionmaker(bind=engine)
        session = Session()

        try:
            print("Sesión a Neon establecida...")
            yield session
            session.commit()
        except Exception as e:
            session.rollback()
            print(f"Error en la consulta: {e}")
            raise e
        finally:
            print("Cerrando sesión de Neon...")
            session.close()


In [None]:
# COMPROBANDO CONEXIÓN
con = ConnectorNeon()

with con.db_session() as s:
    result = s.execute(text("SELECT 'Hola Neon'"))
    print(result.fetchone())


Sesión a Neon establecida...
('Hola Neon',)
Cerrando sesión de Neon...


## **EXTRACCIÓN**

In [None]:
# COMO FUENTE DE DATOS TENEMOS SOLICITUDES SOBRE DECKS EN CABA Y SEGUNDA FUENTE LAS SOLICITUDES DE BA COLABORATIVA PARA DICIEMBRE 2023
url = "https://raw.githubusercontent.com/NicolasCH24/Decks/refs/heads/main/decks.xlsx"
url2 = "https://raw.githubusercontent.com/NicolasCH24/Decks/refs/heads/main/bac-2023.xlsx"

**EXTRACCIÓN Y TRANSFORMACIÓN DE BA COLABORATIVA**

In [129]:
# PRIMERA LECTURA
df_bac = pd.read_excel(url2)

In [130]:
# VALORES FALTANTES
df_bac.isnull().any()

Unnamed: 0         False
nro_solicitud      False
periodo            False
categoria          False
prestacion         False
tipo               False
fecha_ingreso      False
hora_ingreso       False
comuna              True
barrio              True
calle               True
altura              True
esquina_proxima     True
canal              False
x                   True
y                   True
genero             False
estado_general     False
año                False
mes                False
dtype: bool

In [132]:
# DIMENSIONES
df_bac.shape

(67222, 20)

In [134]:
# FUNCION DE TRANSFORMACIÓN Y LIMPIEZA
def extract_and_clean_bac(url):
    # LECTURA
    df_bac = pd.read_excel(url)
    # LIMPIEZA Y FILTRADO
    df_bac_no_null = df_bac[~df_bac['comuna'].isna() & (~df_bac['calle'].isna()) & (~df_bac['altura'].isna())]
    df_filtered = df_bac_no_null[['nro_solicitud', 'categoria', 'prestacion', 'fecha_ingreso', 'comuna', 'barrio', 'calle',
       'altura', 'canal',
       'estado_general']].copy()
    df_filtered['altura'] = df_filtered['altura'].astype(int)

    df_filtered["comuna"] = df_filtered["comuna"].str.replace("_", " ", regex=False)

    # MÉTODO APPLY PARA CAMBIAR APP BA 147 POR APP BA COLABORATIVA
    def change_channel(channel):
        if channel == 'App BA 147':
            return "App BA Colaborativa"
        else:
            return channel
   
    df_filtered['canal'] = df_filtered['canal'].apply(change_channel)

    return df_filtered.sort_values(by='fecha_ingreso', ascending=True)

df_filtered_bac = extract_and_clean_bac(url2)

In [135]:
# PRIMERAS 4 FILAS
df_filtered_bac.head(4)

Unnamed: 0,nro_solicitud,categoria,prestacion,fecha_ingreso,comuna,barrio,calle,altura,canal,estado_general
10755,00948637/23,Higiene,Retiro de restos de poda o jardinería domicili...,2023-12-01,COMUNA 11,Villa Del Parque,"CARRANZA, ADOLFO P.",2425,App BA Colaborativa,Cerrado
43942,00950843/23,Higiene,Retiro de escombros/restos de obra,2023-12-01,COMUNA 2,Recoleta,RODRIGUEZ PEÑA,1416,GCS Web,Cerrado
43943,00950879/23,Denuncia Vial,Vehículo mal estacionado,2023-12-01,COMUNA 3,Balvanera,"URQUIZA, GRAL.",381,Boti,Cerrado
43945,00950962/23,Higiene,Retiro de escombros/restos de obra,2023-12-01,COMUNA 14,Palermo,CHARCAS,4312,GCS Web,Cerrado


**EXTRACCIÓN Y TRANSFORMACIÓN DECKS**

In [136]:
df = pd.read_excel(url)

In [137]:
# INDICA LAS DIMENSIONES DE LA FUENTE DE DATOS EN (FILAS, COLUMNAS)
df.shape

(789, 13)

In [138]:
# OBTENEMOS EL TIMPO DE DATO POR COLUMNA
df.dtypes

anio                                 float64
expediente                            object
solicitante                           object
tipo_documento                        object
nro_documento                          int64
calle                                 object
altura                                object
barrio                                object
comuna                                 int64
estado                                object
nro_disposicion_resolucion            object
fecha_inicio                  datetime64[ns]
fecha_vencimiento             datetime64[ns]
dtype: object

In [139]:
# VEMOS LAS COLUMNAS
df.columns

Index(['anio', 'expediente', 'solicitante', 'tipo_documento', 'nro_documento',
       'calle', 'altura', 'barrio', 'comuna', 'estado',
       'nro_disposicion_resolucion', 'fecha_inicio', 'fecha_vencimiento'],
      dtype='object')

In [140]:
# DESCRIPCION GENERAL DE LA SERIE NUMÉRICA
df.describe()

Unnamed: 0,anio,nro_documento,comuna,fecha_inicio,fecha_vencimiento
count,788.0,789.0,789.0,789,789
mean,2022.426396,27294140000.0,11.2218,2023-05-29 04:59:18.935361024,2028-04-23 03:28:03.650190080
min,2019.0,14038100.0,1.0,2020-10-16 00:00:00,2025-07-21 00:00:00
25%,2022.0,27215220000.0,10.0,2022-09-19 00:00:00,2027-08-08 00:00:00
50%,2023.0,30715160000.0,13.0,2023-07-19 00:00:00,2028-06-07 00:00:00
75%,2023.0,30717180000.0,14.0,2024-01-08 00:00:00,2028-11-30 00:00:00
max,2025.0,33718220000.0,15.0,2025-10-24 00:00:00,2030-07-11 00:00:00
std,1.26568,6925793000.0,4.297057,,


In [141]:
# CIUDADANOS UNICOS
df['nro_documento'].nunique()

755

In [142]:
# OBSERVAMOS DATOS FALTANTES SOLO EN EL AÑO, LO CUAL NO NOS IMPORTA PORQUE NOS VAMOS A BASAR EN LA FECHA DE INICIO DE LA SOLICITUD
df.isnull().any()

anio                           True
expediente                    False
solicitante                   False
tipo_documento                False
nro_documento                 False
calle                         False
altura                        False
barrio                        False
comuna                        False
estado                        False
nro_disposicion_resolucion    False
fecha_inicio                  False
fecha_vencimiento             False
dtype: bool

In [143]:
# TENEMOS CUITS Y DNIS LOS CAULES DECIDIMOS PASAR A FORMATO DNI
# EL ESTADO UNICO QUE EXISTE ES EL OTORGADO
print(df['tipo_documento'].unique())
print("")
print(df['estado'].unique())

['CUIT' 'DNI']

['OTORGADO']


In [144]:
# UNA VISTA DE LOS ÚLTIMOS REGISTROS
df.tail(4)

Unnamed: 0,anio,expediente,solicitante,tipo_documento,nro_documento,calle,altura,barrio,comuna,estado,nro_disposicion_resolucion,fecha_inicio,fecha_vencimiento
785,2019.0,EX-2019-8928504-GCABA-DGOEP,FRESCOLLI Y BATATA S.R.L.,CUIT,30716094894,EL SALVADOR,4676,Palermo,14,OTORGADO,RS-2021-02257974-GCABA-COMUNA14,2021-09-03,2026-09-03
786,2019.0,EX-2019-30635550-GCABA-DGOEP,RUBJOR S.R.L,CUIT,33716296429,ARCOS,1785,Belgrano,13,OTORGADO,RS-2021-9205719-GCABA-COMUNA13,2021-03-19,2026-03-19
787,2019.0,EX-2019-26919035-GCABA-DGOEP,CARECHUR S.R.L,CUIT,33716426829,AMENABAR,2363,Belgrano,13,OTORGADO,RS-2022-36725897-GCABA-COMUNA13,2022-10-13,2027-10-13
788,,EX-2022-30276443- -GCABA-DGCCT- EX-2022-315629...,BUSTOS ROSENDO MATEO,CUIT,20246590797,AGUIRRE,484,Villa Crespo,15,OTORGADO,RS-2023-05998793-GCABA-COMUNA15,2023-02-03,2028-02-03


In [145]:
def extract_and_clean_decks(url):
    df_decks = pd.read_excel(url)
    # NORMALIZAMOS LOS DOCUMENTOS
    def get_documento(nro_dni_cuit):
        nro_txt = str(nro_dni_cuit)
        if len(nro_txt) == 11:
            return nro_txt[2:-1]
        elif len(nro_txt) == 8:
            return nro_txt
        else:
            return "Documento erróneo"
        
    df_decks['documento'] = df_decks['nro_documento'].apply(get_documento)
    # NORMALIZAMOS ALTURA
    def limpiar_altura(valor):
        if pd.isna(valor):
            return np.nan
        valor_str = str(valor)

        match = re.search(r'\d{1,}', valor_str)
        if match:
            return int(match.group())
        return np.nan 

    df_decks['altura_corregida'] = df_decks['altura'].apply(limpiar_altura)
    df_decks['altura_corregida'] = df_decks['altura_corregida'].astype(str)

    # ORDENAMOS LAS COLUMNAS A GUSTO Y DE MAS ANTIGUA A MAS RECIENTE FECHA DE INICIO
    df_filtered_decks = df_decks[['fecha_inicio', 'fecha_vencimiento', 'solicitante', 'documento', 'calle', 'altura_corregida', 'barrio', 'comuna', 'estado', 'expediente', 'nro_disposicion_resolucion']].sort_values(by='fecha_inicio', ascending=True).copy()
    df_filtered_decks = df_filtered_decks.rename(columns={'altura_corregida':'altura'})

    df_filtered_decks['año'] = df_filtered_decks['fecha_inicio'].dt.year

    return df_filtered_decks

df_filtered_decks = extract_and_clean_decks(url)


In [146]:
df_filtered_decks

Unnamed: 0,fecha_inicio,fecha_vencimiento,solicitante,documento,calle,altura,barrio,comuna,estado,expediente,nro_disposicion_resolucion,año
765,2020-10-16,2025-10-16,BUENA HUERTA SRL,71609774,CERVIÑO AV.,3889,Palermo,14,OTORGADO,EX-2020-09491688-GCABA-DGCCT,RS-2020-25074837-GCABA-COMUNA14 RS-2022-386534...,2020
782,2020-10-16,2025-10-16,KOI BAR S.R.L,71543324,LAVALLEJA,1387,Palermo,14,OTORGADO,EX-2019-12236586-GCABA-DGOEP,RS-2020-25076833-GCABA-COMUNA14,2020
740,2020-10-19,2025-10-19,BOTAFRIA BRASSERIE S.R.L,71172124,RIVADAVIA AV.,3401,Almagro,5,OTORGADO,EX-2020-21123690- -GCABA-DGCCT,RS-2020-25186983-GCABA-COMUNA5,2020
773,2020-11-09,2025-11-09,PURA MASA S.R.L,71677247,MIÑONES,1886,Belgrano,13,OTORGADO,EX-2020-19425941- -GCABA-DGCDPU,RS-2020-27123653-GCABA-COMUNA13,2020
756,2020-11-10,2025-11-10,PAMPANITO S.A,71567564,"RIVADAVIA MARTIN, COMODORO",1694,Núñez,13,OTORGADO,EX-2020-22334563- -GCABA-DGCCT,RS-2020-27311791-GCABA-COMUNA13,2020
...,...,...,...,...,...,...,...,...,...,...,...,...
5,2025-06-27,2030-06-27,MATEUS MARTINEZ ANGELICA MARIA,95569901,LAPRIDA,1344,Recoleta,2,OTORGADO,EX-2025-08244233- -GCABA-DGPF,DI-2025-1858-GCABA-DGPF,2025
103,2025-06-30,2030-06-30,REENCUENTRO 6401 SRL.,71773059,DORREGO AV.,1194,Chacarita,15,OTORGADO,EX-2024-48524971- -GCABA-DGPF,DI-2025-1924-GCABA-DGPF,2025
81,2025-07-11,2030-07-11,Cooperativa de Trabajo Centro Cultural Nueva U...,71790979,URIARTE,1289,Palermo,14,OTORGADO,EX-2024-41678969- -GCABA-DGPF,DI-2025-2044-GCABA-DGPF\n,2025
1,2025-07-21,2025-07-21,IGNACIO ROBERTO GOMEZ,39626712,ARENALES,2434,Recoleta,2,OTORGADO,EX-2025-13053524- -GCABA-DGPF,DI-2025-2142-GCABA-DGPF,2025


**MODELADO ESTRELLA BA COLABORATIVA**

- **Para ello vamos a crear una tabla de hechos y la dimension de categoria/prestacion, comunas/barrios, canal y estados**

In [160]:
# DIVIDIMOS DATASET EN 1 Y 2 PARA SIMULAR UNA PRIMERA CARGA Y PRIMERA ACTUALIZACIÓN POR LOTES
df_filtered_bac['dia_del_año'] = df_filtered_bac['fecha_ingreso'].dt.dayofyear

In [None]:
# FUENTES DIVIDIDAS
# ANTEÚLTIMO Y ÚLTIMO DÍA DEL AÑO
df_filtered_bac1 = df_filtered_bac[df_filtered_bac['dia_del_año'] == 364]
df_filtered_bac2 = df_filtered_bac[df_filtered_bac['dia_del_año'] == 365]

In [165]:
def modeling_bac_data(df_filtered_bac):
    # FCT HECHOS
    fct_bac = df_filtered_bac.drop(columns='semana', axis=1)

    # DIM CATEGORIA PRESTACION
    dim_categoria_prestacion = fct_bac[['categoria', 'prestacion']]
    dim_categoria_prestacion = dim_categoria_prestacion.drop_duplicates(subset='prestacion')

    # DIM COMUNAS BARRIOS
    dim_comunas_barrios = fct_bac[['comuna', 'barrio']]
    dim_comunas_barrios = dim_comunas_barrios.drop_duplicates(subset='barrio')

    # DIM CANALES
    dim_canales = fct_bac[['canal']].drop_duplicates()

    # DIM ESTADOS
    dim_estados = fct_bac[['estado_general']].drop_duplicates()

    return fct_bac, dim_categoria_prestacion, dim_comunas_barrios, dim_canales, dim_estados

fct_bac, dim_categoria_prestacion, dim_comunas_barrios, dim_canales, dim_estados = modeling_bac_data(df_filtered_bac1)

In [None]:
# CREAMOS PRIMERO LAS TABLAS SEGUN EL MODELADO (SOLO EJECUTAR SI LAS TABLAS NO ESTAN CREADAS)
def create_schema_bac():
    con = ConnectorNeon()

    create_tables_script = text("""

    -- Tabla de hechos
    CREATE TABLE IF NOT EXISTS fct_bac (
        id SERIAL PRIMARY KEY,
        nro_solicitud TEXT,
        categoria TEXT,
        prestacion TEXT,
        fecha_ingreso TIMESTAMP,
        comuna TEXT,
        barrio TEXT,
        calle TEXT,
        altura INTEGER,
        canal TEXT,
        estado_general TEXT
    );

    -- Dimensión: categoría y prestación
    CREATE TABLE IF NOT EXISTS dim_bac_categoria_prestacion (
        id SERIAL PRIMARY KEY,
        categoria TEXT,
        prestacion TEXT
    );

    -- Dimensión: comunas y barrios
    CREATE TABLE IF NOT EXISTS dim_bac_comunas_barrios (
        id SERIAL PRIMARY KEY,
        comuna TEXT,
        barrio TEXT
    );

    -- Dimensión: canales
    CREATE TABLE IF NOT EXISTS dim_bac_canales (
        id SERIAL PRIMARY KEY,
        canal TEXT
    );

    -- Dimensión: estados generales
    CREATE TABLE IF NOT EXISTS dim_bac_estados (
        id SERIAL PRIMARY KEY,
        estado_general TEXT
    );

    """)

    with con.db_session() as s:
        s.execute(create_tables_script)

    return "Todas las tablas fueron creadas correctamente."

#created_schema = create_schema_bac()

Sesión a Neon establecida...
Cerrando sesión de Neon...


**CARGA DE DIMENSIONES Y HECHOS BA COLABORATIVA**

In [157]:
# CARGA DE FCT
def get_max_fecha_fct(conector):
    """
    Retorna la última fecha_ingreso cargada en fct_bac.
    """
    query = text("SELECT MAX(fecha_ingreso) AS max_fecha FROM fct_bac")
    with conector.db_session() as s:
        result = s.execute(query).fetchone()
        return result[0]  # puede ser None si la tabla está vacía

def load_fct_bac(fct_bac_df, conector):
    """
    Inserta datos nuevos en la fact table fct_bac.
    Solo sube registros con fecha_ingreso mayor a la última cargada.
    """
    # Obtener la última fecha_ingreso de la tabla
    fecha_minima = get_max_fecha_fct(conector)
    
    if fecha_minima is not None:
        fct_bac_df = fct_bac_df[fct_bac_df['fecha_ingreso'] > fecha_minima]
    
    if fct_bac_df.empty:
        print("No hay registros nuevos para insertar en fct_bac.")
        return
    
    insert_query = text("""
    INSERT INTO fct_bac (
        nro_solicitud, categoria, prestacion, fecha_ingreso,
        comuna, barrio, calle, altura, canal, estado_general
    )
    VALUES (
        :nro_solicitud, :categoria, :prestacion, :fecha_ingreso,
        :comuna, :barrio, :calle, :altura, :canal, :estado_general
    )
    """)
    
    with conector.db_session() as s:
        for _, row in fct_bac_df.iterrows():
            s.execute(
                insert_query,
                {
                    "nro_solicitud": row["nro_solicitud"],
                    "categoria": row["categoria"],
                    "prestacion": row["prestacion"],
                    "fecha_ingreso": row["fecha_ingreso"],
                    "comuna": row["comuna"],
                    "barrio": row["barrio"],
                    "calle": row["calle"],
                    "altura": int(row["altura"]) if not pd.isna(row["altura"]) else None,
                    "canal": row["canal"],
                    "estado_general": row["estado_general"]
                }
            )
    print(f"{len(fct_bac_df)} registros insertados en fct_bac")

In [158]:
# CARGA DE DIMENSIONALES
def load_dim_categoria_prestacion(df, conector):
    insert_query = text("""
    INSERT INTO dim_bac_categoria_prestacion (categoria, prestacion)
    SELECT :categoria, :prestacion
    WHERE NOT EXISTS (
        SELECT 1 FROM dim_bac_categoria_prestacion 
        WHERE categoria = :categoria AND prestacion = :prestacion
    )
    """)
    
    with conector.db_session() as s:
        for _, row in df.iterrows():
            s.execute(insert_query, {
                "categoria": row["categoria"],
                "prestacion": row["prestacion"]
            })
    print(f"{len(df)} intentos de insert en dim_bac_categoria_prestacion")

def load_dim_comunas_barrios(df, conector):
    insert_query = text("""
    INSERT INTO dim_bac_comunas_barrios (comuna, barrio)
    SELECT :comuna, :barrio
    WHERE NOT EXISTS (
        SELECT 1 FROM dim_bac_comunas_barrios 
        WHERE comuna = :comuna AND barrio = :barrio
    )
    """)
    
    with conector.db_session() as s:
        for _, row in df.iterrows():
            s.execute(insert_query, {
                "comuna": row["comuna"],
                "barrio": row["barrio"]
            })
    print(f"{len(df)} intentos de insert en dim_bac_comunas_barrios")

def load_dim_canales(df, conector):
    insert_query = text("""
    INSERT INTO dim_bac_canales (canal)
    SELECT :canal
    WHERE NOT EXISTS (
        SELECT 1 FROM dim_bac_canales WHERE canal = :canal
    )
    """)
    
    with conector.db_session() as s:
        for _, row in df.iterrows():
            s.execute(insert_query, {"canal": row["canal"]})
    print(f"{len(df)} intentos de insert en dim_bac_canales")

def load_dim_estados(df, conector):
    insert_query = text("""
    INSERT INTO dim_bac_estados (estado_general)
    SELECT :estado_general
    WHERE NOT EXISTS (
        SELECT 1 FROM dim_bac_estados WHERE estado_general = :estado_general
    )
    """)
    
    with conector.db_session() as s:
        for _, row in df.iterrows():
            s.execute(insert_query, {"estado_general": row["estado_general"]})
    print(f"{len(df)} intentos de insert en dim_estados")

In [None]:
# ORQUESTADOR DE PROCESO, PRIMERO CARGA LAS DIMENSIONES Y LUEGO LA TABLA DE HECHOS
def load_data_bac(
    fct_bac_df,
    dim_categoria_prestacion_df,
    dim_comunas_barrios_df,
    dim_canales_df,
    dim_estados_df,
    conector
):
    """
    Función completa de carga de datos BAC.
    Inserta nuevas filas en dimensiones y fact table,
    mostrando prints de cantidad de registros insertados.
    """
    # Dimensiones

    print("Cargando dim_categoria_prestacion...")
    load_dim_categoria_prestacion(dim_categoria_prestacion_df, conector)
    print("Cargando dim_comunas_barrios...")
    load_dim_comunas_barrios(dim_comunas_barrios_df, conector)
    print("Cargando dim_canales...")
    load_dim_canales(dim_canales_df, conector)
    print("Cargando dim_estados...")
    load_dim_estados(dim_estados_df, conector)

    # Fact table
    
    print("Cargando fct_bac...")
    # Obtener el número de registros antes de insertar
    with conector.db_session() as s:
        result = s.execute(text("SELECT COUNT(*) FROM fct_bac")).fetchone()
        filas_antes = result[0] if result else 0

    # Ejecutar la carga de la fact table
    load_fct_bac(fct_bac_df, conector)

    # Número de registros después
    with conector.db_session() as s:
        result = s.execute(text("SELECT COUNT(*) FROM fct_bac")).fetchone()
        filas_despues = result[0] if result else 0

    print(f"Filas insertadas en fct_bac: {filas_despues - filas_antes}")

#load_data_bac(fct_bac, dim_categoria_prestacion, dim_comunas_barrios, dim_canales, dim_estados, con)


Cargando dim_categoria_prestacion...
Sesión a Neon establecida...
Cerrando sesión de Neon...
72 intentos de insert en dim_bac_categoria_prestacion
Cargando dim_comunas_barrios...
Sesión a Neon establecida...
Cerrando sesión de Neon...
47 intentos de insert en dim_bac_comunas_barrios
Cargando dim_canales...
Sesión a Neon establecida...
Cerrando sesión de Neon...
4 intentos de insert en dim_bac_canales
Cargando dim_estados...
Sesión a Neon establecida...
Cerrando sesión de Neon...
2 intentos de insert en dim_estados
Cargando fct_bac...
Sesión a Neon establecida...
Cerrando sesión de Neon...
Sesión a Neon establecida...
Cerrando sesión de Neon...
Sesión a Neon establecida...
Cerrando sesión de Neon...
881 registros insertados en fct_bac
Sesión a Neon establecida...
Cerrando sesión de Neon...
Filas insertadas en fct_bac: 881


**MODELADO ESTRELLA DECKS**

- **Para ello vamos a crear una tabla de hechos y la dimension de categoria/prestacion, comunas/barrios, canal y estados**

In [167]:
# DIVIDIMOS DATASET EN 1 Y 2 PARA SIMULAR UNA PRIMERA CARGA Y PRIMERA ACTUALIZACIÓN POR LOTES
# ANTERIOR A 2022 Y POSTERIOR O IGUAL A 2022
df_filtered_decks1 = df_filtered_decks[df_filtered_decks['año'] < 2022]
df_filtered_decks2 = df_filtered_decks[df_filtered_decks['año'] >= 2022]

In [168]:
def modeling_decks_data(df_filtered_decks):
    # TABLA DE HECHOS
    fct_decks = df_filtered_decks[['fecha_inicio', 'fecha_vencimiento', 'solicitante', 'documento',
       'calle', 'altura', 'barrio', 'comuna', 'estado', 'expediente',
       'nro_disposicion_resolucion']]

    # DIM SOLICITANTES ÚNICOS
    dim_solicitantes = fct_decks[['solicitante', 'documento']].drop_duplicates(subset='documento')

    # DIM COMUNAS BARRIOS
    dim_comunas_barrios = fct_decks[['comuna', 'barrio']].drop_duplicates(subset='barrio')

    return fct_decks, dim_solicitantes, dim_comunas_barrios

fct_decks, dim_solicitantes, dim_comunas_barrios = modeling_decks_data(df_filtered_decks1)

In [None]:
def create_schema_decks():
    """
    Crea las tablas necesarias para almacenar los datos de Decks.
    Solo se ejecuta si las tablas no existen.
    """
    con = ConnectorNeon()

    create_tables_script = text("""

    -- Tabla de hechos
    CREATE TABLE IF NOT EXISTS fct_decks (
        id SERIAL PRIMARY KEY,
        fecha_inicio TIMESTAMP,
        fecha_vencimiento TIMESTAMP,
        solicitante TEXT,
        documento TEXT,
        calle TEXT,
        altura INTEGER,
        barrio TEXT,
        comuna INTEGER,
        estado TEXT,
        expediente TEXT,
        nro_disposicion_resolucion TEXT
    );

    -- Dimensión: solicitantes
    CREATE TABLE IF NOT EXISTS dim_decks_solicitantes (
        id SERIAL PRIMARY KEY,
        solicitante TEXT,
        documento TEXT
    );

    -- Dimensión: comunas y barrios
    CREATE TABLE IF NOT EXISTS dim_decks_comunas_barrios (
        id SERIAL PRIMARY KEY,
        comuna INTEGER,
        barrio TEXT
    );

    """)

    with con.db_session() as s:
        s.execute(create_tables_script)

    return "Todas las tablas de Decks fueron creadas correctamente."

#created_schema = create_schema_decks()

Sesión a Neon establecida...
Cerrando sesión de Neon...


In [170]:
# CARGA DE FCT
def get_max_fecha_fct_decks(conector):
    """
    Retorna la última fecha_inicio cargada en fct_decks.
    """
    query = text("SELECT MAX(fecha_inicio) AS max_fecha FROM fct_decks")
    with conector.db_session() as s:
        result = s.execute(query).fetchone()
        return result[0]  # Puede ser None si la tabla está vacía

def load_fct_decks(fct_decks_df, conector):
    """
    Inserta datos nuevos en la fact table fct_decks.
    Solo sube registros con fecha_inicio mayor a la última cargada.
    """
    # Obtener la última fecha_inicio de la tabla
    fecha_minima = get_max_fecha_fct_decks(conector)
    
    if fecha_minima is not None:
        fct_decks_df = fct_decks_df[fct_decks_df['fecha_inicio'] > fecha_minima]
    
    if fct_decks_df.empty:
        print("No hay registros nuevos para insertar en fct_decks.")
        return
    
    insert_query = text("""
    INSERT INTO fct_decks (
        fecha_inicio, fecha_vencimiento, solicitante, documento,
        calle, altura, barrio, comuna, estado, expediente, nro_disposicion_resolucion
    )
    VALUES (
        :fecha_inicio, :fecha_vencimiento, :solicitante, :documento,
        :calle, :altura, :barrio, :comuna, :estado, :expediente, :nro_disposicion_resolucion
    )
    """)
    
    with conector.db_session() as s:
        for _, row in fct_decks_df.iterrows():
            s.execute(
                insert_query,
                {
                    "fecha_inicio": row["fecha_inicio"],
                    "fecha_vencimiento": row["fecha_vencimiento"],
                    "solicitante": row["solicitante"],
                    "documento": row["documento"],
                    "calle": row["calle"],
                    "altura": int(row["altura"]) if not pd.isna(row["altura"]) else None,
                    "barrio": row["barrio"],
                    "comuna": int(row["comuna"]) if not pd.isna(row["comuna"]) else None,
                    "estado": row["estado"],
                    "expediente": row["expediente"],
                    "nro_disposicion_resolucion": row["nro_disposicion_resolucion"]
                }
            )
    print(f"{len(fct_decks_df)} registros insertados en fct_decks")

In [172]:
# CARGA DE DIMENSIONALES
def load_dim_solicitantes(df, conector):
    """
    Inserta datos nuevos en dim_solicitantes si no existen ya.
    """
    insert_query = text("""
    INSERT INTO dim_decks_solicitantes (solicitante, documento)
    SELECT :solicitante, :documento
    WHERE NOT EXISTS (
        SELECT 1 FROM dim_decks_solicitantes 
        WHERE solicitante = :solicitante AND documento = :documento
    )
    """)
    
    with conector.db_session() as s:
        for _, row in df.iterrows():
            s.execute(insert_query, {
                "solicitante": row["solicitante"],
                "documento": row["documento"]
            })
    print(f"{len(df)} intentos de insert en dim_solicitantes")


def load_dim_comunas_barrios_decks(df, conector):
    """
    Inserta datos nuevos en dim_comunas_barrios para decks si no existen ya.
    """
    insert_query = text("""
    INSERT INTO dim_decks_comunas_barrios (comuna, barrio)
    SELECT :comuna, :barrio
    WHERE NOT EXISTS (
        SELECT 1 FROM dim_decks_comunas_barrios 
        WHERE comuna = :comuna AND barrio = :barrio
    )
    """)
    
    with conector.db_session() as s:
        for _, row in df.iterrows():
            s.execute(insert_query, {
                "comuna": int(row["comuna"]) if not pd.isna(row["comuna"]) else None,
                "barrio": row["barrio"]
            })
    print(f"{len(df)} intentos de insert en dim_comunas_barrios (Decks)")

In [None]:
# ORQUESTADOR DE PROCESO PARA DECKS
def load_data_decks(
    fct_decks_df,
    dim_solicitantes_df,
    dim_comunas_barrios_df,
    conector
):
    """
    Función completa de carga de datos Decks.
    Inserta nuevas filas en dimensiones y fact table,
    mostrando prints de cantidad de registros insertados.
    """
    # Dimensiones
    print("Cargando dim_solicitantes...")
    load_dim_solicitantes(dim_solicitantes_df, conector)
    
    print("Cargando dim_comunas_barrios (Decks)...")
    load_dim_comunas_barrios_decks(dim_comunas_barrios_df, conector)
    
    # Fact table
    print("Cargando fct_decks...")
    # Número de registros antes
    with conector.db_session() as s:
        result = s.execute(text("SELECT COUNT(*) FROM fct_decks")).fetchone()
        filas_antes = result[0] if result else 0
    
    # Ejecutar la carga de la fact table
    load_fct_decks(fct_decks_df, conector)
    
    # Número de registros después
    with conector.db_session() as s:
        result = s.execute(text("SELECT COUNT(*) FROM fct_decks")).fetchone()
        filas_despues = result[0] if result else 0
    
    print(f"Filas insertadas en fct_decks: {filas_despues - filas_antes}")

#load_data_decks(fct_decks, dim_solicitantes, dim_comunas_barrios, con)

Cargando dim_solicitantes...
Sesión a Neon establecida...
Cerrando sesión de Neon...
114 intentos de insert en dim_solicitantes
Cargando dim_comunas_barrios (Decks)...
Sesión a Neon establecida...
Cerrando sesión de Neon...
22 intentos de insert en dim_comunas_barrios (Decks)
Cargando fct_decks...
Sesión a Neon establecida...
Cerrando sesión de Neon...
Sesión a Neon establecida...
Cerrando sesión de Neon...
Sesión a Neon establecida...
Cerrando sesión de Neon...
116 registros insertados en fct_decks
Sesión a Neon establecida...
Cerrando sesión de Neon...
Filas insertadas en fct_decks: 116
