###Importing the SPARK SESSION

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Movie_Recomendation").getOrCreate()

###Importing the Dataset

In [0]:
df = spark.read.csv(
    "/FileStore/tables/12_movielens_ratings.csv",
    inferSchema=True,
    header=True,
)

In [0]:
df.show()

+-------+------+------+
|movieId|rating|userId|
+-------+------+------+
|      2|   3.0|     0|
|      3|   1.0|     0|
|      5|   2.0|     0|
|      9|   4.0|     0|
|     11|   1.0|     0|
|     12|   2.0|     0|
|     15|   1.0|     0|
|     17|   1.0|     0|
|     19|   1.0|     0|
|     21|   1.0|     0|
|     23|   1.0|     0|
|     26|   3.0|     0|
|     27|   1.0|     0|
|     28|   1.0|     0|
|     29|   1.0|     0|
|     30|   1.0|     0|
|     31|   1.0|     0|
|     34|   1.0|     0|
|     37|   1.0|     0|
|     41|   2.0|     0|
+-------+------+------+
only showing top 20 rows



###Description of Dataset

In [0]:
df.describe().show()

+-------+------------------+------------------+------------------+
|summary|           movieId|            rating|            userId|
+-------+------------------+------------------+------------------+
|  count|              1501|              1501|              1501|
|   mean| 49.40572951365756|1.7741505662891406|14.383744170552964|
| stddev|28.937034065088994| 1.187276166124803| 8.591040424293272|
|    min|                 0|               1.0|                 0|
|    max|                99|               5.0|                29|
+-------+------------------+------------------+------------------+



###Splitting the Dataset

In [0]:
training_set, test_set = df.randomSplit([0.8, 0.2])

###Let's Create a Model

In [0]:
from pyspark.ml.recommendation import ALS
recommender = ALS(userCol= 'userId' , itemCol='movieId' , ratingCol='rating' )
recommender = recommender.fit(training_set)

###Predicting Using The Test Set

In [0]:
prediction = recommender.transform(test_set)

In [0]:
prediction.show()

+-------+------+------+----------+
|movieId|rating|userId|prediction|
+-------+------+------+----------+
|      2|   4.0|    28| 1.9127501|
|      3|   1.0|    26| 1.4242408|
|      2|   1.0|    12| 1.2702665|
|      0|   1.0|    22| 0.8881091|
|      5|   2.0|    22| 1.7864375|
|      0|   1.0|     6|0.87236214|
|      2|   3.0|     6| 1.8105493|
|      5|   1.0|     6|    1.3277|
|      1|   1.0|    20| 1.0923432|
|      0|   1.0|     5| 1.3210535|
|      1|   1.0|    19| 1.6709225|
|      2|   1.0|    15| 0.7689161|
|      5|   2.0|    15|0.67889017|
|      3|   1.0|    17| 1.1852053|
|      5|   1.0|     8| 1.8474325|
|      4|   1.0|    23| 1.0941532|
|      2|   4.0|    10| 1.9065617|
|      2|   1.0|    25| 1.1367258|
|      2|   4.0|    21| 2.8393636|
|      3|   1.0|     0| 0.6998131|
+-------+------+------+----------+
only showing top 20 rows



###Evaluating the Model

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol='rating')
evaluator.evaluate(prediction)

Out[30]: 1.0883201354147838

###Making a Recommendation

In [0]:
test_set.show() 

+-------+------+------+
|movieId|rating|userId|
+-------+------+------+
|      0|   1.0|     5|
|      0|   1.0|     6|
|      0|   1.0|    22|
|      1|   1.0|    19|
|      1|   1.0|    20|
|      2|   1.0|    12|
|      2|   1.0|    15|
|      2|   1.0|    25|
|      2|   3.0|     6|
|      2|   4.0|    10|
|      2|   4.0|    21|
|      2|   4.0|    28|
|      3|   1.0|     0|
|      3|   1.0|    17|
|      3|   1.0|    26|
|      4|   1.0|    23|
|      4|   3.0|    18|
|      5|   1.0|     6|
|      5|   1.0|     8|
|      5|   2.0|    15|
+-------+------+------+
only showing top 20 rows



In [0]:
test_set.filter(test_set['userId']==23).show()

+-------+------+------+
|movieId|rating|userId|
+-------+------+------+
|      4|   1.0|    23|
|     18|   4.0|    23|
|     50|   4.0|    23|
|     67|   1.0|    23|
|     72|   1.0|    23|
|     73|   3.0|    23|
|     82|   2.0|    23|
+-------+------+------+



In [0]:
single_user = test_set.filter(test_set['userId']==23).select(['userId', 'movieId'])

In [0]:
single_user.show()


+------+-------+
|userId|movieId|
+------+-------+
|    23|      4|
|    23|     18|
|    23|     50|
|    23|     67|
|    23|     72|
|    23|     73|
|    23|     82|
+------+-------+



In [0]:
recommendations = recommender.transform(single_user)

In [0]:
recommendations.orderBy('prediction', ascending=False).show()

+------+-------+----------+
|userId|movieId|prediction|
+------+-------+----------+
|    23|     50| 3.5972977|
|    23|     18| 2.9967515|
|    23|     73| 2.0316827|
|    23|     82| 1.5852365|
|    23|     72| 1.2031747|
|    23|      4| 1.0941532|
|    23|     67| 0.6178534|
+------+-------+----------+

