## Summary
*   This project builds a collaborative filtering, specifically alternating least square (ALS) matrix factorization, recommendation system implemented with Spark ML.
*   The model was trainined on the MovieLens 27,000,000 movie rating dataset. Best model achieved RMSE of 0.71.
*   Trained model can predict each user's rating to all the movies in the database. So for each user, the recommendation system can recommend movies he/she is likely to like and give high rating.
*   Also the item matrix can be used to find similar movies for each movie. When user is searching and browsing for certain movies, similar movies can be recommended to him/her.

## Process
1. Data exploration to have an overall understanding of the dataset.
2. Trian a ALS model, and tune hyperparameter using cross validation.
3. Model evaluation use RMSE
4. Use the trained model to recommend movies to user(s). Use item matrix to recommend similar movies of a specific movie.


In [None]:
# Install dependencies of Spark
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz 
!tar xf spark-3.0.1-bin-hadoop2.7.tgz

Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:6 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:7 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Get:8 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:10 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:12 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Get:13 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Fetched 252 kB in 2s (118 kB/s)
Reading package lists... Done


In [None]:
# Install Spark
!pip install -q findspark
!pip install py4j

!export JAVA_HOME=$(/usr/lib/jvm/java-8-openjdk-amd64 -v 1.8)
! echo $JAVA_HOME
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop2.7"
import findspark
findspark.init("spark-3.0.1-bin-hadoop2.7")# SPARK_HOME

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

/bin/bash: /usr/lib/jvm/java-8-openjdk-amd64: Is a directory



In [None]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("RecSys_ALS") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [None]:
!pip install 'PyArrow==0.15.1'
!pip install koalas



In [None]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import seaborn as sns
import math
import databricks.koalas as ks
%matplotlib inline
import os
os.environ["PYSPARK_PYTHON"] = "python3"

# Part 0: Data ETL and Data Exploration


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
movies_df = spark.read.load("drive/My Drive/movies.csv", format='csv', header = True)
ratings_df = spark.read.load("drive/My Drive/ratings.csv", format='csv', header = True)

In [None]:
movies_df.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [None]:
ratings_df.show(5)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    307|   3.5|1256677221|
|     1|    481|   3.5|1256677456|
|     1|   1091|   1.5|1256677471|
|     1|   1257|   4.5|1256677460|
|     1|   1449|   4.5|1256677264|
+------+-------+------+----------+
only showing top 5 rows



In [None]:
tmp1 = ratings_df.groupBy("userID").count().toPandas()['count'].min()
tmp2 = ratings_df.groupBy("movieId").count().toPandas()['count'].min()
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per movie is {}'.format(tmp2))

For the users that rated movies and the movies that were rated:
Minimum number of ratings per user is 1
Minimum number of ratings per movie is 1


In [None]:
%%time
tmp1 = sum(ratings_df.groupBy("movieId").count().toPandas()['count'] == 1)
tmp2 = ratings_df.select('movieId').distinct().count()
print('{} out of {} movies are rated by only one user'.format(tmp1, tmp2))

10155 out of 53889 movies are rated by only one user
CPU times: user 446 ms, sys: 105 ms, total: 551 ms
Wall time: 1min 2s


# Part 1: Spark SQL and OLAP

## Q1: The number of Users

In [None]:
movies_df.registerTempTable("movies")
ratings_df.registerTempTable("ratings")

In [None]:
number_of_users = spark.sql("""
select count(distinct userid) as Number_of_users 
from ratings
""")
number_of_users.show()

+---------------+
|Number_of_users|
+---------------+
|         283228|
+---------------+



## Q2: The number of Movies

In [None]:
number_of_movies = spark.sql("""
select count(distinct movieid) as Number_of_movies
from movies
""")
number_of_movies.show()

+----------------+
|Number_of_movies|
+----------------+
|           58098|
+----------------+



## Q3:  How many movies are rated by users? List movies not rated before

In [None]:
%%time
number_of_movies_rated = spark.sql("""
select count(distinct movieid) as Number_of_movies_rated
from ratings
""")
number_of_movies_rated.show()

+----------------------+
|Number_of_movies_rated|
+----------------------+
|                 53889|
+----------------------+

CPU times: user 6.23 ms, sys: 880 µs, total: 7.11 ms
Wall time: 29.9 s


In [None]:
%%time
movies_not_rated = spark.sql("""
select a.movieid, a.title, a.genres
from movies a left join ratings b on a.movieid = b.movieid
where b.movieid is Null
""")
movies_not_rated.show()

+-------+--------------------+--------------------+
|movieid|               title|              genres|
+-------+--------------------+--------------------+
| 135352|Heroes of the Wes...|  (no genres listed)|
| 135901|Count the Hours (...|               Crime|
| 138076|Diary of a Tired ...|Comedy|Drama|Romance|
| 145275|   Live Wires (1946)|     Action|Thriller|
| 151859|Hart to Hart Retu...|Adventure|Crime|M...|
| 156821|       Yakeen (1969)|Action|Crime|Myst...|
| 159526|Black Cobra Woman...|        Drama|Horror|
| 163288|Those Three Frenc...|  (no genres listed)|
| 165977|Behind the Make-U...|               Drama|
| 174251|     The Cure (1924)|    Animation|Comedy|
| 116060|Code Name: Emeral...|    Action|Drama|War|
| 117742|     The Fake (1953)|         Crime|Drama|
| 123069|Midnight Heat (1996)|Action|Drama|Thri...|
| 125469|Till the End of T...|       Drama|Romance|
| 132802|Nest of Vipers (1...|       Drama|Romance|
| 133189|To Kill A Clown (...|      Drama|Thriller|
| 136782|The

In [None]:
%%time
# list top 20 movies received the highest numer of rating, number_of_ratings, and avg_rating
highest_number = spark.sql("""
select a.movieid, a.title, a.genres, b.number_of_ratings, b.avg_rating
from movies a
join
(select movieid, count(*) as number_of_ratings, avg(rating) as avg_rating
from ratings
group by movieid
order by number_of_ratings desc
limit 20) b on a.movieid = b.movieid
order by number_of_ratings desc
""")
highest_number.show()

+-------+--------------------+--------------------+-----------------+------------------+
|movieid|               title|              genres|number_of_ratings|        avg_rating|
+-------+--------------------+--------------------+-----------------+------------------+
|    318|Shawshank Redempt...|         Crime|Drama|            97999| 4.424188001918387|
|    356| Forrest Gump (1994)|Comedy|Drama|Roma...|            97040| 4.056584913437757|
|    296| Pulp Fiction (1994)|Comedy|Crime|Dram...|            92406| 4.173971387139363|
|    593|Silence of the La...|Crime|Horror|Thri...|            87899|  4.15141241652351|
|   2571|  Matrix, The (1999)|Action|Sci-Fi|Thr...|            84545| 4.149695428470046|
|    260|Star Wars: Episod...|Action|Adventure|...|            81815| 4.120454684348836|
|    480|Jurassic Park (1993)|Action|Adventure|...|            76451|3.6650338125073576|
|    527|Schindler's List ...|           Drama|War|            71516| 4.257501817775044|
|    110|   Bravehear

## Q4. List movie genres

In [None]:
genres = spark.sql("""
select distinct genres
from movies
""")
genres.show()

+--------------------+
|              genres|
+--------------------+
|Comedy|Horror|Thr...|
|Adventure|Sci-Fi|...|
|Action|Adventure|...|
| Action|Drama|Horror|
|Comedy|Drama|Horr...|
|Action|Animation|...|
|Fantasy|Musical|M...|
|Adventure|Mystery...|
|Children|Comedy|D...|
|Action|Adventure|...|
|Animation|Childre...|
|Action|Adventure|...|
| Adventure|Animation|
|    Adventure|Sci-Fi|
|Documentary|Music...|
|  Documentary|Sci-Fi|
|Adventure|Childre...|
|Action|Adventure|...|
|Comedy|Drama|Fant...|
| Musical|Romance|War|
+--------------------+
only showing top 20 rows



In [None]:
%%time
genres_splitted =  spark.sql("""
select distinct explode(split(genres, '[|]')) as Category
from movies
order by 1
""")
genres_splitted.show()

+------------------+
|          Category|
+------------------+
|(no genres listed)|
|            Action|
|         Adventure|
|         Animation|
|          Children|
|            Comedy|
|             Crime|
|       Documentary|
|             Drama|
|           Fantasy|
|         Film-Noir|
|            Horror|
|              IMAX|
|           Musical|
|           Mystery|
|           Romance|
|            Sci-Fi|
|          Thriller|
|               War|
|           Western|
+------------------+

CPU times: user 1.33 ms, sys: 18 µs, total: 1.35 ms
Wall time: 1.55 s


## Q5: Movie for Each Category

In [None]:
%%time
movie_category = spark.sql("""
select movieid, explode(split(genres, '[|]')) as Category
from movies
""")
movie_category.show()

+-------+---------+
|movieid| Category|
+-------+---------+
|      1|Adventure|
|      1|Animation|
|      1| Children|
|      1|   Comedy|
|      1|  Fantasy|
|      2|Adventure|
|      2| Children|
|      2|  Fantasy|
|      3|   Comedy|
|      3|  Romance|
|      4|   Comedy|
|      4|    Drama|
|      4|  Romance|
|      5|   Comedy|
|      6|   Action|
|      6|    Crime|
|      6| Thriller|
|      7|   Comedy|
|      7|  Romance|
|      8|Adventure|
+-------+---------+
only showing top 20 rows

CPU times: user 2.04 ms, sys: 0 ns, total: 2.04 ms
Wall time: 155 ms


In [None]:
number_per_category =  spark.sql("""
select Category, count(*) as number
from (select movieid, explode(split(genres, '[|]')) as Category from movies)
group by 1
order by 1
""")
number_per_category.show()

+------------------+------+
|          Category|number|
+------------------+------+
|(no genres listed)|  4266|
|            Action|  7130|
|         Adventure|  4067|
|         Animation|  2663|
|          Children|  2749|
|            Comedy| 15956|
|             Crime|  5105|
|       Documentary|  5118|
|             Drama| 24144|
|           Fantasy|  2637|
|         Film-Noir|   364|
|            Horror|  5555|
|              IMAX|   197|
|           Musical|  1113|
|           Mystery|  2773|
|           Romance|  7412|
|            Sci-Fi|  3444|
|          Thriller|  8216|
|               War|  1820|
|           Western|  1378|
+------------------+------+



In [None]:
list_of_movies =  spark.sql("""
select Category, concat_ws(',', collect_set(title)) as list_of_movies
from (select title, explode(split(genres, '[|]')) as Category from movies)
group by 1
""")
list_of_movies.show()

+------------------+--------------------+
|          Category|      list_of_movies|
+------------------+--------------------+
|             Crime|Hot Rock, The (19...|
|           Romance|Brain Drain (2009...|
|          Thriller|The Sleeper (2012...|
|         Adventure|Masters of the Un...|
|             Drama|My Life (1993),Al...|
|               War|PT 109 (1963),Sol...|
|       Documentary|U2: Rattle and Hu...|
|           Fantasy|Masters of the Un...|
|           Mystery|Manila in the Cla...|
|           Musical|U2: Rattle and Hu...|
|         Animation|Bugs in Love (193...|
|         Film-Noir|Thieves' Highway ...|
|(no genres listed)|The Youth of Maxi...|
|              IMAX|Harry Potter and ...|
|            Horror|The Sleeper (2012...|
|           Western|The Maverick Quee...|
|            Comedy|The Sleeper (2012...|
|          Children|Home on the Range...|
|            Action|Masters of the Un...|
|            Sci-Fi|Mosquito (1995),S...|
+------------------+--------------

# Part2: Train ALS model using Spark ML 

In [None]:
ratings_df.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    307|   3.5|1256677221|
|     1|    481|   3.5|1256677456|
|     1|   1091|   1.5|1256677471|
|     1|   1257|   4.5|1256677460|
|     1|   1449|   4.5|1256677264|
|     1|   1590|   2.5|1256677236|
|     1|   1591|   1.5|1256677475|
|     1|   2134|   4.5|1256677464|
|     1|   2478|   4.0|1256677239|
|     1|   2840|   3.0|1256677500|
|     1|   2986|   2.5|1256677496|
|     1|   3020|   4.0|1256677260|
|     1|   3424|   4.5|1256677444|
|     1|   3698|   3.5|1256677243|
|     1|   3826|   2.0|1256677210|
|     1|   3893|   3.5|1256677486|
|     2|    170|   3.5|1192913581|
|     2|    849|   3.5|1192913537|
|     2|   1186|   3.5|1192913611|
|     2|   1235|   3.0|1192913585|
+------+-------+------+----------+
only showing top 20 rows



In [None]:
movie_ratings=ratings_df.drop('timestamp')

In [None]:
# Data type convert
from pyspark.sql.types import IntegerType, FloatType
movie_ratings = movie_ratings.withColumn("userId", movie_ratings["userId"].cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("movieId", movie_ratings["movieId"].cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("rating", movie_ratings["rating"].cast(FloatType()))

In [None]:
%%time
movie_ratings.count()

CPU times: user 9.73 ms, sys: 1.02 ms, total: 10.8 ms
Wall time: 10.1 s


27753444

In [None]:
movie_ratings.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|    307|   3.5|
|     1|    481|   3.5|
|     1|   1091|   1.5|
|     1|   1257|   4.5|
|     1|   1449|   4.5|
|     1|   1590|   2.5|
|     1|   1591|   1.5|
|     1|   2134|   4.5|
|     1|   2478|   4.0|
|     1|   2840|   3.0|
|     1|   2986|   2.5|
|     1|   3020|   4.0|
|     1|   3424|   4.5|
|     1|   3698|   3.5|
|     1|   3826|   2.0|
|     1|   3893|   3.5|
|     2|    170|   3.5|
|     2|    849|   3.5|
|     2|   1186|   3.5|
|     2|   1235|   3.0|
+------+-------+------+
only showing top 20 rows



## ALS Model Selection and Cross Validation

In [None]:
# import package
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder

In [None]:
#Create test and train set
(training,test)=movie_ratings.randomSplit([0.8,0.2])

In [None]:
#Create ALS model
als = ALS(userCol = 'userId', itemCol = 'movieId', ratingCol = 'rating', coldStartStrategy='drop', seed = 42)

In [None]:
#Tune model using ParamGridBuilder
paramGrid = (ParamGridBuilder()
            .addGrid(als.rank, [10, 20])
            .addGrid(als.maxIter, [5, 10])
            .addGrid(als.regParam, [0.05, 0.1])
            .build())

In [None]:
# Define evaluator as RMSE
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName='rmse', predictionCol='prediction', labelCol='rating')


In [None]:
# Build Cross validation 
cv = CrossValidator(estimator=als, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5, seed = 42)

In [None]:
%%time
#Fit ALS model to training data
cvModel = cv.fit(training)

CPU times: user 8.43 s, sys: 1.22 s, total: 9.66 s
Wall time: 2h 2min 48s


In [None]:
#Extract best model from the tuning exercise using ParamGridBuilder
best_model = cvModel.bestModel

In [None]:
# save best_model
best_model.save('drive/My Drive/best_model')
# best_model = ALSmodel.load('drive/My Drive/saved_model')


# Part3: Model Evaluation

In [None]:
#Generate predictions and evaluate using RMSE
predictions=best_model.transform(test)
rmse = evaluator.evaluate(predictions)
rmse

0.8017025752963514

In [None]:
#Print evaluation metrics and model parameters
print ("RMSE = "+str(rmse))
print ("**Best Model**")
print (" Rank:", best_model._java_obj.parent().getRank())
print (" MaxIter:", best_model._java_obj.parent().getMaxIter()) 
print (" RegParam:", best_model._java_obj.parent().getRegParam())

RMSE = 0.8017025752963514
**Best Model**
 Rank: 20
 MaxIter: 10
 RegParam: 0.05


In [None]:
predictions.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|107339|    148|   4.0| 3.2779598|
| 93112|    148|   3.0| 2.7392113|
|207939|    148|   3.0| 2.5178525|
| 60382|    148|   4.0| 3.1443737|
|211963|    148|   3.0| 2.1486604|
| 52772|    148|   3.0| 3.4739225|
|167692|    148|   3.0| 3.6178567|
|149383|    148|   3.0| 2.5317852|
| 73492|    148|   0.5|  2.672189|
|  8264|    148|   1.0| 2.2764797|
| 46427|    148|   3.0| 2.6688888|
|263797|    148|   3.0| 2.9111385|
|274916|    148|   4.0| 3.3579824|
|233502|    148|   4.0| 2.8918853|
|188347|    148|   3.0| 2.7162554|
| 87619|    148|   3.0| 2.4214826|
|160910|    148|   3.0| 3.4012394|
|235384|    148|   2.0| 2.4509559|
|115125|    148|   2.0| 2.6773906|
|263125|    148|   1.0|  2.753273|
+------+-------+------+----------+
only showing top 20 rows



In [None]:
# Evaluate model use all data
alldata=best_model.transform(movie_ratings)
rmse = evaluator.evaluate(alldata)
print ("RMSE = "+str(rmse))

RMSE = 0.7151870553776468


In [None]:
alldata.registerTempTable("alldata")

In [None]:
all_data = spark.sql('select * from alldata')
all_data.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|107339|    148|   4.0| 3.2779598|
| 93112|    148|   3.0| 2.7392113|
|106148|    148|   2.5|  2.862426|
|234926|    148|   4.0| 3.0685127|
|253535|    148|   4.0| 2.9279442|
| 50155|    148|   3.0| 3.0381732|
| 65991|    148|   4.0|  3.168431|
|146376|    148|   5.0|  4.170856|
|207939|    148|   3.0| 2.5178525|
| 41788|    148|   3.0| 3.0288837|
|220572|    148|   2.0| 2.7837794|
|244192|    148|   3.0| 3.0964794|
|273242|    148|   4.0| 3.3510816|
| 52620|    148|   1.0| 2.4853115|
| 98426|    148|   3.0| 2.1869304|
|102642|    148|   4.0| 3.5956018|
|108082|    148|   3.0| 2.8170261|
|264081|    148|   3.0| 2.9015353|
| 60382|    148|   4.0| 3.1443737|
|275860|    148|   3.0| 2.4927943|
+------+-------+------+----------+
only showing top 20 rows



In [None]:
%%time
all_movie = spark.sql('select * from movies join alldata on movies.movieId=alldata.movieId') 
all_movie.show()

+-------+--------------------+------+------+-------+------+----------+
|movieId|               title|genres|userId|movieId|rating|prediction|
+-------+--------------------+------+------+-------+------+----------+
|    148|Awfully Big Adven...| Drama|107339|    148|   4.0| 3.2779598|
|    148|Awfully Big Adven...| Drama| 93112|    148|   3.0| 2.7392113|
|    148|Awfully Big Adven...| Drama|106148|    148|   2.5|  2.862426|
|    148|Awfully Big Adven...| Drama|234926|    148|   4.0| 3.0685127|
|    148|Awfully Big Adven...| Drama|253535|    148|   4.0| 2.9279442|
|    148|Awfully Big Adven...| Drama| 50155|    148|   3.0| 3.0381732|
|    148|Awfully Big Adven...| Drama| 65991|    148|   4.0|  3.168431|
|    148|Awfully Big Adven...| Drama|146376|    148|   5.0|  4.170856|
|    148|Awfully Big Adven...| Drama|207939|    148|   3.0| 2.5178525|
|    148|Awfully Big Adven...| Drama| 41788|    148|   3.0| 3.0288837|
|    148|Awfully Big Adven...| Drama|220572|    148|   2.0| 2.7837794|
|    1

# Part4: ALS model apply

## Recommend movie to users with id: 575, 232

In [None]:
%%time
userSubset = spark.sql("""
select *
from alldata
where userid in ('575', '232')
""")
userSubset.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   575|   2025|   1.0| 1.6193321|
|   232|   3028|   3.0|  3.091841|
|   232|     34|   5.0| 4.4874616|
|   232| 120474|   5.0| 2.9531176|
|   232|     26|   4.5|  4.016177|
|   232|     27|   5.0| 4.2145863|
|   575|   2857|   3.5|  2.974967|
|   575|   2409|   0.5| 1.0256069|
|   232|     22|   3.0| 3.0903065|
|   575|   7160|   4.5| 3.4142458|
|   575|   5388|   3.5| 2.2770367|
|   575|  48394|   3.0| 3.2603304|
|   575|  31410|   4.0| 3.3853567|
|   232|  78499|   4.0| 4.0309086|
|   232|      1|   5.0| 4.0945096|
|   232|     13|   4.0| 3.8664474|
|   575|  44555|   3.0|  4.089219|
|   232|   3723|   4.0| 3.2453837|
|   575|   4857|   3.0| 3.0880516|
|   232| 106022|   3.0| 2.7590058|
+------+-------+------+----------+
only showing top 20 rows

CPU times: user 34.9 ms, sys: 7.77 ms, total: 42.7 ms
Wall time: 41.2 s


In [None]:
%%time
movie_recommendation1 = best_model.recommendForUserSubset(userSubset, 10)
movie_recommendation1.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   232|[[150268, 6.25494...|
|   575|[[58409, 6.533530...|
+------+--------------------+

CPU times: user 66.6 ms, sys: 10 ms, total: 76.6 ms
Wall time: 1min 13s


In [None]:
%%time
movie_recommendation1.createOrReplaceTempView("movie_recom1")

CPU times: user 1.34 ms, sys: 0 ns, total: 1.34 ms
Wall time: 39.1 ms


In [None]:
%%time
movie_recommendation2 = spark.sql("""
select userid, explode(movie_recom1.recommendations) as recommendations
from movie_recom1
""")
movie_recommendation2.show()

+------+-------------------+
|userid|    recommendations|
+------+-------------------+
|   232|[150268, 6.2549453]|
|   232|  [79045, 6.094378]|
|   232|  [73826, 6.088421]|
|   232| [179115, 5.896111]|
|   232|[154921, 5.8046684]|
|   232|    [3228, 5.76131]|
|   232| [125938, 5.741655]|
|   232| [120574, 5.657745]|
|   232| [142891, 5.623824]|
|   232|[113280, 5.6165447]|
|   575| [58409, 6.5335307]|
|   575|  [90533, 6.458926]|
|   575|   [66811, 6.36649]|
|   575| [135031, 6.309785]|
|   575| [125273, 6.294984]|
|   575| [63647, 6.2045636]|
|   575| [51559, 6.1433053]|
|   575| [185905, 5.971272]|
|   575|[185035, 5.9317827]|
|   575|  [84730, 5.916791]|
+------+-------------------+

CPU times: user 60.4 ms, sys: 11.4 ms, total: 71.8 ms
Wall time: 1min 7s


In [None]:
%%time
movie_recommendation2.createOrReplaceTempView("movie_recom2")

CPU times: user 617 µs, sys: 0 ns, total: 617 µs
Wall time: 33.6 ms


In [None]:
%%time
movie_recommendation3 = spark.sql("""
select userid, recommendations.* 
from movie_recom2
""")
movie_recommendation3.show()

+------+-------+---------+
|userid|movieId|   rating|
+------+-------+---------+
|   232| 150268|6.2549453|
|   232|  79045| 6.094378|
|   232|  73826| 6.088421|
|   232| 179115| 5.896111|
|   232| 154921|5.8046684|
|   232|   3228|  5.76131|
|   232| 125938| 5.741655|
|   232| 120574| 5.657745|
|   232| 142891| 5.623824|
|   232| 113280|5.6165447|
|   575|  58409|6.5335307|
|   575|  90533| 6.458926|
|   575|  66811|  6.36649|
|   575| 135031| 6.309785|
|   575| 125273| 6.294984|
|   575|  63647|6.2045636|
|   575|  51559|6.1433053|
|   575| 185905| 5.971272|
|   575| 185035|5.9317827|
|   575|  84730| 5.916791|
+------+-------+---------+

CPU times: user 61.9 ms, sys: 6.15 ms, total: 68 ms
Wall time: 1min 5s


In [None]:
%%time
movie_recommendation3.createOrReplaceTempView("movie_recom3")

CPU times: user 57 µs, sys: 1.01 ms, total: 1.07 ms
Wall time: 40.3 ms


In [None]:
%%time
movie_recommendation4 = spark.sql("""
select a.userid, a.movieid, a.rating, b.title
from movie_recom3 a join movies b on a.movieid = b.movieid
""")
movie_recommendation4.show()

+------+-------+---------+--------------------+
|userid|movieid|   rating|               title|
+------+-------+---------+--------------------+
|   232| 150268|6.2549453|      Dilwale (2015)|
|   232|  79045| 6.094378|We Stand Alone To...|
|   232|  73826| 6.088421|Violence at Noon ...|
|   232| 179115| 5.896111| The Workshop (2017)|
|   232| 154921|5.8046684|Future My Love (2...|
|   232|   3228|  5.76131|Wirey Spindell (2...|
|   232| 125938| 5.741655|Reggie Watts: Why...|
|   232| 120574| 5.657745|Gimme Shelter (2013)|
|   232| 142891| 5.623824|The Legend of Pau...|
|   232| 113280|5.6165447|   White Frog (2012)|
|   575|  58409|6.5335307|District, The (Ny...|
|   575|  90533| 6.458926|Marseillaise, La ...|
|   575|  66811|  6.36649|Glenn Killing på ...|
|   575| 135031| 6.309785|Toto, Peppino, an...|
|   575| 125273| 6.294984|      Tenchu! (1969)|
|   575|  63647|6.2045636|Hanzo the Razor: ...|
|   575|  51559|6.1433053|Eight Deadly Shot...|
|   575| 185905| 5.971272|The Third Murd

## Find the similar moives for moive with id: 463, 471, 2
Find the similar moives based on the item matrix

In [None]:
items_kdf = best_model.itemFactors.to_koalas()
items_kdf.head(10)

Unnamed: 0,id,features
0,10,"[0.48953258991241455, -0.1679736077785492, -0...."
1,20,"[-0.03313341736793518, -0.20652873814105988, -..."
2,30,"[0.07184356451034546, -0.03379387408494949, -0..."
3,40,"[0.4006499648094177, -0.47610244154930115, 0.1..."
4,50,"[0.39639580249786377, -0.12326012551784515, 0...."
5,60,"[0.26513993740081787, -0.1936086267232895, 0.3..."
6,70,"[0.26196807622909546, -0.10213450342416763, -0..."
7,80,"[-0.09198872745037079, -0.28897616267204285, 0..."
8,90,"[0.2360554039478302, -0.05575760453939438, -0...."
9,100,"[0.09779367595911026, -0.28060540556907654, -0..."


In [None]:
def top10_similar(targetId):
  try:
    target_feature = items_kdf[items_kdf.id == str(targetId)]['features'].to_numpy()[0]
  except:
    return 'movie ' + str(targetId) + ' is not found'

  similarities = pd.DataFrame(columns = ['movieId', 'cosine_similarity'])
  for id, feature in items_kdf.to_numpy():
    similarity = np.dot(target_feature, feature)/(np.linalg.norm(target_feature) * np.linalg.norm(feature))
    similarities = similarities.append({'movieId': str(id), 'cosine_similarity': similarity}, ignore_index = True)
  similarities = similarities.sort_values(by=['cosine_similarity'], ascending = False)[1:11]
  joint = similarities.merge(movies_df.toPandas(), left_on = 'movieId', right_on = 'movieId', how = 'inner')
  return joint


In [None]:
# find top 10 similar moview with movie ID 463
top10_similar(463)

Unnamed: 0,movieId,cosine_similarity,title,genres
0,6240,0.905931,One Good Cop (1991),Action|Crime|Drama
1,6810,0.899345,Sleeping with the Enemy (1991),Drama|Thriller
2,7626,0.896538,Switch (1991),Comedy|Crime|Fantasy
3,1661,0.892277,Switchback (1997),Crime|Mystery|Thriller
4,1003,0.891103,Extreme Measures (1996),Drama|Thriller
5,59840,0.885669,Cold Sweat (De la part des copains) (1970),Action|Drama|Thriller
6,1432,0.880651,Metro (1997),Action|Comedy|Crime|Drama|Thriller
7,137036,0.874421,The Surrogate (2013),Drama|Mystery|Thriller
8,5282,0.868342,High Crimes (2002),Thriller
9,5655,0.867672,"Fan, The (1981)",Drama|Thriller


In [None]:
# find top 10 similar movies with movie ID 471
top10_similar(471)

Unnamed: 0,movieId,cosine_similarity,title,genres
0,4467,0.905163,"Adventures of Baron Munchausen, The (1988)",Adventure|Comedy|Fantasy
1,5072,0.903572,Metropolis (2001),Animation|Sci-Fi
2,46976,0.90201,Stranger than Fiction (2006),Comedy|Drama|Fantasy|Romance
3,125854,0.897983,The Diary of Anne Frank (1980),Drama
4,138028,0.897921,Tsumugi (2004),Drama|Romance
5,140531,0.897281,Coo of The Far Seas (1993),(no genres listed)
6,143661,0.897281,Rage (2010),Drama|Horror|Sci-Fi
7,183897,0.894299,Isle of Dogs (2018),Animation|Comedy
8,87234,0.893941,Submarine (2010),Comedy|Drama|Romance
9,97906,0.893175,Splinterheads (2009),Comedy


In [None]:
# find top 10 similar movies with movie ID 2
top10_similar(2)

Unnamed: 0,movieId,cosine_similarity,title,genres
0,141662,0.903451,Hawks (1988),Comedy|Drama
1,171003,0.891981,Rufus (2016),Children|Comedy
2,138448,0.891981,Midnight Sun (2014),Adventure
3,99448,0.888521,Trek Nation (2010),Documentary
4,4545,0.887341,Short Circuit (1986),Comedy|Sci-Fi
5,94537,0.88488,Angels Crest (2011),Drama
6,131504,0.884467,Jellyfish Eyes (2013),Comedy|Fantasy
7,141191,0.880831,OPA! (2005),Comedy|Romance
8,192749,0.880043,Along With the Gods: The Last 49 Days (2018),Action|Adventure|Drama|Fantasy
9,177081,0.873256,Colour from the Dark (2008),Horror
