In [89]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, round
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import explode
# Inicializamos la sesion de Spark
spark = SparkSession.builder.appName("SistemaRecomendacion_ALS").config("spark.executor.memory", "8g").config("spark.executor.cores", "6").config("spark.driver.memory","8g").getOrCreate()

# Cargamos el dataset
df = spark.read.option("delimiter", ",").option("quote", '"').option("escape", '"').csv("data/anime.csv", header=True, inferSchema=True)


# Limpieza de los datos

In [90]:
# Visualizamos todo el dataset de animes
df.show()

+---+--------------------+-----+--------------------+--------------------+------------------------------+-----+--------+--------------------+-----------+--------------------+--------------------+----------------+-----------+---------------+--------------------+------+----------+-------+---------+--------+---------+-------+-------+-------------+--------+--------+--------+--------+--------+-------+-------+-------+-------+-------+
| ID|                Name|Score|              Genres|        English name|                 Japanese name| Type|Episodes|               Aired|  Premiered|           Producers|           Licensors|         Studios|     Source|       Duration|              Rating|Ranked|Popularity|Members|Favorites|Watching|Completed|On-Hold|Dropped|Plan to Watch|Score-10| Score-9| Score-8| Score-7| Score-6|Score-5|Score-4|Score-3|Score-2|Score-1|
+---+--------------------+-----+--------------------+--------------------+------------------------------+-----+--------+----------------

In [91]:
# Visualizamos las filas con valores nulos
df.where(df["Type"] == "Unknown").count()

37

In [92]:
# Visualizamos los tipos de animes que existen (Movie, TV, OVA, etc)
df.select("Type").distinct().show(20, False)

+-------+
|Type   |
+-------+
|TV     |
|Special|
|Unknown|
|OVA    |
|Music  |
|Movie  |
|ONA    |
+-------+



In [93]:
df = df.drop(df["Premiered"])

In [94]:
#df = df.drop(*[
#    when(col("Genres") == "Uknown", column).alias(column)
#    for column in df.columns
#])

In [95]:
# Se filtra el dataset para que solo contenga los animes de tipo Movie y TV
df_filtered = df.filter((df["Type"] == "Movie") | (df["Type"] == "TV"))

In [96]:

# Convertimos los valores de las columnas de Score-1 a Score-10 a valores numericos cambiando los Unknown por 0
df_filtered = df_filtered.withColumn("Score-1", when(df_filtered["Score-1"] == "Unknown", 0).otherwise(df_filtered["Score-1"])) \
    .withColumn("Score-2", when(df_filtered["Score-2"] == "Unknown", 0).otherwise(df_filtered["Score-2"])) \
    .withColumn("Score-3", when(df_filtered["Score-3"] == "Unknown", 0).otherwise(df_filtered["Score-3"])) \
    .withColumn("Score-4", when(df_filtered["Score-4"] == "Unknown", 0).otherwise(df_filtered["Score-4"])) \
    .withColumn("Score-5", when(df_filtered["Score-5"] == "Unknown", 0).otherwise(df_filtered["Score-5"])) \
    .withColumn("Score-6", when(df_filtered["Score-6"] == "Unknown", 0).otherwise(df_filtered["Score-6"])) \
    .withColumn("Score-7", when(df_filtered["Score-7"] == "Unknown", 0).otherwise(df_filtered["Score-7"])) \
    .withColumn("Score-8", when(df_filtered["Score-8"] == "Unknown", 0).otherwise(df_filtered["Score-8"])) \
    .withColumn("Score-9", when(df_filtered["Score-9"] == "Unknown", 0).otherwise(df_filtered["Score-9"])) \
    .withColumn("Score-10", when(df_filtered["Score-10"] == "Unknown", 0).otherwise(df_filtered["Score-10"]))
    

In [97]:
# Visualizamos el dataset con Scores en Unknown y verificamos que los valores de Scores-1 al Scores-10 sean numericos
df_filtered.where(df_filtered["Score"] == "Unknown").show()


+----+--------------------+-------+--------------------+--------------------+-------------------------------------+-----+--------+--------------------+--------------------+---------+--------------------+---------+---------------+--------------------+-------+----------+-------+---------+--------+---------+-------+-------+-------------+--------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|  ID|                Name|  Score|              Genres|        English name|                        Japanese name| Type|Episodes|               Aired|           Producers|Licensors|             Studios|   Source|       Duration|              Rating| Ranked|Popularity|Members|Favorites|Watching|Completed|On-Hold|Dropped|Plan to Watch|Score-10|Score-9|Score-8|Score-7|Score-6|Score-5|Score-4|Score-3|Score-2|Score-1|
+----+--------------------+-------+--------------------+--------------------+-------------------------------------+-----+--------+--------------------+-------------

In [98]:
## Se muestra uno de los animes para verificar un cambio en la siguinete ejecución
df_filtered.where(df_filtered["ID"] == "1547").show()

+----+----------------+-------+--------------------+------------+--------------+----+--------+--------------------+---------+---------+-----------------+------+---------------+------------+-------+----------+-------+---------+--------+---------+-------+-------+-------------+--------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|  ID|            Name|  Score|              Genres|English name| Japanese name|Type|Episodes|               Aired|Producers|Licensors|          Studios|Source|       Duration|      Rating| Ranked|Popularity|Members|Favorites|Watching|Completed|On-Hold|Dropped|Plan to Watch|Score-10|Score-9|Score-8|Score-7|Score-6|Score-5|Score-4|Score-3|Score-2|Score-1|
+----+----------------+-------+--------------------+------------+--------------+----+--------+--------------------+---------+---------+-----------------+------+---------------+------------+-------+----------+-------+---------+--------+---------+-------+-------+-------------+--------+--

In [99]:
# Se calcula el Score promedio de cada anime si no tiene, y se guarda en la columna Score en otro caso mantiene el Score que tiene
df_filtered = df_filtered.withColumn("Score", when(df_filtered["Score"] == "Unknown", (df_filtered["Score-1"] *1 + df_filtered["Score-2"] *2 + df_filtered["Score-3"] *3 + df_filtered["Score-4"] *4 + df_filtered["Score-5"] *5 + df_filtered["Score-6"] *6 + df_filtered["Score-7"] *7 + df_filtered["Score-8"] *8 + df_filtered["Score-9"] *9 + df_filtered["Score-10"] *10) / (df_filtered["Score-1"] + df_filtered["Score-2"] + df_filtered["Score-3"] + df_filtered["Score-4"] + df_filtered["Score-5"] + df_filtered["Score-6"] + df_filtered["Score-7"] + df_filtered["Score-8"] + df_filtered["Score-9"] + df_filtered["Score-10"])).otherwise(df_filtered["Score"]))

In [100]:
# Redondeamos el Score a 2 decimales
df_filtered = df_filtered.withColumn("Score", round(df_filtered.Score, 2))

In [101]:
# Se muestra uno de los animes para verificar que el Score se haya calculado correctamente
df_filtered.where(df_filtered["ID"] == "1547").show()

+----+----------------+-----+--------------------+------------+--------------+----+--------+--------------------+---------+---------+-----------------+------+---------------+------------+-------+----------+-------+---------+--------+---------+-------+-------+-------------+--------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|  ID|            Name|Score|              Genres|English name| Japanese name|Type|Episodes|               Aired|Producers|Licensors|          Studios|Source|       Duration|      Rating| Ranked|Popularity|Members|Favorites|Watching|Completed|On-Hold|Dropped|Plan to Watch|Score-10|Score-9|Score-8|Score-7|Score-6|Score-5|Score-4|Score-3|Score-2|Score-1|
+----+----------------+-----+--------------------+------------+--------------+----+--------+--------------------+---------+---------+-----------------+------+---------------+------------+-------+----------+-------+---------+--------+---------+-------+-------+-------------+--------+-------+

In [102]:
# Comprobación de la media de los scores
# num = (8*10+5*9+6*8+20*7+28*6+31*5+6*4+6*3+2*2+10*1)/(8+5+6+20.0+28+31+6+6+2+10.0)
# num

In [103]:
# Se visualiza el dataset filtrado nuevamente
df_filtered.show()

+---+--------------------+-----+--------------------+--------------------+------------------------------+-----+--------+--------------------+--------------------+--------------------+----------------+-----------+---------------+--------------------+------+----------+-------+---------+--------+---------+-------+-------+-------------+--------+--------+--------+--------+--------+-------+-------+-------+-------+-------+
| ID|                Name|Score|              Genres|        English name|                 Japanese name| Type|Episodes|               Aired|           Producers|           Licensors|         Studios|     Source|       Duration|              Rating|Ranked|Popularity|Members|Favorites|Watching|Completed|On-Hold|Dropped|Plan to Watch|Score-10| Score-9| Score-8| Score-7| Score-6|Score-5|Score-4|Score-3|Score-2|Score-1|
+---+--------------------+-----+--------------------+--------------------+------------------------------+-----+--------+--------------------+-------------------

In [104]:
# se visualiza el numero de filas del dataset filtrado
df_filtered.count()

8037

In [105]:
# Visualizamos el numero de filas sin valoraciones
df_filtered.where((df_filtered["Score-1"] == 0) & (df_filtered["Score-2"] == 0) & (df_filtered["Score-3"] == 0) & (df_filtered["Score-4"] == 0) & (df_filtered["Score-5"] == 0) & (df_filtered["Score-6"] == 0) & (df_filtered["Score-7"] == 0) & (df_filtered["Score-8"] == 0) & (df_filtered["Score-9"] == 0) & (df_filtered["Score-10"] == 0)).count()

188

In [106]:
# Visualizamos las filas sin valoraciones
df_filtered.where((df_filtered["Score-1"] == 0) & (df_filtered["Score-2"] == 0) & (df_filtered["Score-3"] == 0) & (df_filtered["Score-4"] == 0) & (df_filtered["Score-5"] == 0) & (df_filtered["Score-6"] == 0) & (df_filtered["Score-7"] == 0) & (df_filtered["Score-8"] == 0) & (df_filtered["Score-9"] == 0) & (df_filtered["Score-10"] == 0)).show()

+-----+--------------------+-----+--------------------+--------------------+----------------------------------+-----+--------+--------------+--------------------+----------+------------+------------+--------+--------------------+-------+----------+-------+---------+--------+---------+-------+-------+-------------+--------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|   ID|                Name|Score|              Genres|        English name|                     Japanese name| Type|Episodes|         Aired|           Producers| Licensors|     Studios|      Source|Duration|              Rating| Ranked|Popularity|Members|Favorites|Watching|Completed|On-Hold|Dropped|Plan to Watch|Score-10|Score-9|Score-8|Score-7|Score-6|Score-5|Score-4|Score-3|Score-2|Score-1|
+-----+--------------------+-----+--------------------+--------------------+----------------------------------+-----+--------+--------------+--------------------+----------+------------+------------+-------

In [107]:
# Visualizamos las filas sin valoraciones de otra manera
df_filtered.filter(df_filtered.Score.isNull()).show()

+-----+--------------------+-----+--------------------+--------------------+----------------------------------+-----+--------+--------------+--------------------+----------+------------+------------+--------+--------------------+-------+----------+-------+---------+--------+---------+-------+-------+-------------+--------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|   ID|                Name|Score|              Genres|        English name|                     Japanese name| Type|Episodes|         Aired|           Producers| Licensors|     Studios|      Source|Duration|              Rating| Ranked|Popularity|Members|Favorites|Watching|Completed|On-Hold|Dropped|Plan to Watch|Score-10|Score-9|Score-8|Score-7|Score-6|Score-5|Score-4|Score-3|Score-2|Score-1|
+-----+--------------------+-----+--------------------+--------------------+----------------------------------+-----+--------+--------------+--------------------+----------+------------+------------+-------

In [108]:
# Se filtra por los animes que tienen valoraciones
df_filtered = df_filtered.filter(df_filtered.Score.isNotNull())

## Entrenamiento del Algoritmo ALS

In [109]:
# Se cargan los valores del csv de ratings
df_ratings = spark.read.csv("data/rating_complete.csv", header=True, sep=",", encoding="UTF-8", inferSchema=True)

# Se cargan los valores de ratings del usuario 66666
df_ratings_ep = spark.read.csv("data/valoraciones_EP.csv", header=False, sep=",", encoding="UTF-8", inferSchema=True)
# Como el csv de datos no tiene header, se reemplaza el nombre por defecto por los que se quieren
df_ratings_ep = df_ratings_ep.withColumnRenamed("_c0", "user_id")
df_ratings_ep = df_ratings_ep.withColumnRenamed("_c1", "anime_id")
df_ratings_ep = df_ratings_ep.withColumnRenamed("_c2", "rating")

In [110]:
print("El numero de valoraciones: " + str(df_ratings.count()))
print("El numero de valoraciones del usuario EP: " + str(df_ratings_ep.count()))

El numero de valoraciones: 57633278
El numero de valoraciones del usuario EP: 65


In [111]:
df_filtered = df_filtered.withColumnRenamed("Rating","Category_rating")

In [112]:
# Se realiza un union de los dos dataframes de valoraciones
df_ratings_completo = df_ratings.union(df_ratings_ep)

In [113]:
df_ratings_completo.show()

+-------+--------+------+
|user_id|anime_id|rating|
+-------+--------+------+
|      0|     430|   9.0|
|      0|    1004|   5.0|
|      0|    3010|   7.0|
|      0|     570|   7.0|
|      0|    2762|   9.0|
|      0|     431|   8.0|
|      0|     578|  10.0|
|      0|     433|   6.0|
|      0|    1571|  10.0|
|      0|     121|   9.0|
|      0|     356|   9.0|
|      0|    1250|   7.0|
|      0|    2913|   6.0|
|      0|    1689|   6.0|
|      0|      68|   6.0|
|      0|    1829|   7.0|
|      0|     600|   6.0|
|      0|    3418|   9.0|
|      0|     164|   8.0|
|      0|    1894|   7.0|
+-------+--------+------+
only showing top 20 rows



In [114]:
# Se realiza un join con el dataframe de animes, para conocer cuales son peliculas y cuales son series
df_ratings_completo_movies = df_ratings_completo.join(df_filtered, df_ratings_completo["anime_id"] == df_filtered["ID"], "inner")

In [115]:
# Se obtienen las peliculas valoradas por los usuarios
df_ratings_movies = df_ratings_completo_movies.filter(df_ratings_completo_movies["Type"] == "Movie").select("user_id", "anime_id", "rating")

In [116]:
# Se obtienen las series valoradas por los usuarios
df_ratings_series = df_ratings_completo_movies.filter(df_ratings_completo_movies["Type"] == "TV").select("user_id", "anime_id", "rating")

In [117]:
df_ratings_series.show()

+-------+--------+------+
|user_id|anime_id|rating|
+-------+--------+------+
|      0|    3010|   7.0|
|      0|    2762|   9.0|
|      0|    1571|  10.0|
|      0|     121|   9.0|
|      0|     356|   9.0|
|      0|    1250|   7.0|
|      0|    2913|   6.0|
|      0|      68|   6.0|
|      0|     600|   6.0|
|      0|    3418|   9.0|
|      0|    2034|   8.0|
|      0|    2547|   7.0|
|      0|     169|   7.0|
|      0|     174|   4.0|
|      0|    2543|   7.0|
|      0|    4086|   6.0|
|      0|     419|   8.0|
|      1|   22535|   9.0|
|      1|   38000|   9.0|
|      1|   18679|   6.0|
+-------+--------+------+
only showing top 20 rows



## Dividir los sets de datos

In [118]:
(training_movies, test_movies) = df_ratings_movies.randomSplit([0.8, 0.2])
(training_series, test_series) = df_ratings_series.randomSplit([0.8, 0.2])

In [142]:
als_movie = ALS(maxIter=5, regParam=0.1, userCol="user_id", itemCol="anime_id", ratingCol="rating", coldStartStrategy="drop")
model_movie = als_movie.fit(training_movies)

In [143]:
als_series = ALS(maxIter=5, regParam=0.1, userCol="user_id", itemCol="anime_id", ratingCol="rating", coldStartStrategy="drop")
model_series = als_series.fit(training_series)

In [144]:
predictions_movies = model_movie.transform(test_movies)
predictions_series = model_series.transform(test_series)

In [145]:
predictions_movies.show()

+-------+--------+------+----------+
|user_id|anime_id|rating|prediction|
+-------+--------+------+----------+
|     27|     199|   7.0|  7.467622|
|     27|     433|   5.0|  5.477476|
|     27|     468|   9.0| 5.9065013|
|     27|   28851|  10.0|  7.915716|
|     28|     431|  10.0|   8.14528|
|     28|     459|   8.0|  7.685598|
|     28|    1119|   8.0| 7.3078337|
|     28|    1120|   8.0|  7.199186|
|     28|    1526|  10.0| 7.6236405|
|     28|    2006|   8.0|  7.375023|
|     28|    2201|   8.0|  7.438695|
|     34|     371|   9.0|  8.593226|
|     34|     449|  10.0|  8.187546|
|     34|     450|  10.0|  8.418671|
|     34|    2144|   6.0|  6.718346|
|     34|   16528|  10.0|  8.204545|
|     44|     512|   7.0|  6.846034|
|     44|     597|   6.0| 5.8049736|
|     44|     974|   7.0|  6.132569|
|     44|    2236|   7.0| 6.6550636|
+-------+--------+------+----------+
only showing top 20 rows



In [146]:
predictions_series.show()

+-------+--------+------+----------+
|user_id|anime_id|rating|prediction|
+-------+--------+------+----------+
|     27|      20|  10.0| 7.5609174|
|     27|     121|   8.0|  7.710748|
|     27|     276|   9.0| 6.0174265|
|     27|     934|   7.0|  7.494112|
|     27|    2476|   7.0| 4.1868806|
|     27|    2966|   9.0| 7.8213067|
|     27|   11757|   9.0| 7.8251987|
|     27|   13601|   8.0|  8.378403|
|     27|   18153|  10.0|  7.951451|
|     27|   22319|  10.0|  8.284065|
|     27|   25777|   9.0|  8.295728|
|     27|   31637|   7.0| 7.8112297|
|     27|   31798|   8.0|  7.685741|
|     27|   32901|   8.0| 6.6412864|
|     27|   33487|   7.0| 6.8316526|
|     27|   34382|   6.0| 6.5436797|
|     27|   34392|   6.0|  4.913827|
|     27|   35076|   8.0|  6.530383|
|     28|     226|  10.0|   8.97889|
|     28|     813|  10.0|  9.728958|
+-------+--------+------+----------+
only showing top 20 rows



In [147]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse_movies = evaluator.evaluate(predictions_movies)
rmse_series = evaluator.evaluate(predictions_series)
print("Root-mean-square error movies = " + str(rmse_movies))
print("Root-mean-square error series = " + str(rmse_series))

Root-mean-square error movies = 1.2982498330260732
Root-mean-square error series = 1.1786542177458779


In [148]:
user_ep = df_ratings_movies.select(als_movie.getUserCol()).where(df_ratings_movies["user_id"] == 66666).distinct()

In [149]:
recomendation_userEp_movies = model_movie.recommendForUserSubset(user_ep, 5)
recomendation_userEp_series = model_series.recommendForUserSubset(user_ep, 5)

In [150]:
# Se obtienen las recomendaciones de peliculas y series para el usuario ep
recomendation_userEp_movies.show(20, False)
recomendation_userEp_series.show(20, False)

+-------+-----------------------------------------------------------------------------------------------+
|user_id|recommendations                                                                                |
+-------+-----------------------------------------------------------------------------------------------+
|66666  |[{33894, 9.740658}, {39486, 9.616261}, {31463, 9.583441}, {28023, 9.529836}, {33132, 9.305414}]|
+-------+-----------------------------------------------------------------------------------------------+

+-------+---------------------------------------------------------------------------------------------+
|user_id|recommendations                                                                              |
+-------+---------------------------------------------------------------------------------------------+
|66666  |[{40748, 9.23185}, {13455, 9.219774}, {42923, 9.134537}, {5114, 9.089539}, {11061, 9.018715}]|
+-------+--------------------------------------------

In [151]:
recomendation_userEp_movies.printSchema()
recomendation_userEp_series.printSchema()

root
 |-- user_id: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- anime_id: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)

root
 |-- user_id: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- anime_id: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [152]:
# Separamos las recomendaciones de peliculas en columnas
df_recomendations_movies_userEp = recomendation_userEp_movies.select("user_id", explode("recommendations").alias("recommendations"))
# Separamos las recomendaciones de series en columnas
df_recomendations_series_userEp = recomendation_userEp_series.select("user_id", explode("recommendations").alias("recommendations"))

In [153]:
df_recomendations_movies_userEp = df_recomendations_movies_userEp.select("user_id", "recommendations.anime_id", "recommendations.rating")
df_recomendations_series_userEp = df_recomendations_series_userEp.select("user_id", "recommendations.anime_id", "recommendations.rating")

In [154]:
df_recomendations_movies_userEp.show(truncate=False)
df_recomendations_series_userEp.show(truncate=False)

+-------+--------+--------+
|user_id|anime_id|rating  |
+-------+--------+--------+
|66666  |33894   |9.740658|
|66666  |39486   |9.616261|
|66666  |31463   |9.583441|
|66666  |28023   |9.529836|
|66666  |33132   |9.305414|
+-------+--------+--------+

+-------+--------+--------+
|user_id|anime_id|rating  |
+-------+--------+--------+
|66666  |40748   |9.23185 |
|66666  |13455   |9.219774|
|66666  |42923   |9.134537|
|66666  |5114    |9.089539|
|66666  |11061   |9.018715|
+-------+--------+--------+



In [164]:
df_movies_recomended = df_recomendations_movies_userEp.join(df_filtered, df_recomendations_movies_userEp["anime_id"] == df_filtered["ID"], "inner")

In [165]:
df_movies_recomended = df_movies_recomended.orderBy("Score", ascending=False).select("anime_id", "Name", "English name", "Score")

In [166]:
df_movies_recomended.show(truncate=False)

+--------+------------------------------------------+--------------------+-----+
|anime_id|Name                                      |English name        |Score|
+--------+------------------------------------------+--------------------+-----+
|39486   |Gintama: The Final                        |Unknown             |8.88 |
|33894   |Sensou: Kodomo-tachi no Yuigon            |Unknown             |8.06 |
|33132   |Shin Kachikachi Yama                      |Unknown             |6.94 |
|28023   |Tsuru no Sugomori                         |Unknown             |6.57 |
|31463   |Maemilggot, Unsu Joeun Nal, Geurigo Bombom|The Road Called Life|5.62 |
+--------+------------------------------------------+--------------------+-----+



In [167]:
df_series_recomended = df_recomendations_series_userEp.join(df_filtered, df_recomendations_series_userEp["anime_id"] == df_filtered["ID"], "inner")

In [168]:
df_series_recomended = df_series_recomended.orderBy("Score", ascending=False).select("anime_id", "Name", "English name", "Score")

In [169]:
df_series_recomended.show(truncate=False)

+--------+--------------------------------+-------------------------------+-----+
|anime_id|Name                            |English name                   |Score|
+--------+--------------------------------+-------------------------------+-----+
|5114    |Fullmetal Alchemist: Brotherhood|Fullmetal Alchemist:Brotherhood|9.19 |
|11061   |Hunter x Hunter (2011)          |Hunter x Hunter                |9.1  |
|40748   |Jujutsu Kaisen (TV)             |Unknown                        |8.54 |
|42923   |SK∞                             |SK8 the Infinity               |7.96 |
|13455   |Zumomo to Nupepe                |Unknown                        |5.95 |
+--------+--------------------------------+-------------------------------+-----+



In [None]:
df_movies_recomended.write.text("results/movies_recomended.txt")
df_series_recomended.write.text("results/series_recomended.txt")