# Big Data Project 
**Mohamed Mbarek** \
**Yafaa Ben Tili** \
3e Info C

Als Recommender System Pyspark Lab
Introduction
In this Project, we will implement a  recommendation system using ALS in Spark programming environment. \
Spark's machine learning library ml comes packaged with a very efficient implementation of the ALS algorithm that we looked at in the previous lesson. The lab will require you to put into practice your Spark programming skills for creating and manipulating PySpark DataFrames. We will go through a step-by-step process into developing a movie recommendation system using ALS and PySpark using the MovieLens dataset that we used in a previous lab.

**Part 1 : install all the modules**

In [None]:
# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"


# install findspark using pip
!pip install -q findspark


In [None]:
!pip install pyspark



In [None]:
import pyspark
sc = pyspark.SparkContext(appName="yourAppName")

In [None]:
print(sc)

<SparkContext master=local[*] appName=yourAppName>


In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
# Test the spark
df = spark.createDataFrame([{"hello": "world"} for x in range(1000)])
df.show(3, False)



+-----+
|hello|
+-----+
|world|
|world|
|world|
+-----+
only showing top 3 rows



In [None]:
df.show(10, False)

+-----+
|hello|
+-----+
|world|
|world|
|world|
|world|
|world|
|world|
|world|
|world|
|world|
|world|
+-----+
only showing top 10 rows



**Part 2 :Exploratory Data Analysis (EDA) using Pyspark**

In [None]:
# Get data
ratings = spark.read.option("header", "true").csv("ratings.csv")
ratings.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [None]:
ratings.printSchema()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [None]:
movies = spark.read.option("header", "true").csv("movies.csv")
movies.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [None]:
from pyspark.sql.functions import *

most_popular = ratings\
.groupBy("movieId")\
.agg(count("userId"))\
.withColumnRenamed("count(userId)", "num_ratings")\
.sort(desc("num_ratings"))

In [None]:
most_popular.show(15)

+-------+-----------+
|movieId|num_ratings|
+-------+-----------+
|    356|        329|
|    318|        317|
|    296|        307|
|    593|        279|
|   2571|        278|
|    260|        251|
|    480|        238|
|    110|        237|
|    589|        224|
|    527|        220|
|   2959|        218|
|      1|        215|
|   1196|        211|
|     50|        204|
|   2858|        204|
+-------+-----------+
only showing top 15 rows



In [None]:
most_popular_movies = most_popular.join(movies, most_popular.movieId == movies.movieId)
most_popular_movies.show(20, truncate=False)

+-------+-----------+-------+-----------------------------------------+-------------------------------------------+
|movieId|num_ratings|movieId|title                                    |genres                                     |
+-------+-----------+-------+-----------------------------------------+-------------------------------------------+
|296    |307        |296    |Pulp Fiction (1994)                      |Comedy|Crime|Drama|Thriller                |
|1090   |63         |1090   |Platoon (1986)                           |Drama|War                                  |
|115713 |28         |115713 |Ex Machina (2015)                        |Drama|Sci-Fi|Thriller                      |
|3210   |42         |3210   |Fast Times at Ridgemont High (1982)      |Comedy|Drama|Romance                       |
|88140  |32         |88140  |Captain America: The First Avenger (2011)|Action|Adventure|Sci-Fi|Thriller|War       |
|829    |9          |829    |Joe's Apartment (1996)                   |C

In [None]:
top_rated = ratings\
.groupBy("movieId")\
.agg(avg(col("rating")))\
.withColumnRenamed("avg(rating)", "avg_rating")\
.sort(desc("avg_rating"))

In [None]:
top_rated_movies = top_rated.join(movies, top_rated.movieId == movies.movieId)
top_rated_movies.show(15)

+-------+------------------+-------+--------------------+--------------------+
|movieId|        avg_rating|movieId|               title|              genres|
+-------+------------------+-------+--------------------+--------------------+
|    296| 4.197068403908795|    296| Pulp Fiction (1994)|Comedy|Crime|Dram...|
|   1090| 3.984126984126984|   1090|      Platoon (1986)|           Drama|War|
| 115713|3.9107142857142856| 115713|   Ex Machina (2015)|Drama|Sci-Fi|Thri...|
|   3210|3.4761904761904763|   3210|Fast Times at Rid...|Comedy|Drama|Romance|
|  88140|          3.546875|  88140|Captain America: ...|Action|Adventure|...|
|    829|2.6666666666666665|    829|Joe's Apartment (...|Comedy|Fantasy|Mu...|
|   2088|               2.5|   2088|       Popeye (1980)|Adventure|Comedy|...|
|   2294|3.2444444444444445|   2294|         Antz (1998)|Adventure|Animati...|
|   4821|               3.1|   4821|     Joy Ride (2001)|  Adventure|Thriller|
|  48738|             3.975|  48738|Last King of Sco

In [None]:
top_rated = ratings\
.groupBy("movieId")\
.agg(count("userId"), avg(col("rating")))\
.withColumnRenamed("count(userId)", "num_ratings")\
.withColumnRenamed("avg(rating)", "avg_rating")

In [None]:
top_rated_movies = top_rated.join(movies, top_rated.movieId == movies.movieId).sort(desc("avg_rating"), desc("num_ratings"))
top_rated_movies.show(15)

+-------+-----------+----------+-------+--------------------+--------------------+
|movieId|num_ratings|avg_rating|movieId|               title|              genres|
+-------+-----------+----------+-------+--------------------+--------------------+
|   3473|          2|       5.0|   3473|Jonah Who Will Be...|              Comedy|
|   6818|          2|       5.0|   6818|Come and See (Idi...|           Drama|War|
|     99|          2|       5.0|     99|Heidi Fleiss: Hol...|         Documentary|
|  78836|          2|       5.0|  78836|Enter the Void (2...|               Drama|
|     53|          2|       5.0|     53|     Lamerica (1994)|     Adventure|Drama|
|   6442|          2|       5.0|   6442| Belle époque (1992)|      Comedy|Romance|
|   1151|          2|       5.0|   1151| Lesson Faust (1994)|Animation|Comedy|...|
|   6402|          1|       5.0|   6402|  Siam Sunset (1999)|              Comedy|
| 149508|          1|       5.0| 149508|   Spellbound (2011)|      Comedy|Romance|
|   

In [None]:
# Calculate average, minimum, and maximum of num_ratings
top_rated_movies.select([mean('num_ratings'), min('num_ratings'), max('num_ratings')]).show(1)

+------------------+----------------+----------------+
|  avg(num_ratings)|min(num_ratings)|max(num_ratings)|
+------------------+----------------+----------------+
|10.369806663924312|               1|             329|
+------------------+----------------+----------------+



**Part 3 : Machine learning and Recommender System**

In [None]:
ratings = spark.read.option("inferSchema",True).option("header",True).csv("ratings.csv")


In [None]:
ratings.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [None]:
from pyspark.ml.recommendation import ALS
als = ALS(maxIter=10, regParam=0.5, userCol="userId", 
                      itemCol = "movieId", ratingCol =    "rating", coldStartStrategy = "drop")
train, test = ratings.randomSplit([0.8, 0.2])

In [None]:
#Training the Model
alsModel = als.fit(train)
#Generating Predictions
prediction = alsModel.transform(test)
prediction.show(10)

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
|   218|    471|   4.0|1111624874| 2.7378786|
|   176|    471|   5.0| 840109075|  3.370955|
|   448|    471|   4.0|1178980875| 2.6990294|
|    32|    471|   3.0| 856737165| 3.1321526|
|   260|    471|   4.5|1109409455| 3.0542073|
|   609|    833|   3.0| 847221080| 1.6091218|
|   563|   1088|   4.0|1440793700| 2.9440413|
|   555|   1088|   4.0| 978822670| 3.1608918|
|    51|   1088|   4.0|1230929736| 3.3741312|
|   391|   1088|   1.0|1030824424|  3.071959|
+------+-------+------+----------+----------+
only showing top 10 rows



In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName="mse", labelCol="rating",  predictionCol="prediction")
mse = evaluator.evaluate(prediction)
print(mse)


1.0036721546582492


In [None]:
recommended_movie_df = alsModel.recommendForAllUsers(3)
recommended_movie_df.show(10, False)

+------+----------------------------------------------------------+
|userId|recommendations                                           |
+------+----------------------------------------------------------+
|471   |[[40491, 5.46509], [3235, 4.982009], [2314, 4.950876]]    |
|463   |[[40491, 5.757683], [3235, 5.2526083], [2314, 5.2190056]] |
|496   |[[40491, 5.158339], [3235, 4.7060146], [2314, 4.6764665]] |
|148   |[[40491, 5.35207], [3235, 4.8833094], [2314, 4.852128]]   |
|540   |[[40491, 6.1194143], [3235, 5.5812225], [2314, 5.5458636]]|
|392   |[[40491, 5.1196513], [3235, 4.661579], [2314, 4.6337943]] |
|243   |[[40491, 6.63878], [3235, 6.0600753], [2314, 6.020464]]   |
|31    |[[40491, 5.724956], [3235, 5.2239046], [2314, 5.190165]]  |
|516   |[[40491, 5.4686184], [3235, 4.985235], [2314, 4.9541235]] |
|580   |[[40491, 5.259993], [3235, 4.7992754], [2314, 4.7684455]] |
+------+----------------------------------------------------------+
only showing top 10 rows



In [None]:
recommended_movie_df = alsModel.recommendForAllUsers(5)
recommended_movie_df.show(10, False)

+------+--------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                   |
+------+--------------------------------------------------------------------------------------------------+
|471   |[[40491, 5.46509], [3235, 4.982009], [2314, 4.950876], [156605, 4.918581], [25947, 4.8082123]]    |
|463   |[[40491, 5.757683], [3235, 5.2526083], [2314, 5.2190056], [156605, 5.1819143], [25947, 5.0690284]]|
|496   |[[40491, 5.158339], [3235, 4.7060146], [2314, 4.6764665], [156605, 4.6425056], [25947, 4.5415425]]|
|148   |[[40491, 5.35207], [3235, 4.8833094], [2314, 4.852128], [156605, 4.816863], [25947, 4.7128716]]   |
|540   |[[40491, 6.1194143], [3235, 5.5812225], [2314, 5.5458636], [156605, 5.507473], [25947, 5.386301]] |
|392   |[[40491, 5.1196513], [3235, 4.661579], [2314, 4.6337943], [156605, 4.6076865], [25947, 4.4993505]]|
|243   |[[40491, 6.63878], [