## Import libraries

In [1]:
import pyspark
from pyspark.sql.types import *
from pyspark.sql.functions import isnan, when, count, col

from pyspark.sql import SparkSession

In [2]:
import pyspark.sql.functions as sql_func

from pyspark.ml.recommendation import ALS, ALSModel

from pyspark.mllib.evaluation import RegressionMetrics, RankingMetrics
from pyspark.ml.evaluation import RegressionEvaluator

## Spark session

In [3]:
spark = SparkSession.builder.appName("recommender").getOrCreate()

In [8]:
# To get dataframe outputs that are nice looking into jupyter
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

## Read the data

In [4]:
data_schema = StructType([
    StructField('userId', IntegerType(), False),
    StructField('movieId', IntegerType(), False),
    StructField('rating', FloatType(), False),
    StructField('timestamp',IntegerType(), False)
])

In [6]:
df = spark.read.csv('ratings_small.csv', header=True, schema=data_schema).cache()

In [10]:
df = df.drop('timestamp')

In [11]:
df

userId,movieId,rating
1,31,2.5
1,1029,3.0
1,1061,3.0
1,1129,2.0
1,1172,4.0
1,1263,2.0
1,1287,2.0
1,1293,2.0
1,1339,3.5
1,1343,2.0


## Split the dataset in training and test sets

In [12]:
(training, test) = ratings.randomSplit([0.7, 0.3], seed=42)

## Build the recommendation model using ALS on the training data

In [13]:
als = ALS(
          rank=30,
          maxIter=4, 
          regParam=0.1,
          userCol='userId', 
          itemCol='movieId', 
          ratingCol='rating',
          coldStartStrategy='drop',
          implicitPrefs=False
         )

In [14]:
model = als.fit(training)

## Evaluate the model

In [15]:
predictions = model.transform(test)

 ### ...by computing the MAE on the test data

In [17]:
evaluator_mae = RegressionEvaluator(metricName='mae', labelCol='rating',
                                predictionCol='prediction')

evaluator_mae.evaluate(predictions)


0.7209757297941944

 ### ...by computing the RMSE on the test data

In [18]:
evaluator_rmse = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

evaluator_rmse.evaluate(predictions)

0.9282258446797904

## Recommendations

### ...top 3 movie recommendations for each user

list of [movieId, predicted rating]

In [23]:
model.recommendForAllUsers(3).show(5, truncate=False)

+------+------------------------------------------------------------+
|userId|recommendations                                             |
+------+------------------------------------------------------------+
|471   |[[3414, 4.8970337], [3030, 4.7155657], [599, 4.567352]]     |
|463   |[[67504, 4.840533], [83411, 4.840533], [83318, 4.840533]]   |
|496   |[[3414, 5.0007257], [916, 4.944495], [318, 4.9396644]]      |
|148   |[[67504, 5.3121815], [83411, 5.3121815], [83318, 5.3121815]]|
|540   |[[1150, 4.5969515], [1131, 4.518958], [838, 4.4133058]]     |
+------+------------------------------------------------------------+
only showing top 5 rows



### ...top 3 user recommendations for each movie

list of [userId, predicted rating]

In [27]:
model.recommendForAllItems(3).show(5, truncate=False)

+-------+-----------------------------------------------------+
|movieId|recommendations                                      |
+-------+-----------------------------------------------------+
|1580   |[[113, 5.014726], [543, 4.8468328], [145, 4.70602]]  |
|5300   |[[257, 4.802076], [546, 4.4074507], [577, 4.357668]] |
|6620   |[[357, 4.8438497], [156, 4.677115], [443, 4.6186557]]|
|7340   |[[621, 4.3526173], [112, 4.141518], [546, 4.000797]] |
|54190  |[[473, 4.580847], [113, 4.3279715], [296, 4.236387]] |
+-------+-----------------------------------------------------+
only showing top 5 rows



## Stop Spark session

In [28]:
spark.stop()