#Exploración de los datos

Este cuaderno tiene como propósito realizar una exploración preliminar de los datos, integrando las etapas de limpieza, transformación, unión y exportación de archivos. Los procedimientos desarrollados en este cuaderno servirán como prototipo para la posterior implementación de los scripts que compondrán el pipeline de procesamiento del proyecto.

##Configurations

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, sum, regexp_replace, regexp_extract, explode, split, avg, round, count, collect_list
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, StringType
import pandas as pd

Creacion de las sección basica de spark

In [None]:
spark = SparkSession\
.builder\
.appName("Movie Lens data")\
.getOrCreate()

your 131072x1 screen size is bogus. expect trouble
25/04/27 17:51:34 WARN Utils: Your hostname, Jeffrey resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/04/27 17:51:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/27 17:51:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


##Datos de ratings

####RATINGS FILE DESCRIPTION

All ratings are contained in the file "ratings.dat" and are in the
following format:

UserID::MovieID::Rating::Timestamp

- UserIDs range between 1 and 6040
- MovieIDs range between 1 and 3952
- Ratings are made on a 5-star scale (whole-star ratings only)
- Timestamp is represented in seconds since the epoch as returned by time(2)
- Each user has at least 20 ratings

###Cargar datos de usuarios

In [None]:
#definicion del esquema
ratings_schema = StructType(
    [
        StructField("user_id", IntegerType(), True),
        StructField("movie_id", IntegerType(), True),
        StructField("rating", IntegerType(), True),
        StructField("timestamp",LongType(), True )
    ]
)

In [None]:
#leer archivo
ratings_df = spark.read.option("delimiter", "::").schema(ratings_schema).csv("data/raw/ratings.dat")

In [None]:
ratings_df.printSchema()

root
 |-- userID: integer (nullable = true)
 |-- movieID: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- timestamp: long (nullable = true)



In [None]:
ratings_df.orderBy("movie_id").show(5)

                                                                                

+------+-------+------+----------+
|userID|movieID|rating| timestamp|
+------+-------+------+----------+
|  3190|      1|     3| 968650558|
|  1124|      1|     5| 974909280|
|     9|      1|     5| 978225952|
|  1141|      1|     5|1005053594|
|  2030|      1|     4| 974734434|
+------+-------+------+----------+
only showing top 5 rows



In [None]:
#contar los nulos por cada columna del df
ratings_df.select([sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in ratings_df.columns]).show()

+------+-------+------+---------+
|userID|movieID|rating|timestamp|
+------+-------+------+---------+
|     0|      0|     0|        0|
+------+-------+------+---------+



Verificar estado de los datos

In [None]:
ratings_df.filter(col("rating")>5).show()

+------+-------+------+---------+
|userID|movieID|rating|timestamp|
+------+-------+------+---------+
+------+-------+------+---------+



In [None]:
ratings_df.filter(col("rating")<1).show()

+------+-------+------+---------+
|userID|movieID|rating|timestamp|
+------+-------+------+---------+
+------+-------+------+---------+



##Datos de peliculas

####MOVIES FILE DESCRIPTION

Movie information is in the file "movies.dat" and is in the following
format:

MovieID::Title::Genres

- Titles are identical to titles provided by the IMDB (including
year of release)
- Genres are pipe-separated and are selected from the following genres:

	* Action
	* Adventure
	* Animation
	* Children's
	* Comedy
	* Crime
	* Documentary
	* Drama
	* Fantasy
	* Film-Noir
	* Horror
	* Musical
	* Mystery
	* Romance
	* Sci-Fi
	* Thriller
	* War
	* Western

- Some MovieIDs do not correspond to a movie due to accidental duplicate
entries and/or test entries
- Movies are mostly entered by hand, so errors and inconsistencies may exist

###Cargar datos

In [None]:
#Definir esquema
movie_schema = StructType(
    [
        StructField("movie_id", IntegerType(), True),
        StructField("title", StringType(), True),
        StructField("genres", StringType(), True)
    ]
)

In [None]:
#Cargar datos
movie_df = spark.read.option("delimiter", "::").schema(movie_schema).csv("data/raw/movies.dat")

In [None]:
movie_df.printSchema()

root
 |-- movieID: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = false)
 |-- year: string (nullable = true)



In [None]:
movie_df.show(5)

+-------+--------------------+--------------------+
|movieID|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Animation|Childre...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|        Comedy|Drama|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



Transformaciones

In [None]:
#Separar años de los nombres de las peliculas
movie_df = movie_df.withColumn("year", regexp_extract("title",  r"\((\d{4})\)",1))
movie_df = movie_df.withColumn("title", regexp_replace("title", r"\s*\(\d{4}\)",""))

In [None]:
movie_df.show(5)

+-------+--------------------+--------------------+----+
|movieID|               title|              genres|year|
+-------+--------------------+--------------------+----+
|      1|           Toy Story|Animation|Childre...|1995|
|      2|             Jumanji|Adventure|Childre...|1995|
|      3|    Grumpier Old Men|      Comedy|Romance|1995|
|      4|   Waiting to Exhale|        Comedy|Drama|1995|
|      5|Father of the Bri...|              Comedy|1995|
+-------+--------------------+--------------------+----+
only showing top 5 rows



In [None]:
#Normalizar los generos de las peliculas
movie_df = movie_df.withColumn("genres", explode(split("genres",r"\|")))
movie_df.show(5)

+-------+---------+----------+----+
|movieID|    title|    genres|year|
+-------+---------+----------+----+
|      1|Toy Story| Animation|1995|
|      1|Toy Story|Children's|1995|
|      1|Toy Story|    Comedy|1995|
|      2|  Jumanji| Adventure|1995|
|      2|  Jumanji|Children's|1995|
+-------+---------+----------+----+
only showing top 5 rows



##Datos de usuarios

###USERS FILE DESCRIPTION

User information is in the file "users.dat" and is in the following
format:

UserID::Gender::Age::Occupation::Zip-code

All demographic information is provided voluntarily by the users and is
not checked for accuracy.  Only users who have provided some demographic
information are included in this data set.

- Gender is denoted by a "M" for male and "F" for female
- Age is chosen from the following ranges:

	*  1:  "Under 18"
	* 18:  "18-24"
	* 25:  "25-34"
	* 35:  "35-44"
	* 45:  "45-49"
	* 50:  "50-55"
	* 56:  "56+"

- Occupation is chosen from the following choices:

	*  0:  "other" or not specified
	*  1:  "academic/educator"
	*  2:  "artist"
	*  3:  "clerical/admin"
	*  4:  "college/grad student"
	*  5:  "customer service"
	*  6:  "doctor/health care"
	*  7:  "executive/managerial"
	*  8:  "farmer"
	*  9:  "homemaker"
	* 10:  "K-12 student"
	* 11:  "lawyer"
	* 12:  "programmer"
	* 13:  "retired"
	* 14:  "sales/marketing"
	* 15:  "scientist"
	* 16:  "self-employed"
	* 17:  "technician/engineer"
	* 18:  "tradesman/craftsman"
	* 19:  "unemployed"
	* 20:  "writer"

###Cargar datos de usuario

In [None]:
#Definir esquema
user_schema = StructType(
    [
        StructField("user_id", IntegerType(), True),
        StructField("gender", StringType(), True),
        StructField("age", IntegerType(), True),
        StructField("ocuppation", IntegerType(), True),
        StructField("zipCode", IntegerType(), True)
    ]
)

In [None]:
#Cargar datos
user_df = spark.read.option("delimiter", "::").schema(user_schema).csv("data/raw/users.dat")

In [None]:
user_df.printSchema()

root
 |-- userID: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- ocuppation: integer (nullable = true)
 |-- zipCode: integer (nullable = true)



In [None]:
#contar nulos por columna
user_df.select([sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in user_df.columns]).show()

+------+------+---+----------+-------+
|userID|gender|age|ocuppation|zipCode|
+------+------+---+----------+-------+
|     0|     0|  0|         0|     66|
+------+------+---+----------+-------+



In [None]:
#Duplicades
user_df.groupBy(user_df.columns).count().filter("count > 1").show()

+------+------+---+----------+-------+-----+
|userID|gender|age|ocuppation|zipCode|count|
+------+------+---+----------+-------+-----+
+------+------+---+----------+-------+-----+



###Juntar información

Se unen ratings y movies para futuras consultas

In [None]:
complete_df = movie_df.join(
    ratings_df,
    on="movie_id",
    how="left"
)

In [None]:
complete_df.count()

2102031

In [None]:
complete_df.show(5)

+-------+---------+---------+----+------+------+---------+
|movieID|    title|   genres|year|userID|rating|timestamp|
+-------+---------+---------+----+------+------+---------+
|      1|Toy Story|Animation|1995|     1|     5|978824268|
|      1|Toy Story|Animation|1995|     6|     4|978237008|
|      1|Toy Story|Animation|1995|     8|     4|978233496|
|      1|Toy Story|Animation|1995|     9|     5|978225952|
|      1|Toy Story|Animation|1995|    10|     5|978226474|
+-------+---------+---------+----+------+------+---------+
only showing top 5 rows



###Average of rating for each movie

Se calcula el promedio del rating de cada pelicula

In [None]:
ranting_prom_df = complete_df.groupBy("movie_id", "title").agg(
    round(avg("rating")).alias("rating_Average")
    )
ranting_prom_df.show(5)

+-------+--------------------+--------------+
|movieID|               title|rating Average|
+-------+--------------------+--------------+
|     12|Dracula: Dead and...|           2.0|
|     22|             Copycat|           3.0|
|     26|             Othello|           4.0|
|     27|        Now and Then|           3.0|
|     28|          Persuasion|           4.0|
+-------+--------------------+--------------+
only showing top 5 rows



###Popularity of each genre

Se busca el genero más popular en base al promedio de apariciones por pelicula

In [None]:
genre_popularity_df = complete_df.groupBy("genres").agg(
    count("genres").alias("total_movies"),
    round(avg("rating"),2).alias("average_for_genre")
)
genre_popularity_df.show(5)

+---------+------------+-----------------+
|   genres|total movies|average for genre|
+---------+------------+-----------------+
|    Crime|       79551|             3.71|
|  Romance|      147535|             3.61|
| Thriller|      189687|             3.57|
|Adventure|      133955|             3.48|
|    Drama|      354639|             3.77|
+---------+------------+-----------------+
only showing top 5 rows



##Basic machine learning model





Flujo del proceso:
Preparación de los datos:
Se parte de un conjunto de calificaciones de usuarios a películas (ratings_df) que se divide en dos subconjuntos: entrenamiento (80%) y validación (20%).

Definición y entrenamiento del modelo:
Se configura un modelo ALS especificando las columnas de usuario, ítem (película) y calificación, además de manejar valores nulos en predicciones mediante la estrategia coldStartStrategy="drop". También se indica que se trata de datos explícitos (implicitPrefs=False).

Búsqueda de hiperparámetros:
Se evalúan distintas combinaciones de hiperparámetros (rank y regParam) para encontrar el mejor modelo posible. Para cada combinación:

Se entrena el modelo ALS.

Se realizan predicciones sobre el conjunto de validación.

Se calcula el error cuadrático medio (RMSE) para medir la calidad del modelo.

Seguimiento de experimentos con MLflow:
Cada experimento de entrenamiento se registra en MLflow, guardando los parámetros utilizados y las métricas de rendimiento (RMSE). El mejor modelo encontrado (el de menor RMSE) también se guarda en MLflow como artefacto.

Generación de recomendaciones:
Una vez entrenado el mejor modelo, se generan las 10 mejores recomendaciones de películas para cada usuario. Para facilitar su manipulación y análisis posterior:

Se expande la lista de recomendaciones (explode).

Se agrupan las recomendaciones por usuario.

Exportación de resultados:
Finalmente, las recomendaciones generadas se pueden guardar en formato Parquet o en cualquier otro formato compatible para su uso en los siguientes pasos del proyecto.

In [None]:
from pyspark.sql import Row
from pyspark.sql.functions import col, explode, collect_list
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
import mlflow
import mlflow.spark

# 1. Dividir los datos en conjuntos de entrenamiento y validación
(training_data, validation_data) = ratings_df.randomSplit([0.8, 0.2], seed=42)

# 2. Definir el modelo ALS
als = ALS(
    userCol="user_id",
    itemCol="movie_id",
    ratingCol="rating",
    coldStartStrategy="drop",
    implicitPrefs=False  # Usar True para datos implícitos (ej: número de reproducciones)
)

# 3. Definir los parámetros que vamos a probar (para MLflow)
param_grid = [
    {"rank": 10, "regParam": 0.01},
    {"rank": 10, "regParam": 0.1},
    {"rank": 15, "regParam": 0.01},
    {"rank": 15, "regParam": 0.1},
]

best_model = None
best_rmse = float('inf')

# 4. Entrenamiento del modelo y registro con MLflow
with mlflow.start_run():
    for params in param_grid:
        with mlflow.start_run(nested=True):
            print(f"Entrenando modelo con parámetros: {params}")
            mlflow.log_params(params)

            # Establecer los parámetros en el modelo
            als.setParams(**params)

            # Entrenar el modelo
            model = als.fit(training_data)

            # Hacer predicciones en el conjunto de validación
            predictions = model.transform(validation_data)

            # Evaluar el modelo
            evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
            rmse = evaluator.evaluate(predictions)
            print(f"RMSE para los parámetros {params}: {rmse}")
            mlflow.log_metric("rmse", rmse)

            if rmse < best_rmse:
                best_rmse = rmse
                best_model = model

print(f"\nMejor RMSE encontrado: {best_rmse}")
mlflow.log_metric("best_rmse", best_rmse)

input_example = pd.DataFrame({"userID": [1], "movieID": [10]})
mlflow.spark.log_model(best_model, "best_als_model", input_example=input_example)


# 6. Generación de Recomendaciones (con PySpark)
# Asegurarse de que tenemos el mejor modelo entrenado
if best_model is None:
    raise Exception("No se encontró ningún modelo entrenado.")

# Generar recomendaciones para todos los usuarios
user_recs = best_model.recommendForAllUsers(10)  # Generar las 10 mejores recomendaciones por usuario

# Explode las recomendaciones
user_recs_exploded = user_recs.withColumn("recommendations", explode("recommendations")) \
    .select("User_id", col("recommendations.movieID").alias("movie_id_recommended"), col("recommendations.rating").alias("predicted_rating"))

# Para obtener una lista de movie_id recomendadas por usuario:
user_recommendations = user_recs_exploded.groupBy("User_id") \
    .agg(collect_list("movie_id_recommended").alias("recommended_movie_ids"))

# Mostrar las recomendaciones de los usuarios
user_recommendations.show(truncate=False)

Entrenando modelo con parámetros: {'rank': 10, 'regParam': 0.01}


25/04/25 17:30:03 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/04/25 17:30:03 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
25/04/25 17:30:03 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                

RMSE para los parámetros {'rank': 10, 'regParam': 0.01}: 0.8907285925129635
Entrenando modelo con parámetros: {'rank': 10, 'regParam': 0.1}
RMSE para los parámetros {'rank': 10, 'regParam': 0.1}: 0.8669024597766961
Entrenando modelo con parámetros: {'rank': 15, 'regParam': 0.01}
RMSE para los parámetros {'rank': 15, 'regParam': 0.01}: 0.9180240177456261
Entrenando modelo con parámetros: {'rank': 15, 'regParam': 0.1}
RMSE para los parámetros {'rank': 15, 'regParam': 0.1}: 0.8671424760196154

Mejor RMSE encontrado: 0.8669024597766961


25/04/25 17:30:16 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/04/25 17:30:16 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/04/25 17:30:16 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
25/04/25 17:30:16 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/04/25 17:30:16 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
2025/04/25 17:30:32 INFO mlflow.spark: File '/tmp/tmpuoiqti_7/model/sparkml' is already on DFS, copy is not necessary.
                                                                                

+------+-----------------------------------------------------------+
|UserID|recommended_movie_ids                                      |
+------+-----------------------------------------------------------+
|1     |[1851, 3233, 527, 318, 953, 3172, 858, 1097, 919, 2493]    |
|2     |[1851, 318, 527, 3233, 2028, 356, 2762, 3245, 953, 1741]   |
|3     |[811, 1851, 3233, 318, 2329, 110, 598, 1741, 3245, 1659]   |
|4     |[1851, 858, 2760, 2503, 912, 1221, 2309, 1207, 923, 2019]  |
|5     |[557, 1423, 2309, 2962, 2342, 1149, 3645, 2512, 668, 854]  |
|6     |[2562, 3913, 1035, 3314, 687, 1741, 2101, 1871, 985, 3916] |
|7     |[811, 1851, 1198, 2562, 598, 3172, 260, 3314, 110, 2028]   |
|8     |[1851, 2309, 3172, 557, 598, 787, 50, 318, 858, 2905]      |
|9     |[1851, 2309, 318, 50, 858, 527, 3172, 260, 2905, 1198]     |
|10    |[1851, 2562, 1741, 3092, 439, 3233, 3625, 1780, 2762, 2776]|
|11    |[2309, 1851, 2999, 2858, 771, 50, 887, 296, 2295, 2959]    |
|12    |[557, 2309, 3172, 1420, 85

## Detener Spark

In [None]:
spark.stop()