# Vista rapida de datasets con Spark SQL

In [0]:
# El SparkSession ya existe en el notebook de Databricks por defecto
csv_files = [
    "genome-scores.csv",
    "genome-tags.csv",
    "links.csv",
    "movies.csv",
    "ratings.csv",
    "tags.csv"
]

base_path = "/Volumes/workspace/movie/movielens/"

# Carga Optimizada y Definici√≥n de Schema

In [0]:
# --- Definici√≥n de Schemas ---
# Evita errores como por ejemplo suma de strings y agiliza la lectura
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, TimestampType

# Schema para movies.csv
movies_schema = StructType([
    StructField("movieId", IntegerType(), True),
    StructField("title", StringType(), True),
    StructField("genres", StringType(), True)
])

# Schema para ratings.csv
ratings_schema = StructType([
    StructField("userId", IntegerType(), True),
    StructField("movieId", IntegerType(), True),
    StructField("rating", FloatType(), True),
    StructField("timestamp", IntegerType(), True) # Lo leeremos como int y luego transformaremos
])

# --- Carga de Datos con Schemas ---
base_path = "/Volumes/workspace/movie/movielens/"

# Cargar los datos principales usando los schemas definidos
movies_df = spark.read.format("csv").option("header", "true").schema(movies_schema).load(base_path + "movies.csv")
ratings_df = spark.read.format("csv").option("header", "true").schema(ratings_schema).load(base_path + "ratings.csv")

print("Datos de pel√≠culas cargados con schema:")
movies_df.printSchema()
movies_df.show(5, truncate=False)

print("\nDatos de ratings cargados con schema:")
ratings_df.printSchema()
ratings_df.show(5, truncate=False)

In [0]:
# Crear vistas temporales para poder consultarlas con SQL
movies_df.createOrReplaceTempView("movies_view")
ratings_df.createOrReplaceTempView("ratings_view")

# --- Consultas B√°sicas con Spark SQL ---

# 1. Ver las primeras 10 pel√≠culas
print("Primeras 10 pel√≠culas:")
spark.sql("SELECT title, genres FROM movies_view LIMIT 10").show(truncate=False)

# 2. Encontrar todas las pel√≠culas de 'Toy Story'
print("\nPel√≠culas de 'Toy Story':")
spark.sql("SELECT * FROM movies_view WHERE title LIKE 'Toy Story%'").show(truncate=False)

# 3. Ver los ratings m√°s altos (por encima de 4.5)
print("\nRatings m√°s altos:")
spark.sql("SELECT userId, movieId, rating FROM ratings_view WHERE rating > 4.5 ORDER BY rating DESC").show(10)

In [0]:
display(movies_df.select("genres").distinct())

# Transformaci√≥n y Limpieza

## Eliminar duplicados

In [0]:
from pyspark.sql import DataFrame

def remove_duplicates(df: DataFrame, subset_cols: list, df_name: str) -> DataFrame:
    """
    Identifica y elimina filas duplicadas de un DataFrame bas√°ndose en un subconjunto de columnas. La funci√≥n agrega una capa de auditaci√≥n y reporte que dropDuplicates no logra.

    Args:
        df (DataFrame): El DataFrame de Spark a limpiar.
        subset_cols (list): Lista de nombres de columna para considerar la unicidad.
        df_name (str): Nombre descriptivo del DataFrame, usado para el logging.

    Returns:
        DataFrame: Un nuevo DataFrame con los duplicados eliminados.
    """
    initial_count = df.count()
    unique_count = df.select(subset_cols).distinct().count()
    
    duplicates_found = initial_count - unique_count
    
    print(f"--- Limpieza de Duplicados para: {df_name} ---")
    print(f"Filas iniciales: {initial_count}")
    print(f"Filas √∫nicas basadas en {subset_cols}: {unique_count}")
    print(f"Duplicados a eliminar: {duplicates_found}\n")
    
    if duplicates_found > 0:
        df_cleaned = df.dropDuplicates(subset_cols)
        print(f"Duplicados eliminados. Nuevo total de filas: {df_cleaned.count()}\n")
        return df_cleaned
    else:
        print("No se encontraron duplicados. El DataFrame no fue modificado.\n")
        return df

In [0]:
# --- Limpieza de Duplicados ---
# Eliminamos posibles pel√≠culas duplicadas antes de cualquier transformaci√≥n.
movies_df_clean = remove_duplicates(df=movies_df, subset_cols=['movieId'], df_name='Movies DataFrame Original')

# # Eliminamos posibles pel√≠culas duplicadas antes de cualquier transformaci√≥n.
# movies_df_clean = remove_duplicates(df=movies_df, subset_cols=['movieId'], df_name='Movies DataFrame Original')


## Separaci√≥n de a√±o y generos

In [0]:
from pyspark.sql.functions import col, split, explode, regexp_extract, from_unixtime, to_timestamp, lit, when

# --- Transformaci√≥n del DataFrame de Pel√≠culas (Versi√≥n Profundamente Limpia) ---
# 1. Extraer el a√±o de forma segura (usando try_cast).
# 2. Separar los g√©neros.
# 3. Convertir el marcador "(no genres listed)" a NULL para consistencia.

movies_transformed_df = movies_df_clean.withColumn(
    "year",
    regexp_extract(col("title"), r"\((\d{4})\)", 1).try_cast("int")
).withColumn(
    "genre",
    explode(split(col("genres"), "\\|"))
).withColumn(
    # Reemplazamos el texto de marcador por un verdadero NULL
    "genre",
    when(col("genre") == "(no genres listed)", lit(None)).otherwise(col("genre"))
).select(
    "movieId",
    "title",
    "year",
    "genre"
)

print("Pel√≠culas transformadas (g√©neros explotados y valores nulos estandarizados):")
movies_transformed_df.show(10, truncate=False)

In [0]:
from pyspark.sql import DataFrame

def show_unique_values(df: DataFrame, df_name: str, limit: int = 20, ignore_cols: list = []):
    """
    Muestra los valores √∫nicos de cada columna de un DataFrame para su validaci√≥n.
    
    Args:
        df (DataFrame): El DataFrame de Spark a analizar.
        df_name (str): El nombre del DataFrame para imprimir en el encabezado.
        limit (int, optional): N√∫mero m√°ximo de valores √∫nicos a mostrar por columna.
                               Si es None, muestra todos. Por defecto es None.
        ignore_cols (list): Lista de nombres de columna que se omitir√°n en el an√°lisis.
    
    Evitar:
    distinct().collect()

        ‚û° intenta traer todo al driver
        ‚û° provoca un shuffle enorme
        ‚û° se queda ejecutando por mucho tiempo o hasta agotar memoria
    """
    print(f"--- Validaci√≥n de Valores √önicos para: {df_name} ---\n")

    for column_name in df.columns:
        if column_name in ignore_cols:
            continue

        print(f"Columna: '{column_name}'")

        distinct_df = df.select(column_name).distinct()

        # Mostrar n valores
        distinct_df.show(limit, truncate=False)

        n = limit if limit is not None else 20

        # Contar total
        total = distinct_df.count()
        print(f"Total valores √∫nicos: {total}")

        print("-" * 40)

In [0]:
# Mostrar valores √∫nicos para el DataFrame de ratings
ignore = ["timestamp", "movieId", "userId"]
show_unique_values(ratings_df, "Ratings Transformado", ignore_cols=ignore)

In [0]:
# --- Transformaci√≥n del DataFrame de Ratings (Versi√≥n Limpia) ---
# 1. Convertir el timestamp de Unix Epoch a un formato de fecha legible.
# 2. Convertir ratings inv√°lidos (nulos o 0.0) a NULL.

ratings_transformed_df = ratings_df.withColumn(
    "rating_timestamp",
    to_timestamp(from_unixtime(col("timestamp")))
).withColumn(
    # Si el rating es nulo o 0.0, se convierte a NULL. De lo contrario, se mantiene el valor.
    "rating",
    when((col("rating").isNull()) | (col("rating") <= 0.0), lit(None)).otherwise(col("rating"))
).select(
    "userId",
    "movieId",
    "rating",
    "timestamp"
)

print("\nRatings transformados y limpios:")
ratings_transformed_df.show(10, truncate=False)

## Buscar valores √∫nicos para entender y limpiar mejor las columnas

In [0]:
# Mostrar valores √∫nicos para el DataFrame de pel√≠culas
ignore = ["movieId", "title"]
show_unique_values(movies_transformed_df, "Movies Transformado", limit=100, ignore_cols=ignore)

In [0]:
# Filtrar filas donde 'genre' es exactamente "We're Comin' To Get Ya!\" (2014)"
target_genre = " We're Comin' To Get Ya!\"\" (2014)\""
rows_to_remove_df = movies_transformed_df.filter(col("genre") == target_genre)
num_rows_removed = rows_to_remove_df.count()

# Eliminar esas filas del DataFrame
movies_filtered_df = movies_transformed_df.filter(col("genre") != target_genre)

print(f"Filas eliminadas con g√©nero '{target_genre}': {num_rows_removed}")

In [0]:
# Mostrar valores √∫nicos para el DataFrame de pel√≠culas
ignore = ["movieId", "title"]
show_unique_values(movies_filtered_df, "Movies Transformado", limit=100, ignore_cols=ignore)

In [0]:
# Mostrar valores √∫nicos para el DataFrame de ratings
ignore = ["timestamp", "movieId", "userId"]
show_unique_values(ratings_transformed_df, "Ratings Transformado", ignore_cols=ignore)

In [0]:
from pyspark.sql.functions import col
# --- Paso de Limpieza Avanzada: Eliminar T√≠tulos Colados en G√©neros ---

# 1. Primero, inspeccionemos qu√© filas coinciden con nuestro patr√≥n sospechoso.
#    Usamos .rlike() para aplicar la expresi√≥n regular.
#    La regex busca cualquier string que contenga "(4 d√≠gitos)" y luego una comilla doble.
print("üîç Filas identificadas con un posible t√≠tulo en la columna 'genre':")
suspicious_rows_df = movies_transformed_df.filter(col("genre").rlike('.*\(\d{4}\).*"'))
suspicious_rows_df.show(truncate=False)

# 2. Ahora, eliminamos estas filas del DataFrame.
#    El s√≠mbolo '~' es el operador "NOT", por lo que filtramos para mantener todo lo que NO coincide con el patr√≥n.
movies_cleaned_df = movies_transformed_df.filter(~col("genre").rlike('.*\(\d{4}\).*"'))

print(f"\n‚úÖ Se han eliminado {suspicious_rows_df.count()} filas incorrectas.")

## Buscar valores nulos y borrarlos

In [0]:
from pyspark.sql.functions import count, when, col

from pyspark.sql.functions import count, col, sum as spark_sum

total_count_movies = movies_transformed_df.count()

null_props_movies = movies_transformed_df.agg(
    *[(spark_sum(col(c).isNull().cast("int")) / total_count_movies).alias(c)
      for c in movies_transformed_df.columns]
)

null_props_movies.show()


total_count_ratings = ratings_transformed_df.count()

null_props_ratings = ratings_transformed_df.agg(
    *[(spark_sum(col(c).isNull().cast("int")) / total_count_ratings).alias(c)
      for c in ratings_transformed_df.columns]
)

null_props_ratings.show()


# Antes
# df.count() ya hace un full scan del DataFrame.
# .select([...]).show(), que vuelve a disparar otro full scan + agregaciones sobre todas las columnas.
# genera un plano l√≥gico con miles de operaciones separadas,
# usa count(when()), que es m√°s lento,
# divide por un count de otro DataFrame,
# y Spark debe recalcular muchas cosas.

# La versi√≥n optimizada:
# usa agg() ‚Üí Spark lo optimiza como un single pass aggregation,
# isNull().cast("int") es vectorizado,
# evita m√∫ltiples scans y shuffles,
# usa el total correcto por DataFrame.

In [0]:
from pyspark.sql import DataFrame

def drop_nulls_and_report(df: DataFrame, df_name: str) -> DataFrame:
    """
    Elimina todas las filas que contienen al menos un valor nulo y reporta
    la cantidad de filas eliminadas.

    Args:
        df (DataFrame): El DataFrame de Spark a limpiar.
        df_name (str): Nombre descriptivo del DataFrame para el logging.

    Returns:
        DataFrame: Un nuevo DataFrame sin filas nulas.
    """
    initial_count = df.count()
    
    print(f"--- Eliminaci√≥n de Nulos para: {df_name} ---")
    print(f"Filas iniciales: {initial_count}")
    
    # na.drop() elimina cualquier fila que contenga un valor nulo en CUALQUIER columna
    df_cleaned = df.na.drop()
    
    final_count = df_cleaned.count()
    rows_dropped = initial_count - final_count
    
    print(f"Filas eliminadas con nulos: {rows_dropped}")
    print(f"Filas finales: {final_count}\n")
    
    return df_cleaned

In [0]:
# Limpiar el DataFrame de pel√≠culas
movies_clean_df = drop_nulls_and_report(movies_transformed_df, "Movies Transformado")

# Limpiar el DataFrame de ratings
ratings_clean_df = drop_nulls_and_report(ratings_transformed_df, "Ratings Transformado")

## Agregaci√≥n y Joins simples

In [0]:
# --- Agregaciones y Joins Simples ---

# Spark SQL: Contar el n√∫mero total de ratings
total_ratings_sql = spark.sql("SELECT COUNT(*) as total_ratings FROM ratings_view").collect()[0][0]
print(f"Total de ratings en la tabla (SQL): {total_ratings_sql}")

# DataFrame API: Hacer lo mismo
total_ratings_df = ratings_df.count()
print(f"Total de ratings en la tabla (DataFrame API): {total_ratings_df}")

# Spark SQL: Encontrar las 5 pel√≠culas con m√°s ratings
print("\nTop 5 pel√≠culas con m√°s ratings (SQL):")
spark.sql("""
    SELECT
        movieId,
        COUNT(*) as num_ratings
    FROM ratings_view
    GROUP BY movieId
    ORDER BY num_ratings DESC
    LIMIT 5
""").show()

# Join simple con Spark SQL: Unir una pel√≠cula con sus ratings
print("\nJoin simple: Ver los ratings para la pel√≠cula 'Jumanji':")
spark.sql("""
    SELECT
        m.title,
        r.userId,
        r.rating
    FROM movies_view m
    JOIN ratings_view r ON m.movieId = r.movieId
    WHERE m.title = 'Jumanji (1995)'
""").show()

# Guardar en formato Delta Lake

## Uni√≥n de Datos

In [0]:
# --- Paso 1: Unir los dos DataFrames en una tabla "Golden" ---
# Usamos un 'inner join' para quedarnos solo con los ratings que tienen una pel√≠cula asociada.

# Es una buena pr√°ctica renombrar las columnas antes del join para evitar ambig√ºedades,
# aunque en este caso 'movieId' es la √∫nica en com√∫n y es la clave del join.
# El join se realizar√° autom√°ticamente en esta columna.

golden_df = ratings_clean_df.join(
    movies_clean_df,
    on="movieId",
    how="inner"
)

print("Vista previa de la tabla 'Golden' unida y desnormalizada:")
golden_df.printSchema()
golden_df.show(5, truncate=False)

## Guardar versi√≥n final

In [0]:
# --- Paso 2: Guardar la Tabla Golden en formato Delta Lake (Corregido) ---

delta_path = "/Volumes/workspace/movie/movielens/delta_tables/movielens_analyzed_final"
# Particionamos por 'year' porque es una columna de baja cardinalidad
# y muy com√∫n para filtrar en an√°lisis de series temporales.
(golden_df.write
 .format("delta")
 .mode("overwrite")
 .partitionBy("year")
 .option("overwriteSchema", "true")
 .save(delta_path))

print(f"‚úÖ Tabla 'Golden' guardada exitosamente en Delta Lake en: {delta_path}")

# --- Validaci√≥n Final ---
print("\nüîç Validaci√≥n: Leyendo la tabla final desde Delta Lake:")
final_df = spark.read.format("delta").load(delta_path)
final_df.show(5)
final_df.printSchema()

# Visualizaci√≥n

## Popularidad por g√©nero
Dice qu√© generos reciben mas ratings. Los g√©neros mas populares a su vez reciben muchas cr√≠ticas de un mismo tipo acumulandose y sesgando el promedio final.

In [0]:
from pyspark.sql.functions import count, avg, desc

# Calcular el n√∫mero de ratings por g√©nero
genre_popularity_df = golden_df.groupBy("genre").agg(count("rating").alias("num_ratings")).orderBy(desc("num_ratings"))

print("Popularidad de los g√©neros por n√∫mero de ratings:")
genre_popularity_df.show()

# --- Visualizaci√≥n ---
display(genre_popularity_df)

Databricks visualization. Run in Databricks to view.

Qu√© porcentaje representan los ratings de Film-Noir respecto a los ratings del g√©nero m√°s popular (Drama).

In [0]:
from pyspark.sql.functions import col

# Cantidad de ratings de Film-Noir
film_noir_count = golden_df.filter(col("genre") == "Film-Noir").count()

# Cantidad de ratings de Drama (el m√°s popular)
drama_count = golden_df.filter(col("genre") == "Drama").count()

# Porcentaje
percentage = (film_noir_count / drama_count) * 100

print(f"Film-Noir representa el {percentage:.2f}% de los ratings de Drama.")


## Promedio de rating por g√©nero (sin corregir sesgo a√∫n)
Muestra la media aritm√©tica cl√°sica. Generos con pocos ratings pueden tener promedios inflados. Generos muy populares con miles de pel√≠culas quedan penalizados.

In [0]:
genre_avg_rating_df = (
    golden_df
    .groupBy("genre")
    .agg(avg("rating").alias("avg_rating"))
    .orderBy(desc("avg_rating"))
)

print("Promedio simple de rating por g√©nero:")
genre_avg_rating_df.show()

display(genre_avg_rating_df)


Databricks visualization. Run in Databricks to view.

## Bayesian Adjust Average
No favorece a g√©neros con pocos ratings ni castiga a generos con muchos ratings. Es la forma mas justa porque es un promedio ponderado entre:
- Promedio del g√©nero (R)
- Promedio global (C)
- Ajustado por cuantos ratings tiene (v)
- Ajustado por umbral de m√≠nimos ratings confiables (m = 1000)

In [0]:
# Los g√©neros con muchas pel√≠culas suelen tener m√°s ratings y por ende el promedio puede ‚Äúempujarse‚Äù hacia valores m√°s bajos o m√°s altos.
# La forma estad√≠sticamente correcta de obtener una puntuaci√≥n promedio no sesgada es aplicar Bayesian Average o Regularized Mean.
from pyspark.sql.functions import col, avg, count, lit

# 1) promedio global
global_avg = golden_df.select(avg("rating")).first()[0]

# 2) n√∫mero y promedio por g√©nero
genre_stats_df = (
    golden_df
    .groupBy("genre")
    .agg(
        count("rating").alias("v"),
        avg("rating").alias("R")
    )
)

# 3) par√°metro m
m = 1000

# 4) calculamos el promedio ajustado
genre_adjusted_df = (
    genre_stats_df
    .withColumn("C", lit(global_avg))
    .withColumn(
        "adjusted_rating",
        (col("v") / (col("v") + m)) * col("R") +
        (lit(m) / (col("v") + m)) * col("C")
    )
    .orderBy(col("adjusted_rating").desc())
)

print("Rating promedio por g√©nero ajustado (sin sesgo por popularidad):")
genre_adjusted_df.show()

display(genre_adjusted_df)


Databricks visualization. Run in Databricks to view.

In [0]:
ratings_long_df = golden_df.select("genre", "rating")

display(ratings_long_df)


In [0]:
# Calcular el rating promedio por a√±o
avg_rating_by_year_df = golden_df.filter(col("year").isNotNull()).groupBy("year").agg(avg("rating").alias("avg_rating")).orderBy("year")

print("Rating promedio de las pel√≠culas por a√±o de lanzamiento:")
avg_rating_by_year_df.show(20)

# --- Visualizaci√≥n ---
display(avg_rating_by_year_df)

Databricks visualization. Run in Databricks to view.

# Concusiones

A partir del an√°lisis del dataset de MovieLens, se pueden extraer las siguientes conclusiones simples:

1.  **Dominancia de G√©neros:** Los g√©neros de **Drama** y **Comedia** son los que concentran la mayor cantidad de ratings, lo que sugiere una fuerte preferencia del p√∫blico por este tipo de contenido.

2.  **Calidad vs. Antig√ºedad:** El an√°lisis del rating promedio por a√±o muestra que entre las decadas del 20 y el 80 las pel√≠culas tuvieron un rating algo sostenido a diferencia de epocas anteriores y posteriores.