## Configuración del entorno

Consulta de spark y SQL

In [0]:
spark.range(5)

In [0]:
%sql
SELECT current_date()

## Carga y exploración de datos

Ingreso a `Data Ingestion`, creo un volumen y subo el archivo por medio de `Upload files to a Volume` 

In [0]:
display(dbutils.fs.ls("/Volumes/workspace/default/tmp_steam/"))

In [0]:
df = spark.read.option("header", True).csv("/Volumes/workspace/default/tmp_steam/vgsales.csv")
df.show(5)


In [0]:
df.printSchema()


Registramos la tabla temporal para consultas con Spark SQL

In [0]:
df.createOrReplaceTempView("videojuegos")

In [0]:
%sql
SELECT Genre, COUNT(*) AS cantidad
FROM videojuegos
GROUP BY Genre
ORDER BY cantidad DESC

## Transformaciones de datos

In [0]:
from pyspark.sql.functions import col

# Eliminar filas duplicadas
df_clean = df.dropDuplicates()

# Quitar filas donde falten valores críticos (Nombre, Género o Ventas globales)
df_clean = df_clean.na.drop(subset=["Name", "Genre", "Global_Sales"])

print("Filas antes:", df.count())
print("Filas después:", df_clean.count())


In [0]:
from pyspark.sql.functions import round

# Convertir columnas de ventas a tipo float
df_typed = df_clean.withColumn("Global_Sales", col("Global_Sales").cast("float")) \
                   .withColumn("NA_Sales", col("NA_Sales").cast("float")) \
                   .withColumn("EU_Sales", col("EU_Sales").cast("float")) \
                   .withColumn("JP_Sales", col("JP_Sales").cast("float")) \
                   .withColumn("Other_Sales", col("Other_Sales").cast("float"))

# Crear una columna de porcentaje de ventas en Norteamérica
df_typed = df_typed.withColumn(
    "Pct_NA",
    round((col("NA_Sales") / col("Global_Sales")) * 100, 2)
)


In [0]:
# Filtramos por juegos exitosos
df_success = df_typed.filter(col("Global_Sales") >= 1.0)
display(df_success.select("Name", "Platform", "Global_Sales", "Genre"))


In [0]:
from pyspark.sql.functions import avg

# Agregaciones

## Promedio de ventas por género
df_avg_genre = df_typed.groupBy("Genre") \
    .agg(round(avg("Global_Sales"), 2).alias("PromedioVentas")) \
    .orderBy(col("PromedioVentas").desc())

display(df_avg_genre)

## Top plataformas opr ventas globales
df_top_platforms = df_typed.groupBy("Platform") \
    .agg(round(avg("Global_Sales"), 2).alias("PromedioVentas")) \
    .orderBy(col("PromedioVentas").desc())

display(df_top_platforms)


In [0]:
# Creamos un dataset auxiliar con un JOIN
from pyspark.sql import Row

plataformas_info = [
    Row(Platform="PS2", Fabricante="Sony", Generacion="6ª"),
    Row(Platform="X360", Fabricante="Microsoft", Generacion="7ª"),
    Row(Platform="Wii", Fabricante="Nintendo", Generacion="7ª"),
    Row(Platform="PS3", Fabricante="Sony", Generacion="7ª"),
    Row(Platform="DS", Fabricante="Nintendo", Generacion="7ª"),
    Row(Platform="PC", Fabricante="Varios", Generacion="N/A"),
]

df_plataformas = spark.createDataFrame(plataformas_info)

# Realizar el join con el dataset principal
df_joined = df_typed.join(df_plataformas, on="Platform", how="left")

display(df_joined.select("Name", "Platform", "Fabricante", "Global_Sales").limit(10))

In [0]:
# Guardamos el dataframe limpio
df_joined.write.format("delta").mode("overwrite").save("/Volumes/workspace/default/tmp_steam/vgsales_clean_delta")

## Almacenamiento con Delta Lake

In [0]:
# Guardar como Delta Lake en el volumen de trabajo
output_path = "/Volumes/workspace/default/tmp_steam/vgsales_clean_delta"

df_joined.write.format("delta").mode("overwrite").save(output_path)

print("✅ Datos guardados correctamente en formato Delta.")

In [0]:
df_delta = spark.read.format("delta").load(output_path)

# Mostrar algunas filas
df_delta.show(5)

In [0]:
# Validamos integridad
print("Filas originales:", df_joined.count())
print("Filas leídas desde Delta:", df_delta.count())

df_delta.printSchema()

In [0]:
output_path = "/Volumes/workspace/default/tmp_steam/vgsales_clean_delta"

# Guardamos los datos transformados en formato Delta
df_joined.write.format("delta").mode("overwrite").save(output_path)

df_delta = spark.read.format("delta").load(output_path)
assert df_joined.count() == df_delta.count(), "Los conteos no coinciden"
df_delta.show(5)

In [0]:
# Versiones del Delta Lake
from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, output_path)
display(delta_table.history())

In [0]:
# Lectura por verisón
old_df = spark.read.format("delta").option("versionAsOf", 0).load(output_path)
old_df.show(5)

## Visualización y análisis

In [0]:
# Creamos una vista temporal para consultas SQL
df_delta.createOrReplaceTempView("vgsales_delta")

In [0]:
spark.sql("""
SELECT Platform, SUM(Global_Sales) AS TotalSales
FROM vgsales_delta
GROUP BY Platform
ORDER BY TotalSales DESC
""").show(10)


In [0]:
# Visualizaciones con Matplotlib

## Ventas globales por plataforma
import matplotlib.pyplot as plt

df_plot = (
    spark.sql("""
        SELECT Platform, SUM(Global_Sales) AS TotalSales
        FROM vgsales_delta
        GROUP BY Platform
        ORDER BY TotalSales DESC
        LIMIT 10
    """)
    .toPandas()
)

plt.figure(figsize=(10,5))
plt.bar(df_plot["Platform"], df_plot["TotalSales"], color="skyblue")
plt.title("Top 10 Plataformas por Ventas Globales")
plt.xlabel("Plataforma")
plt.ylabel("Ventas Globales (millones)")
plt.xticks(rotation=45)
plt.show()


In [0]:
## Ventas por año
df_plot = (
    spark.sql("""
        SELECT Year, SUM(Global_Sales) AS TotalSales
        FROM vgsales_delta
        WHERE Year IS NOT NULL
        GROUP BY Year
        ORDER BY Year
    """)
    .toPandas()
)

plt.figure(figsize=(12,6))
plt.plot(df_plot["Year"], df_plot["TotalSales"], marker="o")
plt.title("Ventas Globales por Año")
plt.xlabel("Año")
plt.ylabel("Ventas Globales (millones)")
plt.grid(True)
plt.show()
