In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import col, explode

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import pandas as pd

### Initialize spark session

In [2]:
spark = SparkSession.builder.appName('Recommendation').getOrCreate()
sc = SparkContext

### Load dataset

In [4]:
movies = spark.read.csv('../data/movies.csv', header=True)
ratings = spark.read.csv('../data/ratings.csv', header=True)

ratings.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows



In [5]:
ratings.printSchema()

ratings = ratings.withColumn('userId', col('userId').cast('integer')).\
                  withColumn('movieId', col('movieId').cast('integer')).\
                  withColumn('rating', col('rating').cast('float')).\
                  drop('timestamp')

ratings.show()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
|     1|     70|   3.0|
|     1|    101|   5.0|
|     1|    110|   4.0|
|     1|    151|   5.0|
|     1|    157|   5.0|
|     1|    163|   5.0|
|     1|    216|   5.0|
|     1|    223|   3.0|
|     1|    231|   5.0|
|     1|    235|   4.0|
|     1|    260|   5.0|
|     1|    296|   3.0|
|     1|    316|   3.0|
|     1|    333|   5.0|
|     1|    349|   4.0|
+------+-------+------+
only showing top 20 rows



In [6]:
movies.show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

### Calculate sparsity

In [7]:
numerator = ratings.select('rating').count()

num_users = ratings.select('userId').distinct().count()
num_movies = ratings.select('movieId').distinct().count()

denominator = num_movies * num_users

sparsity = (1.0 - (numerator * 1.0)/denominator) * 100

print("The rating dataframe is ", "%.2f" % sparsity + "% empty.")

The rating dataframe is  98.30% empty.


### Data exploration

In [8]:
userId_ratings = ratings.groupBy('userId').count().orderBy('count', ascending=False)
userId_ratings.show()

+------+-----+
|userId|count|
+------+-----+
|   414| 2698|
|   599| 2478|
|   474| 2108|
|   448| 1864|
|   274| 1346|
|   610| 1302|
|    68| 1260|
|   380| 1218|
|   606| 1115|
|   288| 1055|
|   249| 1046|
|   387| 1027|
|   182|  977|
|   307|  975|
|   603|  943|
|   298|  939|
|   177|  904|
|   318|  879|
|   232|  862|
|   480|  836|
+------+-----+
only showing top 20 rows



In [9]:
movieId_ratings = ratings.groupBy('movieId').count().orderBy('count', ascending=False)
movieId_ratings.show()

+-------+-----+
|movieId|count|
+-------+-----+
|    356|  329|
|    318|  317|
|    296|  307|
|    593|  279|
|   2571|  278|
|    260|  251|
|    480|  238|
|    110|  237|
|    589|  224|
|    527|  220|
|   2959|  218|
|      1|  215|
|   1196|  211|
|   2858|  204|
|     50|  204|
|     47|  203|
|    780|  202|
|    150|  201|
|   1198|  200|
|   4993|  198|
+-------+-----+
only showing top 20 rows



### ALS Model build

In [11]:
train, test = ratings.randomSplit([0.8, 0.2], seed=1)

als = ALS(userCol='userId',
          itemCol='movieId',
          ratingCol='rating',
          nonnegative=True, implicitPrefs=False, coldStartStrategy='drop')

# type(als)

pyspark.ml.recommendation.ALS

#### Hyperparameter tuning

In [13]:
param_grid = ParamGridBuilder() \
             .addGrid(als.rank, [10, 50, 100, 150]) \
             .addGrid(als.regParam, [.01, .05, .1, .15]) \
             .build()

evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')

print("Number of models to test: ", len(param_grid))

Number of models to test:  16


#### Cross-validation pipeline

In [14]:
cv = CrossValidator(estimator=als,
                    estimatorParamMaps=param_grid,
                    evaluator=evaluator,
                    numFolds=5)

print(cv)

CrossValidator_c6cfab9f9a4f


#### Model training

In [15]:
model = cv.fit(train)

best_model = model.bestModel

AttributeError: 'CrossValidatorModel' object has no attribute 'best_model'

### Select best model

In [None]:
print(type(best_model))

print("Best model ->")

print("  Rank: ", best_model._java_obj.parent().getRank())

print("  MaxIter: ", best_model._java_obj.parent().getMaxIter())

print("  RegParam: ", best_model._java_obj.parent().getRegParam())

### Test model

In [None]:
test_predictions = best_model.transform(test)

RMSE = evaluator.evaluate(test_predictions)

print(RMSE)

In [None]:
test_predictions.show()

### Model evaluation and recommendation

In [None]:
n_recommendations = best_model.recommendForAllUsers(10)
n_recommendations.limit(10).show()

In [None]:
n_recommendations = n_recommendations.withColumn('rec_exp', explode('recommendations')) \
                                     .select('userId', col('rec_exp.movieId'), col("rec_exp.rating"))

n_recommendations.limits(10).show()

In [None]:
n_recommendations.join(movies, on='movieId').filter('userId = 100').show()

In [None]:
ratings.join(movies, on='movieId').filter('userId = 100').sort('rating', ascending=False).limit(10).show()