# Carga y Almacenamiento de Datos RAW en PostgreSQL 

In [1]:
import pandas as pd
from sqlalchemy import create_engine
import psycopg2
from tqdm import tqdm
import json
import ast
from dotenv import load_dotenv
import os

# Creación de la Base de datos

### Creación de la Base de Datos en PostgreSQL

En este bloque de código se establece una conexión con el servidor de PostgreSQL utilizando las credenciales almacenadas en un archivo `.env`. El objetivo principal es verificar si la base de datos necesaria para el proyecto (`DB_NAME`) ya existe. En caso contrario, el script la crea automáticamente.

#### Pasos principales:

- Se carga la configuración desde variables de entorno para evitar exponer credenciales sensibles en el código.
- Se conecta a la base de datos por defecto `postgres`, necesaria para poder crear nuevas bases de datos.
- Se consulta si la base de datos deseada (`DB_NAME`) ya existe en el sistema.
- Si no existe, se crea mediante un comando `CREATE DATABASE`.
- Finalmente, se cierra la conexión para liberar recursos.

Este paso es fundamental como parte del flujo ETL, ya que garantiza la existencia del entorno de almacenamiento antes de cargar los datos `raw` del ecommerce dataset. Facilita la automatización y evita errores por falta de preparación del entorno.


In [2]:
# Load .env file
load_dotenv(override=True)

# Configuración de conexión
DB_HOST = os.getenv("DB_HOST")
DB_NAME = os.getenv("DB_NAME")
DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")
DB_PORT = os.getenv("DB_PORT", "5432")  # Default port


In [3]:

# Crear base de datos si no existe
try:
    con = psycopg2.connect(dbname="postgres", user=DB_USER, password=DB_PASSWORD, host=DB_HOST, port=DB_PORT)
    con.autocommit = True
    cur = con.cursor()

    # Verificar si la base de datos ya existe
    cur.execute(f"SELECT 1 FROM pg_database WHERE datname = '{DB_NAME}';")
    exists = cur.fetchone()

    if not exists:
        cur.execute(f"CREATE DATABASE {DB_NAME};")
        print(f"✅ Base de datos '{DB_NAME}' creada exitosamente.")
    else:
        print(f"La base de datos '{DB_NAME}' ya existe.")

except psycopg2.Error as e:
    print("❌ Error al conectar o crear la base de datos:", e)

finally:
    if 'cur' in locals(): cur.close()
    if 'con' in locals(): con.close()
    print("Conexión cerrada.")

✅ Base de datos 'tfm_database' creada exitosamente.
Conexión cerrada.


### Función para Crear Tablas en PostgreSQL

Esta función `create_table` permite crear una tabla en una base de datos PostgreSQL a partir de una consulta SQL de creación (`CREATE TABLE`) proporcionada como argumento. Se utiliza cuando se desea definir de forma explícita la estructura de la tabla antes de cargar datos.

#### Parámetros:
- `bd`: Nombre de la base de datos donde se creará la tabla.
- `usuario`: Usuario de PostgreSQL con permisos para ejecutar operaciones DDL.
- `password`: Contraseña del usuario.
- `host`: Dirección del servidor donde se encuentra la base de datos.
- `puerto`: Puerto de conexión (por defecto, suele ser 5432).
- `nombre_tabla`: Nombre de la tabla que se va a crear (utilizado para imprimir mensajes de estado).
- `consulta`: Cadena SQL con la sentencia `CREATE TABLE`.

#### Flujo:
1. Se establece la conexión con la base de datos.
2. Se ejecuta la sentencia SQL proporcionada.
3. Se confirma la operación y se cierra la conexión.

In [4]:
def create_table(bd, usuario, password, host, puerto, nombre_tabla, consulta):
    con = psycopg2.connect(dbname=bd, user=usuario, password=password, host=host, port=puerto)
    cur = con.cursor()
    
    try:
        cur.execute(consulta)
        con.commit()
        print(f"✅ Tabla '{nombre_tabla}' creada exitosamente en PostgreSQL (o ya existía).")
    
    except Exception as e:
        print(f"❌ Error al crear la tabla '{nombre_tabla}': {e}")
    
    finally:
        cur.close()
        con.close()


# Creación/Importación de la tabla **"Products"**

### Creación e Importación de la Tabla `Products` 

En esta sección se realiza la carga de un archivo CSV que contiene el catálogo de productos (`product.csv`) y su posterior almacenamiento en una base de datos PostgreSQL.

#### 1. Carga del CSV con `pandas`
Se utiliza `pandas.read_csv()` para cargar los datos desde el archivo `product.csv`, restringiendo la lectura a las primeras 10 columnas con el parámetro `usecols=range(10)` para asegurar compatibilidad con la estructura de la tabla destino.

#### 2. Creación de la tabla `Products`
Mediante una sentencia SQL `CREATE TABLE IF NOT EXISTS`, se define la estructura de la tabla `Products` en PostgreSQL. Esta tabla contiene información categórica y descriptiva sobre cada producto, como:
- Género,
- Categorías principales y secundarias,
- Tipo de artículo,
- Color base,
- Temporada y año,
- Uso previsto,
- Nombre descriptivo del producto.

#### 3. Llamada a la función `create_table`
Se utiliza la función previamente definida para ejecutar la consulta de creación, asegurando que no se sobrescriba si la tabla ya existe.

> Este paso forma parte de la fase **Extract y Load** del proceso ETL, y sienta las bases para futuras operaciones de análisis y modelado sobre los productos.

- ## Carga del CSV

In [5]:
# Cargar el CSV con Pandas
df = pd.read_csv("../data/product.csv",on_bad_lines="warn",usecols=range(10))
df.head()

Unnamed: 0,id,gender,masterCategory,subCategory,articleType,baseColour,season,year,usage,productDisplayName
0,15970,Men,Apparel,Topwear,Shirts,Navy Blue,Fall,2011.0,Casual,Turtle Check Men Navy Blue Shirt
1,39386,Men,Apparel,Bottomwear,Jeans,Blue,Summer,2012.0,Casual,Peter England Men Party Blue Jeans
2,59263,Women,Accessories,Watches,Watches,Silver,Winter,2016.0,Casual,Titan Women Silver Watch
3,21379,Men,Apparel,Bottomwear,Track Pants,Black,Fall,2011.0,Casual,Manchester United Men Solid Black Track Pants
4,53759,Men,Apparel,Topwear,Tshirts,Grey,Summer,2012.0,Casual,Puma Men Grey T-shirt


- ## Creación de la tabla

In [6]:
# Consulta SQL para crear la tabla
create_product_table_sql = """
CREATE TABLE IF NOT EXISTS Products (
    id SERIAL PRIMARY KEY,
    gender VARCHAR(10),
    masterCategory VARCHAR(100),
    subCategory VARCHAR(100),
    articleType VARCHAR(100),
    baseColour VARCHAR(50),
    season VARCHAR(50),
    year INT,
    usage VARCHAR(100),
    productDisplayName TEXT
);
"""
table_name="Products"

create_table(DB_NAME, DB_USER, DB_PASSWORD, DB_HOST, DB_PORT, table_name, create_product_table_sql)

✅ Tabla 'Products' creada exitosamente en PostgreSQL (o ya existía).


- ## Carga de datos

### Verificación de Carga Previa antes de Insertar Datos

Para evitar cargas duplicadas y mejorar la eficiencia del proceso ETL, se implementa una verificación previa que consulta el número de registros existentes en la tabla `Products`. 

#### ¿Qué se hace?
- Se ejecuta un `SELECT COUNT(*)` para saber si la tabla ya contiene datos.
- Si ya existen registros, se **omite la carga** del archivo CSV.
- Si la tabla está vacía, se procede a insertar los datos usando un bucle `iterrows()`.

#### Beneficios:
- Previene recargas innecesarias que consumirían recursos y tiempo.
- Asegura que la carga de datos sea **idempotente** (segura de repetir sin efecto adverso).
- Funciona en conjunto con la cláusula `ON CONFLICT (id) DO NOTHING` como segunda capa de protección.

> Esta estrategia es altamente recomendable en flujos de datos repetibles o automatizados.


In [7]:
%%time

# Establecer conexión
conn = psycopg2.connect(dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD, host=DB_HOST, port=DB_PORT)
cur = conn.cursor()

# Verificar si la tabla ya contiene registros
cur.execute("SELECT COUNT(*) FROM Products;")
row_count = cur.fetchone()[0]

if row_count > 0:
    print(f"La tabla 'Products' ya contiene {row_count} registros. Se omite la carga.")
else:
    # Insertar datos si la tabla está vacía
    for _, row in df.iterrows():
        cur.execute("""
            INSERT INTO Products (id, gender, masterCategory, subCategory, articleType, baseColour, season, year, usage, productDisplayName)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            ON CONFLICT (id) DO NOTHING;
        """, (
            row["id"], row["gender"], row["masterCategory"], row["subCategory"],
            row["articleType"], row["baseColour"], row["season"],
            int(row["year"]) if not pd.isna(row["year"]) else None,
            row["usage"], row["productDisplayName"]
        ))
    
    conn.commit()
    print("✅ Archivo 'product.csv' cargado exitosamente en PostgreSQL.")

# Cierre de conexión
cur.close()
conn.close()


✅ Archivo 'product.csv' cargado exitosamente en PostgreSQL.
CPU times: total: 4.56 s
Wall time: 7.29 s


# Creación/Importación de la tabla **"Customers"**

### Creación e Importación de la Tabla `Customers`

En esta sección se carga el archivo `customer.csv`, que contiene información de clientes, y se define su estructura en la base de datos PostgreSQL mediante una sentencia SQL `CREATE TABLE IF NOT EXISTS`.

#### 1. Carga del archivo `customer.csv`
Se utiliza `pandas.read_csv()` con el parámetro `on_bad_lines="warn"` para manejar posibles filas corruptas o mal formateadas durante la lectura.

#### 2. Creación de la tabla `Customers`
La estructura de la tabla incluye campos clave como:
- Identificador de cliente (`customer_id`),
- Nombre y apellidos,
- Identificador único (`username` y `device_id` en formato UUID),
- Género y fecha de nacimiento,
- Información del dispositivo,
- Ubicación geográfica,
- Fecha de incorporación a la plataforma (`first_join_date`).

Algunos campos están marcados como `UNIQUE` para garantizar la unicidad de ciertos atributos, como el email y el username.

#### 3. Uso de la función `create_table`
Se reutiliza la función previamente definida para ejecutar la sentencia de creación, garantizando que la tabla se crea si no existe.

> Este paso prepara la base de datos para almacenar la información de los clientes, que es fundamental para la segmentación, análisis de comportamiento y posterior personalización de recomendaciones.


- ## Carga del CSV

In [8]:
%%time
df = pd.read_csv("../data/customer.csv",on_bad_lines="warn")
df.head()

CPU times: total: 328 ms
Wall time: 346 ms


Unnamed: 0,customer_id,first_name,last_name,username,email,gender,birthdate,device_type,device_id,device_version,home_location_lat,home_location_long,home_location,home_country,first_join_date
0,2870,Lala,Maryati,671a0865-ac4e-4dc4-9c4f-c286a1176f7e,671a0865_ac4e_4dc4_9c4f_c286a1176f7e@startupca...,F,1996-06-14,iOS,c9c0de76-0a6c-4ac2-843f-65264ab9fe63,iPhone; CPU iPhone OS 14_2_1 like Mac OS X,-1.043345,101.360523,Sumatera Barat,Indonesia,2019-07-21
1,8193,Maimunah,Laksmiwati,83be2ba7-8133-48a4-bbcb-b46a2762473f,83be2ba7_8133_48a4_bbcb_b46a2762473f@zakyfound...,F,1993-08-16,Android,fb331c3d-f42e-40fe-afe2-b4b73a8a6e25,Android 2.2.1,-6.212489,106.81885,Jakarta Raya,Indonesia,2017-07-16
2,7279,Bakiman,Simanjuntak,3250e5a3-1d23-4675-a647-3281879d42be,3250e5a3_1d23_4675_a647_3281879d42be@startupca...,M,1989-01-23,iOS,d13dde0a-6ae1-43c3-83a7-11bbb922730b,iPad; CPU iPad OS 4_2_1 like Mac OS X,-8.631607,116.428436,Nusa Tenggara Barat,Indonesia,2020-08-23
3,88813,Cahyadi,Maheswara,df797edf-b465-4a80-973b-9fbb612260c2,df797edf_b465_4a80_973b_9fbb612260c2@zakyfound...,M,1991-01-05,iOS,f4c18515-c5be-419f-8142-f037be47c9cd,iPad; CPU iPad OS 14_2 like Mac OS X,1.299332,115.774934,Kalimantan Timur,Indonesia,2021-10-03
4,82542,Irnanto,Wijaya,36ab08e1-03de-42a8-9e3b-59528c798824,36ab08e1_03de_42a8_9e3b_59528c798824@startupca...,M,2000-07-15,iOS,e46e4c36-4630-4736-8fcf-663db29ca3b0,iPhone; CPU iPhone OS 10_3_3 like Mac OS X,-2.980807,114.924675,Kalimantan Selatan,Indonesia,2021-04-11


- ## Creación de la tabla

In [9]:
# Crear la tabla de clientes si no existe
create_customers_table_sql="""
    CREATE TABLE IF NOT EXISTS Customers (
        customer_id SERIAL PRIMARY KEY,
        first_name VARCHAR(50),
        last_name VARCHAR(50),
        username UUID UNIQUE,
        email VARCHAR(100) UNIQUE,
        gender CHAR(1),
        birthdate DATE,
        device_type VARCHAR(50),
        device_id UUID,
        device_version VARCHAR(50),
        home_location_lat FLOAT,
        home_location_long FLOAT,
        home_location VARCHAR(100),
        home_country VARCHAR(50),
        first_join_date DATE
    );
"""
table_name="Customers"

create_table(DB_NAME, DB_USER, DB_PASSWORD, DB_HOST, DB_PORT, table_name, create_customers_table_sql)

✅ Tabla 'Customers' creada exitosamente en PostgreSQL (o ya existía).


### Verificación de Carga Previa en la Tabla `Customers`

Antes de realizar la carga de datos, se implementa una verificación para determinar si la tabla `Customers` ya contiene registros. Si es así, se omite la carga para evitar reprocesamiento innecesario.

#### ¿Qué hace este bloque?
- Consulta la cantidad de registros existentes en la tabla con `SELECT COUNT(*)`.
- Si la tabla no está vacía, se muestra un mensaje y se evita recorrer el DataFrame.
- Si está vacía, se inserta cada registro utilizando `INSERT INTO` con control de duplicados vía `ON CONFLICT (customer_id) DO NOTHING`.

#### Ventajas de esta estrategia:
- Mejora el rendimiento evitando cargas repetidas.
- Protege contra duplicaciones sin necesidad de eliminar datos previos.
- Aporta trazabilidad y control sobre la ejecución del pipeline.

> Esta es una práctica recomendada en flujos ETL donde las cargas pueden ejecutarse varias veces o de forma programada.


- ## Carga de datos

In [10]:
%%time

# Conectar a la base de datos
conn = psycopg2.connect(dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD, host=DB_HOST, port=DB_PORT)
cur = conn.cursor()

# Verificar si la tabla ya contiene registros
cur.execute("SELECT COUNT(*) FROM Customers;")
row_count = cur.fetchone()[0]

if row_count > 0:
    print(f"La tabla 'Customers' ya contiene {row_count} registros. Se omite la carga.")
else:
    # Insertar datos si la tabla está vacía
    for _, row in df.iterrows():
        cur.execute("""
            INSERT INTO Customers (
                customer_id, first_name, last_name, username, email, gender, birthdate, device_type,
                device_id, device_version, home_location_lat, home_location_long,
                home_location, home_country, first_join_date
            )
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            ON CONFLICT (customer_id) DO NOTHING;
        """, (
            row["customer_id"], row["first_name"], row["last_name"], row["username"],
            row["email"], row["gender"], row["birthdate"], row["device_type"], row["device_id"],
            row["device_version"], 
            float(row["home_location_lat"]) if not pd.isna(row["home_location_lat"]) else None,
            float(row["home_location_long"]) if not pd.isna(row["home_location_long"]) else None,
            row["home_location"], row["home_country"], row["first_join_date"]
        ))

    conn.commit()
    print("✅ Archivo 'customer.csv' cargado exitosamente en PostgreSQL.")

# Cerrar conexión
cur.close()
conn.close()


✅ Archivo 'customer.csv' cargado exitosamente en PostgreSQL.
CPU times: total: 13.1 s
Wall time: 23.1 s


# Creación/Importación de la tabla **"Transactions"**

### Creación e Importación de la Tabla `Transactions`

En esta sección se carga el archivo `transactions.csv` y se define su estructura en la base de datos PostgreSQL mediante una sentencia SQL `CREATE TABLE IF NOT EXISTS`.

#### Estructura de la tabla `Transactions`
Contiene información clave sobre las transacciones realizadas por los clientes, incluyendo:
- Fecha y hora de la transacción (`created_at`),
- Relación con el cliente (`customer_id`),
- Identificadores únicos (`booking_id`, `session_id`),
- Detalles del producto (`product_metadata` en formato `JSONB`),
- Información del pago y promociones,
- Costes de envío y ubicación de entrega,
- Importe total de la transacción.

Se incluye una clave foránea con la tabla `Customers` y se establece `session_id` como clave primaria.

#### Protección de integridad
- Se define la relación entre transacciones y clientes con una `FOREIGN KEY`.
- Se define el campo `session_id` como `PRIMARY KEY`, asegurando unicidad por sesión.

> Esta tabla es crítica para el análisis de comportamiento, patrones de compra, segmentación avanzada y recomendaciones basadas en transacciones reales.


- ## Carga del CSV

In [11]:
%%time
df = pd.read_csv("../data/transactions.csv",on_bad_lines="warn")
df.head()

CPU times: total: 3.19 s
Wall time: 3.21 s


Unnamed: 0,created_at,customer_id,booking_id,session_id,product_metadata,payment_method,payment_status,promo_amount,promo_code,shipment_fee,shipment_date_limit,shipment_location_lat,shipment_location_long,total_amount
0,2018-07-29T15:22:01.458193Z,5868,186e2bee-0637-4710-8981-50c2d737bc42,3abaa6ce-e320-4e51-9469-d9f3fa328e86,"[{'product_id': 54728, 'quantity': 1, 'item_pr...",Debit Card,Success,1415,WEEKENDSERU,10000,2018-08-03T05:07:24.812676Z,-8.227893,111.969107,199832
1,2018-07-30T12:40:22.365620Z,4774,caadb57b-e808-4f94-9e96-8a7d4c9898db,2ee5ead1-f13e-4759-92df-7ff48475e970,"[{'product_id': 16193, 'quantity': 1, 'item_pr...",Credit Card,Success,0,,10000,2018-08-03T01:29:03.415705Z,3.01347,107.802514,155526
2,2018-09-15T11:51:17.365620Z,4774,6000fffb-9c1a-4f4a-9296-bc8f6b622b50,93325fb6-eb00-4268-bb0e-6471795a0ad0,"[{'product_id': 53686, 'quantity': 4, 'item_pr...",OVO,Success,0,,10000,2018-09-18T08:41:49.422380Z,-2.579428,115.743885,550696
3,2018-11-01T11:23:48.365620Z,4774,f5e530a7-4350-4cd1-a3bc-525b5037bcab,bcad5a61-1b67-448d-8ff4-781d67bc56e4,"[{'product_id': 20228, 'quantity': 1, 'item_pr...",Credit Card,Success,0,,0,2018-11-05T17:42:27.954235Z,-3.602334,120.363824,271012
4,2018-12-18T11:20:30.365620Z,4774,0efc0594-dbbf-4f9a-b0b0-a488cfddf8a2,df1042ab-13e6-4072-b9d2-64a81974c51a,"[{'product_id': 55220, 'quantity': 1, 'item_pr...",Credit Card,Success,0,,0,2018-12-23T17:24:07.361785Z,-3.602334,120.363824,198753


- ## Creación de la tabla

In [12]:
# Crear la tabla de transacciones si no existe
create_transactions_table_sql="""
    CREATE TABLE IF NOT EXISTS Transactions (
        created_at TIMESTAMP WITH TIME ZONE, 
        customer_id INT REFERENCES Customers(customer_id),
        booking_id UUID, 
        session_id UUID PRIMARY KEY,  
        product_metadata JSONB,  
        payment_method VARCHAR(50), 
        payment_status VARCHAR(10),
        promo_amount FLOAT DEFAULT 0.0,
        promo_code VARCHAR(20), 
        shipment_fee FLOAT DEFAULT 0.0,
        shipment_date_limit TIMESTAMP WITH TIME ZONE,
        shipment_location_lat FLOAT,
        shipment_location_long FLOAT,
        total_amount FLOAT DEFAULT 0.0 
    );
"""
table_name="Transactions"

create_table(DB_NAME, DB_USER, DB_PASSWORD, DB_HOST, DB_PORT, table_name, create_transactions_table_sql)

✅ Tabla 'Transactions' creada exitosamente en PostgreSQL (o ya existía).


- ## Carga de datos

### Verificación de Carga Previa y Carga Controlada en la Tabla `Transactions`

Antes de realizar la inserción de registros, se comprueba si la tabla `Transactions` ya contiene datos mediante una sentencia `SELECT COUNT(*)`. Si existen registros, se evita la recarga.

En caso contrario, se procede a la carga línea por línea, incluyendo:
- Conversión segura de `product_metadata` desde string a formato `JSONB`.
- Prevención de errores mediante bloques `try/except`, que permiten continuar el proceso aún si hay registros con problemas de formato.
- Protección contra duplicados mediante la cláusula `ON CONFLICT (session_id) DO NOTHING`.

> Este proceso permite una carga segura, robusta y eficiente, adaptable a entornos reales donde los datos pueden contener inconsistencias o estructuras complejas como JSON.


In [13]:
%%time

# Conectar a la base de datos
conn = psycopg2.connect(dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD, host=DB_HOST, port=DB_PORT)
cur = conn.cursor()

# Verificar si la tabla ya contiene registros
cur.execute("SELECT COUNT(*) FROM Transactions;")
row_count = cur.fetchone()[0]

if row_count > 0:
    print(f"La tabla 'Transactions' ya contiene {row_count} registros. Se omite la carga.")
else:
    print("Cargando datos en la tabla 'Transactions'...")

    for _, row in df.iterrows():
        try:
            # Convertir product_metadata a JSON válido
            product_metadata_str = row["product_metadata"]
            if isinstance(product_metadata_str, str):
                product_metadata_json = json.dumps(ast.literal_eval(product_metadata_str.replace("'", "\"")))
            else:
                product_metadata_json = json.dumps([])  # Si está vacío o no es str

            cur.execute("""
                INSERT INTO Transactions (
                    created_at, customer_id, booking_id, session_id, product_metadata, 
                    payment_method, payment_status, promo_amount, promo_code, 
                    shipment_fee, shipment_date_limit, shipment_location_lat, 
                    shipment_location_long, total_amount
                )
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                ON CONFLICT (session_id) DO NOTHING;
            """, (
                row.get("created_at", None),
                int(row["customer_id"]),
                row["booking_id"],
                row["session_id"],
                product_metadata_json,
                row.get("payment_method", None),
                row.get("payment_status", None),
                float(row["promo_amount"]) if not pd.isna(row["promo_amount"]) else 0.0,
                row.get("promo_code", None),
                float(row["shipment_fee"]) if not pd.isna(row["shipment_fee"]) else 0.0,
                row.get("shipment_date_limit", None),
                float(row["shipment_location_lat"]) if not pd.isna(row["shipment_location_lat"]) else None,
                float(row["shipment_location_long"]) if not pd.isna(row["shipment_location_long"]) else None,
                float(row["total_amount"]) if not pd.isna(row["total_amount"]) else 0.0
            ))
        except Exception as e:
            print(f"❌ Error al insertar fila con session_id {row['session_id']}: {e}")

    conn.commit()
    print("✅ Archivo 'transactions.csv' cargado exitosamente en PostgreSQL.")

# Cerrar la conexión
cur.close()
conn.close()


Cargando datos en la tabla 'Transactions'...
✅ Archivo 'transactions.csv' cargado exitosamente en PostgreSQL.
CPU times: total: 2min 48s
Wall time: 4min 31s


# Creación/Importación de la tabla **"Click_stream"**

### Creación e Importación de la Tabla `Click_Stream`

En esta sección se carga el archivo `click_stream.csv`, que contiene los eventos generados por los usuarios durante sus sesiones, y se crea su correspondiente tabla en PostgreSQL.

#### Estructura de la tabla `Click_Stream`
Esta tabla representa el flujo de navegación de los usuarios e incluye:
- `session_id`: ID de la sesión a la que pertenece el evento. **No todos los eventos están necesariamente asociados a una transacción**.
- `event_name`: Nombre del evento registrado (ej. click, view, purchase).
- `event_time`: Fecha y hora exacta del evento.
- `event_id`: Identificador único del evento (`PRIMARY KEY`).
- `traffic_source`: Canal desde el cual llegó el usuario (ej. email, direct, search).
- `event_metadata`: Información adicional del evento, almacenada como `JSONB`.

#### Características destacadas:
- No se establece una relación explícita de clave foránea entre `Click_Stream.session_id` y `Transactions.session_id`, ya que puede haber eventos sin una transacción asociada.
- Se utiliza `event_id` como clave primaria para asegurar unicidad.
- El uso de `JSONB` permite almacenar estructuras de datos flexibles, adaptándose a distintos tipos de eventos.

> Esta tabla es fundamental para analizar el comportamiento de navegación de los usuarios, la atribución multicanal y los puntos de contacto previos a la conversión, incluso cuando no se concreta una transacción.


- ## Carga del CSV

In [14]:
%%time
df = pd.read_csv("../data/click_stream.csv",on_bad_lines="warn")
df.head()

CPU times: total: 18.6 s
Wall time: 18.7 s


Unnamed: 0,session_id,event_name,event_time,event_id,traffic_source,event_metadata
0,fb0abf9e-fd1a-44dd-b5c0-2834d5a4b81c,HOMEPAGE,2019-09-06T15:54:32.821085Z,9c4388c4-c95b-4678-b5ca-e9cbc0734109,MOBILE,
1,fb0abf9e-fd1a-44dd-b5c0-2834d5a4b81c,SCROLL,2019-09-06T16:03:57.821085Z,4690e1f5-3f99-42d3-84a5-22c4c4d8500a,MOBILE,
2,7d440441-e67a-4d36-b324-80ffd636d166,HOMEPAGE,2019-09-01T12:05:10.322763Z,88aeaeb5-ec98-4859-852c-8abb483faf31,MOBILE,
3,7d440441-e67a-4d36-b324-80ffd636d166,ADD_TO_CART,2019-09-01T12:06:33.322763Z,934e306e-ecc6-472f-9ccb-12c8536910a2,MOBILE,"{'product_id': 15315, 'quantity': 4, 'item_pri..."
4,7d440441-e67a-4d36-b324-80ffd636d166,BOOKING,2019-09-01T12:15:29.425431Z,9f4767a1-40fa-4c9c-9524-dfad18634d56,MOBILE,{'payment_status': 'Success'}


- ## Creación de la tabla

In [15]:
# Crear la tabla de click_stream si no existe
create_click_stream_table_sql="""
CREATE TABLE IF NOT EXISTS Click_Stream (
    session_id UUID, 
    event_name VARCHAR(100),
    event_time TIMESTAMP WITH TIME ZONE,
    event_id UUID PRIMARY KEY,
    traffic_source VARCHAR(50),
    event_metadata JSONB
);
"""

table_name="click_stream"

create_table(DB_NAME, DB_USER, DB_PASSWORD, DB_HOST, DB_PORT, table_name, create_click_stream_table_sql)

✅ Tabla 'click_stream' creada exitosamente en PostgreSQL (o ya existía).


- ## Carga de datos

### Verificación de Carga Previa y Carga Controlada en la Tabla `Click_Stream`

Antes de insertar los datos del archivo `click_stream.csv`, se comprueba si la tabla `Click_Stream` ya contiene registros mediante una consulta `SELECT COUNT(*)`. Si se detecta que la tabla ya tiene información, se omite la carga para evitar duplicados y reprocesamientos innecesarios.

#### Proceso detallado:
- Se convierte el campo `event_metadata`, que puede venir como string representando un diccionario, a formato `JSONB` válido usando `ast.literal_eval()` y `json.dumps()`.
- Se utiliza la cláusula `ON CONFLICT (event_id) DO NOTHING` para evitar insertar eventos ya existentes.
- Se maneja cada inserción dentro de un bloque `try/except` para capturar errores y continuar con el resto de los datos.
- En los casos en que `event_metadata` está vacío o mal formado, se inserta un objeto JSON vacío `{}` por defecto.

#### Consideraciones:
- La tabla no impone relación directa con la tabla `Transactions`, ya que los eventos no siempre están asociados a una sesión transaccional.
- Esta verificación previa permite mantener la carga **idempotente** y segura, evitando procesamientos innecesarios.

> Este enfoque proporciona un control robusto sobre la fase de carga del flujo de eventos del usuario, clave para el análisis de comportamiento digital y customer journey.


In [16]:
%%time

# Conectar a la base de datos
conn = psycopg2.connect(dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD, host=DB_HOST, port=DB_PORT)
cur = conn.cursor()

# Verificar si la tabla ya contiene registros
cur.execute("SELECT COUNT(*) FROM Click_Stream;")
row_count = cur.fetchone()[0]

if row_count > 0:
    print(f"La tabla 'Click_Stream' ya contiene {row_count} registros. Se omite la carga.")
else:
    print("Cargando datos en la tabla 'Click_Stream'...")

    for _, row in df.iterrows():
        try:
            # Convertir event_metadata a JSON válido
            event_metadata_str = row["event_metadata"]
            if isinstance(event_metadata_str, str):
                event_metadata_json = json.dumps(ast.literal_eval(event_metadata_str.replace("'", "\"")))
            else:
                event_metadata_json = json.dumps({})  # Si está vacío, guardar objeto vacío

            cur.execute("""
                INSERT INTO Click_Stream (
                    session_id, event_name, event_time, event_id, 
                    traffic_source, event_metadata
                )
                VALUES (%s, %s, %s, %s, %s, %s)
                ON CONFLICT (event_id) DO NOTHING;
            """, (
                row["session_id"],
                row["event_name"],
                row["event_time"],
                row["event_id"],
                row["traffic_source"],
                event_metadata_json
            ))
        except Exception as e:
            print(f"❌ Error al insertar fila con event_id {row['event_id']}: {e}")

    conn.commit()
    print("✅ Archivo 'click_stream.csv' cargado exitosamente en PostgreSQL.")

# Cerrar conexión
cur.close()
conn.close()


Cargando datos en la tabla 'Click_Stream'...
✅ Archivo 'click_stream.csv' cargado exitosamente en PostgreSQL.
CPU times: total: 27min 20s
Wall time: 46min 34s


## Resumen

El proceso de carga se ha completado con éxito. A continuación se presenta un resumen con el número total de registros insertados en cada una de las tablas del modelo de datos:

| Tabla           | Registros cargados |
|----------------|--------------------|
| `Products`     | X registros      |
| `Customers`    | Y registros      |
| `Transactions` | Z registros      |
| `Click_Stream` | W registros      |

> *(Los valores X, Y, Z, W serán rellenados automáticamente desde el script Python)*

---

### Validación general:
- Todos los registros han sido validados y cargados correctamente.
- Se han aplicado verificaciones previas (`SELECT COUNT(*)`) para asegurar que los datos no se recargan si ya existen.
- Cada tabla refleja la estructura deseada, con claves primarias, integridad relacional y soporte para datos estructurados y semi-estructurados (`JSONB`).

¡El entorno está preparado para comenzar con el análisis exploratorio y la construcción del sistema de recomendación!


In [17]:
tablas = ["Products", "Customers", "Transactions", "Click_Stream"]
conteo_filas = {}

conn = psycopg2.connect(dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD, host=DB_HOST, port=DB_PORT)
cur = conn.cursor()

for tabla in tablas:
    cur.execute(f"SELECT COUNT(*) FROM {tabla};")
    conteo_filas[tabla] = cur.fetchone()[0]

cur.close()
conn.close()

for tabla, filas in conteo_filas.items():
    print(f" {tabla}: {filas} registros")

 Products: 44446 registros
 Customers: 100000 registros
 Transactions: 852584 registros
 Click_Stream: 12833602 registros


## Resumen General del Proceso ETL

Este notebook ha implementado con éxito el proceso de carga de datos (Extract & Load) para el sistema de recomendación basado en comportamiento del consumidor en entorno retail. A continuación, se detallan las acciones realizadas:

### Tecnología utilizada:
- **PostgreSQL** como sistema de almacenamiento relacional.
- **Python** para la orquestación del proceso ETL.
- **Pandas** para lectura de archivos CSV.
- **psycopg2 + SQLAlchemy** para conexión y operaciones sobre la base de datos.
- **JSONB** como tipo flexible para datos no estructurados en algunas columnas.

---

### Tablas creadas y cargadas:

#### `Products`
- Contiene el catálogo de productos disponibles.
- Estructura clara y categórica.
- Clave primaria: `id`.

#### `Customers`
- Información personal, técnica y geográfica de cada cliente.
- Incluye campos únicos como `username` y `email`.
- Clave primaria: `customer_id`.

#### `Transactions`
- Registros de compras realizadas por los clientes.
- Incluye datos temporales, ubicación de envío y metadatos del producto.
- Campo `product_metadata` en formato `JSONB`.
- Clave primaria: `session_id`.
- Clave foránea con `Customers(customer_id)`.

#### `Click_Stream`
- Trazabilidad de eventos de navegación (clicks, views, etc.).
- Incluye fuente de tráfico y metadatos del evento (`JSONB`).
- No requiere que todos los eventos estén asociados a una transacción.
- Clave primaria: `event_id`.

---

### Buenas prácticas aplicadas:

- **Carga condicional:** Se verifica si cada tabla ya tiene registros para evitar duplicados (`SELECT COUNT(*)`).
- **Control de duplicados en DB:** Uso de `ON CONFLICT DO NOTHING` en claves primarias.
- **Conversión robusta de JSON:** Validación y transformación de strings con `ast.literal_eval()` + `json.dumps()`.
- **Manejo de errores fila a fila:** Inserciones protegidas con `try/except` para garantizar continuidad.
- **Código modular y reutilizable:** Funciones para creación de tablas, verificación de carga y procesamiento generalizado.

---

### Estado final:
Todas las tablas han sido creadas e importadas correctamente desde archivos `.csv`. El entorno relacional está listo para comenzar con:
- **Análisis exploratorio (EDA)**,
- **Limpieza y transformación (Transform)**,
- **Modelado predictivo y sistemas de recomendación**.

> Este proceso marca la base del pipeline de analítica avanzada para el TFM: *“Análisis predictivo del comportamiento del consumidor a través del análisis de datos”*.
