In [1]:
# !pip install pyspark 
# !pip install -U -q PyDrive
# !apt install openjdk-8-jdk-headless -qq

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=76834d271e97543a893a235a21246aee48573d7b2cc2db7d419143424496824d
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0
The following additional packages will be installed:
  libxtst6 openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra
  fon

In [18]:

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [19]:
import pandas as pd
from pyspark.ml.feature import VectorAssembler, HashingTF, IDF, Normalizer, StopWordsRemover
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import lit
from pyspark.sql.functions import col, isnan, when, trim

import pyspark.sql.functions as psf
from pyspark.sql.types import DoubleType

In [20]:
movies = spark.read.csv('movies.csv',inferSchema=True, header =True)
ratings = spark.read.csv('ratings.csv',inferSchema=True, header =True)

In [21]:
ratings.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows



In [22]:
ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [23]:
ratings = ratings.drop("timestamp")
ratings.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
|     1|     70|   3.0|
|     1|    101|   5.0|
|     1|    110|   4.0|
|     1|    151|   5.0|
|     1|    157|   5.0|
|     1|    163|   5.0|
|     1|    216|   5.0|
|     1|    223|   3.0|
|     1|    231|   5.0|
|     1|    235|   4.0|
|     1|    260|   5.0|
|     1|    296|   3.0|
|     1|    316|   3.0|
|     1|    333|   5.0|
|     1|    349|   4.0|
+------+-------+------+
only showing top 20 rows



In [24]:
# Group data by userId, count ratings
userId_ratings = ratings.groupBy("userId").count().orderBy('count', ascending=False)
userId_ratings.show()

+------+-----+
|userId|count|
+------+-----+
|   414| 2698|
|   599| 2478|
|   474| 2108|
|   448| 1864|
|   274| 1346|
|   610| 1302|
|    68| 1260|
|   380| 1218|
|   606| 1115|
|   288| 1055|
|   249| 1046|
|   387| 1027|
|   182|  977|
|   307|  975|
|   603|  943|
|   298|  939|
|   177|  904|
|   318|  879|
|   232|  862|
|   480|  836|
+------+-----+
only showing top 20 rows



In [25]:
# Group data by userId, count ratings
movieId_ratings = ratings.groupBy("movieId").count().orderBy('count', ascending=False)
movieId_ratings.show()

+-------+-----+
|movieId|count|
+-------+-----+
|    356|  329|
|    318|  317|
|    296|  307|
|    593|  279|
|   2571|  278|
|    260|  251|
|    480|  238|
|    110|  237|
|    589|  224|
|    527|  220|
|   2959|  218|
|      1|  215|
|   1196|  211|
|     50|  204|
|   2858|  204|
|     47|  203|
|    780|  202|
|    150|  201|
|   1198|  200|
|   4993|  198|
+-------+-----+
only showing top 20 rows



**Build up the content-based filtering algorithm with pairwise approach in TF-IDF vector space**

The approach is based on the solution here:

https://stackoverflow.com/questions/46758768/calculating-the-cosine-similarity-between-all-the-rows-of-a-dataframe-in-pyspark

In [29]:
df = movies.select("movieId", "genres").withColumn("genres", psf.split( psf.lower(movies.genres), '\|') )
remover = StopWordsRemover(inputCol="genres", outputCol="filtered")
df = remover.transform(df)

In [30]:
df.show()

+-------+--------------------+--------------------+
|movieId|              genres|            filtered|
+-------+--------------------+--------------------+
|      1|[adventure, anima...|[adventure, anima...|
|      2|[adventure, child...|[adventure, child...|
|      3|   [comedy, romance]|   [comedy, romance]|
|      4|[comedy, drama, r...|[comedy, drama, r...|
|      5|            [comedy]|            [comedy]|
|      6|[action, crime, t...|[action, crime, t...|
|      7|   [comedy, romance]|   [comedy, romance]|
|      8|[adventure, child...|[adventure, child...|
|      9|            [action]|            [action]|
|     10|[action, adventur...|[action, adventur...|
|     11|[comedy, drama, r...|[comedy, drama, r...|
|     12|    [comedy, horror]|    [comedy, horror]|
|     13|[adventure, anima...|[adventure, anima...|
|     14|             [drama]|             [drama]|
|     15|[action, adventur...|[action, adventur...|
|     16|      [crime, drama]|      [crime, drama]|
|     17|   

Compute TF-IDF:

In [31]:
hashingTF = HashingTF(inputCol="filtered", outputCol="tf")
tf = hashingTF.transform(df)
idf = IDF(inputCol="tf", outputCol="tfidf").fit(tf)
tfidf = idf.transform(tf)

In [33]:
tfidf.show()

+-------+--------------------+--------------------+--------------------+--------------------+
|movieId|              genres|            filtered|                  tf|               tfidf|
+-------+--------------------+--------------------+--------------------+--------------------+
|      1|[adventure, anima...|[adventure, anima...|(262144,[30905,73...|(262144,[30905,73...|
|      2|[adventure, child...|[adventure, child...|(262144,[30905,19...|(262144,[30905,19...|
|      3|   [comedy, romance]|   [comedy, romance]|(262144,[73288,17...|(262144,[73288,17...|
|      4|[comedy, drama, r...|[comedy, drama, r...|(262144,[6512,732...|(262144,[6512,732...|
|      5|            [comedy]|            [comedy]|(262144,[73288],[...|(262144,[73288],[...|
|      6|[action, crime, t...|[action, crime, t...|(262144,[33803,17...|(262144,[33803,17...|
|      7|   [comedy, romance]|   [comedy, romance]|(262144,[73288,17...|(262144,[73288,17...|
|      8|[adventure, child...|[adventure, child...|(262144,[

Compute L2 norm:

In [34]:
normalizer = Normalizer(inputCol="tfidf", outputCol="norm")
data = normalizer.transform(tfidf)

In [35]:
data.show()

+-------+--------------------+--------------------+--------------------+--------------------+--------------------+
|movieId|              genres|            filtered|                  tf|               tfidf|                norm|
+-------+--------------------+--------------------+--------------------+--------------------+--------------------+
|      1|[adventure, anima...|[adventure, anima...|(262144,[30905,73...|(262144,[30905,73...|(262144,[30905,73...|
|      2|[adventure, child...|[adventure, child...|(262144,[30905,19...|(262144,[30905,19...|(262144,[30905,19...|
|      3|   [comedy, romance]|   [comedy, romance]|(262144,[73288,17...|(262144,[73288,17...|(262144,[73288,17...|
|      4|[comedy, drama, r...|[comedy, drama, r...|(262144,[6512,732...|(262144,[6512,732...|(262144,[6512,732...|
|      5|            [comedy]|            [comedy]|(262144,[73288],[...|(262144,[73288],[...|(262144,[73288],[...|
|      6|[action, crime, t...|[action, crime, t...|(262144,[33803,17...|(262144,

Compute matrix product (cos_similarity):

In [36]:
dot_udf = psf.udf(lambda x,y: float(x.dot(y)), DoubleType())

cos_similarity = data.alias("i").join(data.alias("j"), psf.col("i.movieId") < psf.col("j.movieId"))\
                     .select(
                         psf.col("i.movieId").alias("i"), 
                         psf.col("j.movieId").alias("j"), 
                         dot_udf("i.norm", "j.norm").alias("dot")).sort("i", "j")

**Build up the Alternating Least Square (ALS) matrix factorization model in collaborative filtering algorithm**

In [37]:
#Split training and testing data
train_data,test_data = ratings.randomSplit([0.8,0.2])

als = ALS(userCol='userId',itemCol='movieId',ratingCol='rating',coldStartStrategy="drop")


paramGrid = ParamGridBuilder() \
    .addGrid(als.regParam, [1, 0.1, 0.01]) \
    .addGrid(als.rank, [10, 20]) \
    .build()


In [38]:
crossval = CrossValidator(estimator=als,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="mae"),
                          numFolds=3)

In [39]:
cvModel = crossval.fit(train_data)

In [12]:
best_rank = cvModel.bestModel._java_obj.parent().getRank()
best_regParam = cvModel.bestModel._java_obj.parent().getRegParam()
best_model_params = {'rank': best_rank, 'regParam': best_regParam}

In [13]:
pred = cvModel.transform(test_data)

In [14]:
def get_movieId( movie_name, movies_data ):
    """
    return the movieId which is corresponding to the movie name

    Parameters
    ----------
    movie_name: string, the name of the movie w/ or w/o the year

    movies_data: spark Dataframe, movies data with columns 'movieId','title'

    Return
    ------
    the movieId
    """


    movieIds = []
    for movie in movie_name:
      Ids = movies_data.filter(movies_data.title.like('%{}%'.format(movie)) ).select('movieId').collect()
      movieIds = list(set(movieIds + [ row.movieId for row in Ids ]  ))
    return movieIds

**Make movie recommendation (item-based)**

In [15]:
def make_recommendation_item_based(similarity_matrix, ratings_data, movies_data,
                        fav_movie, n_recommendations, userId=-99 ):
    """
    Make top n movie recommendations. Currently, the movies the old user have watched are not excluded in the recommendation list yet.
    Parameters
    ----------

    similarity_matrix: spark Dataframe, the similarity matrix with columns 'i','j','dot'

    ratings_data: spark Dataframe, ratings data with columns 'userId', 'movieId', 'rating' 

    movies_data: spark Dataframe, movies data with columns 'movieId','title'

    fav_movie: str, name of user input movie

    n_recommendations: int, top n recommendations

    userId: int optional (default=-99), the user Id
            if userId = -99, the new user will be created
            if userId = -1, the latest inserted user is chosen

    """  
  
    movieIds = get_movieId(fav_movie, movies_data )

    if (userId == -99):
      userId = ratings_data.agg({"userId": "max"}).collect()[0][0] + 1
    elif (userId == -1):
      userId = ratings_data.agg({"userId": "max"}).collect()[0][0]

  
    df_similar_movieIds = similarity_matrix.filter( similarity_matrix.i.isin(movieIds) ).select('i','j','dot') 
  
    df_similar_movieIds = df_similar_movieIds.filter( ~similarity_matrix.j.isin(movieIds) ).select('i','j','dot')

    df_similar_movieIds = df_similar_movieIds.groupBy('j').agg( {'dot':'max'} ).select(col('j').alias('movieId'), col('max(dot)').alias('dot_max'))
 
    topn_predictions = df_similar_movieIds.orderBy('dot_max', ascending=False).limit(10)

    Ids = topn_predictions.select('movieId').collect()
    Ids = [ row.movieId for row in Ids ]
    topn_movies = movies_data.filter( movies_data.movieId.isin(Ids) ).select( 'title' )

#    The following line is better, but it will produce error message...
#    movieId#14 are ambiguous. It's probably because you joined several Datasets together, and some of these......
#    topn_movies = movies_data.join( topn_predictions, topn_predictions.movieId == movies_data.movieId ).orderBy( 'dot_max', ascending=False ).select( 'title' )

    return [row.title for row in topn_movies.collect()]

**Make movie recommendation (user-based)**

In [16]:
def make_recommendation_user_based(best_model_params, ratings_data, movies_data,
                        fav_movie, n_recommendations, userId=-99 ):
    """
    make top n movie recommendations
    Parameters
    ----------

    best_model_params: dict, the best parameters of the model from the CrossValidator

    ratings_data: spark Dataframe, ratings data with columns 'userId', 'movieId', 'rating' 

    movies_data: spark Dataframe, movies data with columns 'movieId','title'

    fav_movie: str, name of user input movie

    n_recommendations: int, top n recommendations

    userId: int optional (default=-99), the user Id
            if userId = -99, the new user will be created
            if userId = -1, the latest inserted user is chosen

    """

    movieIds = get_movieId(fav_movie, movies_data )

    if (userId == -99):
      userId = ratings_data.agg({"userId": "max"}).collect()[0][0] + 1
    elif (userId == -1):
      userId = ratings_data.agg({"userId": "max"}).collect()[0][0]

    max_rating = ratings_data.agg({"rating": "max"}).collect()[0][0]

    # build up the train data, which is the original data + the new inserted data.
    # We assume that the inserted favorate movie has the highest rating.
    train_data = ratings_data
    for movieId in movieIds:
      new_rows = spark.createDataFrame([(userId,movieId,max_rating,0)], ['userId', 'movieId', 'rating', 'timestamp'])
      train_data = ratings_data.union(new_rows)

    # train best ALS
    als = ALS(userCol='userId',itemCol='movieId',ratingCol='rating', \
              rank=best_model_params.get('rank'), \
              regParam=best_model_params.get('regParam'))

    model = als.fit( train_data )
    df_newuser = movies_data.filter(~movies_data.movieId.isin(movieIds)).select('movieId').withColumn("userId", lit(userId))

    predictions = model.transform(df_newuser)

    def to_null(c):
      return when(~(col(c).isNull() | isnan(col(c)) | (trim(col(c)) == "")), col(c))
    
    predictions = predictions.select([to_null(c).alias(c) for c in predictions.columns]).na.drop()

    topn_predictions = predictions.orderBy('prediction', ascending=False).limit(n_recommendations)
    topn_ids = topn_predictions.select('userId')
    topn_movies = movies_data.join( topn_predictions, topn_predictions.movieId == movies_data.movieId ).orderBy( 'prediction', ascending=False ).select( 'title' )

    return [row.title for row in topn_movies.collect()]
    

In [17]:
#my_favorite_movies = ['Iron Man']
my_favorite_movies = ['Genius Party']
# get recommends
n_recommendations = 10
recommends_item_based = make_recommendation_item_based(similarity_matrix = cos_similarity, ratings_data = ratings, movies_data = movies,
                        fav_movie = my_favorite_movies, n_recommendations = n_recommendations )

print("--------------Search based on similarity between movies--------------------------------------")
print('The users like' , my_favorite_movies , 'also like:')
for i, title in enumerate(recommends_item_based):
    print(i+1, title)
if( len(recommends_item_based) < n_recommendations ):
  print("Sadly, we couldn't offer so many recommendations :(")

recommends_user_based = make_recommendation_user_based(best_model_params = best_model_params, ratings_data = ratings, movies_data = movies,
                        fav_movie = my_favorite_movies, n_recommendations = n_recommendations )

print("--------------Search based on similarity between user's preference--------------------------------------")
print('The users like' , my_favorite_movies , 'also like:')
for i, title in enumerate(recommends_user_based):
    print(i+1, title)
if( len(recommends_user_based) < n_recommendations ):
  print("Sadly, we couldn't offer so many recommendations :(")

--------------Search based on similarity between movies--------------------------------------
The users like ['Genius Party'] also like:
1 Piper (2016)
2 The Red Turtle (2016)
3 Winnie the Pooh Goes Visiting (1971)
4 A Plasticine Crow (1981)
5 Cheburashka (1971)
6 Travels of an Ant (1983)
7 Wolf and Calf (1984)
8 LEGO DC Super Hero Girls: Brain Drain (2017)
9 Bunny (1998)
10 Love Live! The School Idol Movie (2015)
--------------Search based on similarity between user's preference--------------------------------------
The users like ['Genius Party'] also like:
1 Hamlet (1996)
2 Dune (2000)
3 Black Cat, White Cat (Crna macka, beli macor) (1998)
4 The Artist (2011)
5 Great Beauty, The (Grande Bellezza, La) (2013)
6 Hours, The (2002)
7 Play Time (a.k.a. Playtime) (1967)
8 Visitor, The (2007)
9 Frozen River (2008)
10 Unforgiven, The (1960)
