# Running ALS on MovieLens (PySpark)

Matrix factorization by [ALS](https://spark.apache.org/docs/latest/api/python/_modules/pyspark/ml/recommendation.html#ALS) (Alternating Least Squares) is a well known collaborative filtering algorithm.

This notebook provides an example of how to utilize and evaluate ALS PySpark ML (DataFrame-based API) implementation, meant for large-scale distributed datasets.

The original dataset is 100M user ratings. This notebook works with a subset of the 100K user ratings.

https://towardsdatascience.com/build-recommendation-system-with-pyspark-using-alternating-least-squares-als-matrix-factorisation-ebe1ad2e7679

In [1]:
# build a spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Recommendations').getOrCreate()

In [2]:
# import the data
movies = spark.read.csv('movies.csv', header=True)
ratings = spark.read.csv('ratings.csv', header=True)
ratings.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows



In [3]:
movies.show()


+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

In [4]:
# join the ratings and movie dataframes
movie_ratings = ratings.join(movies, ['movieId'], 'left')
movie_ratings.show()

+-------+------+------+---------+--------------------+--------------------+
|movieId|userId|rating|timestamp|               title|              genres|
+-------+------+------+---------+--------------------+--------------------+
|      1|     1|   4.0|964982703|    Toy Story (1995)|Adventure|Animati...|
|      3|     1|   4.0|964981247|Grumpier Old Men ...|      Comedy|Romance|
|      6|     1|   4.0|964982224|         Heat (1995)|Action|Crime|Thri...|
|     47|     1|   5.0|964983815|Seven (a.k.a. Se7...|    Mystery|Thriller|
|     50|     1|   5.0|964982931|Usual Suspects, T...|Crime|Mystery|Thr...|
|     70|     1|   3.0|964982400|From Dusk Till Da...|Action|Comedy|Hor...|
|    101|     1|   5.0|964980868|Bottle Rocket (1996)|Adventure|Comedy|...|
|    110|     1|   4.0|964982176|   Braveheart (1995)|    Action|Drama|War|
|    151|     1|   5.0|964984041|      Rob Roy (1995)|Action|Drama|Roma...|
|    157|     1|   5.0|964984100|Canadian Bacon (1...|          Comedy|War|
|    163|   

In [5]:
# create a function to view the sparsity of the data
def get_mat_sparsity(ratings):
    # count the total number of ratings in the dataset
    count_nonzero = ratings.select("rating").count()
    
    # count the number of distinct userIds and distinct movieIds
    total_elements = ratings.select("userId").distinct().count() * ratings.select("movieId").distinct().count()
    
    # divide the numerator by the denominator
    sparsity = (1.0 - (count_nonzero * 1.0) / total_elements)*100
    print("The ratings dataframe is ", "%.2f" % sparsity + "% sparse.")
    

In [6]:
# view the sparsity of the movie ratings data
get_mat_sparsity(ratings)

The ratings dataframe is  98.30% sparse.


In [7]:
# Create test and train datasets
(train, test) = ratings.randomSplit([0.8, 0.2], seed=2020)

In [8]:
train.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|    101|   5.0|964980868|
|     1|   1024|   5.0|964982876|
|     1|   1025|   5.0|964982791|
|     1|   1029|   5.0|964982855|
|     1|   1032|   5.0|964982791|
|     1|   1049|   5.0|964982400|
|     1|   1060|   4.0|964980924|
|     1|   1073|   5.0|964981680|
|     1|   1080|   5.0|964981327|
|     1|   1089|   5.0|964982951|
|     1|   1090|   4.0|964984018|
|     1|   1092|   5.0|964983484|
|     1|   1097|   5.0|964981680|
|     1|    110|   4.0|964982176|
|     1|   1136|   5.0|964981327|
|     1|   1196|   5.0|964981827|
|     1|   1206|   5.0|964983737|
|     1|   1208|   4.0|964983250|
|     1|   1214|   4.0|964981855|
+------+-------+------+---------+
only showing top 20 rows



In [9]:
test.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|   1009|   3.0|964981775|
|     1|   1023|   5.0|964982681|
|     1|   1030|   3.0|964982903|
|     1|   1031|   5.0|964982653|
|     1|   1042|   4.0|964981179|
|     1|   1127|   4.0|964982513|
|     1|   1197|   5.0|964981872|
|     1|   1198|   5.0|964981827|
|     1|   1210|   5.0|964980499|
|     1|   1213|   5.0|964982951|
|     1|   1220|   5.0|964981909|
|     1|   1226|   5.0|964983618|
|     1|   1258|   3.0|964983414|
|     1|   1270|   5.0|964983705|
|     1|   1517|   5.0|964981107|
|     1|   1587|   5.0|964982346|
|     1|   1625|   5.0|964983504|
|     1|   1804|   5.0|964983034|
|     1|   1920|   4.0|964981780|
|     1|   1967|   4.0|964981710|
+------+-------+------+---------+
only showing top 20 rows



In [10]:
for col in train.dtypes:
    print(col[0], ':', col[1])

userId : string
movieId : string
rating : string
timestamp : string


In [11]:
# convert string values to integers
from pyspark.sql.types import IntegerType

train = train.withColumn("userId", train["userId"].cast(IntegerType()))
train = train.withColumn("movieId", train["movieId"].cast(IntegerType()))
train = train.withColumn("rating", train["rating"].cast(IntegerType()))

test = test.withColumn("userId", test["userId"].cast(IntegerType()))
test = test.withColumn("movieId", test["movieId"].cast(IntegerType()))
test = test.withColumn("rating", test["rating"].cast(IntegerType()))

### Implicit Data Extraction

The original dataframe has ratings for movies and provides us with a lot of explicit data. To extract implicit data, it can be done by creating a one hot encoding of whether a movie was watched by the user of not. Unfortunately, we only know which movies were watched and rated, not which movies were watched but not rated. This is imperfect implicit info, but still good practice.

In [12]:
from pyspark.sql import functions

In [13]:
# create a function for one hot encoding whether a movie was watched
def get_binary_data(ratings):
    ratings = ratings.withColumn('binary', functions.lit(1))
    userIds = ratings.select("userId").distinct()
    movieIds = ratings.select('movieId').distinct()
    
    user_movie = userIds.crossJoin(movieIds).join(ratings, ['userId', 'movieId'], 'left')
    user_movie = user_movie.select(['userId', 'movieId', 'binary']).fillna(0)
    return user_movie
    

In [14]:
# create a one hot encoding of movies watched
user_movie = get_binary_data(ratings)

In [15]:
user_movie.show()

+------+-------+------+
|userId|movieId|binary|
+------+-------+------+
|   296|    296|     1|
|   296|   1090|     0|
|   296| 115713|     0|
|   296|   3210|     0|
|   296|  88140|     0|
|   296|    829|     0|
|   296|   2088|     0|
|   296|   2294|     0|
|   296|   4821|     0|
|   296|  48738|     0|
|   296|   3959|     0|
|   296|  89864|     0|
|   296|   2136|     0|
|   296|    691|     0|
|   296|   3606|     0|
|   296| 121007|     0|
|   296|   6731|     0|
|   296|  27317|     0|
|   296|  26082|     0|
|   296| 100553|     0|
+------+-------+------+
only showing top 20 rows



### Build the ALS model

In [16]:
# import the required functions
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ALS model
als = ALS(
        userCol='userId',
        itemCol='movieId',
        ratingCol='rating',
        nonnegative=True, # look at ratings greater than 0
        implicitPrefs=False, # use only the explicit data
        coldStartStrategy='drop' # to avoid NaN values for users in test but not in train data
        )

### Hyperparameter tuning and cross validation

In [17]:
# import the requisite packages
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

In [18]:
# add hyperparameters and their respective values to the param grid
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100, 150]) \
            .addGrid(als.regParam, [0.01, 0.05, 0.1, 0.15]) \
            .build()

In [19]:
# define evaluator as RMSE and print length of evaluator
evaluator = RegressionEvaluator(
            metricName='rmse',
            labelCol='rating',
            predictionCol='prediction')
print("Num models to be tested: ", len(param_grid))

Num models to be tested:  16


In [20]:
# Build cross validation using CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid,
                    evaluator=evaluator, numFolds=5)

In [21]:
# fit the cross validator to the train dataset
model = cv.fit(train)

In [22]:
# extract the best model from the cv model
best_model = model.bestModel

# view the predictions
test_predictions = best_model.transform(test)

RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

0.9159659314001822


### Check the best model parameters


In [23]:
print('**BEST MODEL**')

# print 'Rank'
print(" Rank:", best_model._java_obj.parent().getRank())

# print "MaxIter"
print(" MaxIter:", best_model._java_obj.parent().getMaxIter())

# print "RegParam"
print(" RegParam:", best_model._java_obj.parent().getRegParam())

**BEST MODEL**
 Rank: 100
 MaxIter: 10
 RegParam: 0.15


### Generate Recommendations

In [66]:
# get 5 Recommendations for all users
recommendations = best_model.recommendForAllUsers(5)
recommendations.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   471|[[7096, 4.4201956...|
|   463|[[141718, 4.85213...|
|   496|[[25771, 4.535957...|
|   148|[[33649, 4.573162...|
|   540|[[7842, 5.066717]...|
|   392|[[25771, 4.851111...|
|   243|[[67618, 5.656977...|
|    31|[[33649, 5.081668...|
|   516|[[4429, 4.7163315...|
|   580|[[6300, 4.6681952...|
|   251|[[33649, 5.644783...|
|   451|[[7096, 5.339855]...|
|    85|[[53, 4.8489943],...|
|   137|[[1949, 4.553638]...|
|    65|[[141718, 4.51449...|
|   458|[[67618, 5.526861...|
|   481|[[45503, 4.040567...|
|    53|[[141718, 6.61839...|
|   255|[[67618, 3.952502...|
|   588|[[78836, 4.173175...|
+------+--------------------+
only showing top 20 rows



In [67]:
recommendations.printSchema()

root
 |-- userId: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- movieId: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [70]:
from pyspark.sql.functions import explode

# convert the array of values for the recommendations expected to separate columns
recommendations = recommendations\
    .withColumn("rec_exp", explode("recommendations"))\
    .select('userId', "rec_exp.*")
recommendations.limit(10).show()

+------+-------+---------+
|userId|movieId|   rating|
+------+-------+---------+
|   471|   7096|4.4201956|
|   471|   8477| 4.367511|
|   471|  25771|4.3003664|
|   471|  33649| 4.211338|
|   471|  78836|4.1689057|
|   463| 141718| 4.852137|
|   463|   7842| 4.838543|
|   463|  33649| 4.715434|
|   463|  78836|   4.6408|
|   463|  72171|4.6061525|
+------+-------+---------+



### Do the recommendations make sense

Select the 100th user and view the recommendations

In [71]:
# view the recommendations
recommendations.join(movies, on='movieId').filter('userId=100').show()

+-------+------+---------+--------------------+--------------------+
|movieId|userId|   rating|               title|              genres|
+-------+------+---------+--------------------+--------------------+
|  67618|   100| 4.871826|Strictly Sexual (...|Comedy|Drama|Romance|
| 141718|   100|4.8585134|    Deathgasm (2015)|       Comedy|Horror|
|  33649|   100| 4.728863|  Saving Face (2004)|Comedy|Drama|Romance|
|   1111|   100|4.7050643|Microcosmos (Micr...|         Documentary|
|  72171|   100|4.7050643|Black Dynamite (2...|       Action|Comedy|
+-------+------+---------+--------------------+--------------------+



In [72]:
# view the movies user 100 actually likes
ratings.join(movies, on='movieId').filter('userId=100') \
            .sort('rating', ascending=False).limit(10).show()

+-------+------+------+----------+--------------------+--------------------+
|movieId|userId|rating| timestamp|               title|              genres|
+-------+------+------+----------+--------------------+--------------------+
|   2423|   100|   5.0|1100186118|Christmas Vacatio...|              Comedy|
|   1101|   100|   5.0|1100184137|      Top Gun (1986)|      Action|Romance|
|   4041|   100|   5.0|1100184235|Officer and a Gen...|       Drama|Romance|
|   1958|   100|   5.0|1100186258|Terms of Endearme...|        Comedy|Drama|
|   5620|   100|   5.0|1100186982|Sweet Home Alabam...|      Comedy|Romance|
|    368|   100|   4.5|1100183774|     Maverick (1994)|Adventure|Comedy|...|
|    539|   100|   4.5|1100184295|Sleepless in Seat...|Comedy|Drama|Romance|
|    934|   100|   4.5|1100186407|Father of the Bri...|              Comedy|
|     16|   100|   4.5|1100185959|       Casino (1995)|         Crime|Drama|
|    553|   100|   4.5|1100183810|    Tombstone (1993)|Action|Drama|Western|