### Seminario de Bases de Datos- UdeA 2020-1
#### Sistema de recomendación de peliculas con Spark- Trabajo Final

Departamento de Ingeniería de Sistemas   
Universidad de Antioquia, Medellín, Colombia  
Carlos Alberto Benavidez, carlos.benavidez@udea.edu.co    
Juan Camilo Rojas, juan.rojas9@udea.edu.co 

<img src=Imagenes/movies_r.png>

**Contextualización:** En el presente Notebook contiene la implemetación de un sistema de recomendación  de peliculas basado en Spark. Para ello se trabajo con dos bases de datos que fueron de recuperados del sitio [Groupleans](https://grouplens.org/datasets/movielens/).   
La primera base de dato consiste en un archivo con 100.000 calificaciones que evaluaban 9.000 peliculas.  
Las calificacaciones  de las peliculas fueron realizadas por un total de 600 usuarios. La primera base de datos fue utilizada para determinar los mejores parametros del modelo que ayudara a  predecir cuales seran las peliculas que un usuario podria visualizar con mayor probabilidad. Esta base de datos se encuentra contenida de la carpeta ml-latest-small.  
Una vez se hallan determinado los mejores parametros, se procedera a hacer el entrenamiento con una base de datos que contiene mas registros, con el preposito de que el sistema de recomendación desarrollo una mayor capacidad de predicción.  
La segunda base de datos (**ratings.csv**) contiene 27.000.000 de calificaciones de  58.000 peliculas realizadas por 280.000 usuarios.

Las herramientas para la implementacion de este sistema de recomendación de peliculas que se emplearon fueron:
* Python ( Librerias: "Pandas","pyspark")
* Spark version 3.0.1
* Anaconda
* Gestión de Versiones basadao en Git (Github)

### ALS Alternatving Least Squere [1]

Alternating Least Square (ALS) is also a matrix factorization algorithm and it runs itself in a parallel fashion. ALS is implemented in Apache Spark ML and built for a larges-scale collaborative filtering problems. ALS is doing a pretty good job at solving scalability and sparseness of the Ratings data, and it’s simple and scales well to very large datasets.

<img src=Imagenes/als-illustration.png>

#### Inicializacion de Spark

In [1]:
import pandas as pd

In [2]:
import findspark

In [3]:
findspark.init('C:\BigData\Spark')

In [4]:
findspark.init()
findspark.find()

'C:\\BigData\\Spark'

In [5]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark import Row
import re 

In [7]:
environment_to_connect = 'local'
SparkContext.setSystemProperty('spark.executor.memory', '2g') #We have to extend the memory capacity
conf = SparkConf().setAppName("SparkRecommendationfull").setMaster(environment_to_connect)
sc = SparkContext.getOrCreate(conf)
spark = SparkSession(sc)


### Working with RDD, (rating_rdd)

In [8]:
ratings_rdd=sc.textFile('ml-latest/ratings.csv') #Obtener los datos del archivo de ratings del directorio

In [10]:
ratings_rdd.take(10) #Muestra

['userId,movieId,rating,timestamp',
 '1,307,3.5,1256677221',
 '1,481,3.5,1256677456',
 '1,1091,1.5,1256677471',
 '1,1257,4.5,1256677460',
 '1,1449,4.5,1256677264',
 '1,1590,2.5,1256677236',
 '1,1591,1.5,1256677475',
 '1,2134,4.5,1256677464',
 '1,2478,4.0,1256677239']

In [11]:
data_header_ratings = ratings_rdd.take(1)[0] #Obtener el encabezado

### Getting the rating_data

In [12]:
# Eliminar el encabezado y el timestamp y tokenizar el user_id movie_id y timestamp
ratings_data = ratings_rdd.filter(lambda line: line!=data_header_ratings)\
.map(lambda line: line.split(","))\
.map(lambda tokens: (tokens[0],tokens[1],tokens[2]))


In [13]:
ratings_data.take(10) #Muestra 

[('1', '307', '3.5'),
 ('1', '481', '3.5'),
 ('1', '1091', '1.5'),
 ('1', '1257', '4.5'),
 ('1', '1449', '4.5'),
 ('1', '1590', '2.5'),
 ('1', '1591', '1.5'),
 ('1', '2134', '4.5'),
 ('1', '2478', '4.0'),
 ('1', '2840', '3.0')]

### Getting the movies

In [14]:
movies_rdd=sc.textFile('ml-latest/movies.csv') #Obtener los datos del archivo de las peliculas del directorio
data_header_movies = movies_rdd.take(1)[0]

In [15]:
# Tokenizar las columnasmovie_id, titulo y genero
movies_data=movies_rdd.filter(lambda line: line!=data_header_movies)\
.map(lambda line: line.split(","))\
.map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()
movies_data.take(5)

[('1', 'Toy Story (1995)', 'Adventure|Animation|Children|Comedy|Fantasy'),
 ('2', 'Jumanji (1995)', 'Adventure|Children|Fantasy'),
 ('3', 'Grumpier Old Men (1995)', 'Comedy|Romance'),
 ('4', 'Waiting to Exhale (1995)', 'Comedy|Drama|Romance'),
 ('5', 'Father of the Bride Part II (1995)', 'Comedy')]

### Working with ALS (collaborative filtering algorithm)

In [16]:
# Cargar las librerias AlS 
from pyspark.mllib.recommendation import ALS
import math

### The best parameters

Los mejores parametros fueron tomados empleando un algoritmo para hallar el error mini empleando en metodo de minimos cuadradoscon un menor dataset de peliculas y raings sacado de: 
[Movie_recommendation_system](https://github.com/carlosbenavidez9507/Movie_recommendation_system/blob/main/Recommendation_Spark.ipynb)


In [19]:
rank = 4
iterations = 10

### Splitting the data into training and test

In [23]:
training, test= ratings_data.randomSplit([7,3]) # se usa un porcentaje de 70% y de 30%

In [24]:
test.count()

8319934

In [25]:
training.count()

19433510

### Parameters [4]

* **NumFactors:** The number of latent factors to use for the underlying model. It is equivalent to the dimension of the calculated user and item vectors. (Default value: 10) 
* **Lambda:** Regularization factor. Tune this value in order to avoid overfitting or poor performance due to strong generalization. (Default value: 1)
* **Iterations:** The maximum number of iterations. (Default value: 10)
* **Blocks**: The number of blocks into which the user and item matrix are grouped. The fewer blocks one uses, the less data is sent redundantly. However, bigger blocks entail bigger update messages which have to be stored on the heap. If the algorithm fails because of an OutOfMemoryException, then try to increase the number of blocks. (Default value: None)  
* **Seed**: Random seed used to generate the initial item matrix for the algorithm. (Default value: 0)
* **TemporaryPath:** Path to a temporary directory into which intermediate results are stored. If this value is set, then the algorithm is split into two preprocessing steps, the ALS iteration and a post-processing step which calculates a last ALS half-step. The preprocessing steps calculate the OutBlockInformation and InBlockInformation for the given rating matrix. The results of the individual steps are stored in the specified directory. By splitting the algorithm into multiple smaller steps, Flink does not have to split the available memory amongst too many operators. This allows the system to process bigger individual messages and improves the overall performance. (Default value: None)

### Training the model

In [26]:
model = ALS.train(training, rank, iterations) 

In [27]:
test_for_predict_RDD = test.map(lambda x: (x[0], x[1])) #se asigna una clave conid_user, id_movie

### Making prediction

In [29]:
# con base en la id_user, id_movie y rating model.predictAll() predecira raitings de peliculas para todos los usuarios
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
# Unir las predicciones con los datos actuales de la matriz de ratings mapeados por su clave y valor
rates_and_preds = test.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
# hallar la media del error cuadrado
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())    
print("Mean Squared Error = " + str(error))

Mean Squared Error = 0.8396068189587629


### About the movies

In [32]:
# Eliminar los generos de las peliclas para crear una lista de solo peliculas con su clave
movies_titles = movies_data.map(lambda x: (int(x[0]),x[1])) 
movies_titles.take(5)

[(1, 'Toy Story (1995)'),
 (2, 'Jumanji (1995)'),
 (3, 'Grumpier Old Men (1995)'),
 (4, 'Waiting to Exhale (1995)'),
 (5, 'Father of the Bride Part II (1995)')]

In [35]:
print ("There are", str(movies_titles.count())," movies in the complete dataset") #muestra

There are 58098  movies in the complete dataset


In [41]:
# se hace un mapeo de las peliculas por su calificacion y su id
group_of_rating_movie = ratings_data.map(lambda x: (x[1], x[2]))
group_of_rating_movie.take(5)

[('307', '3.5'),
 ('481', '3.5'),
 ('1091', '1.5'),
 ('1257', '4.5'),
 ('1449', '4.5')]

In [43]:
# se mapea las peliculas por el usuario que la califico
tmp_group=group_of_rating_movie.map(lambda x: x[0])

In [51]:
# se agrupa el numero de calificaciones por peliculas
timo=(tmp_group.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y))

In [53]:
timito=movies_titles

In [54]:
descriptio_movies=timito.map(lambda p: Row(id_movie=p[0], description=p[1]))

### Working with SQL Queries

In [62]:
dfmaping5 = spark.createDataFrame(descriptio_movies)
dfmaping5.createOrReplaceTempView("descriptio_movies")
tabla5 = spark.sql("SELECT mapingMovies.id_movie, mapingMovies.frecuency,descriptio_movies.description  FROM mapingMovies INNER JOIN descriptio_movies on mapingMovies.id_movie=descriptio_movies.id_movie   order by frecuency desc")


In [63]:
tabla5.show()

+--------+---------+--------------------+
|id_movie|frecuency|         description|
+--------+---------+--------------------+
|     318|    97999|"Shawshank Redemp...|
|     356|    97040| Forrest Gump (1994)|
|     296|    92406| Pulp Fiction (1994)|
|     593|    87899|"Silence of the L...|
|    2571|    84545|             "Matrix|
|     260|    81815|Star Wars: Episod...|
|     480|    76451|Jurassic Park (1993)|
|     527|    71516|Schindler's List ...|
|     110|    68803|   Braveheart (1995)|
|       1|    68469|    Toy Story (1995)|
|    1210|    66023|Star Wars: Episod...|
|    1196|    65822|Star Wars: Episod...|
|    2959|    65678|   Fight Club (1999)|
|     589|    64258|Terminator 2: Jud...|
|    1198|    63505|Raiders of the Lo...|
|      50|    62180|     "Usual Suspects|
|    4993|    61883|"Lord of the Ring...|
|     858|    60904|          "Godfather|
|    2858|    60820|American Beauty (...|
|     780|    58949|Independence Day ...|
+--------+---------+--------------

### How many ratings per movie 

In [52]:
mapingMovies= timo.map(lambda p : Row(id_movie=p[0], frecuency=p[1]))
dfmaping4 = spark.createDataFrame(mapingMovies)
dfmaping4.createOrReplaceTempView("mapingMovies")
tabla4 = spark.sql("SELECT id_movie , frecuency FROM mapingMovies order by frecuency desc")
tabla4.show()

+--------+---------+
|id_movie|frecuency|
+--------+---------+
|     318|    97999|
|     356|    97040|
|     296|    92406|
|     593|    87899|
|    2571|    84545|
|     260|    81815|
|     480|    76451|
|     527|    71516|
|     110|    68803|
|       1|    68469|
|    1210|    66023|
|    1196|    65822|
|    2959|    65678|
|     589|    64258|
|    1198|    63505|
|      50|    62180|
|    4993|    61883|
|     858|    60904|
|    2858|    60820|
|     780|    58949|
+--------+---------+
only showing top 20 rows



### Adding new user ratings

In [69]:
# Se añade un nuevo usuario con algunas calificaciones de peliculas

new_user_ID = 0
#  (userID, movieID, rating)
dummy_user_ratings = [
     (0,260,9), # Star Wars (1977)
     (0,1,8), # Toy Story (1995)
     (0,16,7), # Casino (1995)
     (0,25,8), # Leaving Las Vegas (1995)
     (0,32,9), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
     (0,335,4), # Flintstones, The (1994)
     (0,379,3), # Timecop (1994)
     (0,296,7), # Pulp Fiction (1994)
     (0,858,10) , # Godfather, The (1972)
     (0,50,8) # Usual Suspects, The (1995)
    ]
new_user_ratings_RDD = sc.parallelize(dummy_user_ratings)

In [71]:
# La informacion se convierte en un RDD
data_with_new_ratings_RDD = ratings_data.union(new_user_ratings_RDD)

### Training the model with the new input 

In [72]:
nw_model = ALS.train(data_with_new_ratings_RDD, rank, iterations)

In [77]:
# no vuelva aparecer en el id de las peliculas del usuario
new_user_ratings_ids = map(lambda x: x[1], dummy_user_ratings) # get just movie IDs

In [82]:
# rescata las peliculas q no han sido seleccionadas 
unrated_movies_user = (movies_titles.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

### Making the new prediction

In [83]:
# realizar una nueva prediccion para las peliculas que no ha visto el usuario
new_recommendations_RDD = nw_model.predictAll(unrated_movies_user)

In [173]:
new_recommendations_RDD.take(10)

[Rating(user=0, product=116688, rating=1.6033434923158865),
 Rating(user=0, product=32196, rating=4.657505672617084),
 Rating(user=0, product=138744, rating=5.1833602040377915),
 Rating(user=0, product=81132, rating=3.117377536245579),
 Rating(user=0, product=81324, rating=5.2379959457766105),
 Rating(user=0, product=7020, rating=6.6483911271058105),
 Rating(user=0, product=136596, rating=8.673700498887683),
 Rating(user=0, product=134328, rating=4.780101462674151),
 Rating(user=0, product=138348, rating=5.182443584613017),
 Rating(user=0, product=60408, rating=5.159327474067133)]

In [91]:
new_recommendations_RDD.count()

53888

In [86]:
new_user_recommendations_rating_RDD = new_recommendations_RDD.map(lambda x: (x.product, x.rating))

In [94]:
new_user_recommendations_rating_RDD .take(5)

[(116688, 1.6033434923158865),
 (32196, 4.657505672617084),
 (138744, 5.1833602040377915),
 (81132, 3.117377536245579),
 (81324, 5.2379959457766105)]

In [87]:
# Concatenar el las peliculas calificadas del usuario con las calificaciones de las peliculas predictas
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(movies_titles)

### Predicted Training

In [90]:
new_user_recommendations_rating_title_and_count_RDD.take(20)

[(116688, (1.6033434923158865, 'Prime Evil (1988)')),
 (7020, (6.6483911271058105, 'Proof (1991)')),
 (5928, (4.094505136857857, '"Border')),
 (53352, (1.5919378376473539, 'Sheitan (2006)')),
 (4992, (3.6223134947038815, 'Kate & Leopold (2001)')),
 (162396, (0.5027394875178217, 'Skiptrace (2016)')),
 (1716, (2.6884069931046866, '"Other Voices')),
 (72228, (5.414885002673417, 'Creation (2009)')),
 (83616, (8.579623580633182, '"Real McCoy')),
 (77688,
  (6.5649538047104805,
   'Three Men and a Cradle (3 hommes et un couffin) (1985)')),
 (117312, (1.8187179947083276, 'The ABCs of Death 2 (2014)')),
 (106080, (3.8634028517977654, 'Fragment of Fear (1970)')),
 (107172, (-4.519814433447564, 'Ways to Live Forever (2010)')),
 (95628, (6.440686160494206, "Red's Dream (1987)")),
 (5148, (4.490166697412283, 'Black Like Me (1964)')),
 (153192, (8.211688369727113, 'How to Become Myself (2007)')),
 (153348, (5.329586310322478, 'The Matrimony (2007)')),
 (6552, (6.95290110711904, 'Dirty Pretty Things

In [112]:
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)


In [175]:
# Calificaciones asociadas a todas la pelicula
movie_ID_with_ratings_RDD = ratings_data.map(lambda x: (x[1], x[2])).groupByKey()
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

In [176]:
movie_ID_with_ratings_RDD.take(5)

[('3826', <pyspark.resultiterable.ResultIterable at 0x12cbed97e80>),
 ('104', <pyspark.resultiterable.ResultIterable at 0x12cbee58278>),
 ('153', <pyspark.resultiterable.ResultIterable at 0x12cbf029320>),
 ('165', <pyspark.resultiterable.ResultIterable at 0x12cbed97630>),
 ('181', <pyspark.resultiterable.ResultIterable at 0x12cbf755d68>)]

### Average rating 

In [177]:
# promedio de calificacion de cada pelicula
ri=movie_ID_with_ratings_RDD.map(lambda x: (int(x[0]),(sum(float(i) for i in list(x[1])))/len(list(x[1]))))

In [178]:
ri.take(10)

[(3826, 2.5461339626882444),
 (104, 3.3942778801636178),
 (153, 2.9020622558025204),
 (165, 3.5077587042963825),
 (181, 1.9860046651116294),
 (253, 3.5052233084147977),
 (423, 3.008343508343508),
 (494, 3.3739313292884723),
 (762, 2.414987080103359),
 (1396, 3.6338256484149856)]

In [181]:
# numero de evaluaciones por pelicula
ra=movie_ID_with_ratings_RDD.map(lambda x: (int(x[0]),len(list(x[1]))))

In [182]:
ra.take(1)

[(3826, 8898)]

In [179]:
predit=new_user_recommendations_rating_title_and_count_RDD.join(ri)

In [180]:
predit.count()

53888

In [183]:
ra.take(10)

[(3826, 8898),
 (104, 22247),
 (153, 38647),
 (165, 41244),
 (181, 3001),
 (253, 31302),
 (423, 2457),
 (494, 14504),
 (762, 11610),
 (1396, 13880)]

### Prediction plus Average Raiting

In [185]:
# id pelicula, calificiacion, descripcion y promedio
predit.take(10)

[(7020, ((6.6483911271058105, 'Proof (1991)'), 3.6684350132625996)),
 (53352, ((1.5919378376473539, 'Sheitan (2006)'), 2.73728813559322)),
 (162396, ((0.5027394875178217, 'Skiptrace (2016)'), 2.992957746478873)),
 (77688,
  ((6.5649538047104805,
    'Three Men and a Cradle (3 hommes et un couffin) (1985)'),
   3.26)),
 (107172,
  ((-4.519814433447564, 'Ways to Live Forever (2010)'), 2.9166666666666665)),
 (5148, ((4.490166697412283, 'Black Like Me (1964)'), 3.13265306122449)),
 (6552,
  ((6.95290110711904, 'Dirty Pretty Things (2002)'), 3.7538280329799765)),
 (120276, ((10.910280733097615, 'Symphony of the Soil (2012)'), 3.5)),
 (117468, ((6.782766733962799, 'Leontine (1968)'), 3.75)),
 (7488,
  ((5.499456165879572, "Berkeley in the '60s (1990)"), 3.5754716981132075))]

In [186]:
# se concatena la informacion obtenida 
cbe=predit.join(ra)

In [188]:
#idMovie, Rating,Title,Avarage_rating,number of calification

In [187]:
cbe.take(10)

[(82836, (((8.173770480240421, '"Life of Reilly'), 4.05), 10)),
 (72216, (((6.941365717690999, 'Fun is Beautiful (1980)'), 3.5), 5)),
 (99828, (((4.454620762760526, '"Unloved'), 3.0), 10)),
 (89208,
  (((-0.24894194710554274, 'Walled In (2009)'), 2.4545454545454546), 22)),
 (144432,
  (((12.110979729288715, 'Hindsight (2011)'), 3.5454545454545454), 11)),
 (106200, (((1.362721242371073, 'Hours (2013)'), 3.0083333333333333), 60)),
 (133812, (((4.482705562268945, 'Malarek (1988)'), 2.0), 1)),
 (123192,
  (((3.506352450057882,
     'The Land Before Time XI: Invasion of the Tinysauruses (2005)'),
    2.74),
   25)),
 (167796, (((6.2200321490352515, 'Lace Crater (2015)'), 3.0), 1)),
 (140184, (((0.5322806392729444, 'Kingdom Come (2014)'), 1.75), 2))]

In [208]:
tope_raiting=cbe.map(lambda x: (x[0],x[1][0][0][1],x[1][0][0][0],x[1][0][1],x[1][1]))

In [209]:
# id pelicula, calificiacion, predicicion y la calificacion promedio y numero de evaluaciones
tope_raiting.take(5)

[(82836, '"Life of Reilly', 8.173770480240421, 4.05, 10),
 (72216, 'Fun is Beautiful (1980)', 6.941365717690999, 3.5, 5),
 (99828, '"Unloved', 4.454620762760526, 3.0, 10),
 (89208, 'Walled In (2009)', -0.24894194710554274, 2.4545454545454546, 22),
 (144432, 'Hindsight (2011)', 12.110979729288715, 3.5454545454545454, 11)]

In [210]:
description_top_full=tope_raiting.map(lambda p: Row(id_movie=p[0], description=p[1],rat_pred=p[2],rat_people=p[3],num_class=p[4]))

In [228]:
# se toma en cuenta la mayoria de las calificaciones de la peliculas 
dfmaping7 = spark.createDataFrame(description_top_full)
dfmaping7.createOrReplaceTempView("description_top_full")
tabla7 = spark.sql("SELECT id_movie, description,rat_pred,rat_people,num_class FROM description_top_full where num_class>100  order by rat_pred desc limit 25")


In [229]:
# prediccion de las peliculas que se ajustan a la calificacion del usuario
tabla7.show()

+--------+--------------------+-----------------+------------------+---------+
|id_movie|         description|         rat_pred|        rat_people|num_class|
+--------+--------------------+-----------------+------------------+---------+
|  134252|That Munchhausen ...|9.946149883380144|3.9951923076923075|      104|
|    4993|"Lord of the Ring...| 9.59727298599667|4.0979428922321155|    61883|
|    7153|"Lord of the Ring...|9.508771821041329| 4.102853009864408|    57378|
|    5952|"Lord of the Ring...|9.495517197718872| 4.074705446592352|    56696|
|  159819|         Life (2009)|9.478605335986913| 4.183734939759036|      166|
|     858|          "Godfather|9.392935532028801| 4.332892749244713|    60904|
|  100553|Frozen Planet (2011)|9.278043527242602| 4.108208955223881|      402|
|   86504|Voices from the L...|9.199760046588125| 4.124444444444444|     1800|
|   93040|          "Civil War|9.192630580533987|4.0742459396751745|      431|
|     527|Schindler's List ...|9.188816069830942| 4.

In [189]:
#Pruebas

top_new_user=new_user_recommendations_rating_title_and_count_RDD.map(lambda x: (x[0],x[1][0],x[1][1]))

In [190]:
top_new_user.take(5)

[(116688, 1.6033434923158865, 'Prime Evil (1988)'),
 (7020, 6.6483911271058105, 'Proof (1991)'),
 (5928, 4.094505136857857, '"Border'),
 (53352, 1.5919378376473539, 'Sheitan (2006)'),
 (4992, 3.6223134947038815, 'Kate & Leopold (2001)')]

In [191]:
description_top=top_new_user.map(lambda p: Row(id_movie=p[0], description=p[2],rat=p[1]))

In [226]:
dfmaping6 = spark.createDataFrame(description_top)
dfmaping6.createOrReplaceTempView("description_top")
tabla6 = spark.sql("SELECT id_movie, description as Titulo_de_la_pelicula,rat FROM description_top where  order by rat desc limit 25")


In [227]:
tabla6.show()

+--------+---------------------+------------------+
|id_movie|Titulo_de_la_pelicula|               rat|
+--------+---------------------+------------------+
|    2958| Naturally Native ...| 29.66444448241168|
|   39191|    Blue Vinyl (2002)|29.549312948653437|
|  164809| Two-Legged Horse ...|27.840926480522626|
|  183053| An Insignificant ...|27.821871047637387|
|  190239| The World of Stai...|26.043097457542643|
|  108214|      "Prize of Peril|25.658081732326863|
|  154256| The Great Global ...|24.897666282399257|
|  125930| Björk at the Roya...|24.540265961029405|
|  185211|        Heroes (2008)|24.133265195369663|
|  158832|     The Store (1983)|24.020912300069703|
|  185259| It Happened in Pe...|23.845375683509587|
|  182521|       Thakara (1979)|23.605410195185357|
|  123044|    Stalingrad (1989)|23.602145493795994|
|  133339|          1915 (2015)|23.335834644766813|
|  168314| Be My Cat: A Film...| 23.30457692736836|
|  177619|    Cold Tango (2017)|22.777794696015178|
|  187987| "

**Referencias**:  
[1] https://databricks.com/blog/2014/07/23/scalable-collaborative-filtering-with-spark-mllib.html  
[2] https://spark.apache.org/docs/latest/api/python/index.html  
[3] https://github.com/jadianes/spark-movie-lens  
[4] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/libs/ml/als.html#:~:text=Examples-,Description,R%E2%89%88UTV.&text=Since%20matrix%20factorization%20can%20be,user%20and%20item%20matrix%2C%20respectively.