# ALS Recommender System

This project purpose to undergraduate thesis for bachelor of Informatics Universitas AMIKOM Yogyakarta

- Dataset : MovieLens 100k
- Methods : Alternating Least Square

### Define the libraries

In [62]:
import numpy as np
import os
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import udf, col, when
from IPython.display import Image
from IPython.display import display
from pyspark.ml.tuning import TrainValidationSplit
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.tuning import CrossValidator

In [2]:
# Define the files directory

os.chdir('Dataset/ALS1')
os.getcwd()

'/Users/creative/Documents/THESIS/Dataset/ALS1'

### Define the dataset 

In [3]:
spark = SparkSession.builder.appName('Mrecommend_demo').getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext

# Ratings and Movies data
ratings_df = spark.read.csv('ratings.csv', inferSchema=True, header=True)
movies_df = spark.read.csv('movies.csv', inferSchema=True, header=True)

In [4]:
# Ratings Data

ratings_df.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



### ALS Process

In [5]:
%%time
# Define training and testinf set
(training, test) = ratings_df.randomSplit([0.8, 0.2])

# ALS Model
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating",coldStartStrategy="drop", nonnegative = True)

# Train Data
model = als.fit(training)

# Predict Data
predictions = model.transform(test)

CPU times: user 22.6 ms, sys: 7.21 ms, total: 29.8 ms
Wall time: 11.6 s


### Error Measurement

In [7]:
def measurement(error):
    new_predictions = predictions.filter(col('prediction') != np.nan)
    evaluator = RegressionEvaluator(metricName=str(error), labelCol="rating", predictionCol="prediction")
    rmse = evaluator.evaluate(new_predictions)
    print(error.upper()," = "+ str(rmse))

In [8]:
%%time
measurement('mae')
measurement('rmse')

MAE  = 0.6809120654486693
RMSE  = 0.8838447267574263
CPU times: user 17.4 ms, sys: 6.28 ms, total: 23.6 ms
Wall time: 23.6 s


### Recommended Movies

In [10]:
%%time
userRecs = model.recommendForAllUsers(5)
userRecs.show(10,False)

+------+---------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                    |
+------+---------------------------------------------------------------------------------------------------+
|471   |[[177593, 4.881076], [306, 4.7430716], [3379, 4.7194037], [94959, 4.7078366], [89904, 4.6835995]]  |
|463   |[[7842, 5.0982447], [3379, 4.9183955], [60943, 4.840947], [171495, 4.8090496], [117531, 4.6842003]]|
|496   |[[3379, 4.9233146], [96004, 4.8863063], [3030, 4.8130994], [5747, 4.719459], [6669, 4.6813555]]    |
|148   |[[5034, 4.429165], [7121, 4.3770857], [93988, 4.3632903], [1357, 4.3478956], [47423, 4.338019]]    |
|540   |[[3379, 5.404235], [5075, 5.3878703], [171495, 5.208924], [7842, 5.1628175], [26171, 5.110066]]    |
|392   |[[86345, 5.2528], [3379, 5.1781983], [92535, 5.064887], [84847, 4.9740486], [96004, 4.964236]]     |
|243   |[[33090, 6.

## Experiments

In [11]:
def experiments(train,test):
    print('Train data for ', train*100,'% Train Data')
    (training, test) = ratings_df.randomSplit([train, test])
    als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating",coldStartStrategy="drop", nonnegative = True)
    model = als.fit(training)
    predictions = model.transform(test)
    
    err = ['mae','rmse']
    for i in err:
        new_predictions = predictions.filter(col('prediction') != np.nan)
        evaluator = RegressionEvaluator(metricName=str(i), labelCol="rating", predictionCol="prediction")
        score = evaluator.evaluate(new_predictions)
        print(i.upper()," = "+ str(score))

### EX001

- Train Data : 70%
- Test Data : 30% 
- Dataset : MovieLens 100k

In [12]:
%%time
experiments(0.7,0.3)

Train data for  70.0 % Train Data
MAE  = 0.6889196582855841
RMSE  = 0.8936742533549177
CPU times: user 38.9 ms, sys: 10.5 ms, total: 49.5 ms
Wall time: 27.2 s


### EX002

- Train Data : 80%
- Test Data : 20%
- Dataset : MovieLens 100k

In [13]:
%%time
experiments(0.8,0.2)

Train data for  80.0 % Train Data
MAE  = 0.6709044670295701
RMSE  = 0.8662591670454897
CPU times: user 38.8 ms, sys: 9.69 ms, total: 48.5 ms
Wall time: 24.7 s


### EX003
- Train Data : 90%
- Test Data : 10%
- Dataset : MovieLens 100k

In [14]:
%%time
experiments(0.9,0.1)

Train data for  90.0 % Train Data
MAE  = 0.6690494628737167
RMSE  = 0.8671406988644633
CPU times: user 44.8 ms, sys: 12.3 ms, total: 57 ms
Wall time: 24.1 s


In [72]:
def als_predict(n,error):
    (training, test) = ratings_df.randomSplit([0.8, 0.2])
    als = ALS(rank=n,userCol="userId", itemCol="movieId", ratingCol="rating",coldStartStrategy="drop", nonnegative = True)
    model = als.fit(training)
    predictions = model.transform(test)
    
    new_predictions = predictions.filter(col('prediction') != np.nan)
    evaluator = RegressionEvaluator(metricName=str(error), labelCol="rating", predictionCol="prediction")
    score = evaluator.evaluate(new_predictions)
    return score

def als_predict_itter(n,error):
    (training, test) = ratings_df.randomSplit([0.8, 0.2])
    als = ALS(rank=n,userCol="userId", itemCol="movieId", ratingCol="rating",coldStartStrategy="drop", nonnegative = True)
    model = als.fit(training)
    predictions = model.transform(test)
    
    new_predictions = predictions.filter(col('prediction') != np.nan)
    evaluator = RegressionEvaluator(metricName=str(error), labelCol="rating", predictionCol="prediction")
    score = evaluator.evaluate(new_predictions)
    return score

### EX004

Impact of Rank

- Rank : 1 to 100
- Error Measurement : MAE and RMSE

In [73]:
# MAE
mae_score = []

for n in range(1,100):
    score = als_predict(n,'mae')
    mae_score.append('%.4f' % score)

mae_score

['0.6844']

In [None]:
# RMSE
rmse_score = []

for n in range(1,100):
    score = als_predict(n,'rmse')
    mae_score.append('%.4f' % score)

rmse_score

### EX005

Impact of MaxItter

- Rank : 1 to 100
- Error Measurement : MAE and RMSE

In [None]:
# MAE
mae_score = []

for n in range(1,100):
    score = als_predict_itter(n,'mae')
    mae_score.append('%.4f' % score)

mae_score

In [None]:
# RMSE
rmse_score = []

for n in range(1,100):
    score = als_predict_itter(n,'rmse')
    mae_score.append('%.4f' % score)

rmse_score

### Hyperparameters Tuning

In [59]:
#Tune model using ParamGridBuilder

param_grid = ParamGridBuilder().addGrid(als.rank, [12,13,14]).addGrid(als.maxIter, [18,19,20]).addGrid(als.regParam, [.17, 0.18, 0.19]).build()
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

In [60]:
cv = CrossValidator(estimator = als,
                    estimatorParamMaps = param_grid,
                    evaluator = evaluator,
                    numFolds = 5) 

In [61]:
model = cv.fit(training)

In [71]:
best_model = model.bestModel
predictions = best_model.transform(test)
rmse = evaluator.evaluate(predictions)
print("RMSE = " + str(rmse))
print("=======Best Model========")
print('')
print('Dataset : MovieLens 100k')
print("Rank : ", best_model.rank)
print("MaxIter: ", best_model._java_obj.parent().getMaxIter())
print("RegParam: ", best_model._java_obj.parent().getRegParam())

RMSE = 0.8765226258505581

Dataset : MovieLens 100k
Rank :  14
MaxIter:  20
RegParam:  0.17


### Recommended Movies

In [77]:
als = ALS(maxIter=20, regParam = 0.17, rank=14,userCol="userId", itemCol="movieId", ratingCol="rating",coldStartStrategy="drop", nonnegative = True)
model = als.fit(training)
predictions = model.transform(test)

err = ['mae','rmse']
for i in err:
    new_predictions = predictions.filter(col('prediction') != np.nan)
    evaluator = RegressionEvaluator(metricName=str(i), labelCol="rating", predictionCol="prediction")
    score = evaluator.evaluate(new_predictions)
    print(i.upper()," = "+ str(score))

MAE  = 0.6836160106470243
RMSE  = 0.8765226258505581


In [75]:
%%time
userRecs = model.recommendForAllUsers(5)
userRecs.show(10,False)

+------+---------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                    |
+------+---------------------------------------------------------------------------------------------------+
|471   |[[6818, 4.8711944], [96004, 4.7894807], [89904, 4.679346], [8477, 4.655531], [7767, 4.5522065]]    |
|463   |[[96004, 5.078626], [60943, 4.8791623], [7481, 4.8791623], [59018, 4.8791623], [171495, 4.7820797]]|
|496   |[[6818, 4.9922967], [96004, 4.7327614], [8477, 4.696682], [89759, 4.5038633], [6666, 4.4259048]]   |
|148   |[[77846, 4.596333], [25906, 4.596333], [93008, 4.596333], [67618, 4.3877873], [96004, 4.3683186]]  |
|540   |[[96004, 5.5427637], [6818, 5.2318788], [132333, 5.142784], [171495, 5.135109], [102217, 5.103898]]|
|392   |[[96004, 4.5766044], [60943, 4.433956], [7481, 4.433956], [59018, 4.433956], [170705, 4.4278316]]  |
|243   |[[67618, 6.