In [None]:
#Fase 2

In [1]:
#Abrir liberias y session


import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, sum, avg, count, desc, round, row_number, max as spark_max, when, isnan, lit
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql.window import Window
import time
import sqlite3



spark = SparkSession.builder \
    .appName("ecommerce_warehouse") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.ui.port", "4040") \
    .getOrCreate()

print("✓ SparkSession creada")
print(f"\n Spark UI disponible en: http://localhost:4040")
print(f"   Application ID: {spark.sparkContext.applicationId}")

✓ SparkSession creada

 Spark UI disponible en: http://localhost:4040
   Application ID: local-1765674723708


In [2]:
#Leer el .csv

df = spark.read.csv(
    "ecommerce_orders.csv",
    header=True,
    inferSchema=True
)

print(f"✓ Dataset cargado: {df.count():,} registros")
df.printSchema()


✓ Dataset cargado: 10,000 registros
root
 |-- order_id: double (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- quantity: double (nullable = true)
 |-- unit_price: string (nullable = true)
 |-- total_price: double (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- status: string (nullable = true)
 |-- update_time: timestamp (nullable = true)



In [None]:
#FASE 4

In [4]:
# Convertir a int
df = df.withColumn('order_id', col('order_id').cast('double'))

# Obtener el máximo ID actual
max_id = df.agg(spark_max('order_id')).collect()[0][0]

# Crear una columna de números 
window_spec = Window.orderBy(lit(1))
df = df.withColumn(
    'row_num',
    row_number().over(window_spec)
)

# Reemplazar nulos con IDs
df = df.withColumn(
    'order_id',
    when(isnan(col('order_id')) | col('order_id').isNull(), 
         col('row_num') + max_id).otherwise(col('order_id'))
)

# Convertir a entero 
df = df.withColumn('order_id', col('order_id').cast('int')).drop('row_num')

In [None]:
#Arreglar fechas
df['order_date'] = df['order_date'].astype(str).str.replace('/', '-', regex=False)
df['order_date'] = pd.to_datetime(df['order_date'], format='mixed', dayfirst=False, errors='coerc


In [None]:
#Añadir columna de precio sin impuestos
df = df.withColumn(
    "precio_sin_iva",
    round(col("total_price") / 1.21, 2)
)


df.head(5)

In [None]:
#Lista de 10 compras mas caras
df_top10 = df.orderBy(desc("total_price")).limit(10)

print("Top 10 órdenes más caras:\n")
df_top10.show()

In [None]:
#Agregar una fila nueva al dataset
nueva_venta = spark.createDataFrame([
    Row(
        order_id=10001,
        customer_id=2000,
        customer_name="Roberto Martín",
        product_id=201,
        product_name="iPad Pro",
        category="Tecnología",
        order_date="2025-12-12",
        quantity=1,
        unit_price=1299.99,
        total_price=1299.99,
        payment_method="Tarjeta",
        country="Spain",
        city="Madrid",
        status="Pendiente",
        update_time="2025-12-12 01:26:00"
    )
])

# Unir con dataset original
df_updated = df.unionByName(nueva_venta, allowMissingColumns=True)

print(f"✓ Nueva venta agregada")
print(f"Total de registros ahora: {df_updated.count()}\n")


df_updated.filter(col("order_id") == 10001).show(truncate=False)

In [None]:
#SQL

In [None]:
# Tabla de clientes
dim_clientes_spark = df_spark.select("customer_id", "customer_name").dropDuplicates()

# Tabla de productos productos
dim_productos_spark = df_spark.select("product_id", "product_name", "category").dropDuplicates()

# Tabla de hechos pedidos
fact_pedidos_spark = df_spark.select("order_id", "customer_id", "product_id", "order_date", "quantity", "unit_price", "total_price", "payment_method",
    "country","city","status","update_time")



In [None]:
conn = sqlite3.connect("warehouse_pyspark.db")


In [None]:
# Creacion de las tablas SQL
dim_clientes = dim_clientes_spark.toPandas()
dim_productos = dim_productos_spark.toPandas()
fact_pedidos = fact_pedidos_spark.toPandas()

dim_clientes.to_sql("dim_clientes", conn, if_exists="replace", index=False)
dim_productos.to_sql("dim_productos", conn, if_exists="replace", index=False)
fact_pedidos.to_sql("fact_pedidos", conn, if_exists="replace", index=False)



In [None]:
#Consulta de prueba
print("Tablas creadas:")
for table in conn.execute("SELECT name FROM sqlite_master WHERE type='table';"):
    print(table[0])


In [None]:
# Cerrar la base de datoss y acabar con la sessión de spark
conn.close()
spark.stop()
