# üìä RA1 - An√°lisis de Videojuegos con PySpark

## Fase 2: Procesamiento con PySpark

Objetivo: procesar el dataset de videojuegos usando PySpark, aplicando transformaciones distribuidas y mostrar los resultados.

### 1. Crear SparkSession
Configuramos PySpark y creamos la sesi√≥n de Spark.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, count, avg, sum, when, round as spark_round,
    regexp_extract, trim
)
from pyspark.sql.types import DoubleType
import os
import warnings

warnings.filterwarnings("ignore")

# Crear SparkSession
spark = (
    SparkSession.builder
    .appName("VideoGames Analysis")
    .getOrCreate()
)

print("Versi√≥n de Spark:", spark.version)
print("SparkSession creada correctamente ‚úÖ")


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/10 16:21:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Versi√≥n de Spark: 3.5.0
SparkSession creada correctamente ‚úÖ


### 2. Cargar el dataset
Cargamos el dataset original o limpio con rutas robustas.

In [2]:
# Posibles rutas donde podr√≠as tener el CSV
possible_paths = [
    "videogames.csv",
    "../videogames.csv",
    "../data/videogames.csv",
    "/mnt/data/videogames.csv"  # ruta t√≠pica en entornos de pr√°cticas
]

csv_path = None
for p in possible_paths:
    if os.path.exists(p):
        csv_path = p
        break

if csv_path is None:
    raise FileNotFoundError(
        f"No se encontr√≥ 'videogames.csv' en ninguna de estas rutas:\n{possible_paths}"
    )

# Cargar dataset con inferencia de esquema
df_spark = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(csv_path)
)

print("Ruta cargada:", csv_path)
print("N√∫mero de filas:", df_spark.count())
print("N√∫mero de columnas:", len(df_spark.columns))

df_spark.printSchema()
df_spark.show(5, truncate=False)


Ruta cargada: ../data/videogames.csv


25/12/10 16:21:53 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


N√∫mero de filas: 10000
N√∫mero de columnas: 21
root
 |-- name: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- cost: string (nullable = true)
 |-- platform: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- pegi: string (nullable = true)
 |-- year: string (nullable = true)
 |-- developer: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- region: string (nullable = true)
 |-- mode: string (nullable = true)
 |-- engine: string (nullable = true)
 |-- award: string (nullable = true)
 |-- dlc_support: string (nullable = true)
 |-- language: string (nullable = true)
 |-- metascore: string (nullable = true)
 |-- user_score: string (nullable = true)
 |-- reviews: string (nullable = true)
 |-- rating_source: string (nullable = true)
 |-- copies_sold_millions: string (nullable = true)
 |-- revenue_millions_usd: string (nullable = true)

+-------------------+-------+-----+--------+-----------------+----+----+---------+------------+---

### 3. Transformaci√≥n 1: Selecci√≥n y filtrado de columnas
Seleccionamos columnas relevantes y filtramos datos v√°lidos.

In [3]:
# Columnas que nos interesan para el an√°lisis
cols_basic = [
    "name",              # ojo: en el CSV se llama 'name', no 'title'
    "platform",
    "genre",
    "metascore",
    "user_score",
    "copies_sold_millions",
    "revenue_millions_usd"
]

df_selected = df_spark.select(*cols_basic)

# Crear columnas num√©ricas a partir de texto:
# - metascore_num: extrae el n√∫mero inicial (ej. '80/100' -> 80, '47.7' -> 47.7)
# - copies_sold_millions_num: extrae el primer n√∫mero (ej. '1.5M' -> 1.5, '30M' -> 30)
df_numeric = (
    df_selected
    .withColumn(
        "metascore_num",
        regexp_extract(
            trim(col("metascore").cast("string")),
            r'^\d+(\.\d+)?',
            0
        ).cast(DoubleType())
    )
    .withColumn(
        "copies_sold_millions_num",
        regexp_extract(
            trim(col("copies_sold_millions").cast("string")),
            r'([0-9]*\.?[0-9]+)',
            1
        ).cast(DoubleType())
    )
)

# Filtramos filas con valores num√©ricos v√°lidos
df_filtered = (
    df_numeric
    .filter(col("metascore_num").isNotNull())
    .filter(col("copies_sold_millions_num").isNotNull())
)

print("Filas despu√©s de selecci√≥n y filtrado:", df_filtered.count())
df_filtered.show(10, truncate=False)


Filas despu√©s de selecci√≥n y filtrado: 2983
+-------------------------+---------------+----------+---------+----------+--------------------+--------------------+-------------+------------------------+
|name                     |platform       |genre     |metascore|user_score|copies_sold_millions|revenue_millions_usd|metascore_num|copies_sold_millions_num|
+-------------------------+---------------+----------+---------+----------+--------------------+--------------------+-------------+------------------------+
|God of War               |Mobile         |RPG       |98.1     |8.4       |1.5M                |N/A                 |98.1         |1.5                     |
|Persona 5 Royal          |PS             |Shooter   |31.7     |2.6       |25.08               |889.0               |31.7         |25.08                   |
|Untitled Project         |Nintendo Switch|Unknown   |80/100   |?         |30M                 |?                   |80.0         |30.0                    |
|Valorant   

### 4. Transformaci√≥n 2: Agregaci√≥n por g√©nero
Calculamos estad√≠sticas agrupadas por g√©nero de videojuego.

In [4]:
# Agregaci√≥n por g√©nero: n¬∫ de juegos, metascore medio y ventas totales
df_by_genre = (
    df_filtered
    .groupBy("genre")
    .agg(
        count("*").alias("num_juegos"),
        spark_round(avg("metascore_num"), 2).alias("metascore_medio"),
        spark_round(sum("copies_sold_millions_num"), 2).alias("ventas_totales_millones")
    )
    .orderBy(col("ventas_totales_millones").desc())
)

print("Estad√≠sticas agregadas por g√©nero:")
df_by_genre.show(20, truncate=False)


Estad√≠sticas agregadas por g√©nero:
+----------+----------+---------------+-----------------------+
|genre     |num_juegos|metascore_medio|ventas_totales_millones|
+----------+----------+---------------+-----------------------+
|Adventure |231       |71.62          |4684.0                 |
|Unknown   |231       |70.53          |4379.97                |
|RPG       |214       |70.7           |4310.9                 |
|?         |219       |72.68          |4255.91                |
|RPG       |219       |70.59          |4221.08                |
|Puzzle    |213       |69.8           |4094.84                |
|Indie     |211       |70.88          |3971.67                |
|Sports    |236       |68.34          |3970.57                |
|Simulation|205       |72.19          |3917.29                |
|action    |200       |69.66          |3898.19                |
|Racing    |194       |70.12          |3852.02                |
|Shooter   |210       |69.22          |3838.77                |
|Ac

### 5. Transformaci√≥n 3: Creaci√≥n de nuevas columnas
A√±adimos columnas calculadas para an√°lisis adicionales.

In [5]:
df_with_new_cols = (
    df_filtered
    .withColumn(
        "categoria_ventas",
        when(col("copies_sold_millions_num") >= 5, "Alto")
        .when(col("copies_sold_millions_num") >= 1, "Medio")
        .otherwise("Bajo")
    )
    .withColumn(
        "categoria_calidad",
        when(col("metascore_num") >= 85, "Excelente")
        .when(col("metascore_num") >= 70, "Buena")
        .when(col("metascore_num") >= 50, "Regular")
        .otherwise("Mala")
    )
)

print("Ejemplo de nuevas columnas creadas:")
df_with_new_cols.select(
    "name", "platform", "genre",
    "metascore_num", "copies_sold_millions_num",
    "categoria_ventas", "categoria_calidad"
).show(20, truncate=False)


Ejemplo de nuevas columnas creadas:
+---------------------------------------+---------------+----------+-------------+------------------------+----------------+-----------------+
|name                                   |platform       |genre     |metascore_num|copies_sold_millions_num|categoria_ventas|categoria_calidad|
+---------------------------------------+---------------+----------+-------------+------------------------+----------------+-----------------+
|God of War                             |Mobile         |RPG       |98.1         |1.5                     |Medio           |Excelente        |
|Persona 5 Royal                        |PS             |Shooter   |31.7         |25.08                   |Alto            |Mala             |
|Untitled Project                       |Nintendo Switch|Unknown   |80.0         |30.0                    |Alto            |Buena            |
|Valorant                               |Switch         |Simulation|84.8         |1.5                     

### 6. Transformaci√≥n 4: An√°lisis por plataforma y g√©nero (Join simulado)
Cruzamos informaci√≥n de plataforma y g√©nero para an√°lisis detallado.

In [6]:
# Agregaci√≥n por plataforma
df_platform_stats = (
    df_with_new_cols
    .groupBy("platform")
    .agg(
        count("*").alias("num_juegos_plataforma"),
        spark_round(avg("metascore_num"), 2).alias("metascore_medio_plataforma"),
        spark_round(sum("copies_sold_millions_num"), 2).alias("ventas_totales_plataforma")
    )
)

print("Estad√≠sticas por plataforma:")
df_platform_stats.show(truncate=False)

# JOIN: a√±adimos las estad√≠sticas de plataforma a cada juego
df_joined = (
    df_with_new_cols
    .join(df_platform_stats, on="platform", how="left")
)

print("Ejemplo de an√°lisis cruzado (juego + stats de su plataforma):")
df_joined.select(
    "name", "platform", "genre",
    "metascore_num", "copies_sold_millions_num",
    "num_juegos_plataforma",
    "metascore_medio_plataforma",
    "ventas_totales_plataforma"
).show(20, truncate=False)

# Opcional: cache para reutilizar en otras fases
df_joined.cache()


Estad√≠sticas por plataforma:
+---------------+---------------------+--------------------------+-------------------------+
|platform       |num_juegos_plataforma|metascore_medio_plataforma|ventas_totales_plataforma|
+---------------+---------------------+--------------------------+-------------------------+
|PC             |325                  |71.43                     |6067.13                  |
|PS             |349                  |70.55                     |7103.83                  |
|XBOX           |323                  |71.62                     |6334.43                  |
|PlayStation    |327                  |70.28                     |5770.74                  |
|Nintendo Switch|375                  |69.93                     |7033.16                  |
|Switch         |312                  |70.74                     |5935.08                  |
|Xbox           |295                  |70.1                      |5559.84                  |
|Mobile         |342                  |6

DataFrame[platform: string, name: string, genre: string, metascore: string, user_score: string, copies_sold_millions: string, revenue_millions_usd: string, metascore_num: double, copies_sold_millions_num: double, categoria_ventas: string, categoria_calidad: string, num_juegos_plataforma: bigint, metascore_medio_plataforma: double, ventas_totales_plataforma: double]

---

## Proceso ETL con PySpark

Objetivo: Extraer, Transformar y Cargar (ETL) los datos procesados a una base de datos SQLite con un modelo dimensional (1 tabla de hechos + 2 tablas de dimensiones).

### 7. EXTRACCI√ìN (E)
Reutilizamos el DataFrame transformado de las fases anteriores.

In [7]:
# Ya tenemos df_joined con todos los datos transformados
# Aseguramos que el DataFrame est√© en cach√© para optimizar
df_etl = df_joined.cache()

print("="*70)
print("EXTRACCI√ìN - Dataset transformado cargado")
print("="*70)
print(f"Registros: {df_etl.count()}")
print(f"Columnas: {len(df_etl.columns)}")
print("\nEsquema del DataFrame:")
df_etl.printSchema()

25/12/10 18:11:11 WARN CacheManager: Asked to cache already cached data.


EXTRACCI√ìN - Dataset transformado cargado
Registros: 2983
Columnas: 14

Esquema del DataFrame:
root
 |-- platform: string (nullable = true)
 |-- name: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- metascore: string (nullable = true)
 |-- user_score: string (nullable = true)
 |-- copies_sold_millions: string (nullable = true)
 |-- revenue_millions_usd: string (nullable = true)
 |-- metascore_num: double (nullable = true)
 |-- copies_sold_millions_num: double (nullable = true)
 |-- categoria_ventas: string (nullable = false)
 |-- categoria_calidad: string (nullable = false)
 |-- num_juegos_plataforma: long (nullable = true)
 |-- metascore_medio_plataforma: double (nullable = true)
 |-- ventas_totales_plataforma: double (nullable = true)



### 8. TRANSFORMACI√ìN (T) - Modelo Dimensional
Creamos un modelo estrella con 1 tabla de hechos y 2 tablas de dimensiones.

In [8]:
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

print("="*70)
print("TRANSFORMACI√ìN - Creando modelo dimensional")
print("="*70)

# Ventanas para generar IDs secuenciales
w_genre = Window.orderBy("genre")
w_platform = Window.orderBy("platform")
w_fact = Window.orderBy("name", "genre", "platform")

# ==============================================================
# DIMENSI√ìN 1: dim_genre (Dimensi√≥n de G√©nero)
# ==============================================================
dim_genre = (
    df_etl
    .select("genre")
    .distinct()
    .withColumn("genre_id", row_number().over(w_genre))
    .select("genre_id", "genre")
)

print("\nüìä Dimensi√≥n 1: dim_genre")
print(f"Registros: {dim_genre.count()}")
dim_genre.show(10, truncate=False)

# ==============================================================
# DIMENSI√ìN 2: dim_platform (Dimensi√≥n de Plataforma)
# ==============================================================
dim_platform = (
    df_etl
    .select("platform")
    .distinct()
    .withColumn("platform_id", row_number().over(w_platform))
    .select("platform_id", "platform")
)

print("\nüìä Dimensi√≥n 2: dim_platform")
print(f"Registros: {dim_platform.count()}")
dim_platform.show(10, truncate=False)

# ==============================================================
# TABLA DE HECHOS: fact_videogames
# ==============================================================
fact_videogames = (
    df_etl
    .join(dim_genre, on="genre", how="left")
    .join(dim_platform, on="platform", how="left")
    .withColumn("fact_id", row_number().over(w_fact))
    .select(
        "fact_id",
        "name",
        "genre_id",
        "platform_id",
        "metascore_num",
        "copies_sold_millions_num",
        "categoria_ventas",
        "categoria_calidad",
        "num_juegos_plataforma",
        "metascore_medio_plataforma",
        "ventas_totales_plataforma"
    )
)

print("\nüìä Tabla de Hechos: fact_videogames")
print(f"Registros: {fact_videogames.count()}")
fact_videogames.show(10, truncate=False)

print("\n" + "="*70)
print("Modelo dimensional creado exitosamente:")
print(f"  - dim_genre: {dim_genre.count()} registros")
print(f"  - dim_platform: {dim_platform.count()} registros")
print(f"  - fact_videogames: {fact_videogames.count()} registros")
print("="*70)


TRANSFORMACI√ìN - Creando modelo dimensional

üìä Dimensi√≥n 1: dim_genre
Registros: 14


25/12/10 18:11:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/10 18:11:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/10 18:11:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/10 18:11:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/10 18:11:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/10 18:11:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/10 1

+--------+----------+
|genre_id|genre     |
+--------+----------+
|1       |?         |
|2       |Action    |
|3       |Adventure |
|4       |Indie     |
|5       |Puzzle    |
|6       |RPG       |
|7       |RPG       |
|8       |Racing    |
|9       |Shooter   |
|10      |Simulation|
+--------+----------+
only showing top 10 rows


üìä Dimensi√≥n 2: dim_platform
Registros: 9
+-----------+---------------+
|platform_id|platform       |
+-----------+---------------+
|1          |Mobile         |
|2          |Nintendo Switch|
|3          |PC             |
|4          |PS             |
|5          |PlayStation    |
|6          |Switch         |
|7          |XBOX           |
|8          |Xbox           |
|9          |pc             |
+-----------+---------------+



25/12/10 18:11:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/10 18:11:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/10 18:11:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/10 18:11:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/10 18:11:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/10 18:11:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/10 1


üìä Tabla de Hechos: fact_videogames
Registros: 2983


25/12/10 18:11:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/10 18:11:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/10 18:11:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/10 18:11:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/10 18:11:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/10 18:11:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/10 1

+-------+----+--------+-----------+-------------+------------------------+----------------+-----------------+---------------------+--------------------------+-------------------------+
|fact_id|name|genre_id|platform_id|metascore_num|copies_sold_millions_num|categoria_ventas|categoria_calidad|num_juegos_plataforma|metascore_medio_plataforma|ventas_totales_plataforma|
+-------+----+--------+-----------+-------------+------------------------+----------------+-----------------+---------------------+--------------------------+-------------------------+
|1      |    |1       |1          |43.2         |23.52                   |Alto            |Mala             |342                  |69.44                     |6370.95                  |
|2      |    |1       |2          |80.0         |30.0                    |Alto            |Buena            |375                  |69.93                     |7033.16                  |
|3      |    |1       |2          |48.5         |1.5                     |M

### 9. CARGA (L)
Guardamos el modelo dimensional en SQLite (warehouse_pyspark.db) en el directorio warehouse.

In [9]:
import sqlite3
from pathlib import Path

# Determinar la ruta del warehouse
warehouse_paths = [
    '../warehouse/warehouse_pyspark.db',    # ejecuci√≥n local
    '/app/warehouse/warehouse_pyspark.db'   # ejecuci√≥n en contenedor
]

# Crear directorio si no existe
for path in warehouse_paths:
    warehouse_dir = Path(path).parent
    if warehouse_dir.exists() or str(warehouse_dir).startswith('..'):
        db_path = path
        warehouse_dir.mkdir(parents=True, exist_ok=True)
        break
else:
    db_path = warehouse_paths[0]
    Path(db_path).parent.mkdir(parents=True, exist_ok=True)

print("="*70)
print("CARGA - Guardando modelo dimensional en SQLite")
print("="*70)
print(f"Base de datos: {db_path}\n")

# Convertir DataFrames de PySpark a Pandas para cargar en SQLite
dim_genre_pd = dim_genre.toPandas()
dim_platform_pd = dim_platform.toPandas()
fact_videogames_pd = fact_videogames.toPandas()

# Crear conexi√≥n a SQLite
conn = sqlite3.connect(db_path)

# Cargar dimensiones
dim_genre_pd.to_sql('dim_genre', conn, if_exists='replace', index=False)
print(f"‚úì Dimensi√≥n 'dim_genre' cargada: {len(dim_genre_pd)} registros")

dim_platform_pd.to_sql('dim_platform', conn, if_exists='replace', index=False)
print(f"‚úì Dimensi√≥n 'dim_platform' cargada: {len(dim_platform_pd)} registros")

# Cargar tabla de hechos
fact_videogames_pd.to_sql('fact_videogames', conn, if_exists='replace', index=False)
print(f"‚úì Tabla de hechos 'fact_videogames' cargada: {len(fact_videogames_pd)} registros")

# Cerrar conexi√≥n
conn.close()

print("\n" + "="*70)
print("PROCESO ETL CON PYSPARK COMPLETADO CON √âXITO")
print("="*70)
print(f"Base de datos creada: {db_path}")
print("Modelo dimensional:")
print("  - dim_genre (dimensi√≥n)")
print("  - dim_platform (dimensi√≥n)")
print("  - fact_videogames (hechos)")
print("="*70)

CARGA - Guardando modelo dimensional en SQLite
Base de datos: ../warehouse/warehouse_pyspark.db



25/12/10 18:12:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/10 18:12:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/10 18:12:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/10 18:12:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/10 18:12:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/10 18:12:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/10 1

‚úì Dimensi√≥n 'dim_genre' cargada: 14 registros
‚úì Dimensi√≥n 'dim_platform' cargada: 9 registros
‚úì Tabla de hechos 'fact_videogames' cargada: 2983 registros

PROCESO ETL CON PYSPARK COMPLETADO CON √âXITO
Base de datos creada: ../warehouse/warehouse_pyspark.db
Modelo dimensional:
  - dim_genre (dimensi√≥n)
  - dim_platform (dimensi√≥n)
  - fact_videogames (hechos)


### 10. Verificaci√≥n del modelo dimensional
Consultamos la base de datos para verificar el modelo estrella cargado.

In [10]:
import pandas as pd

# Conectar a la base de datos
conn = sqlite3.connect(db_path)

# Listar todas las tablas
query_tables = "SELECT name FROM sqlite_master WHERE type='table';"
tables = pd.read_sql(query_tables, conn)
print("Tablas en la base de datos:")
print(tables)

# Verificar registros en cada tabla
print("\n" + "="*70)
print("CONTEO DE REGISTROS POR TABLA")
print("="*70)

for table_name in tables['name']:
    query = f"SELECT COUNT(*) as count FROM {table_name};"
    result = pd.read_sql(query, conn)
    print(f"{table_name}: {result['count'].iloc[0]} registros")

# Mostrar muestra de cada tabla
print("\n" + "="*70)
print("MUESTRA DE dim_genre")
print("="*70)
query_genre = "SELECT * FROM dim_genre LIMIT 10;"
sample_genre = pd.read_sql(query_genre, conn)
print(sample_genre)

print("\n" + "="*70)
print("MUESTRA DE dim_platform")
print("="*70)
query_platform = "SELECT * FROM dim_platform LIMIT 10;"
sample_platform = pd.read_sql(query_platform, conn)
print(sample_platform)

print("\n" + "="*70)
print("MUESTRA DE fact_videogames")
print("="*70)
query_fact = "SELECT * FROM fact_videogames LIMIT 10;"
sample_fact = pd.read_sql(query_fact, conn)
print(sample_fact)

# Ejemplo de consulta JOIN para verificar el modelo estrella
print("\n" + "="*70)
print("CONSULTA JOIN - An√°lisis por g√©nero y plataforma")
print("="*70)
query_join = """
SELECT 
    g.genre,
    p.platform,
    COUNT(*) as num_juegos,
    ROUND(AVG(f.metascore_num), 2) as metascore_promedio,
    ROUND(SUM(f.copies_sold_millions_num), 2) as ventas_totales
FROM fact_videogames f
JOIN dim_genre g ON f.genre_id = g.genre_id
JOIN dim_platform p ON f.platform_id = p.platform_id
GROUP BY g.genre, p.platform
ORDER BY ventas_totales DESC
LIMIT 15;
"""
result_join = pd.read_sql(query_join, conn)
print(result_join)

# Cerrar conexi√≥n
conn.close()

print("\n‚úÖ Verificaci√≥n completada. Modelo dimensional funcionando correctamente.")
print("‚úÖ ETL con PySpark finalizado exitosamente.")

Tablas en la base de datos:
              name
0        dim_genre
1     dim_platform
2  fact_videogames

CONTEO DE REGISTROS POR TABLA
dim_genre: 14 registros
dim_platform: 9 registros
fact_videogames: 2983 registros

MUESTRA DE dim_genre
   genre_id       genre
0         1           ?
1         2      Action
2         3   Adventure
3         4       Indie
4         5      Puzzle
5         6         RPG
6         7        RPG 
7         8      Racing
8         9     Shooter
9        10  Simulation

MUESTRA DE dim_platform
   platform_id         platform
0            1           Mobile
1            2  Nintendo Switch
2            3               PC
3            4               PS
4            5      PlayStation
5            6          Switch 
6            7             XBOX
7            8             Xbox
8            9               pc

MUESTRA DE fact_videogames
   fact_id name  genre_id  platform_id  metascore_num  \
0        1              1            1           43.2   
1        2