Import libraries

In [1]:
from pyspark.sql import SparkSession

In [2]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
#import findspark
#findspark.init()
import pandas as pd

Import SparkSession from pyspark.sql

Create an instance of SparkSession

In [3]:
spark = SparkSession.builder.appName('recommender').getOrCreate()

Print the tables in the catalog

In [4]:
print(spark.catalog.listTables())

[]


Load the file

In [5]:
file_path = "C:\\DATA612\\DATA612ASSINGMENTS-master\\ml-20m\\ratings.csv"

In [6]:
file_path_movies = "C:\\DATA612\\DATA612ASSINGMENTS-master\\ml-20m\\movies.csv"

Read the rating data

In [7]:
ratings = spark.read.csv(file_path, header=True)

In [8]:
movies = spark.read.csv(file_path_movies, header=True)

Show the ratings data

In [9]:
ratings.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|      2|   3.5|1112486027|
|     1|     29|   3.5|1112484676|
|     1|     32|   3.5|1112484819|
|     1|     47|   3.5|1112484727|
|     1|     50|   3.5|1112484580|
|     1|    112|   3.5|1094785740|
|     1|    151|   4.0|1094785734|
|     1|    223|   4.0|1112485573|
|     1|    253|   4.0|1112484940|
|     1|    260|   4.0|1112484826|
|     1|    293|   4.0|1112484703|
|     1|    296|   4.0|1112484767|
|     1|    318|   4.0|1112484798|
|     1|    337|   3.5|1094785709|
|     1|    367|   3.5|1112485980|
|     1|    541|   4.0|1112484603|
|     1|    589|   3.5|1112485557|
|     1|    593|   3.5|1112484661|
|     1|    653|   3.0|1094785691|
|     1|    919|   3.5|1094785621|
+------+-------+------+----------+
only showing top 20 rows



Add ratings to catalog

In [10]:
ratings.createOrReplaceTempView("ratings")

Look at the type of each column

In [11]:
ratings.printSchema()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)



Cast the columns to integers

In [12]:
ratings = ratings.withColumn("userId", ratings.userId.cast("integer"))
ratings = ratings.withColumn("movieId", ratings.movieId.cast("integer"))
ratings = ratings.withColumn("rating", ratings.rating.cast("float"))

In [13]:
movies = movies.withColumn("movieId", movies.movieId.cast("integer"))

In [14]:
ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: float (nullable = true)
 |-- timestamp: string (nullable = true)



In [15]:
movies.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



Eliminate the timestamp column in ratings dataframe

In [16]:
ratings = ratings.select(['userId', 'movieId', 'rating'])

Summirized Statistics on the data ratings

In [17]:
ratings.describe().show()

+-------+-----------------+------------------+------------------+
|summary|           userId|           movieId|            rating|
+-------+-----------------+------------------+------------------+
|  count|         20000263|          20000263|          20000263|
|   mean|69045.87258292554| 9041.567330339605|3.5255285642993797|
| stddev|40038.62665316201|19789.477445413086| 1.051988919294227|
|    min|                1|                 1|               0.5|
|    max|           138493|            131262|               5.0|
+-------+-----------------+------------------+------------------+



Splitting the data into train and test sets

In [18]:
training, test = ratings.randomSplit([0.8,0.2])

Create ALS model

In [19]:
# Build the recommendation model using ALS on the training data
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",  coldStartStrategy="drop")
model = als.fit(training)

RMSE evaluator

In [20]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol="prediction")

Bulid the cross validation

In [21]:
tvs = TrainValidationSplit(estimator=als, evaluator=evaluator)

Fit ALS to the training data

In [22]:
model = als.fit(training)

Summary statistic of predictions


Generate predictions

In [23]:
predictions = model.transform(test)
predictions.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
| 53338|    148|   1.0|  2.415784|
| 19067|    148|   2.0| 1.5705694|
| 87301|    148|   2.0| 2.8955607|
| 88527|    148|   2.0| 1.6705151|
| 92852|    148|   3.0| 2.5834496|
| 81218|    148|   1.0| 3.7903538|
| 91782|    148|   3.0| 3.6888664|
| 60081|    148|   2.0| 2.6538968|
| 94994|    148|   4.0| 3.2066813|
| 46380|    148|   4.0| 2.8373895|
|109121|    148|   4.0|  3.891463|
| 28361|    148|   4.0|  4.963693|
| 35498|    148|   3.0| 2.9753017|
| 61815|    148|   3.0|  4.476904|
|  4914|    148|   2.0| 2.8983908|
| 10434|    148|   3.0| 2.9129286|
| 44926|    148|   2.0| 2.6564178|
|  3439|    148|   1.0|  4.118425|
| 18797|    148|   2.0| 2.3620577|
| 55876|    148|   4.0| 4.3065305|
+------+-------+------+----------+
only showing top 20 rows



Evaluate with RMSE

In [24]:
rmse = evaluator.evaluate(predictions)

Evaluation metric and model parametres

In [25]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating')
rmse = evaluator.evaluate(predictions)
rmse

0.8122801775607266

In [26]:
user_recs = model.recommendForAllUsers(10)

In [27]:
user_recs.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   148|[[84829, 12.42627...|
|   463|[[66579, 9.799184...|
|   471|[[98126, 9.141088...|
|   496|[[73529, 9.036393...|
|   833|[[66289, 14.06172...|
|  1088|[[116951, 12.0657...|
|  1238|[[116951, 7.81076...|
|  1342|[[59680, 11.36778...|
|  1580|[[88674, 17.24096...|
|  1591|[[34466, 13.62567...|
|  1645|[[84829, 9.214763...|
|  1829|[[123571, 8.62417...|
|  1959|[[56779, 9.660917...|
|  2122|[[116489, 11.5092...|
|  2142|[[115921, 10.8225...|
|  2366|[[116951, 14.4172...|
|  2659|[[128366, 10.2077...|
|  2866|[[70495, 8.592925...|
|  3175|[[108768, 11.5466...|
|  3749|[[70806, 13.35640...|
+------+--------------------+
only showing top 20 rows



In [28]:
def get_recs_for_user(recs):
    recs = recs.select("recommendations.movieId", "recommendations.rating")
    movies = recs.select("movieId").toPandas().iloc[0,0]
    ratings = recs.select("rating").toPandas().iloc[0,0]
    ratings_matrix = pd.DataFrame(movies, columns = ["movieId"])
    ratings_matrix["ratings"] = ratings
    ratings_matrix_ps = spark.createDataFrame(ratings_matrix)
    return ratings_matrix_ps

Recommendation for user 148

In [29]:
user148_recs = get_recs_for_user(user_recs)

In [30]:
movies = movies.select('movieId', 'title')

In [31]:
ru = user148_recs.toPandas()
mr = movies .toPandas()
pd.merge(mr, ru, on = 'movieId').sort_values(by="ratings", ascending=False)


Unnamed: 0,movieId,title,ratings
1,84829,Three Crowns of the Sailor (Les trois couronne...,12.426271
6,98126,Capital (Le capital) (2012),11.787801
9,109769,"Keeper of Lost Causes, The (Kvinden i buret) (...",11.616945
2,86451,Love Torn in Dream (Combat d'amour en songe) (...,11.569497
7,100902,911 in Plane Site (2004),10.726349
8,103271,Sign 'o' the Times (1987),10.650475
4,92154,Faust (2011),10.552417
0,31606,Siberia (1998),10.522499
5,95521,Reminiscences of a Journey to Lithuania (1972),10.482668
3,88674,Edison Kinetoscopic Record of a Sneeze (1894),10.246676


## Conlusion

It is better to consider AWS or Microsft Asure to gain skills on using  clusters with large data.