<a href="https://colab.research.google.com/github/SELF-msselve/UTN-DataEngineering/blob/main/CEL_Data_Eng_Almacenamiento_data_warehouse.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Preparación de entorno
En primer lugar, vamos a instalar librerías y definir funciones para este proceso

In [None]:
!pip install sqlalchemy
!pip install psycopg2-binary

Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.9-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.0/3.0 MB[0m [31m13.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.9.9


In [None]:
from configparser import ConfigParser

from sqlalchemy import create_engine, text

def connect_to_db(config_file, section, driverdb):
    """
    Crea una conexión a la base de datos especificada en el archivo de configuración.

    Parámetros:
    config_file (str): La ruta del archivo de configuración.
    section (str): La sección del archivo de configuración que contiene los datos de la base de datos.
    driverdb (str): El driver de la base de datos a la que se conectará.

    Retorna:
    Un objeto de conexión a la base de datos.
    """
    try:
        # Lectura del archivo de configuración
        parser = ConfigParser()
        parser.read(config_file)

        # Creación de un diccionario
        # donde cargaremos los parámetros de la base de datos
        db = {}
        if parser.has_section(section):
            params = parser.items(section)
            db = {param[0]: param[1] for param in params}

            # Creación de la conexión a la base de datos
            engine = create_engine(
                f"{driverdb}://{db['user']}:{db['pwd']}@{db['host']}:{db['port']}/{db['dbname']}"
            )
            return engine

        else:
            print(
                f"Sección {section} no encontrada en el archivo de configuración.")
            return None
    except Exception as e:
        print(f"Error al conectarse a la base de datos: {e}")
        return None

Antes de continuar, es importante que:
1. Crees una instancia de Postgres en Aiven
2. y subas o crees en esta sesión de Google un **archivo de configuración** con los datos de conexión y credenciales a Postgres.

Te dejo una "plantilla", para que crees el archivo e ingreses tus datos de conexión.

```
[postgres]
host=****.aivencloud.com
port=15191
user=avnadmin
pwd=****
dbname=defaultdb
```

# **Almacenamiento de datos**
## Data warehouse

En ingeniería de datos, el almacenamiento juega un rol fundamental ya que los resultados de las extracciones y las transformaciones de los datos se deben almacenar en un lugar accesible para su posterior explotación.
El sistema donde vamos a almacenar los datos debe ser capaz de soportar grandes volúmenes de información, ser escalable, seguro y eficiente. Los dos principales *sistemas de almacenamiento* que utilizaremos como Data Engineers son el *Data Lake* y el *Data Warehouse*.

El data warehouse se caracteriza por contener datos **estructurados**, es decir, datos que **se organizan en tablas** y que se pueden consultar mediante lenguaje SQL. Por ende, uno de los componentes principales de un data warehouse es un motor de base de datos, orientado a la analítica de datos.

Dado que el data warehouse contiene datos estructurados, organizados en tablas, debe seguir un modelo de datos que defina la estructura de las tablas y las relaciones entre ellas. El modelo utilizado en la mayoría de los data warehouses es el modelo ***dimensional***.

El modelo dimensional se utiliza para facilitar la consulta y el análisis de los datos. En este modelo, los datos se organizan en dos tipos de tablas:
- tablas de hechos (fact tables): contienen datos numéricos que se pueden sumarizar (cuantitativos). Por ejemplo, el monto de una venta, la cantidad de productos vendidos, etc.
- tablas de dimensiones (dimension tables): contienen datos descriptivos (cualitativos) que se utilizan para filtrar y agrupar los datos de las tablas de hechos. Por ejemplo, datos de clientes, productos, sucursal, fechas, etc.

El modelo dimensional se caracteriza además por ser denormalizado, es decir que las tablas de dimensiones contienen redundancia. Esto se hace para evitar la necesidad de hacer demasiados *joins* entre las tablas de hechos y las tablas de dimensiones, lo que agiliza las consultas. La redundancia se sacrifica en pos de la eficiencia en la consulta.

Probablemente estés familiarizado con el *modelo relacional* y términos como *normalización* y *formas normales*. En los siguientes enlaces se ilustran las diferencias entre el modelo relacional y el modelo dimensional, en el contexto de un e-commerce.
- [Modelo relacional](https://miro.medium.com/v2/resize:fit:1100/format:webp/1*JQLas0Gca3VcP4Y9b-CRvQ.jpeg)
- [Modelo dimensional](https://miro.medium.com/v2/resize:fit:1100/format:webp/1*NXuoqqnI6mLsi_sqa8dBow.jpeg)

Aparte de tablas de hechos y dimensiones, en un data warehouse contaremos con un área de *staging* donde se almacenarán los datos crudos de forma temporal. Será una capa intermedia, antes de consolidar los datos en el modelo dimensional.
[Áreas de un data warehouse](https://media.geeksforgeeks.org/wp-content/uploads/ETL.jpg)

En esta notebook, vamos a ver como iniciar la implementación de un data warehouse. Para ello, vamos a utilizar el motor de base de datos *PostgreSQL* y vamos a crear un modelo dimensional para almacenar los datos de un sistema de ventas (clientes y pagos).

Utilizaremos la librería `sqlalchemy` para interactuar con la base de datos desde Python.

*Como vamos a interactuar con base de datos Postgres, es necesario tambien el driver de Postgres para Python: `pip install psycopg2-binary` (ya lo instalamos mas arriba)*

In [None]:
engine = connect_to_db(
    "pipeline.conf",
    "postgres",
    "postgresql+psycopg2"
    )

conn = engine.connect()

### Tablas de dimensiones
#### Slowly Changing Dimensions (SCD)

Con respecto a las tablas de dimensiones, es importante tener en cuenta que los datos de las dimensiones pueden cambiar con el tiempo. Por ejemplo, un cliente puede cambiar su dirección, un producto puede cambiar de categoría, etc. Por lo tanto, es necesario tener en cuenta cómo vamos a manejar estos cambios en las tablas de dimensiones.

Existen diferentes estrategias para manejar los cambios en las dimensiones y se implementan a través de lo que se conoce como *Slowly Changing Dimensions* (SCD). Existen diferentes tipos de SCD, las mas importantes son:
- Tipo 0: No se capturan los cambios. Los datos se mantienen tal como se cargaron inicialmente.
- Tipo 1: Se sobrescriben los datos. Los cambios se sobrescriben en la tabla de dimensiones.
- Tipo 2: Se agregan nuevas filas. Se mantiene un historial de los cambios. Se suman columnas adicionales para identificar la vigencia de cada fila.

##### Comencemos trabajando con las de tipo 0 y 1

Este data warehouse tendrá un área de staging, que contendrá una tabla `payments` y otra `customers`.
En el modelo dimensional, tendremos:
- Tabla de hechos: `payments_fact`
- Tablas de dimensiones: `customer_dim` y `date_dim`

Vamos a inicializar la base de datos y a crear las tablas de staging y las tablas de dimensiones.

In [None]:
create_query = text(
        """
        BEGIN;

        CREATE SCHEMA IF NOT EXISTS stage;
        CREATE TABLE IF NOT EXISTS stage.customers(
            customer_id INT PRIMARY KEY,
            customer_fullname VARCHAR(50),
            customer_email VARCHAR(50),
            customer_phone VARCHAR(50),
            customer_address VARCHAR(50)
        );
        CREATE TABLE IF NOT EXISTS stage.payments(
            payment_id INT PRIMARY KEY,
            customer_id INT,
            amount FLOAT,
            payment_date DATE
        );

        CREATE SCHEMA IF NOT EXISTS datawarehouse;
        CREATE TABLE IF NOT EXISTS datawarehouse.customer_dim(
            customer_id INT PRIMARY KEY,
            customer_fullname VARCHAR(50),
            customer_email VARCHAR(50),
            customer_phone VARCHAR(50),
            customer_address VARCHAR(50)
        );
        CREATE TABLE IF NOT EXISTS datawarehouse.date_dim(
            date_id INT PRIMARY KEY,
            day INT,
            month INT,
            year INT,
            quarter INT,
            day_of_week INT,
            day_of_month INT
        );
        CREATE TABLE IF NOT EXISTS datawarehouse.payment_fact(
            payment_id INT PRIMARY KEY,
            customer_id INT,
            amount FLOAT,
            payment_date_id INT
        );

        COMMIT;
        """
    )

with engine.connect() as conn:
    conn.execute(create_query)

Vamos a cargar datos sobre la tabla calendario `date_dim`. Tendrá todas las fechas del año 2023. Se puede actualizar anualmente.

In [None]:
insert_query = text(
        """
        BEGIN;
        INSERT INTO datawarehouse.date_dim
        SELECT
            TO_CHAR(payment_date, 'yyyymmdd')::INT AS date_id,
            EXTRACT(DAY FROM payment_date) AS day,
            EXTRACT(MONTH FROM payment_date) AS month,
            EXTRACT(YEAR FROM payment_date) AS year,
            EXTRACT(QUARTER FROM payment_date) AS quarter,
            EXTRACT(DOW FROM payment_date) AS day_of_week,
            EXTRACT(DAY FROM payment_date) AS day_of_month
        FROM generate_series('2023-01-01'::date, '2023-12-31'::date, '1 day'::interval) AS payment_date;
        COMMIT;
        """
        )

with engine.connect() as conn:
    conn.execute(insert_query)

Vamos a cargar datos sobre la tabla `customer_dim`, a partir de la tabla `customers` de staging. La estrategia de SCD que vamos a utilizar es la de tipo 0, es decir que no vamos a capturar los cambios.

In [None]:
# Carga de una dimension SCD tipo 0
insert_query = text(
        """
        BEGIN;

        TRUNCATE TABLE stage.customers;
        INSERT INTO stage.customers VALUES
          (1, 'Emilio Ravenna', 'ravenna@simulacro.com', '111111', 'Libertador 1234'),
          (2, 'Mario Santos', 'santos@simulacro.com', '222222', 'Corrientes 5432'),
          (3, 'Gabriel Medina', 'medina@simulacro.com', '999999', 'Santa Fe 9876'),
          (4, 'Molero', 'molero@simulacro.com', '99999', 'Bv de los sueños rotos 999');

        INSERT INTO datawarehouse.customer_dim
        SELECT
            stg.customer_id,
            stg.customer_fullname,
            stg.customer_email,
            stg.customer_phone,
            stg.customer_address
        FROM stage.customers AS stg
        -- Obtener los registros que no existen en la tabla de dimension
        LEFT JOIN datawarehouse.customer_dim AS dim
        ON stg.customer_id = dim.customer_id
        WHERE dim.customer_id IS NULL;
        COMMIT;
        """
        )

with engine.connect() as conn:
    conn.execute(insert_query)

Vamos a repetir la carga de datos en la tabla `customer_dim` pero utilizando la estrategia de SCD de tipo 1, es decir que vamos a sobrescribir los datos.

In [None]:
# SCD tipo 1
insert_query = text(
    """
    BEGIN;

    TRUNCATE TABLE stage.customers;
    INSERT INTO stage.customers VALUES
          (1, 'Emilio Ravenna', 'ravenna@simulacro.com', '111111', 'Libertador 1234'),
          (2, 'Mario Santos', 'santos@simulacro.com', '222222', 'Corrientes 5432'),
          (3, 'Gabriel Medina', 'medina@simulacro.com', '99999999', 'Santa Fe 9876'),
          (4, 'Molero', 'molero@simulacro.com', 99999, 'Bv de los sueños rotos 345'),
          (5, 'Milazzo', 'milazzo@simulacro.com', 3833838, 'Calle Valentia y Fuerza 17')
          ;

    MERGE INTO datawarehouse.customer_dim AS dim
    USING stage.customers AS stg
    ON dim.customer_id = stg.customer_id
    WHEN MATCHED THEN
        UPDATE SET
            customer_email = stg.customer_email,
            customer_phone = stg.customer_phone,
            customer_address = stg.customer_address
    WHEN NOT MATCHED THEN
        INSERT (customer_id, customer_fullname, customer_email, customer_phone, customer_address)
        VALUES (stg.customer_id, stg.customer_fullname, stg.customer_email, stg.customer_phone, stg.customer_address);
    COMMIT;
    """
    )

with engine.connect() as conn:
    conn.execute(insert_query)

Por último, vamos a cargar datos sobre la tabla `payments_fact` a partir de la tabla `payments` de staging.

In [None]:
insert_query = text(
    """
        BEGIN;

        TRUNCATE TABLE stage.payments;
        INSERT INTO stage.payments VALUES
            (1, 1, 100, '2023-01-01'),
            (2, 2, 200, '2023-01-02'),
            (3, 3, 300, '2023-01-03'),
            (4, 4, 400, '2023-01-04'),
            (5, 5, 500, '2023-01-05'),
            (6, 5, 7000, '2023-01-05'),
            (7, 5, 70000, '2023-01-06'),
            (8, 5, 70000, '2023-01-06')
            ;

        INSERT INTO datawarehouse.payment_fact
        SELECT
            stg.payment_id,
            stg.customer_id,
            stg.amount,
            TO_CHAR(payment_date, 'yyyymmdd')::INT AS payment_date_id
        FROM stage.payments AS stg
        -- Obtener los registros que no existen en la tabla de hechos
        LEFT JOIN datawarehouse.payment_fact AS fact
        ON stg.payment_id = fact.payment_id
        WHERE fact.payment_id IS NULL;
        COMMIT;
    """
    )

with engine.connect() as conn:
    conn.execute(insert_query)

#### Implementación de SCD Tipo 2
Para poder implementar la SCD de tipo 2, es necesario re-estructurar el esquema de la base de datos. Por ello, vamos a crear nuevas tablas de hechos y dimensiones. Concretamente, vamos a crear nuevas versiones para `payment_fact` y `customer_dim`. El área de staging se mantienen igual, al igual que `date_dim`.

In [None]:
create_query = text(
    """
    BEGIN;
    CREATE SCHEMA IF NOT EXISTS datawarehouse;
    -- Crear dim SCD tipo 2
    CREATE TABLE IF NOT EXISTS datawarehouse.customer_dim_v2(
        customer_surrogate_key SERIAL PRIMARY KEY,
        customer_id INT,
        customer_fullname VARCHAR(50),
        customer_email VARCHAR(50),
        customer_phone VARCHAR(50),
        customer_address VARCHAR(50),
        start_date DATE,
        end_date DATE,
        is_current BOOLEAN
    );
    CREATE TABLE IF NOT EXISTS datawarehouse.payment_fact_v2(
        payment_id INT PRIMARY KEY,
        customer_surrogate_key INT,
        amount FLOAT,
        payment_date_id INT
    );
    COMMIT
    """
    )

with engine.connect() as conn:
    conn.execute(create_query)

Vamos a cargar datos sobre la tabla `customer_dim_v2`, a partir de la tabla `customers` de staging. La estrategia de SCD que vamos a utilizar es la de tipo 2, es decir que vamos a agregar nuevas filas.

In [None]:
insert_query = text(
    """
    BEGIN;

    TRUNCATE TABLE stage.customers;
    INSERT INTO stage.customers VALUES
          (1, 'Emilio Ravenna', 'tortugamaritima@simulacro.com', '999999', 'Libertador 1234'),
          (2, 'Mario Santos', 'santos@simulacro.com', '0000000', 'Corrientes 5432'),
          (3, 'Gabriel Medina', 'medina@simulacro.com', '90000000', 'Jugueteria Simon 39'),
          (4, 'Molero', 'molero@simulacro.com', 93939339, 'Bv de los sueños rotos 93')
        ;

    MERGE INTO datawarehouse.customer_dim_v2 AS dim
    USING stage.customers AS stg
    ON dim.customer_id = stg.customer_id
    -- Marcar el ultimo registro del cliente como finalizado
    WHEN MATCHED
    AND (dim.customer_email <> stg.customer_email
    OR dim.customer_phone <> stg.customer_phone
    OR dim.customer_address <> stg.customer_address)
    AND dim.is_current = TRUE THEN
        UPDATE SET
            end_date = CURRENT_DATE,
            is_current = FALSE
    -- Ingresar registro de un nuevo cliente
    WHEN NOT MATCHED THEN
        INSERT (customer_id, customer_fullname, customer_email, customer_phone, customer_address, start_date, end_date, is_current)
        VALUES (stg.customer_id, stg.customer_fullname, stg.customer_email, stg.customer_phone, stg.customer_address, CURRENT_DATE, NULL, TRUE);

    -- Ingresar actualización de un cliente ya existente
    INSERT INTO datawarehouse.customer_dim_v2(customer_id, customer_fullname, customer_email, customer_phone, customer_address, start_date, end_date, is_current)
    SELECT
        stg.customer_id,
        stg.customer_fullname,
        stg.customer_email,
        stg.customer_phone,
        stg.customer_address,
        CURRENT_DATE, -- start_date
        NULL, -- end_date
        TRUE -- is_current
    FROM stage.customers AS stg
    LEFT JOIN datawarehouse.customer_dim_v2 AS dim
    ON stg.customer_id = dim.customer_id
    WHERE dim.is_current = FALSE
    AND (stg.customer_email <> dim.customer_email
    OR stg.customer_phone <> dim.customer_phone
    OR stg.customer_address <> dim.customer_address);
    COMMIT;
    """
    )

with engine.connect() as conn:
    conn.execute(insert_query)

In [None]:
# Cargar datos sobre la tabla fact
insert_query = text(
    """
    BEGIN;

    TRUNCATE TABLE stage.payments;
    INSERT INTO stage.payments VALUES
        (1, 1, 100, '2023-01-01'),
        (2, 1, 200, '2023-01-02'),
        (3, 1, 300, '2023-01-03'),
        (4, 2, 400, '2023-01-04'),
        (5, 2, 500, '2023-01-05'),
        (6, 3, 600, '2023-01-06'),
        (7, 3, 700, '2023-01-07'),
        (8, 3, 800, '2023-01-08'),
        (9, 1, 1000, '2023-01-09'),
        (10, 2, 10000, '2023-01-09')
        ;

    INSERT INTO datawarehouse.payment_fact_v2
    SELECT
        stg.payment_id,
        dim.customer_surrogate_key, -- En vez de ingresar el customer_id, se ingresa customer_surrogate_key
        stg.amount,
        TO_CHAR(payment_date, 'yyyymmdd')::INT AS payment_date_id
    FROM stage.payments AS stg
    -- Obtener pagos nuevos, que no existen en la tabla fact
    LEFT JOIN datawarehouse.payment_fact_v2 AS fact
    ON fact.payment_id = stg.payment_id
    -- Recuperar la clave surrogada customer_surrogate_key actual
    JOIN datawarehouse.customer_dim_v2 AS dim
    ON stg.customer_id = dim.customer_id
    WHERE dim.is_current = TRUE -- Recuperar clave surrogada actual
    AND fact.payment_id IS NULL; -- Obtener solo pagos nuevos
    COMMIT;
    """
    )

with engine.connect() as conn:
    conn.execute(insert_query)