In [1]:
%%configure -f 
{"driverMemory": "9000M"}

In [22]:
# Importar las bibliotecas necesarias
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType
from pyspark.sql.functions import avg
from pyspark.sql.functions import sum as _sum

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
# Crear una sesión de Spark
spark = SparkSession.builder \
    .appName("BookRecommendationSystem") \
    .getOrCreate()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
# Cargar el archivo .csv en un DataFrame de PySpark
#file_path = "muestra_rating.parquet"  # Cambia esto por la ruta de tu archivo
#s3://mybigdatapablo/Books_rating.parquet
file_path = "s3://mybigdatapablo/Books_rating.parquet"  # Cambia esto por la ruta de tu archivo
df = spark.read.parquet(file_path, header=True, inferSchema=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
data = df.select(col("User_id"), col("Id"), col("review/score").cast("float"),col("Title"), col("profileName"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
#Preprocesamiento de los datos
# Eliminar cualquier valor nulo
data = data.na.drop()

# Convertir las columnas user ID y ID en un valor numerico
user_indexer = StringIndexer(inputCol="User_id", outputCol="user_index")
book_indexer = StringIndexer(inputCol="Id", outputCol="book_index")

data = user_indexer.fit(data).transform(data)
data = book_indexer.fit(data).transform(data)

data.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+----------+------------+--------------------+--------------------+----------+----------+
|       User_id|        Id|review/score|               Title|         profileName|user_index|book_index|
+--------------+----------+------------+--------------------+--------------------+----------+----------+
| AVCGYZL8FQQTD|1882931173|         4.0|Its Only Art If I...|Jim of Oz "jim-of...|  168473.0|  180789.0|
|A30TK6U7DNS82R|0826414346|         5.0|Dr. Seuss: Americ...|       Kevin Killian|      64.0|   40394.0|
|A3UH4UZ4RSVO82|0826414346|         5.0|Dr. Seuss: Americ...|        John Granger|  106409.0|   40394.0|
|A2MVUWT453QH61|0826414346|         4.0|Dr. Seuss: Americ...|Roy E. Perry "ama...|    4536.0|   40394.0|
|A22X4XUPKF66MR|0826414346|         4.0|Dr. Seuss: Americ...|D. H. Richards "n...|   31956.0|   40394.0|
|A2F6NONFUDB6UK|0826414346|         4.0|Dr. Seuss: Americ...|              Malvin|    3636.0|   40394.0|
|A14OJS0VWMOSWO|0826414346|         5.0|Dr. Seuss: Amer

In [7]:
# DataFrame de usuarios únicos
unique_users_df = data.select("user_index","profileName").distinct()

# DataFrame de ítems únicos
unique_items_df = data.select("book_index","Title").distinct()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
# Split del set de datos para que en Train el modelo vea todos los usuarios y reservar para test usuarios vs peliculas que el modelo no ha visto pero que si ha visto al usuario

# Añadir una columna de conteo de interacciones por usuario
user_counts = data.groupBy("user_id").count().withColumnRenamed("count", "user_count")

# Añadir una columna de conteo de interacciones por item
item_counts = data.groupBy("Id").count().withColumnRenamed("count", "item_count")

# Unir con el DataFrame original para tener el conteo por usuario en cada fila
data = data.join(user_counts, on="user_id", how="left")

# Unir con el DataFrame original para tener el conteo por item en cada fila
data = data.join(item_counts, on="Id", how="left")

# Crear una columna de ranking aleatorio para cada interacción por usuario
window = Window.partitionBy("Id").orderBy(F.rand())
data = data.withColumn("rank", F.row_number().over(window))

# Dividir en train y test usando condiciones basadas en el número de interacciones por usuario
train_df = data.filter((F.col("rank") <= F.col("item_count") * 0.8) | (F.col("item_count") == 1) | (F.col("user_count") == 1))
test_df = data.filter((F.col("rank") > F.col("item_count") * 0.8) & (F.col("user_count") > 1) & (F.col("item_count") > 1))

# Remover columnas temporales para limpiar los DataFrames
train_df = train_df.drop("rank", "user_count")
test_df = test_df.drop("rank", "user_count")

# Mostrar los conjuntos de entrenamiento y prueba
print("Train set:")
train_df.show()

print("Test set:")
test_df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Train set:
+----------+--------------+------------+--------------------+--------------------+----------+----------+----------+
|        Id|       User_id|review/score|               Title|         profileName|user_index|book_index|item_count|
+----------+--------------+------------+--------------------+--------------------+----------+----------+----------+
|0001720104|A3INVHNCS10T1Y|         2.0|Mr. Plod and the ...|          "hurburgh"|    3324.0|  141434.0|         1|
|000217068X|A3U3PA6J2XOVFK|         5.0|         Marsh Arabs| Donald E. Gilliland|    1540.0|   32739.0|        11|
|000217068X| AIGNOD62RNX6E|         5.0|         Marsh Arabs|Sanni M. Slabbert...|   80556.0|   32739.0|        11|
|000217068X|A2YUFVQ5UZOOVL|         5.0|         Marsh Arabs|bassoonmuse "godb...|  143501.0|   32739.0|        11|
|000217068X|A1L0GDQSWK2WOR|         5.0|         Marsh Arabs|         bettie king|  122989.0|   32739.0|        11|
|000217068X|A36VGUJZL78YAI|         5.0|         Marsh Arabs|

## MODELO NAIVE

In [9]:
item_mean_rating = data.groupBy("book_index").mean("review/score").withColumnRenamed("avg(review/score)", "mean_rating")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
item_mean_rating_ordered=item_mean_rating.orderBy(F.desc("mean_rating"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
top_10_items=item_mean_rating_ordered.limit(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
top_10_books=top_10_items.join(unique_items_df,on="book_index", how="left")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
# Las 10 peliculas con mejor rating en promedio
top_10_books.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-----------+--------------------+
|book_index|mean_rating|               Title|
+----------+-----------+--------------------+
|  160008.0|        5.0|New Perspectives ...|
|  212653.0|        5.0|   Working in Groups|
|   77035.0|        5.0|Herman: The Fourt...|
|   91275.0|        5.0|Ezekiel 1-20 (Con...|
|  103838.0|        5.0|Grapes of Canaan:...|
|  172637.0|        5.0|          homerville|
|   80908.0|        5.0|OKB Ilyushin: A H...|
|   89001.0|        5.0|Monet Paintings: ...|
|  108455.0|        5.0|Superconducting M...|
|  120129.0|        5.0|Guidelines for My...|
+----------+-----------+--------------------+

In [14]:
# DataFrame de usuarios únicos en test
test_unique_users_df = test_df.select("user_index","profileName").distinct()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
test_df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+--------------+------------+--------------------+--------------------+----------+----------+----------+
|        Id|       User_id|review/score|               Title|         profileName|user_index|book_index|item_count|
+----------+--------------+------------+--------------------+--------------------+----------+----------+----------+
|000217068X|A36VGUJZL78YAI|         5.0|         Marsh Arabs| Dennis J. Boccippio|   60262.0|   32739.0|        11|
|000217068X|A2YUFVQ5UZOOVL|         5.0|         Marsh Arabs|bassoonmuse "godb...|  143501.0|   32739.0|        11|
|000217068X|A2YXRT2XIJIO57|         4.0|         Marsh Arabs|   John P. Jones III|      95.0|   32739.0|        11|
|0006167942|A30KEXFT9SILL6|         4.0|The Legend of the...|frumiousb "frumio...|      20.0|   47795.0|         7|
|0006167942| A3XGIQ3U908JT|         5.0|The Legend of the...|              Daniel|    4771.0|   47795.0|         7|
|0006383475|A1KS9ZUF6PO36R|         5.0|History of Jerusalem|           

In [41]:
def get_topK_precision(test_df,recommendations,k=10):
    f_test_unique_users_df = test_df.select("user_index","profileName").distinct()
    # con columnas: userId, itemId y rating. Filtramos los ratings relevantes para los usuarios del set de datos de test
    relevant_items = test_df.filter(test_df["review/score"] >= 0).select("user_index", "book_index")
    # Juntar las recomendaciones y los ítems relevantes por usuario
    recommendations_selected = recommendations.select("user_index", "book_index")
    # cuales recomendaciones por cada usuario coinciden con ratings relevantes de los usuarios
    joined_df = recommendations_selected .join(relevant_items, on=["user_index", "book_index"], how="inner")
    # Contar el número de ítems relevantes encontrados en el top-K
    user_precision = joined_df.groupBy("user_index").agg(
        (F.count("book_index") / k).alias("Precision@K")
        )
    total_usuarios=f_test_unique_users_df.count()
    #suma de la columna precision@k
    user_precision_sum=user_precision.agg(_sum("Precision@K")).collect()[0][0]
    #promedio de precision@k teniendo en cuenta el total de usuarios del set de test
    try:
      user_precision_mean=user_precision_sum/total_usuarios
    except:
      user_precision_mean=0
    return user_precision_mean

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
# K: número de recomendaciones a evaluar
k = 10

# Generar recomendaciones para todos los usuarios en el conjunto de prueba
naive_recommendation = test_unique_users_df.crossJoin(top_10_books)

precision=get_topK_precision(test_df,naive_recommendation,k=k)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
print(f"Precisión a Top 10 del modelo Naive {precision}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Precisi?n a Top 10 del modelo Naive 1.7052915195852733e-06

### MODELO ALS

In [17]:
# Configurar el modelo ALS
als = ALS(
    userCol="user_index",
    itemCol="book_index",
    ratingCol="review/score",
    coldStartStrategy="drop"  # Evita NaN en predicciones
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
### Nuevo
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

param_grid = ParamGridBuilder() \
    .addGrid(als.rank, [5, 10, 15]) \
    .addGrid(als.maxIter, [10, 20]) \
    .addGrid(als.regParam, [0.01, 0.1, 0.5]) \
    .build()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
# Para obtener el error cuadrático medio raíz (RMSE)
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="review/score", predictionCol="prediction")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
### Nuevo
# Configura el CrossValidator
cross_validator = CrossValidator(
    estimator=als,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    numFolds=3,  # Número de pliegues
    parallelism=4  # Número de tareas paralelas
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
# Entrena el modelo NUEVO
cv_model = cross_validator.fit(train_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
### NUevo
# Extrae los mejores hiperparámetros
best_model = cv_model.bestModel
print("Mejor número de iteraciones:", best_model._java_obj.parent().getMaxIter())
print("Mejor número de factores latentes (rank):", best_model._java_obj.parent().getRank())
print("Mejor valor de regParam:", best_model._java_obj.parent().getRegParam())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Mejor n?mero de iteraciones: 20
Mejor n?mero de factores latentes (rank): 15
Mejor valor de regParam: 0.1

In [31]:
best_model.save("s3://mybigdatapablo/jupyter/jovyan/mejor_modelo_als")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
best_model = ALSModel.load("s3://mybigdatapablo/jupyter/jovyan/mejor_modelo_als")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
# Evaluar el modelo usando el conjunto de prueba
predictions = best_model.transform(test_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error = {rmse}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Root-mean-square error = 0.875201402782071

In [26]:
# Generar recomendaciones de libros para cada usuario
user_recommendations = best_model.recommendForAllUsers(10)  # 10 recomendaciones por usuario

# Unir con el DataFrame original para tener el conteo por usuario en cada fila
user_recommendations = user_recommendations.join(unique_users_df, on="user_index", how="left")
# Mostrar algunas recomendaciones
user_recommendations.show(10, truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------+
|user_index|recommendations                                                                                                                                                                                              |profileName                                      |
+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------+
|329       |[{161794, 8.085935}, {88540, 7.485327}, {134703, 7.2558765}, {151589, 7.1715884}, {192689, 6.996677}, {163573, 6.9194417}, {182853, 6.893911}, {100960, 6.8717837}, {111192, 6.847019

In [28]:
user_recommendations = user_recommendations.withColumn("recomendations_unique", F.explode("recommendations"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
user_recommendations.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+--------------------+-----------+---------------------+
|user_index|     recommendations|profileName|recomendations_unique|
+----------+--------------------+-----------+---------------------+
|     11757|[{193091, 1.48484...|   HopesMom|  {193091, 1.4848477}|
|     11757|[{193091, 1.48484...|   HopesMom|  {201245, 1.4136168}|
|     11757|[{193091, 1.48484...|   HopesMom|  {111192, 1.3696492}|
|     11757|[{193091, 1.48484...|   HopesMom|  {100960, 1.3562518}|
|     11757|[{193091, 1.48484...|   HopesMom|  {179092, 1.3394797}|
|     11757|[{193091, 1.48484...|   HopesMom|   {88540, 1.3316348}|
|     11757|[{193091, 1.48484...|   HopesMom|  {161794, 1.3064508}|
|     11757|[{193091, 1.48484...|   HopesMom|  {208156, 1.2668185}|
|     11757|[{193091, 1.48484...|   HopesMom|   {134703, 1.237793}|
|     11757|[{193091, 1.48484...|   HopesMom|  {194194, 1.2208441}|
+----------+--------------------+-----------+---------------------+
only showing top 10 rows

In [30]:
user_recommendations = user_recommendations.withColumn("book_index", F.col("recomendations_unique").getField("book_index"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
user_recommendations = user_recommendations.select("user_index","profileName", "book_index")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
user_recommendations.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Interrupted by user


In [42]:
precision= get_topK_precision(test_df=test_df, recommendations=user_recommendations, k=10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [43]:
print(f"Precisión a Top 10 del modelo ALS {precision}")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Precisi?n a Top 10 del modelo ALS 0.011737094557211194

In [44]:
unique_users_df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+--------------------+
|user_index|         profileName|
+----------+--------------------+
|   14065.0|              Womble|
|  597823.0|        Liz Arimento|
|  749897.0|           L. Hunter|
|   72031.0|            kdkeller|
|  132942.0|         C. J. Bondi|
|    2667.0|Royce E. Buehler ...|
|  108195.0|      Renee D. Smith|
|    8981.0|       Bookish Susie|
|    4362.0|A reader "moneysa...|
|    8611.0|     Bill Corporandy|
|  813260.0|          Texas Girl|
|  515566.0|       S. Barry "SB"|
|   52669.0|indianajonesgirl ...|
|   83266.0|              Giulia|
|  272490.0|           Dan Keren|
|  256974.0|       Anthony Knoll|
|   63560.0|      bozo the clown|
|  814432.0|Kitty Shortledge,...|
|  393708.0|      Clodia Metelli|
|    6893.0|     Simone Oltolina|
+----------+--------------------+
only showing top 20 rows

In [49]:
user_recommendations.write.parquet("s3://mybigdatapablo/jupyter/jovyan/recommendations_10/")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [48]:
data.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+--------------+------------+--------------------+--------------------+----------+----------+----------+----------+----+
|        Id|       User_id|review/score|               Title|         profileName|user_index|book_index|user_count|item_count|rank|
+----------+--------------+------------+--------------------+--------------------+----------+----------+----------+----------+----+
|0001720104|A3INVHNCS10T1Y|         2.0|Mr. Plod and the ...|          "hurburgh"|    3324.0|  141434.0|        43|         1|   1|
|000217068X|A30LHK7PL4QGFI|         4.0|         Marsh Arabs|    Robert L. France|   11759.0|   32739.0|        18|        11|   1|
|000217068X|A2YUFVQ5UZOOVL|         5.0|         Marsh Arabs|bassoonmuse "godb...|  143501.0|   32739.0|         3|        11|   2|
|000217068X| AIGNOD62RNX6E|         5.0|         Marsh Arabs|Sanni M. Slabbert...|   80556.0|   32739.0|         5|        11|   3|
|000217068X|A148R759F9JW0A|         5.0|         Marsh Arabs|         othoni