In [1]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

from pyspark.ml.evaluation import RegressionEvaluator # For evaluating the recommendation
from pyspark.ml.recommendation import ALS # Pyspark implementation of Collaborative filtering

import random
import pandas as pd

import warnings
warnings.filterwarnings('ignore') # To ignore warnings in Jupyter notebook

In [2]:
spark = SparkSession.builder.appName('recommender').getOrCreate()

24/04/17 11:38:08 WARN Utils: Your hostname, ubuntuonm1 resolves to a loopback address: 127.0.1.1; using 192.168.64.2 instead (on interface enp0s1)
24/04/17 11:38:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/17 11:38:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## The Data

We will use famous MovieLens dataset, which is one of the most common datasets used when implementing and testing recommender engines. It contains 100k movie ratings from 943 users and a selection of 1682 movies.

We can then read in the **u.data** file, which contains the full dataset. You can read a brief description of the dataset [here](http://files.grouplens.org/datasets/movielens/ml-100k-README.txt).

You can download the dataset [here](http://files.grouplens.org/datasets/movielens/ml-100k.zip) or just use the u.data file that is already included in this folder.


In [3]:
column_names = ['user_id', 'item_id', 'rating', 'timestamp']
df = pd.read_csv('u.data', sep='\t', names=column_names)
df.head()

Unnamed: 0,user_id,item_id,rating,timestamp
0,0,50,5,881250949
1,0,172,5,881250949
2,0,133,1,881250949
3,196,242,3,881250949
4,186,302,3,891717742


In [4]:
movie_titles = pd.read_csv("Movie_Id_Titles") # Reading movies titile data for merding them with the rating data
movie_titles.head()

Unnamed: 0,item_id,title
0,1,Toy Story (1995)
1,2,GoldenEye (1995)
2,3,Four Rooms (1995)
3,4,Get Shorty (1995)
4,5,Copycat (1995)


Merging movie titles from 'Movie_Id_Titles.csv' onto the main dataframe.

In [5]:
df = pd.merge(df,movie_titles,on='item_id')
df.head()

Unnamed: 0,user_id,item_id,rating,timestamp,title
0,0,50,5,881250949,Star Wars (1977)
1,290,50,5,880473582,Star Wars (1977)
2,79,50,4,891271545,Star Wars (1977)
3,2,50,5,888552084,Star Wars (1977)
4,8,50,5,879362124,Star Wars (1977)


We can check user ids and the number of users in the data for getting an understanding of the data.

In [6]:
print('Number of users:', df['user_id'].nunique())
df['user_id'].unique()  # Unique users in dataset

Number of users: 944


array([  0, 290,  79,   2,   8, 274, 227,  99, 305, 108,  63, 234,  97,
       117,  70, 318, 145, 124, 253, 271, 254, 326, 213,  80, 322,  28,
       267, 130, 262, 230, 316, 327, 299, 144, 162,  48, 141, 251, 297,
       137, 154, 257, 269, 123,  94, 330, 368,  77, 321, 381,  43, 286,
       247,  68, 150, 214, 210, 158, 279, 288, 280,  25, 102, 209, 103,
       398, 379, 409, 386, 407, 101, 201, 203, 283, 128, 200,  58,  54,
       311,  96,   4, 401,  71,  59, 176, 393,  14,  12, 432, 378, 245,
       115, 406, 444, 160,  23, 433, 125, 320, 438, 461, 250, 437, 119,
        27, 275, 389, 445, 270, 422, 465, 467, 157, 276,  65,  10, 435,
       458, 478,   6, 192, 387, 405, 287,  64, 369, 272, 394, 334, 346,
       352, 177, 153,  62, 148, 325,  15, 456, 371, 506, 360, 509, 533,
       454, 340, 239, 221, 215, 194,  45,   5, 479, 303, 494, 504, 347,
       161, 313, 113, 453, 540, 402,  66, 523, 536, 527, 277, 549, 551,
       530, 339,  49, 539, 581, 484, 294, 592, 463, 217,  87,  9

In [7]:
test_users_id = random.sample(sorted(df['user_id'].unique()), 10) # Selected random 10 users for testing
print('User ids selected for testing/evaluation of the model:',test_users_id)

User ids selected for testing/evaluation of the model: [359, 162, 62, 243, 767, 421, 841, 546, 358, 847]


### Creating a spark data frame and splitting data into tran and test data frames.

In [8]:
spark_df = spark.createDataFrame(df.iloc[:,:]) # Spark dataframe containing all user data
spark_df.show(20)

                                                                                

+-------+-------+------+---------+----------------+
|user_id|item_id|rating|timestamp|           title|
+-------+-------+------+---------+----------------+
|      0|     50|     5|881250949|Star Wars (1977)|
|    290|     50|     5|880473582|Star Wars (1977)|
|     79|     50|     4|891271545|Star Wars (1977)|
|      2|     50|     5|888552084|Star Wars (1977)|
|      8|     50|     5|879362124|Star Wars (1977)|
|    274|     50|     5|878944679|Star Wars (1977)|
|    227|     50|     4|879035347|Star Wars (1977)|
|     99|     50|     5|885679998|Star Wars (1977)|
|    305|     50|     5|886321799|Star Wars (1977)|
|    108|     50|     4|879879739|Star Wars (1977)|
|     63|     50|     4|875747292|Star Wars (1977)|
|    234|     50|     4|892079237|Star Wars (1977)|
|     97|     50|     5|884239471|Star Wars (1977)|
|    117|     50|     5|880126022|Star Wars (1977)|
|     70|     50|     4|884064188|Star Wars (1977)|
|    318|     50|     2|884495696|Star Wars (1977)|
|    145|   

In [9]:
train_spark_df = spark_df.filter(~spark_df.user_id.isin(test_users_id)) #Filtering the other users for creating training data
train_spark_df.show()

+-------+-------+------+---------+----------------+
|user_id|item_id|rating|timestamp|           title|
+-------+-------+------+---------+----------------+
|      0|     50|     5|881250949|Star Wars (1977)|
|    290|     50|     5|880473582|Star Wars (1977)|
|     79|     50|     4|891271545|Star Wars (1977)|
|      2|     50|     5|888552084|Star Wars (1977)|
|      8|     50|     5|879362124|Star Wars (1977)|
|    274|     50|     5|878944679|Star Wars (1977)|
|    227|     50|     4|879035347|Star Wars (1977)|
|     99|     50|     5|885679998|Star Wars (1977)|
|    305|     50|     5|886321799|Star Wars (1977)|
|    108|     50|     4|879879739|Star Wars (1977)|
|     63|     50|     4|875747292|Star Wars (1977)|
|    234|     50|     4|892079237|Star Wars (1977)|
|     97|     50|     5|884239471|Star Wars (1977)|
|    117|     50|     5|880126022|Star Wars (1977)|
|     70|     50|     4|884064188|Star Wars (1977)|
|    318|     50|     2|884495696|Star Wars (1977)|
|    145|   

In [10]:
test_spark_df = spark_df.filter(spark_df.user_id.isin(test_users_id)) #Filtering the test users for creating test data
test_spark_df.show(20)

+-------+-------+------+---------+--------------------+
|user_id|item_id|rating|timestamp|               title|
+-------+-------+------+---------+--------------------+
|    162|     50|     5|877635662|    Star Wars (1977)|
|     62|     50|     5|879372216|    Star Wars (1977)|
|    421|     50|     5|892241294|    Star Wars (1977)|
|    546|     50|     5|885140368|    Star Wars (1977)|
|    359|     50|     5|886453271|    Star Wars (1977)|
|    847|     50|     4|878774969|    Star Wars (1977)|
|    767|    172|     5|891462614|Empire Strikes Ba...|
|    421|    172|     5|892241707|Empire Strikes Ba...|
|     62|    172|     5|879373794|Empire Strikes Ba...|
|    847|    172|     4|878939803|Empire Strikes Ba...|
|    847|    133|     3|878941027|Gone with the Win...|
|    767|    242|     4|891462614|        Kolya (1996)|
|     62|    302|     3|879371909|L.A. Confidential...|
|    421|    302|     4|892241236|L.A. Confidential...|
|    841|    302|     5|889066959|L.A. Confident

In [11]:
test_spark_df.groupBy('user_id').count().show() # Count of movies each user rated

+-------+-----+
|user_id|count|
+-------+-----+
|    243|   81|
|    847|  146|
|    421|   62|
|    546|   59|
|    841|   31|
|     62|  232|
|    162|   42|
|    359|   27|
|    767|   37|
|    358|   43|
+-------+-----+



### Train model

We can first implement training and evaluation code to set and understand the training pipeline.

In [12]:
# Build the recommendation model using ALS on the training data
als = ALS(maxIter=5, regParam=0.01, userCol="user_id", itemCol="item_id", ratingCol="rating", coldStartStrategy='drop')
model = als.fit(train_spark_df)

24/04/17 11:38:18 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/04/17 11:38:18 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


### Get predictions on train data

In [13]:
predictions = model.transform(train_spark_df, )
predictions.show(20)

+-------+-------+------+---------+--------------------+----------+
|user_id|item_id|rating|timestamp|               title|prediction|
+-------+-------+------+---------+--------------------+----------+
|    148|     50|     5|877016805|    Star Wars (1977)|  5.386224|
|    463|     50|     4|890530818|    Star Wars (1977)|   3.61834|
|    496|     50|     5|876072633|    Star Wars (1977)|  4.020207|
|    471|     50|     3|889827757|    Star Wars (1977)| 3.1449816|
|    833|     50|     2|875035718|    Star Wars (1977)| 3.4285824|
|    148|    172|     5|877016513|Empire Strikes Ba...|  5.437351|
|    496|    172|     5|876065558|Empire Strikes Ba...| 3.7307258|
|    833|    172|     2|875224482|Empire Strikes Ba...| 3.2120566|
|    471|    172|     4|889827822|Empire Strikes Ba...| 3.2251077|
|    496|    133|     5|876066567|Gone with the Win...|   4.04644|
|    148|    133|     5|877019251|Gone with the Win...| 4.4705496|
|    463|    242|     2|889935629|        Kolya (1996)|  4.152

### Evaluation on the training data

In [14]:
def evaluate_recommendation(recommendation_df):
    """Function for evaluating the recommendation predictions form the model. 
        Evaluation metrics calculated-
            1.Root mean squared error of actual user ratings and predicted user ratings.
            
        Parameters:
            recommendation_df(spark dataframe): Dataframe consisting of user rating and predicted ratings for each item/movie.
        Return: None
    """
    evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")
    rmse = evaluator.evaluate(recommendation_df)
    print("Root-mean-square error = " + str(rmse))

evaluate_recommendation(predictions)

Root-mean-square error = 0.7154468805153779


**Note:** The above predictions is done for training data. The evaluation should never be done on training data and should be done on test data. For this recommendation system, the user needs to give ratings for some movies(items) for collaborative filtering to work. This issue is common in recommendation systems especially in collaborative filtering and is also known as '*cold start problem*'.

**Cold start issue:** Occurs when a new user or a new item enters the system, and there is not enough data or feedback to generate accurate and personalized recommendations. This constitutes a problem mainly for collaborative filtering algorithms due to the fact that they rely on the item's interactions to make recommendations.

To solve this issue we will take some samples for each user in the test data and add it to the training data and fit the model again for getting recommendations of other movies for that users.

In [15]:
def split_test_to_input_true(test_spark_df, user_id, split_ratio=(0.2,0.8)):
    '''Here we take the random samples for each user and split them in input samples and test samples, 
        -input samples is user for solving cold start problem for the model.
        -test samples is used to evaluate the model.
    '''
    user_ratings_input_samples, user_ratings_true_samples = test_spark_df.where(test_spark_df.user_id==user_id).randomSplit(split_ratio)
    return user_ratings_input_samples, user_ratings_true_samples

user_ratings_input_samples, user_ratings_true_samples = split_test_to_input_true(test_spark_df, test_users_id[0])

user_ratings_input_samples.show()

+-------+-------+------+---------+--------------------+
|user_id|item_id|rating|timestamp|               title|
+-------+-------+------+---------+--------------------+
|    359|      1|     4|886453214|    Toy Story (1995)|
|    359|    273|     4|886453325|         Heat (1995)|
|    359|    405|     3|886453354|Mission: Impossib...|
|    359|      7|     5|886453325|Twelve Monkeys (1...|
|    359|    298|     5|886453354|     Face/Off (1997)|
|    359|    748|     3|886453271|   Saint, The (1997)|
|    359|    121|     4|886453373|Independence Day ...|
|    359|    295|     3|886453325|    Breakdown (1997)|
+-------+-------+------+---------+--------------------+



In [16]:
user_ratings_true_samples.show()

+-------+-------+------+---------+--------------------+
|user_id|item_id|rating|timestamp|               title|
+-------+-------+------+---------+--------------------+
|    359|     50|     5|886453271|    Star Wars (1977)|
|    359|    117|     4|886453305|    Rock, The (1996)|
|    359|    118|     3|886453402|      Twister (1996)|
|    359|    181|     5|886453305|Return of the Jed...|
|    359|    246|     3|886453214|  Chasing Amy (1997)|
|    359|    268|     4|886453490|  Chasing Amy (1997)|
|    359|    323|     3|886453431| Dante's Peak (1997)|
|    359|    546|     3|886453373| Broken Arrow (1996)|
|    359|    250|     4|886453354|Fifth Element, Th...|
|    359|    286|     5|886453161|English Patient, ...|
|    359|    408|     5|886453239|Close Shave, A (1...|
|    359|    455|     4|886453305|Jackie Chan's Fir...|
|    359|    751|     4|886453467|Tomorrow Never Di...|
|    359|    930|     4|886453402|Chain Reaction (1...|
|    359|     24|     3|886453354|Rumble in the 

In [17]:
test_users_ratings_input_samples = None # Create a test data samples for solving cold start problem. This can also be done by asking new users to rate the movies/items which they have already seen/used.
test_users_ratings_true_samples = None # True ratings from users for evaluation of recommendation model

for user_id in test_users_id:
    user_ratings_input_samples, user_ratings_true_samples = split_test_to_input_true(test_spark_df, user_id)

    if test_users_ratings_input_samples==None: # Initialising the test data input samples
        test_users_ratings_input_samples, test_users_ratings_true_samples = user_ratings_input_samples, user_ratings_true_samples
    else:
        test_users_ratings_input_samples = test_users_ratings_input_samples.union(user_ratings_input_samples)
        test_users_ratings_true_samples = test_users_ratings_true_samples.union(user_ratings_true_samples)

print("Shape of test_spark_df",(test_spark_df.count(), len(test_spark_df.columns)))
print("Shape of test_users_ratings_input_samples",(test_users_ratings_input_samples.count(), len(test_users_ratings_input_samples.columns)))
print("Shape of test_users_ratings_true_samples",(test_users_ratings_true_samples.count(), len(test_users_ratings_true_samples.columns)))

Shape of test_spark_df (760, 5)


                                                                                

Shape of test_users_ratings_input_samples (151, 5)




Shape of test_users_ratings_true_samples (609, 5)


                                                                                

### Prediction of ratings for new users

In [18]:

def new_user_recs(new_user_rating_samples, user_ratings_true_samples, rating_df):
    '''Function to recommending new user with movies/items utilizing ALS alhorithm for implementing colaborative filtering

        Prameters:
            new_user_rating_samples (spark dataframe): Dataframe consisting of new user rating data to avoid cold start issue.
            user_ratings_true_samples (spark dataframe): Dataframe consisting of item ids for making predictions of ratings for model.
            rating_df (spark dataframe): Dataframe for consisting of all the users rating data for training the model.
        Returns:
            recommendations (spark dataframe): Dataframe consisting of all the predictions of ratings for the test data
    '''
    
    user_id = new_user_rating_samples.user_id
    
    # combine the new ratings df with the rating_df
    movie_ratings_combined = rating_df.union(new_user_rating_samples)
    
    # create an ALS model and fit it
    als = ALS(maxIter=10, rank=50, regParam=0.15, userCol='user_id', itemCol='item_id', ratingCol='rating', coldStartStrategy='drop')
    model = als.fit(movie_ratings_combined)
    
    # make recommendations for the evaluation test samples of the users
    recommendations = model.transform(user_ratings_true_samples)

    return recommendations

In [19]:
user_predicted_recoms = new_user_recs(test_users_ratings_input_samples, test_users_ratings_true_samples ,spark_df)
print("Shape of test_users_ratings_true_samples",(user_predicted_recoms.count(), len(user_predicted_recoms.columns)))
user_predicted_recoms.show()

                                                                                

Shape of test_users_ratings_true_samples (609, 6)


[Stage 301:>(32 + 4) / 40][Stage 325:> (0 + 0) / 10][Stage 326:> (0 + 0) / 10]0]

+-------+-------+------+---------+--------------------+----------+
|user_id|item_id|rating|timestamp|               title|prediction|
+-------+-------+------+---------+--------------------+----------+
|    359|      1|     4|886453214|    Toy Story (1995)| 4.1119237|
|    359|     50|     5|886453271|    Star Wars (1977)| 4.6748705|
|    359|    117|     4|886453305|    Rock, The (1996)|   3.94903|
|    359|    118|     3|886453402|      Twister (1996)| 3.4114106|
|    359|    181|     5|886453305|Return of the Jed...| 4.3436007|
|    359|    246|     3|886453214|  Chasing Amy (1997)| 4.0255575|
|    359|    323|     3|886453431| Dante's Peak (1997)| 3.1419852|
|    359|    405|     3|886453354|Mission: Impossib...|  3.534722|
|    359|    250|     4|886453354|Fifth Element, Th...| 3.7331085|
|    359|    286|     5|886453161|English Patient, ...| 3.7667239|
|    359|    298|     5|886453354|     Face/Off (1997)| 3.9854944|
|    359|    408|     5|886453239|Close Shave, A (1...| 4.7247

                                                                                

### Evaluation of the recommendations using test data

In [20]:
evaluate_recommendation(user_predicted_recoms)

[Stage 356:>(33 + 4) / 40][Stage 380:> (0 + 0) / 10][Stage 381:> (0 + 0) / 10]0]

Root-mean-square error = 0.8293336492994183


                                                                                

## Movie Recommendations of test users:

In [21]:
for user_id in test_users_id:
    single_user_predicted_recoms = user_predicted_recoms.where(user_predicted_recoms.user_id==user_id)
    singel_user_recomm_movies  = single_user_predicted_recoms.sort('prediction',ascending=False).select('title').rdd.flatMap(lambda x: x).take(5)
    print(f'Recommendation for User {user_id}')
    for title in singel_user_recomm_movies:
        print('\t', title)
    print()

                                                                                

Recommendation for User 359
	 Close Shave, A (1995)
	 Star Wars (1977)
	 Titanic (1997)
	 Return of the Jedi (1983)
	 Toy Story (1995)



                                                                                

Recommendation for User 162
	 Star Wars (1977)
	 Fugitive, The (1993)
	 Die xue shuang xiong (Killer, The) (1989)
	 Return of the Jedi (1983)
	 Apollo 13 (1995)



                                                                                

Recommendation for User 62
	 Wallace & Gromit: The Best of Aardman Animation (1996)
	 Casablanca (1942)
	 Usual Suspects, The (1995)
	 Star Wars (1977)
	 Shawshank Redemption, The (1994)



                                                                                

Recommendation for User 243
	 Schindler's List (1993)
	 Secrets & Lies (1996)
	 Lawrence of Arabia (1962)
	 Persuasion (1995)
	 Godfather, The (1972)



                                                                                

Recommendation for User 767
	 Casablanca (1942)
	 Manchurian Candidate, The (1962)
	 Fargo (1996)
	 Godfather: Part II, The (1974)
	 Cinema Paradiso (1988)



                                                                                

Recommendation for User 421
	 Usual Suspects, The (1995)
	 Raiders of the Lost Ark (1981)
	 Godfather, The (1972)
	 Silence of the Lambs, The (1991)
	 Empire Strikes Back, The (1980)



                                                                                

Recommendation for User 841
	 Titanic (1997)
	 Good Will Hunting (1997)
	 As Good As It Gets (1997)
	 Ayn Rand: A Sense of Life (1997)
	 Apt Pupil (1998)



                                                                                

Recommendation for User 546
	 Return of the Jedi (1983)
	 Wag the Dog (1997)
	 Star Trek: First Contact (1996)
	 Twelve Monkeys (1995)
	 Titanic (1997)



                                                                                

Recommendation for User 358
	 Kaspar Hauser (1993)
	 Stalker (1979)
	 Three Colors: Red (1994)
	 8 1/2 (1963)
	 Double vie de Véronique, La (Double Life of Veronique, The) (1991)



                                                                                

Recommendation for User 847
	 Sum of Us, The (1994)
	 Star Wars (1977)
	 Picture Bride (1995)
	 Empire Strikes Back, The (1980)
	 Contact (1997)



# Object Oriented Implementation:
The Object Oriented Programming implementation of the above code is as follows:

There are 2 Classes that are implementer:

* **ColaborativeFilteringALS** class - Implementation of Colaborative filtering recommendation system using ALS algorithm in pyspark.
* **User** class - Dummy class for replicating users in any system utilising recommendation feature.

In [39]:
class user():
    """User class to reresent users in a sustem that utilises recommendation system"""
    def __init__(self, user_id, user_input_samples=None):
        self.user_id = user_id
        self.user_input_samples = user_input_samples
        self.predicted_ratings = None

    def set_predicted_ratings(self,predicted_ratings_df):
        """Method to set the predcited rating given by the recommender class
            Parameters:
                predicted_ratings_df (spark dataframe): Dataframe consisting of user sample item/movie rating data for solving cold start issue.
        """
        self.predicted_ratings = predicted_ratings_df

    def get_recommendations(self, n=5, title_col='title'):
        """Method for getting the n number of top recommended items given by the recommendation system.
            This methods needs to be called after predictions of ratings is done my the recommender system.
            Returns:
                recommendations (list(str)): list of top n number of recommended items with first item with highest ratings.
        """
        reccommendations = None
        if self.predicted_ratings:
            reccommendations = self.predicted_ratings.select(title_col).rdd.flatMap(lambda x:x).take(n)
        return reccommendations

class ColaborativeFilteringALS():
    def __init__(self, spark_session, pandas_dataframe,  user_col='user_id', item_col='item_id', rating_col='rating', title_col='title'):
        """
        Class implementation of colaborative filtering using ALS algorithm with pyspark.

        Parameters:
            spark_session (spark session): created spark session to use.
            pandas_dataframe (pandas dataframe): Pandas dataframe containing the user ids, item ids and their ratings.
            userCol (str): default='user_id', Specify column in dataframe which contains user ids,
            itemCol (str): default='item_id', Specify column in dataframe which contains item ids,
            ratingCol (str): default='rating', Specify column in dataframe which contains user ratings,
        """
        self.spark_session = spark_session
        self.pandas_dataframe = pandas_dataframe
        self._userCol = user_col
        self._itemCol = item_col
        self._ratingCol = rating_col
        self._titleCol = title_col
        self.spark_dataframe = self.spark_session.createDataFrame(self.pandas_dataframe.iloc[:,:])

    def show_dataframe(self, spark_df=None):
        """Shows the spark dataframe of the user rating data provided.
            Parameters:
                spark_df (spark dataframe): Dataframe that needs to be displayed.
        """
        if spark_df==None:
            spark_df = self.spark_dataframe
        self._print_df_diamension(spark_df)
        spark_df.show()

    def _print_df_diamension(self, spark_df):
        """Prints the diamentions of the spark dataframe."""
        print(f"Shape: ",(spark_df.count(), len(spark_df.columns)))

    def sample_test_users(self, n_users=5):
        """Create a list of n number or sample test users for testing/evaluation
        Parameters:
            n_users (int): Default=5, Number of users for tesing/evaluating the recommender model

        Returns:
            test_users_id (list(str)): List of test user ids that are randomly selected.
        """
        test_users_id = random.sample(sorted(self.pandas_dataframe['user_id'].unique()), n_users) # Selected random 10 users for testing
        return test_users_id

    def _split_test_to_input_true(self, test_spark_df, user_id, split_ratio=(0.2,0.8)):
        '''Here we take the random samples for each user and split them in input samples and test samples, 
            -input samples is user for solving cold start problem for the model.
            -test samples is used to evaluate the model.
        '''
        user_ratings_input_samples, user_ratings_true_samples = test_spark_df.where(test_spark_df.user_id==user_id).randomSplit(split_ratio)
        # user_ratings_input_samples = test_spark_df.where(test_spark_df.user_id==user_id).limit(5)
        # user_ratings_true_samples = test_spark_df.where(~test_spark_df.item_id.isin(user_ratings_input_samples.select('item_id').rdd.flatMap(lambda x: x).collect()))
        return user_ratings_input_samples, user_ratings_true_samples
    
    def train_test_split(self, test_users_id):
        """Method to create a train test spplit by users
        Parameters:
            test_users_id (list(str)): List of test user ids that are selected for testing/evaluation.

        Returns:
            modified_train_spark_df (spark dataframe): All data with test user input sample data for solving cold start problem
            test_spark_df (spark dataframe): Test user data selected from all data
            
        """
        train_spark_df = self.spark_dataframe.filter(~self.spark_dataframe.user_id.isin(test_users_id)) #Filtering the other users for creating training data
        test_spark_df = self.spark_dataframe.filter(self.spark_dataframe.user_id.isin(test_users_id)) #Filtering the test users for creating test data

        
        print(f"Full Dataset {(self.spark_dataframe.count(), len(self.spark_dataframe.columns))}")
        print("Shape of initial train test split by test user ids:", test_users_id)
        print(f"\t Train Dataset {(train_spark_df.count(), len(train_spark_df.columns))}")
        print(f"\t Test Dataset {(test_spark_df.count(), len(test_spark_df.columns))}")
        print('\n Modifying train and test dataset to solve cold start problem... \n')
        
        modified_train_spark_df = train_spark_df # Training spark dataframe with input samples for solving cold start issue
        modified_test_spark_df = None # Test spark_dataframe without input sample taken for training
        for user_id in test_users_id:
            # Create a test data samples for solving cold start problem. This can also be done by asking new users to rate the movies/items which they have already seen/used.
            # True ratings from users for evaluation of recommendation model
            user_ratings_input_samples, user_ratings_true_samples = self._split_test_to_input_true(test_spark_df, user_id)

            if modified_test_spark_df==None: # Initialising the test data input samples
                modified_test_spark_df = user_ratings_true_samples
                
            modified_train_spark_df = modified_train_spark_df.union(user_ratings_input_samples)
            modified_test_spark_df = modified_test_spark_df.union(user_ratings_true_samples)

        print("Shape of modified train and test datasets:")
        print(f"\t Modified Train Dataset {(modified_train_spark_df.count(), len(modified_train_spark_df.columns))}")
        print(f"\t Modified Test Dataset {(modified_test_spark_df.count(), len(modified_test_spark_df.columns))}")
        
        return modified_train_spark_df, modified_test_spark_df

    def model_fit(self, train_spark_df):
        # create an ALS model and fit it
        als = ALS(maxIter=10, rank=50, regParam=0.15, userCol=self._userCol, itemCol=self._itemCol, ratingCol=self._ratingCol, coldStartStrategy='drop')
        model = als.fit(train_spark_df)
        return model

    def model_transform(self, model, test_spark_df):
        # make recommendations for the evaluation test samples of the users
        recommendations = model.transform(test_spark_df)
        return recommendations

    def model_fit_transform(self, modified_train_spark_df, modified_test_spark_df):
        '''Function to recommending new user with movies/items utilizing ALS alhorithm for implementing colaborative filtering
    
            Prameters:
                modified_train_spark_df (spark dataframe): Dataframe consisting of all the user rating data and new user rating data to avoid cold start issue.
                modified_test_spark_df (spark dataframe): Dataframe consisting of testing data with item ids for making predictions of ratings for model.
            Returns:
                recommendations (spark dataframe): Dataframe consisting of all the predictions of ratings for the test data
        '''
        
        # train model
        model = self.model_fit(modified_train_spark_df)
        
        # make predictions
        recommendations = self.model_transform(model, modified_test_spark_df)
    
        return recommendations

    def create_all_item_df(self):
        """Creates a spark dataframe containing all the item ids."""
        return spark_df.dropDuplicates([self._itemCol]).select([self._itemCol, self._titleCol])
        
    def new_user_recommendations(self, new_user):
        """Predicts ratings for item for the new user with input samples. Returns the recommendations in ascending order-Top rated items are at the top.

        Parameters:
            new_user (obj): New user object containing user_id and sample of rating from user for items for solving cold start issue

        Returns:
            recommendation (spark dataframe): Dataframe containing predicted ratings for each items sorted in order of highest ratings first.
        """
        
        # user_id = user_data_samples.select(self._userCol).rdd.flatMap(lambda x:x).take(1)[0] #Get user id
        
        # combine the new ratings df with the rating_df
        # user_input_samples = new_user.user_input_samples.withColumn(self._userCol, lit(new_user.user_id))
        movie_ratings_combined = self.spark_dataframe.union(new_user.user_input_samples)
        # train model
        model = self.model_fit(movie_ratings_combined)

        # Get all the item ids for making predictions
        all_item_ids = self.create_all_item_df()
        all_item_ids = all_item_ids.withColumn(self._userCol, lit(new_user.user_id))

        # make predictions
        recommendations = self.model_transform(model, all_item_ids)
        recommendations = recommendations.sort('prediction', ascending=False)
        
        return recommendations
        
    def evaluate_recommendation(self, recommendation_df):
        """Function for evaluating the recommendation predictions form the model. 
            Evaluation metrics calculated-
                1.Root mean squared error of actual user ratings and predicted user ratings.
                
            Parameters:
                recommendation_df(spark dataframe): Dataframe consisting of user rating and predicted ratings for each item/movie.
            Return: None
        """
        evaluator = RegressionEvaluator(metricName="rmse", labelCol=self._ratingCol,predictionCol="prediction")
        rmse = evaluator.evaluate(recommendation_df)
        print("Root-mean-square error = " + str(rmse))
    
    @property
    def number_of_users(self): 
        print('Number of users:', self.pandas_dataframe[self._userCol].nunique())

    def users_rating_counts(self, spark_df=None, n=10):
        if spark_df==None:
            spark_df = self.spark_dataframe
        spark_df.groupBy(self._userCol).count().show(n) # Count of movies each user rated
            
    @property
    def unique_users(self):
        print('Unique users in dataframe:')
        print(self.pandas_dataframe[self._userCol].unique()) # Unique users in dataset

### Initialising the classes and understanding the implemented methods

In [40]:
cf = ColaborativeFilteringALS(spark, df) # Initialise an object of ColaborativeFilteringALS class by passing in spark session object and pandas dataframe consisting of the rating data by all users

In [41]:
cf.show_dataframe() # Viewing all the users rating data for understanding

Shape:  (100003, 5)
+-------+-------+------+---------+----------------+
|user_id|item_id|rating|timestamp|           title|
+-------+-------+------+---------+----------------+
|      0|     50|     5|881250949|Star Wars (1977)|
|    290|     50|     5|880473582|Star Wars (1977)|
|     79|     50|     4|891271545|Star Wars (1977)|
|      2|     50|     5|888552084|Star Wars (1977)|
|      8|     50|     5|879362124|Star Wars (1977)|
|    274|     50|     5|878944679|Star Wars (1977)|
|    227|     50|     4|879035347|Star Wars (1977)|
|     99|     50|     5|885679998|Star Wars (1977)|
|    305|     50|     5|886321799|Star Wars (1977)|
|    108|     50|     4|879879739|Star Wars (1977)|
|     63|     50|     4|875747292|Star Wars (1977)|
|    234|     50|     4|892079237|Star Wars (1977)|
|     97|     50|     5|884239471|Star Wars (1977)|
|    117|     50|     5|880126022|Star Wars (1977)|
|     70|     50|     4|884064188|Star Wars (1977)|
|    318|     50|     2|884495696|Star Wars 

In [42]:
cf.number_of_users # Checking the number of unique users in the data

Number of users: 944


In [43]:
cf.unique_users # Checking all the user ids in the data

Unique users in dataframe:
[  0 290  79   2   8 274 227  99 305 108  63 234  97 117  70 318 145 124
 253 271 254 326 213  80 322  28 267 130 262 230 316 327 299 144 162  48
 141 251 297 137 154 257 269 123  94 330 368  77 321 381  43 286 247  68
 150 214 210 158 279 288 280  25 102 209 103 398 379 409 386 407 101 201
 203 283 128 200  58  54 311  96   4 401  71  59 176 393  14  12 432 378
 245 115 406 444 160  23 433 125 320 438 461 250 437 119  27 275 389 445
 270 422 465 467 157 276  65  10 435 458 478   6 192 387 405 287  64 369
 272 394 334 346 352 177 153  62 148 325  15 456 371 506 360 509 533 454
 340 239 221 215 194  45   5 479 303 494 504 347 161 313 113 453 540 402
  66 523 536 527 277 549 551 530 339  49 539 581 484 294 592 463 217  87
  91 417 569 498 293  46 495 385  69 248 399 528 382 151 416  37 345 470
 411 244 350 623 516 430 493 624 517 370 268 324 638 563 178 618  26 642
 653 354  60 644 496 184 650  30 457 343 552 600 263 662 471 602 363 541
 481 421 597 503 550 521

### Testing and Evaluation

Testing the recommender system by sampling 'n' number of users randomply and evaluating it with the evaluation meathod

In [44]:
test_users_id = cf.sample_test_users(10) # Making and list of 10 user ids selected randomly for testing
print('User ids selected for testing/evaluation of the model:',test_users_id)

User ids selected for testing/evaluation of the model: [888, 846, 543, 392, 212, 852, 668, 402, 674, 192]


Splitting the data into train and test data for cross validation.

*Note*: Here the train dataset contains some sample input data from each of the test users for solving cold start issue.

In [45]:
train_df, test_df = cf.train_test_split(test_users_id)

Full Dataset (100003, 5)
Shape of initial train test split by test user ids: [888, 846, 543, 392, 212, 852, 668, 402, 674, 192]
	 Train Dataset (99006, 5)
	 Test Dataset (997, 5)

 Modifying train and test dataset to solve cold start problem... 

Shape of modified train and test datasets:


                                                                                

	 Modified Train Dataset (99202, 5)




	 Modified Test Dataset (817, 5)


                                                                                

In [46]:
cf.show_dataframe(train_df) # Checking the train dataframe



Shape:  (99202, 5)
+-------+-------+------+---------+----------------+
|user_id|item_id|rating|timestamp|           title|
+-------+-------+------+---------+----------------+
|      0|     50|     5|881250949|Star Wars (1977)|
|    290|     50|     5|880473582|Star Wars (1977)|
|     79|     50|     4|891271545|Star Wars (1977)|
|      2|     50|     5|888552084|Star Wars (1977)|
|      8|     50|     5|879362124|Star Wars (1977)|
|    274|     50|     5|878944679|Star Wars (1977)|
|    227|     50|     4|879035347|Star Wars (1977)|
|     99|     50|     5|885679998|Star Wars (1977)|
|    305|     50|     5|886321799|Star Wars (1977)|
|    108|     50|     4|879879739|Star Wars (1977)|
|     63|     50|     4|875747292|Star Wars (1977)|
|    234|     50|     4|892079237|Star Wars (1977)|
|     97|     50|     5|884239471|Star Wars (1977)|
|    117|     50|     5|880126022|Star Wars (1977)|
|     70|     50|     4|884064188|Star Wars (1977)|
|    318|     50|     2|884495696|Star Wars (

                                                                                

In [47]:
cf.show_dataframe(test_df) # Checking the test dataframe

                                                                                

Shape:  (817, 5)
+-------+-------+------+---------+--------------------+
|user_id|item_id|rating|timestamp|               title|
+-------+-------+------+---------+--------------------+
|    888|    100|     4|879365004|        Fargo (1996)|
|    888|    111|     4|879365072|Truth About Cats ...|
|    888|    237|     5|879365449|Jerry Maguire (1996)|
|    888|    274|     4|879365497|      Sabrina (1995)|
|    888|     69|     4|879365104| Forrest Gump (1994)|
|    888|    137|     4|879365104|    Big Night (1996)|
|    888|    153|     4|879365154|Fish Called Wanda...|
|    888|    191|     5|879365004|      Amadeus (1984)|
|    888|    202|     4|879365072|Groundhog Day (1993)|
|    888|    280|     3|879365475|Up Close and Pers...|
|    888|    286|     5|879364981|English Patient, ...|
|    888|    631|     4|879365224|Crying Game, The ...|
|    888|    269|     5|879364981|Full Monty, The (...|
|    888|    792|     5|879365054|Bullets Over Broa...|
|    888|    869|     4|8793650

In [48]:
cf.users_rating_counts(spark_df=test_df) # Checking the count of each items user rated in the test data



+-------+-----+
|user_id|count|
+-------+-----+
|    888|   32|
|    846|  331|
|    543|  155|
|    392|   88|
|    212|   21|
|    852|   34|
|    668|   42|
|    402|   56|
|    674|   31|
|    192|   27|
+-------+-----+



                                                                                

In [49]:
predicted_ratings_df = cf.model_fit_transform(train_df, test_df) # Fitting the model and getting the predicted ratings 
predicted_ratings_df.show()

[Stage 1516:(42 + 2) / 44][Stage 1540:>(2 + 2) / 10][Stage 1541:>(0 + 0) / 10]  

+-------+-------+------+---------+--------------------+----------+
|user_id|item_id|rating|timestamp|               title|prediction|
+-------+-------+------+---------+--------------------+----------+
|    392|    463|     3|891038946|Secret of Roan In...| 3.7258215|
|    402|    471|     4|876267041|Courage Under Fir...|  3.372477|
|    543|    471|     3|875657863|Courage Under Fir...| 3.4017196|
|    543|    463|     3|874864034|Secret of Roan In...| 3.6758442|
|    846|    463|     5|883948222|Secret of Roan In...| 3.7333486|
|    846|    623|     1|883950889|Angels in the Out...|  2.611689|
|    846|    540|     2|883950711|  Money Train (1995)| 2.4158823|
|    392|   1143|     4|891038158|   Hard Eight (1996)| 4.1406717|
|    543|    516|     4|876896210|   Local Hero (1983)|  3.861484|
|    846|    516|     4|883948457|   Local Hero (1983)|   3.87139|
|    846|     31|     4|883948571| Crimson Tide (1995)| 3.6149774|
|    192|   1265|     3|881366585|    Star Maps (1997)| 2.2213

                                                                                

In [50]:
cf.evaluate_recommendation(predicted_ratings_df) # Evaluating the ratings

[Stage 1597:(42 + 2) / 44][Stage 1621:>(0 + 2) / 10][Stage 1622:>(0 + 0) / 10]

Root-mean-square error = 0.8617882602915308


                                                                                

### Recommendions for new user

Here the new user is also sampled from the test data. For the new user we only require some sample rating for items. This can also be done by asking a new user to rate some items/movies that they have already used/seen. 

In [51]:
new_user_id = test_df.select('user_id').rdd.flatMap(lambda x:x).take(1)[0] # Sampling a new user id from test data
new_user_id

888

In [52]:
new_user_samples = test_df.where(test_df.user_id==new_user_id) # Get all the data for that particular user from test data
new_user_samples.show()

+-------+-------+------+---------+--------------------+
|user_id|item_id|rating|timestamp|               title|
+-------+-------+------+---------+--------------------+
|    888|    100|     4|879365004|        Fargo (1996)|
|    888|    111|     4|879365072|Truth About Cats ...|
|    888|    237|     5|879365449|Jerry Maguire (1996)|
|    888|    274|     4|879365497|      Sabrina (1995)|
|    888|     69|     4|879365104| Forrest Gump (1994)|
|    888|    137|     4|879365104|    Big Night (1996)|
|    888|    153|     4|879365154|Fish Called Wanda...|
|    888|    191|     5|879365004|      Amadeus (1984)|
|    888|    202|     4|879365072|Groundhog Day (1993)|
|    888|    280|     3|879365475|Up Close and Pers...|
|    888|    286|     5|879364981|English Patient, ...|
|    888|    631|     4|879365224|Crying Game, The ...|
|    888|    269|     5|879364981|Full Monty, The (...|
|    888|    792|     5|879365054|Bullets Over Broa...|
|    888|    869|     4|879365086|Fools Rush In 

In [53]:
new_user = user(new_user_id, new_user_samples) # Initialising a new user object using user class
new_user.user_id

888

*Note*: For new user rating predictions, the recommendation system is fitted with the entire data and the sample input ratings collected is passed to the colaborative filtering algorithm to find the ratings for the other items in the data.

In [54]:
predicted_ratings = cf.new_user_recommendations(new_user) # Predicting ratings for new user
new_user.set_predicted_ratings(predicted_ratings) # Setting the predictions in user object
new_user.predicted_ratings.show() # Checking the predictions

                                                                                

+-------+--------------------+-------+----------+
|item_id|               title|user_id|prediction|
+-------+--------------------+-------+----------+
|   1449|Pather Panchali (...|    888|  5.181967|
|   1467|Saint of Fort Was...|    888| 5.1142607|
|   1642|Some Mother's Son...|    888| 5.0453606|
|   1398|         Anna (1996)|    888| 5.0294557|
|    318|Schindler's List ...|    888|  5.009569|
|     64|Shawshank Redempt...|    888| 5.0080843|
|   1122|They Made Me a Cr...|    888|  4.966351|
|    272|Good Will Hunting...|    888| 4.9331784|
|    408|Close Shave, A (1...|    888| 4.8848257|
|    483|   Casablanca (1942)|    888| 4.8650794|
|    113|Horseman on the R...|    888|  4.839217|
|   1064|    Crossfire (1947)|    888| 4.8284464|
|    313|      Titanic (1997)|    888| 4.8197722|
|     12|Usual Suspects, T...|    888| 4.8181663|
|    169|Wrong Trousers, T...|    888|  4.804872|
|    603|  Rear Window (1954)|    888| 4.7949753|
|    357|One Flew Over the...|    888|  4.794626|


In [55]:
new_user.get_recommendations(n=8) # Fetching the top 'n' recommended items/movies for that user

['Pather Panchali (1955)',
 'Saint of Fort Washington, The (1993)',
 "Some Mother's Son (1996)",
 'Anna (1996)',
 "Schindler's List (1993)",
 'Shawshank Redemption, The (1994)',
 'They Made Me a Criminal (1939)',
 'Good Will Hunting (1997)']