# <center> <img src="./img/ITESOLogo.png" alt="ITESO" width="480" height="130"> </center>
# <center> **Departamento de Electrónica, Sistemas e Informática** </center>
---
## <center> **Procesamiento de Datos Masivos** </center>
---
### <center> **Primavera 2025** </center>
---
### <center> **Modelo de Recomendacion** </center>

---
**Alumnos**: David Abraham Naranjo Salgado, Benjamin Zarate y Angel Cortes

In [10]:
import findspark
findspark.init()

#### Creacion de la conexión con el cluster de spark


In [11]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MLSpark-Recommender-Systems") \
    .master("spark://spark-master:7077") \
    .config("spark.ui.port","4040") \
    .getOrCreate()
sc = spark.sparkContext
spark.conf.set("spark.sql.shuffle.partitions", "5")

## Librerias

In [12]:
from team_name.spark_utils import SparkUtils
from pyspark.sql.functions import from_json, explode, col
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType, ArrayType

# Preparación de Datos

In [13]:
parquet_path = "/home/jovyan/data"
df = spark.read.parquet(parquet_path)
df.show()

+----+--------------------+--------------------+---------+------+--------------------+-------------+--------------------+
| key|               value|               topic|partition|offset|           timestamp|timestampType|           value_str|
+----+--------------------+--------------------+---------+------+--------------------+-------------+--------------------+
|NULL|[7B 22 75 73 65 7...|kafka-spark-produ...|        0| 11900|2025-05-10 04:28:...|            0|{"userId": 232, "...|
|NULL|[7B 22 75 73 65 7...|kafka-spark-produ...|        0| 11901|2025-05-10 04:28:...|            0|{"userId": 232, "...|
|NULL|[7B 22 75 73 65 7...|kafka-spark-produ...|        0| 11902|2025-05-10 04:28:...|            0|{"userId": 958, "...|
|NULL|[7B 22 75 73 65 7...|kafka-spark-produ...|        0| 11903|2025-05-10 04:28:...|            0|{"userId": 958, "...|
|NULL|[7B 22 75 73 65 7...|kafka-spark-produ...|        0| 11904|2025-05-10 04:28:...|            0|{"userId": 232, "...|
|NULL|[7B 22 75 73 65 7.

In [14]:
df.select("value_str").show(truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------------+
|value_str                                                                                                                                         |
+--------------------------------------------------------------------------------------------------------------------------------------------------+
|{"userId": 232, "movieId": 379, "movieTitle": "Raven'S Peak: Citadel Rising", "genre": "Documentary", "rating": 1, "timestamp": 1746851280182}    |
|{"userId": 232, "movieId": 1043, "movieTitle": "The Alien Horizon", "genre": "Comedy", "rating": 9, "timestamp": 1746851280186}                   |
|{"userId": 958, "movieId": 538, "movieTitle": "Spaceship: Mythic Protocol", "genre": "Thriller", "rating": 3, "timestamp": 1746851280189}         |
|{"userId": 958, "movieId": 1133, "movieTitle": "Beneath The Dark Empire", "genre": "Drama", "rating": 7, 

## Seleccionar columnas clave y limpiar 

In [15]:
schema = SparkUtils.generate_schema(
    [
        ("userId", "integer"),
        ("movieId", "integer"),
        ("movieTitle", "string"),
        ("rating", "float"),
    ]
)

# Parsear y expandir el array de objetos JSON
df_parsed = df.withColumn("json_array", from_json(col("value_str"), ArrayType(schema)))
ratings_df = df_parsed.withColumn("entry", explode("json_array")).select("entry.*")
ratings_df.show(5)

+------+-------+--------------------+------+
|userId|movieId|          movieTitle|rating|
+------+-------+--------------------+------+
|   232|    379|Raven'S Peak: Cit...|   1.0|
|   232|   1043|   The Alien Horizon|   9.0|
|   958|    538|Spaceship: Mythic...|   3.0|
|   958|   1133|Beneath The Dark ...|   7.0|
|   232|   1108|Beneath The Steel...|   2.0|
+------+-------+--------------------+------+
only showing top 5 rows



## Dividir en entrenamiento y prueba

In [16]:
training, test = ratings_df.randomSplit([0.8, 0.2])

# Configure ALS model

In [17]:
als = ALS(
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop",
    nonnegative=True,
)

# TRAINNING

In [18]:
model = als.fit(training)

                                                                                

# PREDICTIONS

In [19]:
# Generate recommendations for each user
recommendations = model.recommendForAllUsers(5)

# Show recommendations
recommendations.show(5, truncate=False)



+------+-----------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                |
+------+-----------------------------------------------------------------------------------------------+
|17    |[{22, 12.109548}, {1753, 11.7491045}, {1116, 11.649764}, {1470, 11.466444}, {1141, 11.341025}] |
|18    |[{1343, 10.9145355}, {1322, 10.913461}, {1423, 10.748011}, {290, 10.679306}, {1595, 10.227373}]|
|19    |[{154, 13.8740015}, {1730, 12.453855}, {1646, 12.116268}, {31, 11.840824}, {1478, 11.635918}]  |
|20    |[{1124, 10.311135}, {1578, 9.682019}, {1572, 9.503909}, {228, 9.211454}, {930, 8.967636}]      |
|21    |[{1823, 8.859558}, {1655, 8.832734}, {626, 8.612838}, {480, 8.597667}, {575, 8.485322}]        |
+------+-----------------------------------------------------------------------------------------------+
only showing top 5 rows



                                                                                

## Recomendaciones Plus

In [20]:
recommendations_exp = recommendations.withColumn(
    "rec", explode("recommendations")
).select(
    "userId",
    col("rec.movieId").alias("movieId"),
    col("rec.rating").alias("predicted_rating"),
)

unique_titles = ratings_df.select("movieId", "movieTitle").dropDuplicates(["movieId"])
recs_final = recommendations_exp.join(unique_titles, on="movieId", how="left")
recs_final.show(10, truncate=False)

                                                                                

+-------+------+----------------+----------------------------+
|movieId|userId|predicted_rating|movieTitle                  |
+-------+------+----------------+----------------------------+
|22     |17    |12.109548       |Desert Of The Shadow Phoenix|
|1753   |17    |11.7491045      |Echoes Of Justice           |
|1116   |17    |11.649764       |The Deadly Case Of The Lies |
|1470   |17    |11.466444       |The Last Ghost Of Titan     |
|1141   |17    |11.341025       |The Crimson Revenant        |
|1343   |18    |10.9145355      |Beyond Dragon'S Tooth       |
|1322   |18    |10.913461       |The Last Ocean Of Gotham    |
|1423   |18    |10.748011       |When The Sanctuary Awaken   |
|290    |18    |10.679306       |Curse Of The Savage Fortress|
|1595   |18    |10.227373       |Guest: A Deceptive Game     |
+-------+------+----------------+----------------------------+
only showing top 10 rows



## Predictions for all data

In [21]:
predictions = model.transform(test)
predictions.show(truncate=False)

                                                                                

+------+-------+--------------------------------+------+----------+
|userId|movieId|movieTitle                      |rating|prediction|
+------+-------+--------------------------------+------+----------+
|48    |1018   |Order Falls                     |5.0   |2.1299624 |
|48    |1847   |Echoes Of Madness               |5.0   |7.1993055 |
|92    |1833   |A Whisper In The Savage Talisman|10.0  |5.210541  |
|139   |621    |Beyond Serpent'S Coil           |8.0   |2.3395476 |
|139   |716    |Portal From Gotham              |6.0   |4.9586782 |
|200   |1739   |Curse Of The Brave Sanctuary    |6.0   |2.9644158 |
|223   |401    |Project: Asteroid               |7.0   |3.2233055 |
|223   |944    |Legend Of The Dragon            |10.0  |6.415805  |
|223   |1050   |The Fatal Fear                  |5.0   |6.5421677 |
|223   |1846   |Secrets Of Xylos                |3.0   |4.494069  |
|267   |473    |Universe: Mythic Protocol       |9.0   |5.1872067 |
|267   |1614   |Witness: A Hidden Game          

# EVALUATE MODEL

In [22]:
# Set up evaluator to compute RMSE
evaluator = RegressionEvaluator(
    metricName="rmse", 
    labelCol="rating", 
    predictionCol="prediction"
)
# Calculate RMSE
rmse = evaluator.evaluate(predictions)
print("\n[INFO] RMSE del modelo:", round(rmse, 4))

                                                                                


[INFO] RMSE del modelo: 3.6799


## Exportar a CSV para Power BI

In [23]:
output_path = "./recommendations/final_recommendations.csv"
recs_final.write.option("header", "true").mode("overwrite").csv(output_path)

print("\n[INFO] Recomendaciones exportadas a:", output_path)

                                                                                


[INFO] Recomendaciones exportadas a: ./recommendations/final_recommendations.csv


                                                                                

In [24]:
sc.stop()