## Ejercicio 2: Diseño y construcción de modelo data warehouse. (4pts)
**Descripción**: Se plantea diseñar un modelo data warehouse sencillo a partir del resultado del ejercicio 1, construido directamente con notebooks jupyter, para ello el modelo debe satisfacer las siguientes necesidades:
- Productos más vendidos
- Clientes con el mayor número de pedidos
- Corresponsal con el mayor número de pedidos
- Total de pagos diario y mensual por productos
- Total de pagos diario y mensual por clientes
- Total de pagos diario y mensual por corresponsal

**Requisitos:**
- Diseñar un modelo de data warehouse de tipo estrella, incluir tablas de hechos y dimensiones que satisfagan los reportes de BI
- Construcción de diagrama data warehouse.

**Entregables:**
Incluir dentro del proyecto:
- Diagrama de data warehouse (1p)
- Notebook Jupyter que realice lo siguiente:
    - Limpieza de datos: (2p)
        - Nombres en mayúsculas, sin tildes, sin ñ, sin guiones bajos y medios.
        - Valores numéricos con solo dos decimales redondeados al inmediato superior.
    - Integración de los datos: (1p)
        - Scripts con la creación de las tablas de dimensiones
        - Scripts con la creación de las tablas de hechos

In [18]:
!pip install sqlalchemy psycopg2-binary pandas



In [19]:
#Instalacion de libreriras
from pyspark.sql import SparkSession
from sqlalchemy import create_engine, text
import pandas as pd

In [20]:
# Crear sesión de Spark
spark = SparkSession.builder \
    .appName("DataWarehouseDW") \
    .config("spark.jars", "postgresql-42.7.5.jar") \
    .config("spark.driver.extraClassPath", "postgresql-42.7.5.jar") \
    .getOrCreate()
# Verifica que la sesión de Spark se haya creado
print(spark)

<pyspark.sql.session.SparkSession object at 0x7f089ae4ffa0>


In [21]:
#Conexión a la base de datos de postgres
usuario = 'postgres'       
contrasena = 'postgres'  
host = 'localhost'         
puerto = '5432'           
bd = 'mp3_db'   

conexion = f'postgresql+psycopg2://{usuario}:{contrasena}@{host}:{puerto}/{bd}'

engine = create_engine(conexion)

In [22]:
# carga de datos de clientes
df_clientes = pd.read_sql('SELECT * FROM clientes', engine)
df_clientes.head()

Unnamed: 0,id_cliente,nombre,email,telefono,direccion,fecha_registro
0,1,Rosalinda del Vigil,maxi91@gmail.com,+34 696 109 211,Urbanización Ileana Jara 88 Apt. 84 Guadalaja...,2025-04-29 21:07:17.855258
1,2,Xavier Zaragoza Alarcón,cuestafortunato@gmail.com,+34737892325,"Pasadizo Ángel Acevedo 58 Apt. 55 Baleares, 3...",2025-04-29 21:07:17.855258
2,3,Micaela Borrego Borrell,ana-belenarrieta@rubio-querol.es,+34 717 22 88 8,"Pasadizo Edmundo Reguera 83 Apt. 68 Alicante,...",2025-04-29 21:07:17.855258
3,4,Claudio Gutierrez Salinas,aaron86@cruz.com,+34 962 07 59 1,"Glorieta de Arturo Amo 17 León, 79725",2025-04-29 21:07:17.855258
4,5,Vasco Parra,violeta58@yahoo.com,+34709764488,"Vial Roldán Solís 53 Navarra, 65772",2025-04-29 21:07:17.855258


## *Limpieza de datos: (2p)*

- Nombres en mayúsculas, sin tildes, sin ñ, sin guiones bajos y medios.
- Valores numéricos con solo dos decimales redondeados al inmediato superior.

In [23]:
#Importación de librerias

import unicodedata
import numpy as np

In [24]:
# Limpieza de texto
def limpiar_texto(texto):
    if pd.isnull(texto): return texto
    texto = texto.upper().replace("_", " ").replace("-", " ")
    texto = unicodedata.normalize('NFKD', texto).encode('ASCII', 'ignore').decode('utf-8')
    texto = texto.replace('Ñ', 'N')
    return texto

In [25]:
# Limpieza de teléfono
def limpiar_telefono(telefono):
    if pd.isnull(telefono):
        return telefono
    telefono = telefono.replace(" ", "")
    return telefono

In [26]:
# Limpieza a Clientes
df_clientes = pd.read_sql('SELECT * FROM Clientes', engine)
df_clientes['nombre'] = df_clientes['nombre'].apply(limpiar_texto)
df_clientes['email'] = df_clientes['email'].apply(limpiar_texto)
df_clientes['telefono'] = df_clientes['telefono'].apply(limpiar_telefono)

df_clientes.head()


Unnamed: 0,id_cliente,nombre,email,telefono,direccion,fecha_registro
0,1,ROSALINDA DEL VIGIL,MAXI91@GMAIL.COM,34696109211,Urbanización Ileana Jara 88 Apt. 84 Guadalaja...,2025-04-29 21:07:17.855258
1,2,XAVIER ZARAGOZA ALARCON,CUESTAFORTUNATO@GMAIL.COM,34737892325,"Pasadizo Ángel Acevedo 58 Apt. 55 Baleares, 3...",2025-04-29 21:07:17.855258
2,3,MICAELA BORREGO BORRELL,ANA BELENARRIETA@RUBIO QUEROL.ES,3471722888,"Pasadizo Edmundo Reguera 83 Apt. 68 Alicante,...",2025-04-29 21:07:17.855258
3,4,CLAUDIO GUTIERREZ SALINAS,AARON86@CRUZ.COM,3496207591,"Glorieta de Arturo Amo 17 León, 79725",2025-04-29 21:07:17.855258
4,5,VASCO PARRA,VIOLETA58@YAHOO.COM,34709764488,"Vial Roldán Solís 53 Navarra, 65772",2025-04-29 21:07:17.855258


In [27]:
# Limpieza a Productos
df_productos = pd.read_sql('SELECT * FROM Productos', engine)
df_productos['nombre'] = df_productos['nombre'].apply(limpiar_texto)
df_productos['descripcion'] = df_productos['descripcion'].apply(limpiar_texto)
df_productos.head()

Unnamed: 0,id_producto,nombre,descripcion,precio,stock
0,1,LAPTOP LENOVO THINKPAD X1 N CARBON,PRODUCTO TECNOLOGICO DE ALTA CALIDAD: LAPTOP L...,442.47849,91
1,2,SMARTPHONE SAMSUNG GALAXY N S24,PRODUCTO TECNOLOGICO DE ALTA CALIDAD: SMARTPHO...,272.04082,96
2,3,TABLET APPLE IPAD N PRO,PRODUCTO TECNOLOGICO DE ALTA CALIDAD: TABLET A...,182.13936,20
3,4,MONITOR LG ULTRAWIDE N 34,PRODUCTO TECNOLOGICO DE ALTA CALIDAD: MONITOR ...,232.92347,81
4,5,AURICULARES SONY WH 1000XM5 N WIRELESS,PRODUCTO TECNOLOGICO DE ALTA CALIDAD: AURICULA...,496.58918,79


In [28]:
# Redondear valores numéricos de precio a dos decimales
df_productos['precio'] = np.ceil(df_productos['precio'] * 100) / 100
df_productos.head()

Unnamed: 0,id_producto,nombre,descripcion,precio,stock
0,1,LAPTOP LENOVO THINKPAD X1 N CARBON,PRODUCTO TECNOLOGICO DE ALTA CALIDAD: LAPTOP L...,442.48,91
1,2,SMARTPHONE SAMSUNG GALAXY N S24,PRODUCTO TECNOLOGICO DE ALTA CALIDAD: SMARTPHO...,272.05,96
2,3,TABLET APPLE IPAD N PRO,PRODUCTO TECNOLOGICO DE ALTA CALIDAD: TABLET A...,182.14,20
3,4,MONITOR LG ULTRAWIDE N 34,PRODUCTO TECNOLOGICO DE ALTA CALIDAD: MONITOR ...,232.93,81
4,5,AURICULARES SONY WH 1000XM5 N WIRELESS,PRODUCTO TECNOLOGICO DE ALTA CALIDAD: AURICULA...,496.59,79


In [29]:
# Se realzia tambien para Detalles_Orden y Ordenes
df_detalles = pd.read_sql('SELECT * FROM Detalles_Orden', engine)
df_ordenes = pd.read_sql('SELECT * FROM Ordenes', engine)


In [30]:
for col in ['precio_unitario', 'subtotal']:
    df_detalles[col] = np.ceil(df_detalles[col] * 100) / 100

df_ordenes['total'] = np.ceil(df_ordenes['total'] * 100) / 100

df_detalles.head()

Unnamed: 0,id_detalle,id_orden,id_producto,cantidad,precio_unitario,subtotal
0,1,1,25,2,259.72,519.43
1,2,1,14,1,383.64,383.64
2,3,2,26,2,380.97,761.94
3,4,2,23,4,338.53,1354.09
4,5,3,28,4,184.53,738.11


In [31]:
df_ordenes.head()

Unnamed: 0,id_orden,id_cliente,id_corresponsal,fecha_orden,total
0,1,1,6,2025-04-25 01:12:43,903.07
1,2,1,1,2025-02-11 15:05:19,2116.03
2,3,1,4,2025-01-13 15:15:20,852.63
3,4,2,3,2025-03-24 04:18:00,1557.02
4,5,2,2,2025-02-09 22:28:48,1385.16


In [32]:
# carga de datos de clientes
df_corresponsales = pd.read_sql('SELECT * FROM corresponsales', engine)
df_corresponsales.head()

Unnamed: 0,id_corresponsal,nombre,direccion
0,1,Banco Pichincha,Av. Amazonas y Naciones Unidas
1,2,Western Union,Calle Bolívar y Vargas
2,3,Cooperativa Andalucía,Av. 6 de Diciembre
3,4,Banco Guayaquil,Malecón Simón Bolívar
4,5,Servipagos,Av. De Los Shyris


## **Creación de la base de datos para el DatawereHouse**

In [33]:
#Credenciales para el datawerehouse        
bd_dw = 'postgres'   
conexion_dw = f'postgresql+psycopg2://{usuario}:{contrasena}@{host}:{puerto}/{bd_dw}'
engine_dw=create_engine(conexion_dw)

In [34]:
# 4. Crear tablas en PostgreSQL
script_sql = """

CREATE TABLE IF NOT EXISTS th_ordenes (
    id_th SERIAL PRIMARY KEY,
    id_fecha INT NOT NULL,
    id_detalle INT NOT NULL,
    id_corresponsal INT NOT NULL,
    id_cliente INT NOT NULL,
    id_producto INT NOT NULL,
    id_orden INT NOT NULL,
    precio_unitario NUMERIC(10, 5) NOT NULL,
    cantidad INT NOT NULL,
    subtotal NUMERIC(10, 5) NOT NULL
);

CREATE TABLE IF NOT EXISTS Dim_Clientes (
    id_cliente INT PRIMARY KEY,
    nombre VARCHAR(100),
    email VARCHAR(100),
    telefono VARCHAR(15),
    direccion TEXT
);
CREATE TABLE IF NOT EXISTS Dim_Productos (
    id_producto INT PRIMARY KEY,
    nombre VARCHAR(100),
    descripcion TEXT,
    precio NUMERIC(10, 5)
);
CREATE TABLE IF NOT EXISTS Dim_Corresponsales (
    id_corresponsal INT PRIMARY KEY,
    nombre VARCHAR(100),
    direccion TEXT
);

CREATE TABLE IF NOT EXISTS Dim_Fecha (
    id_fecha INT PRIMARY KEY,
    anio INT,
    mes INT,
    dia INT
);

"""
# 5. Ejecutar el script SQL en PostgreSQL
with engine_dw.connect() as conn:
    conn.execute(text(script_sql))
    conn.commit()

print("✔ Tablas creadas exitosamente en el Data Warehouse.")

✔ Tablas creadas exitosamente en el Data Warehouse.


In [35]:
# Carga de datos de en las dimesiones

# Insertar los datos en la tabla 'Dim_Clientes'
df_clientes_selected = df_clientes[['id_cliente', 'nombre', 'email','telefono', 'direccion']]
df_clientes_selected.to_sql('dim_clientes', con=engine_dw, index=False, if_exists='append')


30

In [36]:
# Insertar los datos en la tabla 'Dim_Productos'
df_productos_selected = df_productos[['id_producto', 'nombre', 'descripcion','precio']]
df_productos_selected.to_sql('dim_productos', con=engine_dw, index=False, if_exists='append')

30

In [37]:
# Insertar los datos en la tabla 'Dim_Corresponsales'
df_corresponsales_selected = df_corresponsales[['id_corresponsal', 'nombre', 'direccion']]
df_corresponsales_selected.to_sql('dim_corresponsales', con=engine_dw, index=False, if_exists='append')

7

In [38]:
# Generacion del los datos para la dimesion de fecha
# Leer fechas únicas desde la base de datos
df_fechas_tmp = pd.read_sql('''SELECT 
    ROW_NUMBER() OVER (ORDER BY DATE(o.fecha_orden)) AS id_fecha,
    DATE(o.fecha_orden) AS fecha_orden
FROM 
    ordenes o
GROUP BY 
    DATE(o.fecha_orden)
ORDER BY 
    fecha_orden ASC''', engine)

# Convertir a tipo datetime
df_fechas_tmp['fecha_orden'] = pd.to_datetime(df_fechas_tmp['fecha_orden'])

# Separar en año, mes y día
df_fechas_tmp['anio'] = df_fechas_tmp['fecha_orden'].dt.year
df_fechas_tmp['mes'] = df_fechas_tmp['fecha_orden'].dt.month
df_fechas_tmp['dia'] = df_fechas_tmp['fecha_orden'].dt.day

# Visualizar el resultado
df_fechas_tmp.head()



Unnamed: 0,id_fecha,fecha_orden,anio,mes,dia
0,1,2025-01-01,2025,1,1
1,2,2025-01-02,2025,1,2
2,3,2025-01-03,2025,1,3
3,4,2025-01-07,2025,1,7
4,5,2025-01-08,2025,1,8


In [39]:
# Insertar los datos en la tabla 'Dim_Fecha'
df_fecha_selected = df_fechas_tmp[['id_fecha', 'anio', 'mes','dia']]
df_fecha_selected.to_sql('dim_fecha', con=engine_dw, index=False, if_exists='append')


51

In [40]:
#Lectura de datos para la tabla de hechos

df_th_temp = pd.read_sql('''

select
	corr.id_corresponsal,
	c.id_cliente,
	o.id_orden,
	deco.id_detalle,
	p.id_producto,
	DATE(o.fecha_orden) as fecha_orden ,
	deco.precio_unitario,
	deco.cantidad,
	(deco.precio_unitario * deco.cantidad) subtotal
from
	ordenes o,
	clientes c ,
	corresponsales corr,
	detalles_orden deco,
	productos p
where
	o.id_cliente = c.id_cliente
	and 
o.id_corresponsal = corr.id_corresponsal
	and
deco.id_orden = o.id_orden
	and
deco.id_producto = p.id_producto

''', engine)


# Visualizar el resultado
df_th_temp.head()

Unnamed: 0,id_corresponsal,id_cliente,id_orden,id_detalle,id_producto,fecha_orden,precio_unitario,cantidad,subtotal
0,6,1,1,1,25,2025-04-25,259.71175,2,519.4235
1,6,1,1,2,14,2025-04-25,383.63912,1,383.63912
2,1,1,2,3,26,2025-02-11,380.96887,2,761.93774
3,1,1,2,4,23,2025-02-11,338.52229,4,1354.08916
4,4,1,3,5,28,2025-01-13,184.52701,4,738.10804


In [41]:
# Asegurar de que ambas columnas estén en formato fecha sin la hora
df_th_temp['fecha_orden'] = pd.to_datetime(df_th_temp['fecha_orden']).dt.date
df_fechas_tmp['fecha_orden'] = pd.to_datetime(df_fechas_tmp['fecha_orden']).dt.date

#union de los datos de la TH con dl dimesion de fecha para obtener el id_fecha
df_join_th_fechas_tmp = pd.merge(df_th_temp, df_fechas_tmp, on='fecha_orden', how='inner')

# Visualizar
df_join_th_fechas_tmp.head()

Unnamed: 0,id_corresponsal,id_cliente,id_orden,id_detalle,id_producto,fecha_orden,precio_unitario,cantidad,subtotal,id_fecha,anio,mes,dia
0,6,1,1,1,25,2025-04-25,259.71175,2,519.4235,51,2025,4,25
1,6,1,1,2,14,2025-04-25,383.63912,1,383.63912,51,2025,4,25
2,1,1,2,3,26,2025-02-11,380.96887,2,761.93774,22,2025,2,11
3,1,1,2,4,23,2025-02-11,338.52229,4,1354.08916,22,2025,2,11
4,4,1,3,5,28,2025-01-13,184.52701,4,738.10804,7,2025,1,13


In [42]:
#Seleccion de datos a guardar
df_join_th_fechas_tmp['precio_unitario'] = np.ceil(df_join_th_fechas_tmp['precio_unitario'] * 100) / 100
df_join_th_fechas_tmp['subtotal'] = np.ceil(df_join_th_fechas_tmp['subtotal'] * 100) / 100

df_th = df_join_th_fechas_tmp[['id_fecha','id_detalle','id_corresponsal', 'id_cliente', 'id_producto', 'id_orden','precio_unitario','cantidad', 'subtotal']]
df_th.head()

Unnamed: 0,id_fecha,id_detalle,id_corresponsal,id_cliente,id_producto,id_orden,precio_unitario,cantidad,subtotal
0,51,1,6,1,25,1,259.72,2,519.43
1,51,2,6,1,14,1,383.64,1,383.64
2,22,3,1,1,26,2,380.97,2,761.94
3,22,4,1,1,23,2,338.53,4,1354.09
4,7,5,4,1,28,3,184.53,4,738.11


In [43]:
# Insertar los datos en la tabla de hechos
df_th.to_sql('th_ordenes', con=engine_dw, index=False, if_exists='append')

150