## üîÑ Definici√≥n de las Fases ETL

### 1. **EXTRACT (Extracci√≥n)**
- **Fuentes de datos**:
  - Archivo CSV con datos de ventas diarias
  - Archivo JSON con informaci√≥n de productos
  - Archivo CSV con datos de clientes
- **Reto**: Diferentes formatos y estructuras de datos

### 2. **TRANSFORM (Transformaci√≥n)**
- Limpieza de datos (valores nulos, duplicados)
- Estandarizaci√≥n de formatos (fechas, monedas)
- C√°lculos de m√©tricas derivadas (totales, promedios)
- Joins entre las diferentes fuentes
- Agregaciones y agrupaciones

### 3. **LOAD (Carga)**
- Almacenamiento en formato Parquet para optimizaci√≥n
- Particionamiento por fecha para consultas eficientes
- Creaci√≥n de tablas temporales para an√°lisis

In [47]:
# Reiniciar entorno para evitar conflictos
import importlib
import sys

# Limpiar imports previos si existen
modules_to_remove = [mod for mod in sys.modules.keys() if mod.startswith('pyspark')]
for mod in modules_to_remove:
    if mod in sys.modules:
        del sys.modules[mod]

# Importaciones b√°sicas de Python (sin conflictos)
import pandas as pd
import json
import random
import os
from datetime import datetime, timedelta

print("‚úÖ Entorno limpio - listo para PySpark")

‚úÖ Entorno limpio - listo para PySpark


In [48]:
# Importar PySpark de forma segura
from pyspark.sql import SparkSession
import pyspark.sql.functions as spark_functions
from pyspark.sql.types import *

# Crear sesi√≥n Spark
spark = SparkSession.builder \
    .appName("ETL_Ventas_Ecommerce") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

print(f"‚úÖ Spark Version: {spark.version}")
print("‚úÖ Sesi√≥n de Spark iniciada correctamente")

‚úÖ Spark Version: 3.5.1
‚úÖ Sesi√≥n de Spark iniciada correctamente


In [49]:
# Generar datos de ventas - usando solo Python nativo
print("üìä Generando datos de ventas...")

ventas_lista = []
fecha_base = datetime(2024, 1, 1)

for i in range(1000):
    dias_random = random.randint(0, 90)
    fecha_venta = fecha_base + timedelta(days=dias_random)

    # Crear registro usando solo funciones Python nativas
    venta = {
        'venta_id': 'V' + str(i+1).zfill(4),
        'cliente_id': 'C' + str(random.randint(1, 200)).zfill(3),
        'producto_id': 'P' + str(random.randint(1, 50)).zfill(3),
        'cantidad': random.randint(1, 5),
        'precio_unitario': float(f"{random.uniform(10.99, 299.99):.2f}"),
        'fecha_venta': fecha_venta.strftime('%Y-%m-%d'),
        'canal_venta': random.choice(['online', 'tienda_fisica', 'movil'])
    }
    ventas_lista.append(venta)

# Convertir a DataFrame pandas y guardar
ventas_df = pd.DataFrame(ventas_lista)
ventas_df.to_csv('ventas.csv', index=False)

print(f"‚úÖ {len(ventas_lista)} registros de ventas generados")
print("Primeras 5 filas:")
print(ventas_df.head())

üìä Generando datos de ventas...
‚úÖ 1000 registros de ventas generados
Primeras 5 filas:
  venta_id cliente_id producto_id  cantidad  precio_unitario fecha_venta  \
0    V0001       C149        P029         4           244.86  2024-02-19   
1    V0002       C174        P013         4           203.86  2024-01-19   
2    V0003       C178        P017         2           244.17  2024-03-05   
3    V0004       C165        P047         5            70.75  2024-03-21   
4    V0005       C114        P045         2           165.38  2024-03-15   

     canal_venta  
0         online  
1          movil  
2          movil  
3  tienda_fisica  
4  tienda_fisica  


In [50]:
# Generar datos de productos
print("üõçÔ∏è Generando datos de productos...")

productos_lista = []
categorias = ['Electr√≥nicos', 'Ropa', 'Hogar', 'Deportes', 'Libros']
marcas = ['BrandA', 'BrandB', 'BrandC', 'BrandD', 'BrandE']

for i in range(50):
    producto = {
        'producto_id': 'P' + str(i+1).zfill(3),
        'nombre': f'Producto {i+1}',
        'categoria': random.choice(categorias),
        'marca': random.choice(marcas),
        'precio_catalogo': float(f"{random.uniform(15.99, 399.99):.2f}"),
        'estado': random.choice(['activo', 'descontinuado'])
    }
    productos_lista.append(producto)

# Guardar como JSON
with open('productos.json', 'w', encoding='utf-8') as archivo:
    json.dump(productos_lista, archivo, indent=2, ensure_ascii=False)

print(f"‚úÖ {len(productos_lista)} registros de productos generados")
print("Primeros 3 productos:")
for i in range(3):
    print(f"  {productos_lista[i]}")

üõçÔ∏è Generando datos de productos...
‚úÖ 50 registros de productos generados
Primeros 3 productos:
  {'producto_id': 'P001', 'nombre': 'Producto 1', 'categoria': 'Libros', 'marca': 'BrandC', 'precio_catalogo': 199.02, 'estado': 'descontinuado'}
  {'producto_id': 'P002', 'nombre': 'Producto 2', 'categoria': 'Hogar', 'marca': 'BrandE', 'precio_catalogo': 189.55, 'estado': 'activo'}
  {'producto_id': 'P003', 'nombre': 'Producto 3', 'categoria': 'Deportes', 'marca': 'BrandC', 'precio_catalogo': 321.58, 'estado': 'activo'}


In [51]:
# Generar datos de clientes
print("üë• Generando datos de clientes...")

clientes_lista = []
ciudades = ['Santiago', 'Valpara√≠so', 'Concepci√≥n', 'Temuco', 'Antofagasta']
tipos = ['regular', 'premium', 'vip']

for i in range(200):
    dias_registro = random.randint(-365, 0)
    fecha_registro = fecha_base + timedelta(days=dias_registro)

    cliente = {
        'cliente_id': 'C' + str(i+1).zfill(3),
        'nombre': f'Cliente {i+1}',
        'email': f'cliente{i+1}@email.com',
        'ciudad': random.choice(ciudades),
        'tipo_cliente': random.choice(tipos),
        'fecha_registro': fecha_registro.strftime('%Y-%m-%d')
    }
    clientes_lista.append(cliente)

# Guardar como CSV
clientes_df = pd.DataFrame(clientes_lista)
clientes_df.to_csv('clientes.csv', index=False)

print(f"‚úÖ {len(clientes_lista)} registros de clientes generados")
print("Primeras 5 filas:")
print(clientes_df.head())

üë• Generando datos de clientes...
‚úÖ 200 registros de clientes generados
Primeras 5 filas:
  cliente_id     nombre               email       ciudad tipo_cliente  \
0       C001  Cliente 1  cliente1@email.com  Antofagasta          vip   
1       C002  Cliente 2  cliente2@email.com   Valpara√≠so          vip   
2       C003  Cliente 3  cliente3@email.com     Santiago      premium   
3       C004  Cliente 4  cliente4@email.com   Valpara√≠so          vip   
4       C005  Cliente 5  cliente5@email.com  Antofagasta      premium   

  fecha_registro  
0     2023-01-15  
1     2023-08-04  
2     2023-04-17  
3     2023-04-26  
4     2023-09-08  


In [52]:
print("="*60)
print("üîç FASE EXTRACT: Cargando datos con Spark")
print("="*60)

# Cargar ventas
print("\nüìä Cargando ventas.csv...")
df_ventas_spark = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("ventas.csv")

print(f"‚úÖ Ventas cargadas: {df_ventas_spark.count()} registros")

# Cargar productos
print("\nüõçÔ∏è Cargando productos.json...")
df_productos_spark = spark.read \
    .option("multiLine", "true") \
    .json("productos.json")

print(f"‚úÖ Productos cargados: {df_productos_spark.count()} registros")

# Cargar clientes
print("\nüë• Cargando clientes.csv...")
df_clientes_spark = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("clientes.csv")

print(f"‚úÖ Clientes cargados: {df_clientes_spark.count()} registros")

# Mostrar esquemas
print("\nüìã Esquemas de datos:")
print("\nVENTAS:")
df_ventas_spark.printSchema()
print("\nPRODUCTOS:")
df_productos_spark.printSchema()
print("\nCLIENTES:")
df_clientes_spark.printSchema()

üîç FASE EXTRACT: Cargando datos con Spark

üìä Cargando ventas.csv...
‚úÖ Ventas cargadas: 1000 registros

üõçÔ∏è Cargando productos.json...
‚úÖ Productos cargados: 50 registros

üë• Cargando clientes.csv...
‚úÖ Clientes cargados: 200 registros

üìã Esquemas de datos:

VENTAS:
root
 |-- venta_id: string (nullable = true)
 |-- cliente_id: string (nullable = true)
 |-- producto_id: string (nullable = true)
 |-- cantidad: integer (nullable = true)
 |-- precio_unitario: double (nullable = true)
 |-- fecha_venta: date (nullable = true)
 |-- canal_venta: string (nullable = true)


PRODUCTOS:
root
 |-- categoria: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- marca: string (nullable = true)
 |-- nombre: string (nullable = true)
 |-- precio_catalogo: double (nullable = true)
 |-- producto_id: string (nullable = true)


CLIENTES:
root
 |-- cliente_id: string (nullable = true)
 |-- nombre: string (nullable = true)
 |-- email: string (nullable = true)
 |-- ciudad: strin

In [53]:
# Mostrar muestras de los datos cargados
print("üìã MUESTRAS DE DATOS CARGADOS:")

print("\nüìä Ventas (5 primeras filas):")
df_ventas_spark.show(5)

print("\nüõçÔ∏è Productos (5 primeras filas):")
df_productos_spark.show(5)

print("\nüë• Clientes (5 primeras filas):")
df_clientes_spark.show(5)

üìã MUESTRAS DE DATOS CARGADOS:

üìä Ventas (5 primeras filas):
+--------+----------+-----------+--------+---------------+-----------+-------------+
|venta_id|cliente_id|producto_id|cantidad|precio_unitario|fecha_venta|  canal_venta|
+--------+----------+-----------+--------+---------------+-----------+-------------+
|   V0001|      C149|       P029|       4|         244.86| 2024-02-19|       online|
|   V0002|      C174|       P013|       4|         203.86| 2024-01-19|        movil|
|   V0003|      C178|       P017|       2|         244.17| 2024-03-05|        movil|
|   V0004|      C165|       P047|       5|          70.75| 2024-03-21|tienda_fisica|
|   V0005|      C114|       P045|       2|         165.38| 2024-03-15|tienda_fisica|
+--------+----------+-----------+--------+---------------+-----------+-------------+
only showing top 5 rows


üõçÔ∏è Productos (5 primeras filas):
+---------+-------------+------+----------+---------------+-----------+
|categoria|       estado| marca| 

In [54]:
print("="*60)
print("‚öôÔ∏è FASE TRANSFORM: Limpieza y transformaciones")
print("="*60)

# Usar funciones de PySpark con alias expl√≠cito
from pyspark.sql.functions import col, isnan, isnull

print("üßπ Limpiando datos nulos...")

# Limpiar ventas
df_ventas_limpio = df_ventas_spark.filter(
    col("venta_id").isNotNull() &
    col("cliente_id").isNotNull() &
    col("producto_id").isNotNull() &
    col("cantidad").isNotNull() &
    col("precio_unitario").isNotNull()
)

# Limpiar productos
df_productos_limpio = df_productos_spark.filter(
    col("producto_id").isNotNull() &
    col("nombre").isNotNull()
)

# Limpiar clientes
df_clientes_limpio = df_clientes_spark.filter(
    col("cliente_id").isNotNull() &
    col("email").isNotNull()
)

print(f"‚úÖ Ventas despu√©s de limpieza: {df_ventas_limpio.count()}")
print(f"‚úÖ Productos despu√©s de limpieza: {df_productos_limpio.count()}")
print(f"‚úÖ Clientes despu√©s de limpieza: {df_clientes_limpio.count()}")

‚öôÔ∏è FASE TRANSFORM: Limpieza y transformaciones
üßπ Limpiando datos nulos...
‚úÖ Ventas despu√©s de limpieza: 1000
‚úÖ Productos despu√©s de limpieza: 50
‚úÖ Clientes despu√©s de limpieza: 200


In [55]:
# Importar funciones necesarias para transformaciones
from pyspark.sql.functions import to_date, year, month, quarter, current_date, datediff

print("üîß Aplicando transformaciones...")

# Transformar ventas - agregar campos calculados
df_ventas_transformado = df_ventas_limpio \
    .withColumn("fecha_venta", to_date(col("fecha_venta"), "yyyy-MM-dd")) \
    .withColumn("total_venta", col("cantidad") * col("precio_unitario")) \
    .withColumn("anio", year(col("fecha_venta"))) \
    .withColumn("mes", month(col("fecha_venta"))) \
    .withColumn("trimestre", quarter(col("fecha_venta")))

# Transformar clientes - calcular antig√ºedad
df_clientes_transformado = df_clientes_limpio \
    .withColumn("fecha_registro", to_date(col("fecha_registro"), "yyyy-MM-dd")) \
    .withColumn("dias_como_cliente",
                datediff(current_date(), col("fecha_registro")))

print("‚úÖ Transformaciones aplicadas correctamente")

# Mostrar resultado
print("\nVentas transformadas (muestra):")
df_ventas_transformado.select(
    "venta_id", "total_venta", "anio", "mes", "fecha_venta"
).show(5)

print("\nClientes transformados (muestra):")
df_clientes_transformado.select(
    "cliente_id", "nombre", "dias_como_cliente"
).show(5)

üîß Aplicando transformaciones...
‚úÖ Transformaciones aplicadas correctamente

Ventas transformadas (muestra):
+--------+-----------+----+---+-----------+
|venta_id|total_venta|anio|mes|fecha_venta|
+--------+-----------+----+---+-----------+
|   V0001|     979.44|2024|  2| 2024-02-19|
|   V0002|     815.44|2024|  1| 2024-01-19|
|   V0003|     488.34|2024|  3| 2024-03-05|
|   V0004|     353.75|2024|  3| 2024-03-21|
|   V0005|     330.76|2024|  3| 2024-03-15|
+--------+-----------+----+---+-----------+
only showing top 5 rows


Clientes transformados (muestra):
+----------+---------+-----------------+
|cliente_id|   nombre|dias_como_cliente|
+----------+---------+-----------------+
|      C001|Cliente 1|              983|
|      C002|Cliente 2|              782|
|      C003|Cliente 3|              891|
|      C004|Cliente 4|              882|
|      C005|Cliente 5|              747|
+----------+---------+-----------------+
only showing top 5 rows



In [56]:
print("üîó Realizando joins entre tablas...")

# PASO 1: Renombrar columnas para evitar ambig√ºedad
print("üè∑Ô∏è Renombrando columnas para evitar conflictos...")

# Renombrar columnas en productos
df_productos_renombrado = df_productos_limpio \
    .withColumnRenamed("nombre", "nombre_producto")

# Renombrar columnas en clientes
df_clientes_renombrado = df_clientes_transformado \
    .withColumnRenamed("nombre", "nombre_cliente")

print("‚úÖ Columnas renombradas:")
print("  - productos.nombre ‚Üí nombre_producto")
print("  - clientes.nombre ‚Üí nombre_cliente")

# PASO 2: Realizar joins
df_datos_completos = df_ventas_transformado \
    .join(df_productos_renombrado, "producto_id", "inner") \
    .join(df_clientes_renombrado, "cliente_id", "inner")

total_registros = df_datos_completos.count()
print(f"‚úÖ Datos consolidados: {total_registros} registros")

# Mostrar estructura final
print("\nEsquema de datos consolidados:")
df_datos_completos.printSchema()

# Mostrar muestra (ahora sin ambig√ºedad)
print("\nDatos consolidados (muestra):")
df_datos_completos.select(
    "venta_id", "cliente_id", "producto_id",
    "nombre_producto", "nombre_cliente", "categoria",
    "total_venta", "fecha_venta"
).show(5)

üîó Realizando joins entre tablas...
üè∑Ô∏è Renombrando columnas para evitar conflictos...
‚úÖ Columnas renombradas:
  - productos.nombre ‚Üí nombre_producto
  - clientes.nombre ‚Üí nombre_cliente
‚úÖ Datos consolidados: 1000 registros

Esquema de datos consolidados:
root
 |-- cliente_id: string (nullable = true)
 |-- producto_id: string (nullable = true)
 |-- venta_id: string (nullable = true)
 |-- cantidad: integer (nullable = true)
 |-- precio_unitario: double (nullable = true)
 |-- fecha_venta: date (nullable = true)
 |-- canal_venta: string (nullable = true)
 |-- total_venta: double (nullable = true)
 |-- anio: integer (nullable = true)
 |-- mes: integer (nullable = true)
 |-- trimestre: integer (nullable = true)
 |-- categoria: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- marca: string (nullable = true)
 |-- nombre_producto: string (nullable = true)
 |-- precio_catalogo: double (nullable = true)
 |-- nombre_cliente: string (nullable = true)
 |-- email: st

In [57]:
# Importar funciones de agregaci√≥n
from pyspark.sql.functions import sum, avg, count, max as spark_max

print("üìà Calculando m√©tricas por cliente...")

# M√©tricas por cliente (usando nombres corregidos)
df_metricas_cliente = df_datos_completos \
    .groupBy("cliente_id", "nombre_cliente", "ciudad", "tipo_cliente") \
    .agg(
        count("venta_id").alias("total_compras"),
        sum("total_venta").alias("valor_total_compras"),
        avg("total_venta").alias("ticket_promedio"),
        spark_max("fecha_venta").alias("ultima_compra")
    )

total_clientes = df_metricas_cliente.count()
print(f"‚úÖ M√©tricas calculadas para {total_clientes} clientes")

print("\nTop 10 clientes por valor de compras:")
df_metricas_cliente \
    .orderBy(col("valor_total_compras").desc()) \
    .show(10)

üìà Calculando m√©tricas por cliente...
‚úÖ M√©tricas calculadas para 196 clientes

Top 10 clientes por valor de compras:
+----------+--------------+-----------+------------+-------------+-------------------+------------------+-------------+
|cliente_id|nombre_cliente|     ciudad|tipo_cliente|total_compras|valor_total_compras|   ticket_promedio|ultima_compra|
+----------+--------------+-----------+------------+-------------+-------------------+------------------+-------------+
|      C055|    Cliente 55|     Temuco|         vip|           18|            8573.15| 476.2861111111111|   2024-03-22|
|      C117|   Cliente 117|Antofagasta|     regular|            8|  6204.580000000001| 775.5725000000001|   2024-03-15|
|      C141|   Cliente 141| Valpara√≠so|         vip|            6|            6101.51|1016.9183333333334|   2024-02-27|
|      C067|    Cliente 67|     Temuco|         vip|            8|            5416.29|         677.03625|   2024-03-15|
|      C028|    Cliente 28| Valpara√

In [58]:
print("üì¶ Calculando m√©tricas por producto...")

# M√©tricas por producto (usando nombres corregidos)
df_metricas_producto = df_datos_completos \
    .groupBy("producto_id", "nombre_producto", "categoria", "marca") \
    .agg(
        count("venta_id").alias("total_ventas"),
        sum("cantidad").alias("unidades_vendidas"),
        sum("total_venta").alias("ingresos_totales"),
        avg("precio_unitario").alias("precio_promedio")
    )

total_productos = df_metricas_producto.count()
print(f"‚úÖ M√©tricas calculadas para {total_productos} productos")

print("\nTop 10 productos por ingresos:")
df_metricas_producto \
    .orderBy(col("ingresos_totales").desc()) \
    .show(10)

üì¶ Calculando m√©tricas por producto...
‚úÖ M√©tricas calculadas para 50 productos

Top 10 productos por ingresos:
+-----------+---------------+------------+------+------------+-----------------+------------------+------------------+
|producto_id|nombre_producto|   categoria| marca|total_ventas|unidades_vendidas|  ingresos_totales|   precio_promedio|
+-----------+---------------+------------+------+------------+-----------------+------------------+------------------+
|       P046|    Producto 46|    Deportes|BrandE|          26|               86|14348.080000000002|165.68499999999997|
|       P010|    Producto 10|        Ropa|BrandE|          29|               89|14203.330000000002|158.89758620689653|
|       P033|    Producto 33|       Hogar|BrandA|          21|               71|          13928.27| 187.4180952380952|
|       P025|    Producto 25|    Deportes|BrandE|          26|               85|13834.500000000002|170.47153846153844|
|       P015|    Producto 15|        Ropa|BrandD| 

In [59]:
# Importar countDistinct para clientes √∫nicos
from pyspark.sql.functions import countDistinct

print("üìÖ Calculando m√©tricas mensuales...")

# M√©tricas por mes
df_metricas_mes = df_datos_completos \
    .groupBy("anio", "mes") \
    .agg(
        count("venta_id").alias("num_ventas"),
        sum("total_venta").alias("ingresos_mes"),
        avg("total_venta").alias("ticket_promedio_mes"),
        countDistinct("cliente_id").alias("clientes_unicos")
    ) \
    .orderBy("anio", "mes")

print("‚úÖ M√©tricas mensuales calculadas")
print("\nTendencia mensual:")
df_metricas_mes.show()

üìÖ Calculando m√©tricas mensuales...
‚úÖ M√©tricas mensuales calculadas

Tendencia mensual:
+----+---+----------+------------------+-------------------+---------------+
|anio|mes|num_ventas|      ingresos_mes|ticket_promedio_mes|clientes_unicos|
+----+---+----------+------------------+-------------------+---------------+
|2024|  1|       352|152628.12000000002|  433.6026136363637|            159|
|2024|  2|       298|         142334.38| 477.63214765100673|            149|
|2024|  3|       350|163699.71000000005|  467.7134571428573|            157|
+----+---+----------+------------------+-------------------+---------------+



In [60]:
print("="*60)
print("üíæ FASE LOAD: Guardando datos procesados")
print("="*60)

# Crear directorio de salida
directorio_salida = "data_warehouse"
if not os.path.exists(directorio_salida):
    os.makedirs(directorio_salida)
    print(f"‚úÖ Directorio '{directorio_salida}' creado")

# Guardar tabla de hechos (particionada)
print("üíæ Guardando tabla de hechos de ventas...")
df_datos_completos.write \
    .mode("overwrite") \
    .partitionBy("anio", "mes") \
    .parquet(f"{directorio_salida}/fact_ventas")

# Guardar dimensiones
print("üíæ Guardando dimensi√≥n de clientes...")
df_metricas_cliente.write \
    .mode("overwrite") \
    .parquet(f"{directorio_salida}/dim_clientes")

print("üíæ Guardando dimensi√≥n de productos...")
df_metricas_producto.write \
    .mode("overwrite") \
    .parquet(f"{directorio_salida}/dim_productos")

print("üíæ Guardando m√©tricas mensuales...")
df_metricas_mes.write \
    .mode("overwrite") \
    .parquet(f"{directorio_salida}/fact_metricas_mes")

print("‚úÖ Todos los datos guardados exitosamente")

üíæ FASE LOAD: Guardando datos procesados
üíæ Guardando tabla de hechos de ventas...
üíæ Guardando dimensi√≥n de clientes...
üíæ Guardando dimensi√≥n de productos...
üíæ Guardando m√©tricas mensuales...
‚úÖ Todos los datos guardados exitosamente


In [61]:
print("üîç Creando vistas temporales para an√°lisis SQL...")

# Crear vistas temporales
df_datos_completos.createOrReplaceTempView("ventas_completas")
df_metricas_cliente.createOrReplaceTempView("metricas_cliente")
df_metricas_producto.createOrReplaceTempView("metricas_producto")
df_metricas_mes.createOrReplaceTempView("metricas_mes")

print("‚úÖ Vistas temporales creadas:")
print("  - ventas_completas")
print("  - metricas_cliente")
print("  - metricas_producto")
print("  - metricas_mes")

# Verificar archivos guardados
print("\nüìÅ Archivos en el data warehouse:")
for archivo in ["fact_ventas", "dim_clientes", "dim_productos", "fact_metricas_mes"]:
    try:
        df_temp = spark.read.parquet(f"{directorio_salida}/{archivo}")
        print(f"  - {archivo}: {df_temp.count()} registros")
    except Exception as e:
        print(f"  - {archivo}: Error al leer")

üîç Creando vistas temporales para an√°lisis SQL...
‚úÖ Vistas temporales creadas:
  - ventas_completas
  - metricas_cliente
  - metricas_producto
  - metricas_mes

üìÅ Archivos en el data warehouse:
  - fact_ventas: 1000 registros
  - dim_clientes: 196 registros
  - dim_productos: 50 registros
  - fact_metricas_mes: 3 registros


In [62]:
print("="*60)
print("üìä AN√ÅLISIS DE DATOS CON SQL")
print("="*60)

print("üèÜ TOP 10 CLIENTES POR VALOR DE COMPRAS:")
resultado_clientes = spark.sql("""
    SELECT
        nombre_cliente as nombre,
        ciudad,
        tipo_cliente,
        ROUND(valor_total_compras, 2) as valor_total,
        total_compras,
        ROUND(ticket_promedio, 2) as ticket_promedio
    FROM metricas_cliente
    ORDER BY valor_total_compras DESC
    LIMIT 10
""")

resultado_clientes.show()

üìä AN√ÅLISIS DE DATOS CON SQL
üèÜ TOP 10 CLIENTES POR VALOR DE COMPRAS:
+-----------+-----------+------------+-----------+-------------+---------------+
|     nombre|     ciudad|tipo_cliente|valor_total|total_compras|ticket_promedio|
+-----------+-----------+------------+-----------+-------------+---------------+
| Cliente 55|     Temuco|         vip|    8573.15|           18|         476.29|
|Cliente 117|Antofagasta|     regular|    6204.58|            8|         775.57|
|Cliente 141| Valpara√≠so|         vip|    6101.51|            6|        1016.92|
| Cliente 67|     Temuco|         vip|    5416.29|            8|         677.04|
| Cliente 28| Valpara√≠so|     regular|     5362.4|           12|         446.87|
|Cliente 130|Antofagasta|     regular|    5311.75|            7|         758.82|
|  Cliente 9| Concepci√≥n|     premium|    5289.39|            6|         881.56|
| Cliente 16|     Temuco|         vip|    4870.48|            9|         541.16|
| Cliente 44|Antofagasta|     p

In [63]:
print("üõçÔ∏è TOP PRODUCTOS POR CATEGOR√çA:")
resultado_productos = spark.sql("""
    SELECT
        categoria,
        nombre_producto as nombre,
        marca,
        unidades_vendidas,
        ROUND(ingresos_totales, 2) as ingresos_totales
    FROM metricas_producto
    ORDER BY categoria, ingresos_totales DESC
""")

resultado_productos.show(15)

print("\nüõí AN√ÅLISIS POR CANAL DE VENTA:")
resultado_canales = spark.sql("""
    SELECT
        canal_venta,
        COUNT(*) as num_ventas,
        ROUND(SUM(total_venta), 2) as ingresos_totales,
        ROUND(AVG(total_venta), 2) as ticket_promedio
    FROM ventas_completas
    GROUP BY canal_venta
    ORDER BY ingresos_totales DESC
""")

resultado_canales.show()

üõçÔ∏è TOP PRODUCTOS POR CATEGOR√çA:
+------------+-----------+------+-----------------+----------------+
|   categoria|     nombre| marca|unidades_vendidas|ingresos_totales|
+------------+-----------+------+-----------------+----------------+
|    Deportes|Producto 46|BrandE|               86|        14348.08|
|    Deportes|Producto 25|BrandE|               85|         13834.5|
|    Deportes|Producto 47|BrandE|               58|        10937.13|
|    Deportes|Producto 20|BrandC|               63|        10307.45|
|    Deportes|Producto 42|BrandE|               61|         9814.97|
|    Deportes|Producto 24|BrandD|               67|         9412.09|
|    Deportes| Producto 8|BrandA|               60|         9113.54|
|    Deportes|Producto 34|BrandA|               62|         9081.62|
|    Deportes|Producto 38|BrandB|               51|         7740.18|
|    Deportes| Producto 3|BrandC|               56|          6224.5|
|    Deportes|Producto 45|BrandD|               47|         6079.

In [64]:
print("="*60)
print("üìã RESUMEN DEL PIPELINE ETL")
print("="*60)

# Calcular estad√≠sticas finales
total_ventas = df_datos_completos.count()
total_ingresos = df_datos_completos.agg(sum("total_venta")).collect()[0][0]
fecha_min = df_datos_completos.agg(spark_functions.min("fecha_venta")).collect()[0][0]
fecha_max = df_datos_completos.agg(spark_functions.max("fecha_venta")).collect()[0][0]
clientes_unicos = df_datos_completos.select("cliente_id").distinct().count()
productos_unicos = df_datos_completos.select("producto_id").distinct().count()

print(f"""
‚úÖ PIPELINE ETL COMPLETADO EXITOSAMENTE

üìä ESTAD√çSTICAS GENERALES:
   ‚Ä¢ Total de registros procesados: {total_ventas:,}
   ‚Ä¢ Ingresos totales: ${total_ingresos:,.2f}
   ‚Ä¢ Per√≠odo de datos: {fecha_min} a {fecha_max}
   ‚Ä¢ Clientes √∫nicos: {clientes_unicos:,}
   ‚Ä¢ Productos √∫nicos: {productos_unicos:,}

üóÇÔ∏è TABLAS CREADAS:
   ‚Ä¢ fact_ventas (tabla de hechos principal)
   ‚Ä¢ dim_clientes (m√©tricas por cliente)
   ‚Ä¢ dim_productos (m√©tricas por producto)
   ‚Ä¢ fact_metricas_mes (agregaciones mensuales)

‚ö° OPTIMIZACIONES:
   ‚Ä¢ Particionamiento por a√±o y mes
   ‚Ä¢ Formato Parquet para consultas eficientes
   ‚Ä¢ Vistas SQL para an√°lisis ad-hoc

üéØ PROCESO ETL:
   ‚úì EXTRACT: Datos de m√∫ltiples fuentes (CSV, JSON)
   ‚úì TRANSFORM: Limpieza, joins, agregaciones
   ‚úì LOAD: Data warehouse optimizado
""")

print("üéâ ¬°Pipeline ETL ejecutado exitosamente!")

üìã RESUMEN DEL PIPELINE ETL

‚úÖ PIPELINE ETL COMPLETADO EXITOSAMENTE

üìä ESTAD√çSTICAS GENERALES:
   ‚Ä¢ Total de registros procesados: 1,000
   ‚Ä¢ Ingresos totales: $458,662.21
   ‚Ä¢ Per√≠odo de datos: 2024-01-01 a 2024-03-31
   ‚Ä¢ Clientes √∫nicos: 196
   ‚Ä¢ Productos √∫nicos: 50

üóÇÔ∏è TABLAS CREADAS:
   ‚Ä¢ fact_ventas (tabla de hechos principal)
   ‚Ä¢ dim_clientes (m√©tricas por cliente)
   ‚Ä¢ dim_productos (m√©tricas por producto)
   ‚Ä¢ fact_metricas_mes (agregaciones mensuales)

‚ö° OPTIMIZACIONES:
   ‚Ä¢ Particionamiento por a√±o y mes
   ‚Ä¢ Formato Parquet para consultas eficientes
   ‚Ä¢ Vistas SQL para an√°lisis ad-hoc

üéØ PROCESO ETL:
   ‚úì EXTRACT: Datos de m√∫ltiples fuentes (CSV, JSON)
   ‚úì TRANSFORM: Limpieza, joins, agregaciones
   ‚úì LOAD: Data warehouse optimizado

üéâ ¬°Pipeline ETL ejecutado exitosamente!
