In [99]:
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
import warnings
warnings.filterwarnings("ignore")

### Upload data

In [100]:
spark = SparkSession.builder.appName('MovieRec').getOrCreate()
movieRatings = spark.read.csv('ratings.dat',sep='::', inferSchema=True)
df_movies = pd.read_csv('movies.dat', sep='::', names = ['movieId', 'title', 'genre'])

                                                                                

In [101]:
movieRatings = movieRatings.selectExpr('_c0 as userId', '_c1 as movieId','_c2 as rating', '_c3 as timestamp')

In [102]:
movieRatings = movieRatings.drop('timestamp')

In [103]:
train, test = movieRatings.randomSplit([0.8,0.2])

### Create ALS Model

In [104]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [105]:
als = ALS(userCol='userId', itemCol='movieId', ratingCol='rating',
          coldStartStrategy='drop', nonnegative=True)

In [106]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating',
                                predictionCol='prediction')

In [107]:
model = als.fit(train)

                                                                                

In [108]:
predictions = model.transform(test)

In [109]:
rmse = evaluator.evaluate(predictions)
rmse

                                                                                

0.8236631704100738

In [110]:
predictions.sort('userId','rating').show()



+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|     1|    356|   5.0|  5.347277|
|     1|    594|   5.0|  4.664404|
|     1|    292|   5.0|  4.947699|
|     2|    802|   2.0| 3.2250655|
|     2|   1544|   3.0| 2.6198363|
|     2|   1391|   3.0| 1.7677895|
|     2|    376|   3.0|   2.86801|
|     2|   1210|   4.0| 3.2033632|
|     2|    260|   5.0| 3.3304393|
|     3|   1288|   3.0| 3.5263023|
|     3|   5299|   3.0| 3.9305263|
|     3|   7155|   3.5|  4.045279|
|     3|   4677|   4.0| 3.0889623|
|     3|   8529|   4.0| 3.7624245|
|     3|   4995|   4.5|  4.154476|
|     3|   1564|   4.5| 3.9158115|
|     4|    344|   2.0|  2.579411|
|     4|     21|   3.0| 3.8406863|
|     4|    349|   3.0|  4.399834|
|     4|    380|   3.0| 4.1321015|
+------+-------+------+----------+
only showing top 20 rows



                                                                                

### Generate Recomendations for all Users
Resturns top 5 recomendations

In [111]:
user_recomendations = model.recommendForAllUsers(5)

In [112]:
user_recomendations.show(truncate=False)



+------+----------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                     |
+------+----------------------------------------------------------------------------------------------------+
|148   |[{61742, 5.8244677}, {4454, 5.496298}, {42783, 5.388434}, {64280, 5.388434}, {33264, 5.304997}]     |
|463   |[{61742, 4.8351474}, {32657, 4.7572403}, {32090, 4.5386877}, {4454, 4.4550314}, {64197, 4.419184}]  |
|471   |[{5950, 5.759638}, {59655, 5.737817}, {32444, 5.3829975}, {7370, 5.342672}, {7140, 5.177517}]       |
|496   |[{61742, 5.656532}, {32444, 5.527679}, {64197, 5.0124598}, {53883, 4.9652433}, {8794, 4.915106}]    |
|833   |[{61742, 5.5502944}, {32657, 5.19999}, {64280, 5.102018}, {42783, 5.102018}, {64197, 5.015974}]     |
|1088  |[{61742, 4.755621}, {64280, 4.5944195}, {42783, 4.5944195}, {33264, 4.3024173}, {32657, 4.29546}]   |
|1238  |[{

                                                                                

Input user ID

In [113]:
ID = 1088

In [114]:
def recomendations_user(id):
    user = user_recomendations.filter(f'userId == {id}')
    recom = user.select('recommendations.movieId', 'recommendations.rating')
    movies = np.array(recom.select('movieId').collect())
    rating = np.array(recom.select('rating').collect())
    df = pd.DataFrame({'movieId': movies[0][0], 'rating': rating[0][0]})
    df = pd.merge(df, df_movies,left_on='movieId', right_on='movieId')
    return df

In [115]:
recomendations_user(ID)

                                                                                

Unnamed: 0,movieId,rating,title,genre
0,61742,4.755621,Maradona by Kusturica (2008),Documentary
1,64280,4.594419,Hospital (1970),Documentary
2,42783,4.594419,Shadows of Forgotten Ancestors (1964),Drama|Romance
3,33264,4.302417,Satan's Tango (Sátántangó) (1994),Drama
4,32657,4.29546,"Man Who Planted Trees, The (Homme qui plantait...",Animation|Drama
