In [1]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import lit
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import explode, col

In [2]:
path = 'E:\\181255_RNMP_HW2\\ml-100k\\'

In [3]:
k = 5

In [14]:
max_parellelism = 6

# Create Spark session and read datasets

In [5]:
spark = SparkSession.builder.master("local[*]") \
        .appName('MovieLens100K-ALS-CF').getOrCreate()

## Ratings 

In [6]:
df = spark.read.csv(path+'data.csv', inferSchema=True, encoding='utf-8', header=True)
df.withColumn('prediction', lit(None))
df.cache()
df.show()

+-------+--------+------+---------+
|user_id|movie_id|rating|timestamp|
+-------+--------+------+---------+
|    196|     242|     3|881250949|
|    186|     302|     3|891717742|
|     22|     377|     1|878887116|
|    244|      51|     2|880606923|
|    166|     346|     1|886397596|
|    298|     474|     4|884182806|
|    115|     265|     2|881171488|
|    253|     465|     5|891628467|
|    305|     451|     3|886324817|
|      6|      86|     3|883603013|
|     62|     257|     2|879372434|
|    286|    1014|     5|879781125|
|    200|     222|     5|876042340|
|    210|      40|     3|891035994|
|    224|      29|     3|888104457|
|    303|     785|     3|879485318|
|    122|     387|     5|879270459|
|    194|     274|     2|879539794|
|    291|    1042|     4|874834944|
|    234|    1184|     2|892079237|
+-------+--------+------+---------+
only showing top 20 rows



In [7]:
df.describe().show()

+-------+------------------+------------------+------------------+-----------------+
|summary|           user_id|          movie_id|            rating|        timestamp|
+-------+------------------+------------------+------------------+-----------------+
|  count|            100000|            100000|            100000|           100000|
|   mean|         462.48475|         425.53013|           3.52986|8.8352885148862E8|
| stddev|266.61442012750905|330.79835632558473|1.1256735991443214|5343856.189502848|
|    min|                 1|                 1|                 1|        874724710|
|    max|               943|              1682|                 5|        893286638|
+-------+------------------+------------------+------------------+-----------------+



##  Movies

In [21]:
df_movies = spark.read.csv(path+'movies.csv', inferSchema=True, encoding='utf-8', header=True)
df_movies.cache()
df_movies.show(20, 30)

+--------+------------------------------+------------+------------------------------+------------------------------+
|movie_id|                         title|release_date|                      IMDb URL|                         genre|
+--------+------------------------------+------------+------------------------------+------------------------------+
|       1|              Toy Story (1995)| 01-Jan-1995|http://us.imdb.com/M/title-...| "['Animation', ""Children's""|
|       2|              GoldenEye (1995)| 01-Jan-1995|http://us.imdb.com/M/title-...|['Action', 'Adventure', 'Th...|
|       3|             Four Rooms (1995)| 01-Jan-1995|http://us.imdb.com/M/title-...|                  ['Thriller']|
|       4|             Get Shorty (1995)| 01-Jan-1995|http://us.imdb.com/M/title-...| ['Action', 'Comedy', 'Drama']|
|       5|                Copycat (1995)| 01-Jan-1995|http://us.imdb.com/M/title-...|['Crime', 'Drama', 'Thriller']|
|       6|Shanghai Triad (Yao a yao y...| 01-Jan-1995|http://us.

# Users

In [22]:
df_users = spark.read.csv(path+'users.csv', inferSchema=True, encoding='utf-8', header=True)
df_users.cache()
df_users.show()

+-------+---+------+-------------+--------+
|user_id|age|gender|     interest|zip_code|
+-------+---+------+-------------+--------+
|      1| 24|     M|   technician|   85711|
|      2| 53|     F|        other|   94043|
|      3| 23|     M|       writer|   32067|
|      4| 24|     M|   technician|   43537|
|      5| 33|     F|        other|   15213|
|      6| 42|     M|    executive|   98101|
|      7| 57|     M|administrator|   91344|
|      8| 36|     M|administrator|   05201|
|      9| 29|     M|      student|   01002|
|     10| 53|     M|       lawyer|   90703|
|     11| 39|     F|        other|   30329|
|     12| 28|     F|        other|   06405|
|     13| 47|     M|     educator|   29206|
|     14| 45|     M|    scientist|   55106|
|     15| 49|     F|     educator|   97301|
|     16| 21|     M|entertainment|   10309|
|     17| 30|     M|   programmer|   06355|
|     18| 35|     F|        other|   37212|
|     19| 40|     M|    librarian|   02138|
|     20| 42|     F|    homemake

# Train/Test split

In [8]:
(train, test) = df.randomSplit([0.8, 0.2], seed=11111)
train.cache()
test.cache()

DataFrame[user_id: int, movie_id: int, rating: int, timestamp: int]

In [9]:
als = ALS(implicitPrefs=False,
               coldStartStrategy="drop",
               nonnegative=True,
               userCol="user_id",
               itemCol="movie_id",
               ratingCol="rating")

In [10]:
evaluator_r2 = RegressionEvaluator(metricName="r2", labelCol="rating", predictionCol="prediction")
evaluator_rmse = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

# Hyperparameter tuning

## Define parameter search space

In [11]:
ranks = [20, 50, 100]
max_iterations = [5, 10, 20]
reg_param = [0.01, 0.05, 0.1]

In [12]:
param_grid = ParamGridBuilder() \
        .addGrid(als.rank, ranks) \
        .addGrid(als.maxIter, max_iterations) \
        .addGrid(als.regParam, reg_param) \
        .build()

## Estimate best parameters using K-fold cross validation

In [16]:
print("Number of models to be tested: ", len(param_grid))

cv = CrossValidator(estimator=als,
                    estimatorParamMaps=param_grid,
                    evaluator=evaluator_rmse,
                    numFolds=k,
                    parallelism=max_parellelism)

Number of models to be tested:  27


In [17]:
best_model = cv.fit(train).bestModel

## Estimated parameter values of the best model

In [18]:
predictions = best_model.transform(test)
print(f"RMSE: {evaluator_rmse.evaluate(predictions)}")
model_dict = {
        "rank": best_model._java_obj.parent().getRank(),
        "maxIter": best_model._java_obj.parent().getMaxIter(),
        "regParam": best_model._java_obj.parent().getRegParam(),
    }
model_dict

RMSE: 0.9094067669345076


{'rank': 100, 'maxIter': 10, 'regParam': 0.1}

## Compute CV error on u_i train/test datasets

In [23]:
model = als \
.setRank(100)\
.setMaxIter(10)\
.setRegParam(0.1).fit(train)

In [24]:
for fold in range(1,k+1):
    train = spark.read.csv(path+f'u{fold}_train.csv', inferSchema=True, encoding='utf-8', header=True)
    test = spark.read.csv(path+f'u{fold}_test.csv', inferSchema=True, encoding='utf-8', header=True)
    train_predictions = model.transform(train)
    test_predictions = model.transform(test)
    train_rmse = evaluator_rmse.evaluate(train_predictions)
    test_rmse = evaluator_rmse.evaluate(test_predictions)
    print(f'Fold {fold} train set RMSE: {train_rmse}')
    print(f'Fold {fold} test set RMSE: {test_rmse}')

Fold 1 train set RMSE: 0.721693867102374
Fold 1 test set RMSE: 0.6600830035173448
Fold 2 train set RMSE: 0.7228529327648138
Fold 2 test set RMSE: 0.6549925854104728
Fold 3 train set RMSE: 0.7253451780188627
Fold 3 test set RMSE: 0.6438825390902951
Fold 4 train set RMSE: 0.7276501180876778
Fold 4 test set RMSE: 0.6333991507374506
Fold 5 train set RMSE: 0.6481712865391784
Fold 5 test set RMSE: 0.9160494551219932


# Predictions

In [33]:
predictions = model.transform(test)
predictions = predictions.join(df_movies, on='movie_id')
predictions.orderBy(col('prediction').desc()).show(50)

+--------+-------+------+---------+----------+--------------------+------------+--------------------+--------------------+
|movie_id|user_id|rating|timestamp|prediction|               title|release_date|            IMDb URL|               genre|
+--------+-------+------+---------+----------+--------------------+------------+--------------------+--------------------+
|     313|    907|     5|885860093| 5.4522047|      Titanic (1997)| 01-Jan-1997|http://us.imdb.co...|['Action', 'Drama...|
|     313|    801|     5|890332694| 5.4420843|      Titanic (1997)| 01-Jan-1997|http://us.imdb.co...|['Action', 'Drama...|
|      64|    330|     5|876546409| 5.3415003|Shawshank Redempt...| 01-Jan-1994|http://us.imdb.co...|           ['Drama']|
|      50|    850|     5|883195143|  5.269635|    Star Wars (1977)| 01-Jan-1977|http://us.imdb.co...|['Action', 'Adven...|
|     114|    928|     5|880936742|  5.253911|Wallace & Gromit:...| 05-Apr-1996|http://us.imdb.co...|       ['Animation']|
|     496|    85

# Recommendations

## K item recommendations for all users

In [38]:
recommendations_for_users = model.recommendForAllUsers(5)
recommendations_for_users.show()
recommendations_for_users.printSchema()

+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|      1|[{1449, 4.966744}...|
|      3|[{320, 4.29379}, ...|
|      5|[{408, 4.608048},...|
|      6|[{1463, 4.6168427...|
|     12|[{318, 5.2062187}...|
|     13|[{814, 4.900056},...|
|     16|[{64, 5.2119784},...|
|     19|[{512, 4.6331506}...|
|     20|[{394, 4.2026772}...|
|     22|[{50, 4.8384953},...|
|     26|[{313, 4.015071},...|
|     27|[{64, 4.0003533},...|
|     28|[{1449, 4.507613}...|
|     31|[{641, 4.681726},...|
|     34|[{1449, 5.4819927...|
|     40|[{663, 3.8354282}...|
|     44|[{50, 4.619193}, ...|
|     47|[{1449, 4.4709473...|
|     48|[{187, 4.6162915}...|
|     52|[{408, 5.125088},...|
+-------+--------------------+
only showing top 20 rows

root
 |-- user_id: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- movie_id: integer (nullable = true)
 |    |    |-- rating: float (nullable 

### SQL operations such that the result has more readable format

In [39]:
recommendations_for_users = recommendations_for_users\
.withColumn("struct_col", explode("recommendations"))\
.select('user_id', col('struct_col.movie_id'), col('struct_col.rating'))\
.join(df_movies, on='movie_id')\
.join(df_users, on='user_id')\
.drop('zip_code')\
.orderBy(col('user_id').asc())

In [40]:
recommendations_for_users.show()

+-------+--------+---------+--------------------+------------+--------------------+--------------------+---+------+----------+
|user_id|movie_id|   rating|               title|release_date|            IMDb URL|               genre|age|gender|  interest|
+-------+--------+---------+--------------------+------------+--------------------+--------------------+---+------+----------+
|      1|     119|4.8466916|Maya Lin: A Stron...| 01-Jan-1994|http://us.imdb.co...|     ['Documentary']| 24|     M|technician|
|      1|    1449| 4.966744|Pather Panchali (...| 22-Mar-1996|http://us.imdb.co...|           ['Drama']| 24|     M|technician|
|      1|     169| 4.827329|Wrong Trousers, T...| 01-Jan-1993|http://us.imdb.co...|['Animation', 'Co...| 24|     M|technician|
|      1|     408| 4.945852|Close Shave, A (1...| 28-Apr-1996|http://us.imdb.co...|['Animation', 'Co...| 24|     M|technician|
|      1|     114| 4.785778|Wallace & Gromit:...| 05-Apr-1996|http://us.imdb.co...|       ['Animation']| 24|   

## K user recommendations for all items

In [53]:
recommendations_for_items = model.recommendForAllItems(5)
recommendations_for_items.show()
recommendations_for_items.printSchema()

+--------+--------------------+
|movie_id|     recommendations|
+--------+--------------------+
|       1|[{810, 4.912862},...|
|      12|[{118, 5.2276564}...|
|      22|[{688, 5.4716034}...|
|      26|[{770, 4.2078023}...|
|      27|[{849, 4.4430504}...|
|      28|[{688, 5.276476},...|
|      31|[{507, 4.6713715}...|
|      34|[{286, 4.357294},...|
|      44|[{34, 4.1883907},...|
|      47|[{686, 4.5573583}...|
|      52|[{565, 4.724062},...|
|      53|[{366, 4.362792},...|
|      65|[{174, 4.632567},...|
|      76|[{332, 4.532265},...|
|      78|[{38, 4.418796}, ...|
|      81|[{928, 4.559454},...|
|      85|[{546, 4.788795},...|
|      91|[{697, 4.5298524}...|
|      93|[{34, 4.7150803},...|
|     101|[{472, 4.538263},...|
+--------+--------------------+
only showing top 20 rows

root
 |-- movie_id: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- user_id: integer (nullable = true)
 |    |    |-- 

### SQL operations such that the result has more readable format

In [54]:
recommendations_for_items = recommendations_for_items\
.withColumn("struct_col", explode("recommendations"))\
.select('movie_id', col('struct_col.user_id'), col('struct_col.rating'))\
.join(df_movies, on='movie_id')\
.join(df_users, on='user_id')\
.drop('zip_code')\
.orderBy(col('movie_id').asc())

In [55]:
recommendations_for_items.show(20)

+-------+--------+---------+-----------------+------------+--------------------+--------------------+---+------+-------------+
|user_id|movie_id|   rating|            title|release_date|            IMDb URL|               genre|age|gender|     interest|
+-------+--------+---------+-----------------+------------+--------------------+--------------------+---+------+-------------+
|    173|       1|  4.82556| Toy Story (1995)| 01-Jan-1995|http://us.imdb.co...|"['Animation', ""...| 56|     M|        other|
|    810|       1| 4.912862| Toy Story (1995)| 01-Jan-1995|http://us.imdb.co...|"['Animation', ""...| 55|     F|        other|
|    330|       1|4.7818465| Toy Story (1995)| 01-Jan-1995|http://us.imdb.co...|"['Animation', ""...| 35|     F|     educator|
|    688|       1|4.8960257| Toy Story (1995)| 01-Jan-1995|http://us.imdb.co...|"['Animation', ""...| 37|     F|administrator|
|    357|       1| 4.761861| Toy Story (1995)| 01-Jan-1995|http://us.imdb.co...|"['Animation', ""...| 26|     M

# Saving recommendations to .csv files

In [64]:
df_recommended_users = recommendations_for_items.toPandas()

In [65]:
df_recommended_items = recommendations_for_users.toPandas()

In [66]:
df_recommended_users.to_csv(path+'recommendForAllItems.csv')

In [67]:
df_recommended_items.to_csv(path+'recommendForAllUsers.csv')