# LAB: Sistema de Recomendación de libros con PySpark

Algunos recursos para consulta:
- https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.recommendation.ALS.html
- https://www.geeksforgeeks.org/recommender-system-using-pyspark-python/
- https://spark.apache.org/docs/3.0.0/ml-collaborative-filtering.html

In [1]:
# !pip install pyspark

In [2]:
#importar los paquetes necesarios de pyspark
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

#Prepara la sesión Spark Session
spark = SparkSession.builder.appName('Recommender').getOrCreate()
spark

PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.

In [3]:
data = spark.read.csv('book_ratings.csv',
                      inferSchema=True,header=True)

#cargar el archivo csv que tiene la data en formato item, user, rating
#data = spark.read.csv('drive/MyDrive/Colab Notebooks/book_ratings.csv',
#                      inferSchema=True,header=True)

data.show(5)

+-------+-------+------+
|book_id|user_id|rating|
+-------+-------+------+
|      1|    314|     5|
|      1|    439|     3|
|      1|    588|     5|
|      1|   1169|     4|
|      1|   1185|     4|
+-------+-------+------+
only showing top 5 rows



In [4]:
data.describe().show()

+-------+-----------------+------------------+------------------+
|summary|          book_id|           user_id|            rating|
+-------+-----------------+------------------+------------------+
|  count|           981756|            981756|            981756|
|   mean|4943.275635697668|25616.759933221696|3.8565335989797873|
| stddev|2873.207414896143|15228.338825882149|0.9839408559619973|
|    min|                1|                 1|                 1|
|    max|            10000|             53424|                 5|
+-------+-----------------+------------------+------------------+



In [5]:
# Con los ratings observados podemos entrenar y evaluar el modelo
train_data, test_data = data.randomSplit([0.8, 0.2], seed = 1)

In [6]:
als = ALS(maxIter=5,
          regParam=0.01,
          userCol="user_id",
          itemCol="book_id",
          ratingCol="rating",
          coldStartStrategy="drop")

#Ajuste del modelo
model = als.fit(train_data)

In [7]:
# Calcula las predicciones o ratings para la data destinada al testing
predictions = model.transform(test_data)

#Mira las predicciones hechas
predictions.show()

+-------+-------+------+----------+
|book_id|user_id|rating|prediction|
+-------+-------+------+----------+
|      3|  32592|     5| 3.4869146|
|      4|  32592|     5| 4.5206113|
|      7|  19984|     5|  4.280432|
|     17|  19984|     5|  4.958942|
|     20|  32592|     4|  4.188769|
|     25|  32592|     4|  4.537844|
|     29|  19984|     5| 5.1295385|
|     38|  19984|     5| 4.9099345|
|     54|  35982|     3| 4.5072126|
|     56|  32592|     4| 3.8695033|
|     62|  35982|     5|  4.370805|
|     65|  35982|     5| 3.9962554|
|     72|  19984|     4|  4.916008|
|     74|  35982|     2| 3.1854703|
|     78|   1088|     2| 3.2281656|
|     89|   1088|     4| 4.5375175|
|     89|  19984|     5|   5.69235|
|     91|  35982|     2| 2.2963731|
|     96|   6397|     3|   3.47771|
|     99|   6658|     3| 2.5265248|
+-------+-------+------+----------+
only showing top 20 rows



In [8]:
# Evalúa el modelo calculando el error de la predicción con el método MAE
evaluator = RegressionEvaluator(metricName="mae", labelCol="rating", predictionCol="prediction")
mae = evaluator.evaluate(predictions)
print("Mean Absolute error = " + str(mae))

Mean Absolute error = 1.2839760669496554


In [9]:
# Evalúa el modelo calculando el error de la predicción con el método RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.9146746934151253


### Analizaremos los ratings de un usuario en específico, para observar el comportamiento predictivo del modelo 

In [10]:
#Trabajaremos con el usuario con user id "5461" y extraeremos toda su data
user1 = test_data.filter(test_data['user_id']==5461).select(['book_id','user_id', 'rating'])

#Desplegar user1 data en orden descendente de rating
user1.orderBy('rating',ascending=False).show(40)

+-------+-------+------+
|book_id|user_id|rating|
+-------+-------+------+
|   6702|   5461|     5|
|     19|   5461|     5|
|     60|   5461|     5|
|     65|   5461|     5|
|     93|   5461|     5|
|    118|   5461|     5|
|    401|   5461|     5|
|    757|   5461|     5|
|   3613|   5461|     4|
|     31|   5461|     4|
|   4250|   5461|     4|
|     33|   5461|     4|
|   4759|   5461|     4|
|     47|   5461|     4|
|     82|   5461|     4|
|    180|   5461|     4|
|    231|   5461|     4|
|    233|   5461|     4|
|    304|   5461|     4|
|    306|   5461|     4|
|    375|   5461|     4|
|    561|   5461|     4|
|    669|   5461|     4|
|    876|   5461|     4|
|   1644|   5461|     4|
|   2128|   5461|     4|
|   3579|   5461|     3|
|    454|   5461|     3|
|   6404|   5461|     3|
|    885|   5461|     3|
|    935|   5461|     3|
|   1567|   5461|     3|
+-------+-------+------+



In [11]:
#Comparemos las predicciones hechas con el modelo respecto a la data que se usó para testing
recommendations = model.transform(user1)

#Desplegar user1 predictions en orden descendente de predicción de rating
recommendations.orderBy('prediction',ascending=False).show()

+-------+-------+------+----------+
|book_id|user_id|rating|prediction|
+-------+-------+------+----------+
|    669|   5461|     4| 4.9032736|
|     65|   5461|     5| 4.8290267|
|    561|   5461|     4| 4.6410017|
|   3613|   5461|     4|  4.602353|
|     19|   5461|     5| 4.5782886|
|     60|   5461|     5|  4.540062|
|     31|   5461|     4| 4.5013084|
|    118|   5461|     5| 4.4586267|
|     47|   5461|     4|  4.438166|
|     93|   5461|     5| 4.4379635|
|    231|   5461|     4| 4.4179544|
|     82|   5461|     4| 4.3998127|
|    757|   5461|     5| 4.3322215|
|    233|   5461|     4|  4.192278|
|    375|   5461|     4| 4.1898785|
|    401|   5461|     5| 4.1572857|
|   4250|   5461|     4|  4.141159|
|    304|   5461|     4|  4.129478|
|   1644|   5461|     4|  4.042882|
|    885|   5461|     3|  4.034915|
+-------+-------+------+----------+
only showing top 20 rows



### Cálculo de las RECOMENDACIONES para el usuario

In [12]:
type(recommendations)

pyspark.sql.dataframe.DataFrame

In [13]:
#Vamos los ids de usuarios (valores únicos)
users = data.select(als.getUserCol()).distinct()
users.show()

+-------+
|user_id|
+-------+
|  32592|
|  19984|
|  35982|
|   1088|
|   3918|
|   6397|
|   6658|
|   4900|
|  11317|
|  15727|
|  39285|
|   1645|
|  22373|
|  51123|
|  46521|
|  40383|
|  47217|
|  18498|
|  35912|
|  44822|
+-------+
only showing top 20 rows



In [14]:
#almacenaremos al user en la variable propia de spark
user = users[users['user_id'] == 5461]
user.show()

+-------+
|user_id|
+-------+
|   5461|
+-------+



In [15]:
# Con el usuario dado, podemos aplicar la función .recommendForUserSubset() que toma como parámetro
# la lista de usuarios para quienes calcular las recomendaciones
userTopRecommedations = model.recommendForUserSubset(user, 10)
userTopRecommedations.show()

+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|   5461|[{7039, 7.3091564...|
+-------+--------------------+



In [16]:
#estructura de la data en recommendations
userTopRecommedations.select("recommendations")

DataFrame[recommendations: array<struct<book_id:int,rating:float>>]

In [17]:
userTopRecommedations.select("recommendations.book_id", "recommendations.rating").first()

Row(book_id=[7039, 5606, 9248, 9423, 9790, 9127, 8903, 5068, 8891, 7593], rating=[7.30915641784668, 6.786949157714844, 6.676485538482666, 6.316593647003174, 6.233674049377441, 6.055983543395996, 5.686757564544678, 5.6472601890563965, 5.623122692108154, 5.4908623695373535])

### Podemos generar el top-k recomendaciones para todos los usuarios o para todos los libros

In [18]:
# Generate top 10 book recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each book
bookRecs = model.recommendForAllItems(10)


In [19]:
userRecs[userRecs['user_id']==5461].show()

+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|   5461|[{7039, 7.3091564...|
+-------+--------------------+



### Podemos generar el top-k recomendaciones para un subset de los usuarios o para un subset de los libros

In [20]:
# Generate top 10 book recommendations for a specified set of users
usersSet = data.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(usersSet, 10)

# Generate top 10 user recommendations for a specified set of books
booksSet = data.select(als.getItemCol()).distinct().limit(3)
bookSubSetRecs = model.recommendForItemSubset(booksSet, 10)

# Para resolver
1. Qué libro se recomienda como primera opción para estos 3 usuarios?

In [21]:
usersSet.show() 

+-------+
|user_id|
+-------+
|  32592|
|  19984|
|  35982|
+-------+



2. A quiénes se puede recomendar estos libros para su compra?

In [22]:
booksSet.show() 

+-------+
|book_id|
+-------+
|    148|
|    463|
|    471|
+-------+



3. Cómo se puede intentar mejorar el rendimiento predictivo del sistema?