# 02 - Procesamiento y ETL con PySpark

Este notebook replica el flujo de análisis y ETL usando PySpark y carga los resultados en `warehouse/warehouse_pyspark.db`. Ejecuta este notebook desde la raíz del proyecto.

## 1. Configuración Inicial

Se importa PySpark y se crea una SparkSession usando `SparkSession.builder` para procesar datos de manera distribuida. Se definen rutas relativas para el dataset CSV y la base de datos SQLite donde se guardará el Data Warehouse.

In [None]:
import os

import pandas as pd
import sqlalchemy
from pyspark.sql import SparkSession, functions as F, Window
from pyspark.sql.types import DoubleType

# Rutas relativas desde notebooks/ hacia data/ y warehouse/
DATA_PATH = "../data/videogames.csv"
DB_PATH = "../warehouse/warehouse_pyspark.db"
DB_URL = f"sqlite:///{DB_PATH}"

spark = SparkSession.builder.appName("videogames_pyspark_etl").getOrCreate()

print(f"CSV: {DATA_PATH}")
print(f"SQLite DB (PySpark): {DB_PATH}")


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/17 02:11:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


CSV: ../data/videogames.csv
SQLite DB (PySpark): ../warehouse/warehouse_pyspark.db


## 2. Carga del Dataset

Se carga el dataset de videojuegos usando Spark DataFrame con `spark.read.csv()`, habilitando `inferSchema=True` para detectar automáticamente los tipos de datos y `header=True` para usar la primera fila como encabezados. Se muestra el esquema con `printSchema()`, dimensiones y primeras filas con `show()` para verificar la carga.

In [2]:
# Carga del dataset con Spark

raw_df = (
    spark.read
    .option("header", True)  # Indica que el archivo CSV tiene una fila de encabezados
    .option("inferSchema", True)  # Le dice a Spark que detecte automáticamente el tipo de datos
    .csv(DATA_PATH)
)

print("===== Dataset cargado =====")
raw_df.printSchema()
print(f"Dimensiones del dataset: {raw_df.count()} filas x {len(raw_df.columns)} columnas")
print("\nPrimeras 5 filas:")
raw_df.show(5)


===== Dataset cargado =====
root
 |-- name: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- cost: string (nullable = true)
 |-- platform: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- pegi: string (nullable = true)
 |-- year: string (nullable = true)
 |-- developer: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- region: string (nullable = true)
 |-- mode: string (nullable = true)
 |-- engine: string (nullable = true)
 |-- award: string (nullable = true)
 |-- dlc_support: string (nullable = true)
 |-- language: string (nullable = true)
 |-- metascore: string (nullable = true)
 |-- user_score: string (nullable = true)
 |-- reviews: string (nullable = true)
 |-- rating_source: string (nullable = true)
 |-- copies_sold_millions: string (nullable = true)
 |-- revenue_millions_usd: string (nullable = true)

Dimensiones del dataset: 10000 filas x 21 columnas

Primeras 5 filas:
+-------------------+-------+-----+--------+----

## 3. Limpieza y normalización de datos en Spark

Se eliminan duplicados con dropDuplicates(). Las columnas numéricas copies_sold_millions y revenue_millions_usd se limpian y normalizan, convirtiendo valores con M y B, manejando valores problemáticos y rellenando nulos con la media. Finalmente, las columnas categóricas se limpian y los valores faltantes o inconsistentes se reemplazan por "Unknown".

In [None]:
print("===== 1. Eliminación de duplicados =====")
initial_count = raw_df.count()
clean_df = raw_df.dropDuplicates()
final_count = clean_df.count()
print(f"Filas antes: {initial_count}, después: {final_count}")

print("\n===== 2. Limpieza de columnas de ventas/ingresos =====")

numeric_target_cols = ["copies_sold_millions", "revenue_millions_usd"]

for col in numeric_target_cols:
    if col in clean_df.columns:

        # Normalización del texto
        clean_df = clean_df.withColumn(
            col,
            F.upper(
                F.regexp_replace(
                    F.regexp_replace(F.trim(F.col(col)), ",", ""),
                    "\\$", ""
                )
            )
        )

        # Conversión a numérico
        clean_df = clean_df.withColumn(
            col,
            F.when(
                (F.col(col).isNull()) | (F.col(col).isin("UNKNOWN", "?")),
                None
            )
            .when(
                F.col(col).endswith("M"),
                F.regexp_replace(F.col(col), "M", "").cast(DoubleType()) * 1e6
            )
            .when(
                F.col(col).endswith("B"),
                F.regexp_replace(F.col(col), "B", "").cast(DoubleType()) * 1e9
            )
            .otherwise(F.col(col).cast(DoubleType()))
        )

        # Relleno de nulos con la media
        mean_val = clean_df.select(F.mean(col)).first()[0]
        if mean_val is not None:
            clean_df = clean_df.na.fill({col: mean_val})
            print(f"Columna '{col}' rellenada con media: {mean_val}")
        else:
            clean_df = clean_df.na.fill({col: 0.0})
            print(f"Columna '{col}' sin valores válidos → rellenada con 0.0")

print("\n===== 3. Limpieza de columnas categóricas =====")

categorical_cols = [
    c for c in clean_df.columns
    if c not in numeric_target_cols
]

for col in categorical_cols:

    # Limpieza básica
    clean_df = clean_df.withColumn(col, F.trim(F.col(col)))

    # Normalización de valores problemáticos
    clean_df = clean_df.withColumn(
        col,
        F.when(
            (F.col(col).isNull()) |
            (F.col(col) == "") |
            (F.upper(F.col(col)).isin("?", "N/A", "UNKNOWN")),
            "Unknown"
        ).otherwise(F.col(col))
    )

print("\n===== Dataset después de limpieza =====")
clean_df.show(5)


===== 2. Limpieza de columnas de ventas/ingresos =====


                                                                                

Columna 'copies_sold_millions' limpiada y rellenada con media: 10308561.989154695
Columna 'revenue_millions_usd' limpiada y rellenada con media: 507393576.9315999

===== 3. Limpieza de columnas categóricas =====
Columnas categóricas limpiadas correctamente


## 4. Normalización de Nombres de Columnas

Se normalizan los nombres de columnas a formato snake_case (minúsculas, guiones bajos) usando `toDF()` con una lista de nombres normalizados. Esto asegura consistencia en los nombres antes de crear las dimensiones y la tabla de hechos.

In [5]:
# Normalización de nombres de columnas

normalized_cols = [
    c.strip().lower().replace(" ", "_").replace("-", "_") for c in clean_df.columns
]
clean_df = clean_df.toDF(*normalized_cols)

clean_df.printSchema()


root
 |-- name: string (nullable = false)
 |-- genre: string (nullable = false)
 |-- cost: string (nullable = false)
 |-- platform: string (nullable = false)
 |-- popularity: string (nullable = false)
 |-- pegi: string (nullable = false)
 |-- year: string (nullable = false)
 |-- developer: string (nullable = false)
 |-- publisher: string (nullable = false)
 |-- region: string (nullable = false)
 |-- mode: string (nullable = false)
 |-- engine: string (nullable = false)
 |-- award: string (nullable = false)
 |-- dlc_support: string (nullable = false)
 |-- language: string (nullable = false)
 |-- metascore: string (nullable = false)
 |-- user_score: string (nullable = false)
 |-- reviews: string (nullable = false)
 |-- rating_source: string (nullable = false)
 |-- copies_sold_millions: double (nullable = false)
 |-- revenue_millions_usd: double (nullable = false)



## 5. Creación de Dimensiones

Se crean las dimensiones usando `distinct()` y `orderBy()` en Spark para obtener valores únicos. Se convierten temporalmente a Pandas con `toPandas()` para asignar IDs autoincrementales, y luego se recrean como DataFrames de Spark usando `spark.createDataFrame()` para los joins posteriores. Se crean: `dim_game`, `dim_platform`, `dim_developer`, `dim_publisher` y `dim_year`.

In [6]:
cols = clean_df.columns
required_cols = ["name", "genre", "platform", "developer", "publisher", "year"]
for c in required_cols:
    if c not in cols:
        raise ValueError(f"Columna requerida no encontrada en el CSV: {c}")

print("Columnas disponibles después de normalización:")
print(cols)

print("\n===== Creación de dimensiones =====")

# Dimensión: dim_game
dim_game_pd = (
    clean_df.select("name", "genre")
    .distinct()
    .orderBy("name", "genre")
    .toPandas()
)
dim_game_pd["id_game"] = range(1, len(dim_game_pd) + 1)
dim_game = spark.createDataFrame(dim_game_pd)
print(f"Dimensión 'dim_game' creada: {len(dim_game_pd)} filas")

# Dimensión: dim_platform
dim_platform_pd = clean_df.select("platform").distinct().orderBy("platform").toPandas()
dim_platform_pd["id_platform"] = range(1, len(dim_platform_pd) + 1)
dim_platform = spark.createDataFrame(dim_platform_pd)
print(f"Dimensión 'dim_platform' creada: {len(dim_platform_pd)} filas")

# Dimensión: dim_developer
dim_developer_pd = clean_df.select("developer").distinct().orderBy("developer").toPandas()
dim_developer_pd["id_developer"] = range(1, len(dim_developer_pd) + 1)
dim_developer = spark.createDataFrame(dim_developer_pd)
print(f"Dimensión 'dim_developer' creada: {len(dim_developer_pd)} filas")

# Dimensión: dim_publisher
dim_publisher_pd = clean_df.select("publisher").distinct().orderBy("publisher").toPandas()
dim_publisher_pd["id_publisher"] = range(1, len(dim_publisher_pd) + 1)
dim_publisher = spark.createDataFrame(dim_publisher_pd)
print(f"Dimensión 'dim_publisher' creada: {len(dim_publisher_pd)} filas")

# Dimensión: dim_year
dim_year_pd = clean_df.select("year").distinct().orderBy("year").toPandas()
dim_year_pd["id_year"] = range(1, len(dim_year_pd) + 1)
dim_year = spark.createDataFrame(dim_year_pd)
print(f"Dimensión 'dim_year' creada: {len(dim_year_pd)} filas")

Columnas disponibles después de normalización:
['name', 'genre', 'cost', 'platform', 'popularity', 'pegi', 'year', 'developer', 'publisher', 'region', 'mode', 'engine', 'award', 'dlc_support', 'language', 'metascore', 'user_score', 'reviews', 'rating_source', 'copies_sold_millions', 'revenue_millions_usd']

===== Creación de dimensiones =====
Dimensión 'dim_game' creada: 408 filas
Dimensión 'dim_platform' creada: 9 filas
Dimensión 'dim_developer' creada: 13 filas
Dimensión 'dim_publisher' creada: 12 filas
Dimensión 'dim_year' creada: 42 filas


## 6. Construcción de la Tabla de Hechos

Se construye la tabla de hechos `fact_sales` mediante joins left (`how='left'`) entre el dataset limpio y las dimensiones usando `join()`. Se seleccionan las columnas finales con IDs de las dimensiones y las métricas (`copies_sold_millions` y `revenue_millions_usd`) para completar el esquema en estrella.

In [7]:
fact = (
    clean_df
    .join(dim_game, on=["name", "genre"], how="left")
    .join(dim_platform, on=["platform"], how="left")
    .join(dim_developer, on=["developer"], how="left")
    .join(dim_publisher, on=["publisher"], how="left")
    .join(dim_year, on=["year"], how="left")
)

# Columnas métricas
value_cols = [c for c in ["copies_sold_millions", "revenue_millions_usd"] if c in fact.columns]
if not value_cols:
    raise ValueError("No se encontraron columnas métricas para construir la tabla de hechos.")

# Selección final de la tabla de hechos
fact_sales = fact.select(
    "id_game", "id_platform", "id_developer", "id_publisher", "id_year", *value_cols
)

print(f"\nTabla de hechos 'fact_sales' creada: {fact_sales.count()} filas")
fact_sales.show(5)


                                                                                


Tabla de hechos 'fact_sales' creada: 10000 filas
+-------+-----------+------------+------------+-------+--------------------+--------------------+
|id_game|id_platform|id_developer|id_publisher|id_year|copies_sold_millions|revenue_millions_usd|
+-------+-----------+------------+------------+-------+--------------------+--------------------+
|    200|          3|           5|          10|     23|           1500000.0|               282.3|
|    339|          4|          12|          10|     39|               34.55|               5.0E8|
|    373|          1|           8|           9|     33|1.0308561989154695E7|               1.0E9|
|     38|          7|           1|          10|     41|           1500000.0| 5.073935769315999E8|
|    371|          6|           9|           6|     34|               12.79| 5.073935769315999E8|
+-------+-----------+------------+------------+-------+--------------------+--------------------+
only showing top 5 rows


## 7. Carga en SQLite

Se carga el Data Warehouse en SQLite directamente desde Spark usando el conector JDBC. Se utiliza `spark.write.jdbc()` para escribir los DataFrames directamente a SQLite sin necesidad de convertir a Pandas. Se crea la carpeta `warehouse` si no existe y se usa el modo `overwrite` para sobrescribir tablas existentes.

In [8]:
print("===== 1. Creación de carpeta y configuración de la base de datos =====")
# Creamos la carpeta warehouse si no existe
os.makedirs("../warehouse", exist_ok=True)
print(f"Base de datos SQLite se guardará en: {DB_PATH}")

# Configuración de JDBC para SQLite
# Nota: PySpark necesita el driver JDBC de SQLite
# Se puede descargar desde: https://github.com/xerial/sqlite-jdbc/releases
# O usar el que viene con algunas distribuciones de Spark
jdbc_url = f"jdbc:sqlite:{DB_PATH}"
jdbc_properties = {
    "driver": "org.sqlite.JDBC"
}

# -------------------------------
# Guardado de tablas dimensionales y tabla de hechos usando Spark JDBC
# -------------------------------
print("\n===== 2. Guardado de tablas en SQLite usando JDBC =====")
print("Escribiendo DataFrames de Spark directamente a SQLite...")

# Función auxiliar para escribir con modo overwrite
def write_to_sqlite(df, table_name):
    """Escribe un DataFrame de Spark a SQLite usando JDBC"""
    df.write \
        .mode("overwrite") \
        .option("driver", "org.sqlite.JDBC") \
        .jdbc(jdbc_url, table_name, properties=jdbc_properties)
    print(f" - {table_name} cargada")

# Cargar todas las tablas
try:
    # Dimensión: dim_game
    write_to_sqlite(dim_game, "dim_game")
    
    # Dimensión: dim_platform
    write_to_sqlite(dim_platform, "dim_platform")
    
    # Dimensión: dim_developer
    write_to_sqlite(dim_developer, "dim_developer")
    
    # Dimensión: dim_publisher
    write_to_sqlite(dim_publisher, "dim_publisher")
    
    # Dimensión: dim_year
    write_to_sqlite(dim_year, "dim_year")
    
    # Tabla de hechos: fact_sales
    write_to_sqlite(fact_sales, "fact_sales")
    
    print("\n✅ Todas las tablas fueron cargadas correctamente en SQLite usando PySpark JDBC.")
    print(f"Base de datos disponible en: {DB_PATH}")
    
except Exception as e:
    print(f"\n⚠️ Error al usar JDBC directo: {e}")
    print("Nota: Si el driver JDBC de SQLite no está disponible, puedes:")
    print("  1. Descargar sqlite-jdbc desde: https://github.com/xerial/sqlite-jdbc/releases")
    print("  2. Agregarlo al classpath de Spark")
    print("  3. O usar la alternativa con toPandas() (celda siguiente)")
    raise


===== 1. Creación de carpeta y configuración de la base de datos =====
Base de datos SQLite se guardará en: ../warehouse/warehouse_pyspark.db

===== 2. Guardado de tablas en SQLite usando JDBC =====
Escribiendo DataFrames de Spark directamente a SQLite...

⚠️ Error al usar JDBC directo: An error occurred while calling o1681.jdbc.
: java.lang.ClassNotFoundException: org.sqlite.JDBC
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:593)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:526)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:47)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:112)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:112)
	at scala.Option.foreach(Option.scala:437)
	at org.apache.spark.sql.execution

Py4JJavaError: An error occurred while calling o1681.jdbc.
: java.lang.ClassNotFoundException: org.sqlite.JDBC
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:593)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:526)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:47)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:112)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:112)
	at scala.Option.foreach(Option.scala:437)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:112)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:272)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:276)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:48)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:55)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:79)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:77)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:88)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$2(QueryExecution.scala:155)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:272)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:125)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
	at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106)
	at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:295)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:124)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:237)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:155)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)
	at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$eagerlyExecute$1(QueryExecution.scala:154)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$3.applyOrElse(QueryExecution.scala:169)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$3.applyOrElse(QueryExecution.scala:164)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:470)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:86)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:470)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:360)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:356)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:446)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:164)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyCommandExecuted$1(QueryExecution.scala:126)
	at scala.util.Try$.apply(Try.scala:217)
	at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378)
	at org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1439)
	at org.apache.spark.util.LazyTry.get(LazyTry.scala:58)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:131)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:192)
	at org.apache.spark.sql.classic.DataFrameWriter.runCommand(DataFrameWriter.scala:622)
	at org.apache.spark.sql.classic.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
	at org.apache.spark.sql.classic.DataFrameWriter.saveInternal(DataFrameWriter.scala:241)
	at org.apache.spark.sql.classic.DataFrameWriter.save(DataFrameWriter.scala:126)
	at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:334)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:1583)
	Suppressed: org.apache.spark.util.Utils$OriginalTryStackTraceException: Full stacktrace of original doTryWithCallerStacktrace caller
		at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
		at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:593)
		at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:526)
		at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:47)
		at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:112)
		at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:112)
		at scala.Option.foreach(Option.scala:437)
		at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:112)
		at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:272)
		at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:276)
		at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:48)
		at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:55)
		at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:79)
		at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:77)
		at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:88)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$2(QueryExecution.scala:155)
		at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:163)
		at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:272)
		at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:125)
		at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
		at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
		at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106)
		at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
		at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:125)
		at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:295)
		at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:124)
		at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
		at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:78)
		at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:237)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:155)
		at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)
		at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$eagerlyExecute$1(QueryExecution.scala:154)
		at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$3.applyOrElse(QueryExecution.scala:169)
		at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$3.applyOrElse(QueryExecution.scala:164)
		at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:470)
		at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:86)
		at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:470)
		at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:37)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:360)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:356)
		at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
		at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
		at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:446)
		at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:164)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyCommandExecuted$1(QueryExecution.scala:126)
		at scala.util.Try$.apply(Try.scala:217)
		at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378)
		at org.apache.spark.util.LazyTry.tryT$lzycompute(LazyTry.scala:46)
		at org.apache.spark.util.LazyTry.tryT(LazyTry.scala:46)
		... 20 more


**Nota:** Si el driver JDBC de SQLite no está disponible en tu entorno, puedes usar la alternativa con `toPandas()` en la siguiente celda.


In [None]:
# -------------------------------
# ALTERNATIVA: Carga usando toPandas() (si JDBC no está disponible)
# -------------------------------

# Descomenta este código si el método JDBC anterior no funciona

print("===== Alternativa: Carga usando toPandas() =====")
# 
# # Creamos el engine para SQLite
engine = sqlalchemy.create_engine(DB_URL)
# 
# # Convertir dimensiones a Pandas y cargar en SQLite
with engine.begin() as conn:
    dim_game.toPandas().to_sql("dim_game", conn, if_exists="replace", index=False)
    print(" - dim_game cargada")
    dim_platform.toPandas().to_sql("dim_platform", conn, if_exists="replace", index=False)
    print(" - dim_platform cargada")
    dim_developer.toPandas().to_sql("dim_developer", conn, if_exists="replace", index=False)
    print(" - dim_developer cargada")
    dim_publisher.toPandas().to_sql("dim_publisher", conn, if_exists="replace", index=False)
    print(" - dim_publisher cargada")
    dim_year.toPandas().to_sql("dim_year", conn, if_exists="replace", index=False)
    print(" - dim_year cargada")
    fact_sales.toPandas().to_sql("fact_sales", conn, if_exists="replace", index=False)
    print(" - fact_sales cargada")
 
print("\n✅ Tablas cargadas usando método alternativo (toPandas).")


## 8. Consultas y Validación

Se ejecuta una consulta SQL de ejemplo para validar el Data Warehouse: se calcula el top 10 de géneros por ventas usando joins entre `fact_sales` y `dim_game`. Se muestra el resultado con las métricas de ventas e ingresos por género.

In [None]:
# Ejemplo de consulta sobre el Data Warehouse generado por PySpark

print("===== Consulta: Top 10 géneros por ventas =====")

# Opción 1: Usar Spark para leer desde SQLite (si JDBC está disponible)
try:
    # Leer tablas desde SQLite usando Spark JDBC
    dim_game_sql = spark.read.jdbc(jdbc_url, "dim_game", properties=jdbc_properties)
    fact_sales_sql = spark.read.jdbc(jdbc_url, "fact_sales", properties=jdbc_properties)
    
    # Realizar la consulta usando Spark SQL
    top_genres_spark = fact_sales_sql \
        .join(dim_game_sql, fact_sales_sql.id_game == dim_game_sql.id_game, "inner") \
        .groupBy("genre") \
        .agg(
            F.sum("copies_sold_millions").alias("total_copies_sold"),
            F.sum("revenue_millions_usd").alias("total_revenue")
        ) \
        .orderBy(F.desc("total_copies_sold")) \
        .limit(10)
    
    print("\n===== Resultado: Top 10 géneros por ventas (usando Spark JDBC) =====")
    top_genres_spark.show(truncate=False)
    
except Exception as e:
    print(f"Error al leer con Spark JDBC: {e}")
    print("Usando método alternativo con SQLAlchemy...")
    
    # Opción 2: Usar SQLAlchemy (alternativa)
    engine = sqlalchemy.create_engine(DB_URL)
    query = """
    SELECT 
        g.genre, 
        SUM(f.copies_sold_millions) AS total_copies_sold,
        SUM(f.revenue_millions_usd) AS total_revenue
    FROM fact_sales f
    JOIN dim_game g ON f.id_game = g.id_game
    GROUP BY g.genre
    ORDER BY total_copies_sold DESC
    LIMIT 10;
    """
    
    print("Consulta SQL ejecutada:\n", query)
    
    with engine.connect() as conn:
        top_genres_spark = pd.read_sql(query, conn)
    
    print("\n===== Resultado: Top 10 géneros por ventas =====")
    print(top_genres_spark)

print("\n===== Resumen del Data Warehouse =====")
print(f"Base de datos creada en: {DB_PATH}")
print("\nTablas creadas:")
print("  - dim_game (dimensiones de juegos)")
print("  - dim_platform (dimensiones de plataformas)")
print("  - dim_developer (dimensiones de desarrolladores)")
print("  - dim_publisher (dimensiones de publishers)")
print("  - dim_year (dimensiones de años)")
print("  - fact_sales (tabla de hechos con ventas)")
print("\n✅ Proceso ETL con PySpark completado exitosamente")