# Objective
----------------

* We will be building a simple **Recommender System** using `Collaborative Filtering` algorithm.
* Dataset used here: Movies Dataset from Movielens: http://files.grouplens.org/datasets/movielens/ml-latest-small.zip
* Explicit ratings available in data

#### Import Required Packages

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, sum, split
from pyspark.sql.types import IntegerType
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

#### Read and Create needed dataframes

In [2]:
spark = SparkSession.builder.appName('Collaborative Filtering with spark-ml').getOrCreate()
spark

In [3]:
data_loc = '../Course Materials/spark-2-building-machine-learning-models/02/demos/datasets/movielens/'

In [4]:
ratings = spark.read.csv(data_loc+'ratings.csv', header=True, inferSchema=True)
ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [5]:
movies = spark.read.csv(data_loc+'/movies.csv', header=True, inferSchema=True)
movies.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



#### Explore and prepare data

In [6]:
# Drop 'timestamp' column as this is not needed for our recommendation system

ratings = ratings.drop('timestamp')
ratings.show(5)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|     31|   2.5|
|     1|   1029|   3.0|
|     1|   1061|   3.0|
|     1|   1129|   2.0|
|     1|   1172|   4.0|
+------+-------+------+
only showing top 5 rows



In [7]:
# Descriptive statistics of 'rating' column

ratings.select('rating').describe().show()

+-------+------------------+
|summary|            rating|
+-------+------------------+
|  count|            100004|
|   mean| 3.543608255669773|
| stddev|1.0580641091070326|
|    min|               0.5|
|    max|               5.0|
+-------+------------------+



In [8]:
# check for missing values

ratings.select(sum(ratings.rating.isNull().cast("int")).alias('num_null')).show()

+--------+
|num_null|
+--------+
|       0|
+--------+



##### Note:
-----------
Here:

* movieId <=> product
* userId <=> customer

With the recommendation system, we are trying to recommend movies to users with help of explicit ratings in `rating` column.

In [9]:
# Check for number of movies in ratings ~ movies dataframe

ratings_num_movies = ratings.select('movieId').distinct().count()

movies_num_movies = movies.select('movieId').distinct().count()

print(f'Number of Movies in Rating Dataset : {ratings_num_movies}')
print(f'Number of Movies in Movie Dataset : {movies_num_movies}')

Number of Movies in Rating Dataset : 9066
Number of Movies in Movie Dataset : 9125


### Collaborative Filtering Model using ALS Method

In [10]:
als = ALS(userCol='userId'
         ,itemCol='movieId'
         ,ratingCol='rating'
         ,coldStartStrategy='drop'
         )

**`Notes`**

* Default value of maxIter = 10 and regParam = 0.1 is used here.

In [11]:
# Split data into training and test with 80:20 ratio respectively

(training, test) = ratings.randomSplit([0.8, 0.2])

In [12]:
model = als.fit(training)

In [13]:
predictions = model.transform(test)

In [14]:
predictions.show(5)

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   534|    463|   4.0|  3.800033|
|    85|    471|   3.0| 3.3809352|
|   350|    471|   3.0|   3.65091|
|   602|    471|   3.0|  4.243025|
|   306|    471|   3.0| 3.7654939|
+------+-------+------+----------+
only showing top 5 rows



In [15]:
# Get the RMSE error metric

regevaluator = RegressionEvaluator(labelCol='rating')

rmse = regevaluator.evaluate(predictions)

print(f'RMSE Error: {rmse}')

RMSE Error: 0.9118128926056069


**`Note`**

* RMSE error indicates that the predicted rating is off by 0.91 on an average.

#### Generate All Recommendations

In [16]:
# Top 5 movies recommendation for all users

all_movie_recommendations = model.recommendForAllUsers(5)
all_movie_recommendations.printSchema()

root
 |-- userId: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- movieId: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



#### Verify Recommendation for a Sample user

In [17]:
# Check the sample for one user

all_movie_recommendations.filter(all_movie_recommendations['userid'] == 1).show(truncate=False)

+------+-------------------------------------------------------------------------------------------+
|userId|recommendations                                                                            |
+------+-------------------------------------------------------------------------------------------+
|1     |[[390, 4.0157294], [5114, 4.010362], [2563, 3.951115], [994, 3.7134087], [26840, 3.691651]]|
+------+-------------------------------------------------------------------------------------------+



In [18]:
# Get all movie IDs for Userid = 1

ratings.filter(ratings['userId'] == 1).select('movieId').distinct().show(20)

+-------+
|movieId|
+-------+
|     31|
|   1339|
|   2294|
|   1061|
|   1172|
|   2455|
|   2193|
|   1029|
|   2105|
|   2150|
|   1263|
|   2968|
|   3671|
|   1293|
|   1287|
|   1343|
|   1405|
|   1953|
|   1129|
|   1371|
+-------+



* As observed here, the top 5 recommended movies are not already seen by userid == 1

In [19]:
user1_movies = ratings.filter(ratings.userId == 1).select('movieId').distinct()

user1_movies = user1_movies.join(movies, ['movieId'], 'inner').select(user1_movies.movieId, 'title', 'genres')

In [20]:
distinct_genres_user1 = user1_movies.select(split(user1_movies.genres, '\\|').alias('genre'))\
                                    .select(explode('genre')).distinct()

distinct_genres_user1.count()

15

In [21]:
distinct_genres_user1.show(15)

+---------+
|      col|
+---------+
|    Crime|
|  Romance|
| Thriller|
|Adventure|
|    Drama|
|      War|
|  Fantasy|
|  Musical|
|Animation|
|   Horror|
|  Western|
|   Comedy|
| Children|
|   Action|
|   Sci-Fi|
+---------+



In [22]:
# Check for recommended Genres

recommended_movie_genre_user1 = all_movie_recommendations.filter(all_movie_recommendations.userId == 1)\
                                .select(explode('recommendations').alias('element'))\
                                .select(col('element.movieId').alias('movieId'))

recommended_movie_genre_user1 = recommended_movie_genre_user1.join(movies, ['movieId'], 'inner')\
                                .select(split(movies.genres, '\\|').alias('genre'))\
                                .select(explode('genre').alias('genre')).distinct()

recommended_movie_genre_user1.show(10)

+------+
| genre|
+------+
| Crime|
| Drama|
|Comedy|
|Action|
+------+



**`Note`**

* Oberve that the recommended genre is in line with what the user is watching.

#### Get `n` recommendation for any given user

In [23]:
def getRecommendationForUser(userId, numRec):
    '''
    Get `numrec` recommendations for given `userId`
    :return : Dataframe with the recommendation and rating
    '''
    
    userDF = spark.createDataFrame([userId], IntegerType()).toDF('userId')
    recommendations = model.recommendForUserSubset(userDF, numItems=numRec)
    recommendations = recommendations\
                        .select(explode(recommendations.recommendations).alias('recommendations'))\
                        .select(col('recommendations.movieId').alias('movieId')
                               ,col('recommendations.rating').alias('rating')
                               )
    recommendedMovies = recommendations.join(movies, ['movieId'], 'inner')\
                                       .select('title', 'genres', 'rating')\
                                       .orderBy('rating', ascending=False)
    
    return recommendedMovies

In [24]:
getRecommendationForUser(1, 5).show(5, truncate=False)

+-----------------------------------+-------------------------+---------+
|title                              |genres                   |rating   |
+-----------------------------------+-------------------------+---------+
|Faster Pussycat! Kill! Kill! (1965)|Action|Crime|Drama       |4.0157294|
|Bad and the Beautiful, The (1952)  |Drama                    |4.010362 |
|Dangerous Beauty (1998)            |Drama                    |3.951115 |
|Big Night (1996)                   |Comedy|Drama             |3.7134087|
|Sonatine (Sonachine) (1993)        |Action|Comedy|Crime|Drama|3.691651 |
+-----------------------------------+-------------------------+---------+



In [25]:
getRecommendationForUser(219, 3).show(3, False)

+----------------------------------------------------+-------------------------------+---------+
|title                                               |genres                         |rating   |
+----------------------------------------------------+-------------------------------+---------+
|Love Me If You Dare (Jeux d'enfants) (2003)         |Drama|Romance                  |5.10848  |
|Vampire Hunter D: Bloodlust (Banpaia hantâ D) (2000)|Animation|Fantasy|Horror|Sci-Fi|5.0026155|
|Looney, Looney, Looney Bugs Bunny Movie, The (1981) |Animation|Children|Comedy      |4.971336 |
+----------------------------------------------------+-------------------------------+---------+

