In [4]:
import pandas as pd 
import numpy as np
import pyspark
from pyspark.sql import SparkSession

In [69]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.mllib.evaluation import RegressionMetrics
import math

In [None]:
../../data/ml-latest-small/movies.csv"

In [22]:
movies = pd.read_csv('../../data/ml-latest-small/movies.csv')
ratings = pd.read_csv('../../data/ml-latest-small/ratings.csv')
links = pd.read_csv('../../data/ml-latest-small/links.csv')
tags = pd.read_csv('../../data/ml-latest-small/tags.csv')

In [5]:
pwd

'/home/jovyan/work/Desktop/dsi_lax3/assignments/recommender-big-four/src/pyspark'

In [23]:
movies.head(3)

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance


In [24]:
ratings.head(3)

Unnamed: 0,userId,movieId,rating,timestamp
0,1,1,4.0,964982703
1,1,3,4.0,964981247
2,1,6,4.0,964982224


In [25]:
links.head(3)

Unnamed: 0,movieId,imdbId,tmdbId
0,1,114709,862.0
1,2,113497,8844.0
2,3,113228,15602.0


In [26]:
tags.head(3)

Unnamed: 0,userId,movieId,tag,timestamp
0,2,60756,funny,1445714994
1,2,60756,Highly quotable,1445714996
2,2,60756,will ferrell,1445714992


In [29]:
len(tags.tag.value_counts())

1589

In [31]:
spark = SparkSession.builder.getOrCreate()

In [32]:
spark_ratings = spark.createDataFrame(ratings)
spark_ratings.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [34]:
ratings.shape

(100836, 4)

In [36]:
#spliting into test and train

In [187]:
train, test = spark_ratings.randomSplit([0.8, 0.2], seed=427471138)

In [188]:
train.count(), test.count()

(80620, 20216)

In [189]:
only_in_test_movies_id = set(test.toPandas().movieId) - set(train.toPandas().movieId)
movie_only_in_test = test.filter(test.movieId.isin(only_in_test_movies_id))
train = train.union(movie_only_in_test)
test = test.filter(~test.movieId.isin(only_in_test_movies_id))
train.count(), test.count()

(81434, 19402)

In [170]:
als_model = ALS(userCol='userId',
                itemCol='movieId',
                ratingCol='rating',
                nonnegative=True,
                regParam=0.1,
               )

In [65]:
recommender = als_model.fit(train)
predictions_train = recommender.transform(train)
rmse_train = evaluator.evaluate(predictions_train)
print("Root-mean-square error for train = " + str(rmse_train))

Root-mean-square error for train = 0.5797296747595336


In [56]:
predictions = recommender.transform(test)

In [58]:
predictions.show()

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
|   602|    471|   4.0| 840876085| 2.8775423|
|    91|    471|   1.0|1112713817| 2.8220263|
|   372|    471|   3.0| 874415126| 3.0375798|
|   474|    471|   3.0| 974668858| 3.5075052|
|   176|    471|   5.0| 840109075| 3.4207172|
|   312|    471|   4.0|1043175564|  3.535123|
|    47|   1088|   4.0|1496205519| 2.7797272|
|   474|   1088|   3.5|1100292226| 3.0699775|
|    64|   1088|   4.0|1161559902| 3.1865246|
|   381|   1088|   3.5|1168664508| 3.7043102|
|   307|   1088|   3.0|1186162146|  2.253449|
|    10|   1088|   3.0|1455619275| 3.8501909|
|   221|   1088|   3.0|1111178147| 2.9575229|
|   116|   1088|   4.5|1337195649| 3.8618064|
|   600|   1088|   3.5|1237851304| 2.7213738|
|   104|   1088|   3.0|1048590956| 3.6653836|
|   268|   1238|   5.0| 940183153| 3.2435944|
|   223|   1342|   1.0|1226209388| 2.2344763|
|   503|   1342|   1.0|1335214611|

In [57]:
predictions.describe().show()

+-------+------------------+------------------+------------------+--------------------+------------------+
|summary|            userId|           movieId|            rating|           timestamp|        prediction|
+-------+------------------+------------------+------------------+--------------------+------------------+
|  count|             19402|             19402|             19402|               19402|             19402|
|   mean|  325.335274713947|17484.352953303784|3.5160550458715596|  1.19924517525719E9|3.3636595835958594|
| stddev|181.91498056371412| 32869.38172920491|1.0400403235971467|2.1556679412318325E8|0.7229868364028628|
|    min|                 1|                 1|               0.5|           828124615|        0.12791717|
|    max|               610|            187595|               5.0|          1537649775|         5.7049847|
+-------+------------------+------------------+------------------+--------------------+------------------+



In [126]:
rmse = evaluator.evaluate(predictions)
rmse

0.87988147234865

### __Model evaluation converting spark dataframe into RDD__

In [82]:
type(predictions.rating.rdd)

pyspark.sql.column.Column

In [106]:
pred_observ=predictions.rdd.map(lambda p: (p.prediction, p.rating))

In [125]:
metrics=RegressionMetrics(pred_observ)
print("MSE = %s" % metrics.meanSquaredError)
print("RMSE = %s" % metrics.rootMeanSquaredError)
# R-squared
print("R-squared = %s" % metrics.r2)
# Mean absolute error
print("MAE = %s" % metrics.meanAbsoluteError)
# Explained variance
print("Explained variance = %s" % metrics.explainedVariance)

MSE = 0.7741914053824281
RMSE = 0.87988147234865
R-squared = 0.2842351372688069
MAE = 0.6774633210092866
Explained variance = 0.5459074014987982


### __Using Cross Validation__

In [219]:
paramGrid = ParamGridBuilder()\
    .addGrid(als_model.rank, [1, 2,3,4,5,6,7 ,8, 9, 10, 11, 12]) \
    .addGrid(als_model.regParam, [0.1]) \
    .addGrid(als_model.nonnegative, [True, False]) \
    .build()

In [223]:
crossval = CrossValidator(estimator=als_model,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(labelCol='rating'),
                          numFolds=5)

In [224]:
cvModel = crossval.fit(train)

In [213]:
pred_cv=cvModel.transform(test)

In [225]:
pred_cv.show()

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
|   602|    471|   4.0| 840876085|  3.341786|
|    91|    471|   1.0|1112713817| 3.3725078|
|   372|    471|   3.0| 874415126|  3.303238|
|   474|    471|   3.0| 974668858| 3.3305218|
|   176|    471|   5.0| 840109075|  3.960265|
|   312|    471|   4.0|1043175564|  3.659879|
|    47|   1088|   4.0|1496205519| 2.6790051|
|   474|   1088|   3.5|1100292226| 3.0908682|
|    64|   1088|   4.0|1161559902| 3.4035919|
|   381|   1088|   3.5|1168664508| 3.2720296|
|   307|   1088|   3.0|1186162146|  2.720826|
|    10|   1088|   3.0|1455619275| 2.8571756|
|   221|   1088|   3.0|1111178147|  3.501938|
|   116|   1088|   4.5|1337195649| 3.0631793|
|   600|   1088|   3.5|1237851304|  2.801779|
|   104|   1088|   3.0|1048590956| 3.2802951|
|   268|   1238|   5.0| 940183153| 3.5292728|
|   223|   1342|   1.0|1226209388| 2.8213387|
|   503|   1342|   1.0|1335214611|

In [226]:
evaluator.evaluate(pred_cv)

0.8866663930831657

In [227]:
cvModel.getEstimatorParamMaps()[ np.argmax(cvModel.avgMetrics) ]

{Param(parent='ALS_9a0c49f769e3', name='rank', doc='rank of the factorization'): 1,
 Param(parent='ALS_9a0c49f769e3', name='regParam', doc='regularization parameter (>= 0).'): 0.1,
 Param(parent='ALS_9a0c49f769e3', name='nonnegative', doc='whether to use nonnegative constraint for least squares'): True}

In [228]:
#cvModel.getEstimatorParamMaps()

In [198]:
cvModel.getNumFolds()

5

In [243]:
als_model_final = ALS(userCol='userId',
                itemCol='movieId',
                ratingCol='rating',
                nonnegative=True,
                regParam=0.1,
                rank=30
               )

In [244]:
recommender = als_model_final.fit(train)
predictions_train = recommender.transform(train)
rmse_train = evaluator.evaluate(predictions_train)
print("Root-mean-square error for train = " + str(rmse_train))

Root-mean-square error for train = 0.5071579972115434


In [246]:
predictions = recommender.transform(test)
rmse_test = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse_test))

Root-mean-square error = 0.8741118780241278


In [254]:
data = [(1, 100)]
columns = ('userId', 'movieId')
one_row_spark_df = spark.createDataFrame(data, columns)

In [255]:
test = recommender.transform(one_row_spark_df).show()

+------+-------+----------+
|userId|movieId|prediction|
+------+-------+----------+
|     1|    100| 2.8237803|
+------+-------+----------+



In [273]:
users=spark_ratings.select(recommender.getUserCol()).distinct().limit(3)
userSubsetRecs = recommender.recommendForUserSubset(users, 10)

In [269]:
userSubsetRecs.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    26|[[78836, 4.239582...|
|   474|[[2295, 4.892765]...|
|    29|[[78836, 4.876668...|
+------+--------------------+



In [320]:
def movie_recomendation(user=1, movie=500, number=10):
    data = [(user, movie)]
    columns = ('userId', 'movieId')
    one_row_spark_df = spark.createDataFrame(data, columns)
    userSubsetRecs = recommender.recommendForUserSubset(one_row_spark_df, number)
    pd_format=userSubsetRecs.toPandas()
    list_recommend=[]
    for n in pd_format.recommendations[0]:
        list_recommend.append(n[0])
    return list_recommend

In [321]:
a=movie_recomendation(user=1, movie=500, number=15)#.show()

In [322]:
a

[27523,
 171495,
 3379,
 3653,
 78836,
 177593,
 27156,
 7748,
 148626,
 3508,
 3814,
 74226,
 184245,
 84273,
 179135]

In [300]:
a=a.toPandas()


In [306]:
type(a)

pandas.core.frame.DataFrame

In [314]:
list_recommend=[]
for n in pd_format.recommendations[0]:
    list_recommend.append(n[0])

In [315]:
acc

[27523, 171495, 3379, 3653, 78836, 177593, 27156, 7748, 148626, 3508]