In [1]:
import findspark
findspark.init()

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, LongType

In [16]:
def compute_cosine_similarity(spark, data):

    pairScores = data.withColumn('xx', func.pow(func.col('rating1'), 2)) \
                     .withColumn('yy', func.pow(func.col('rating2'), 2)) \
                     .withColumn('xy', func.col('rating1') * func.col('rating2'))
    
    calculate_similarity = pairScores.groupBy('movie1', 'movie2') \
                                     .agg(
                                         func.sum(func.col('xy')).alias('numerator'),
                                         func.sqrt(func.sum(func.col('xx')) * func.sum(func.col('yy'))).alias('denominator'),
                                         func.count(func.col('xy')).alias('num_of_pairs')
                                     )
    
    result = calculate_similarity.withColumn('score', 
                                             func.when(
                                                 func.col('denominator') != 0,
                                                 func.col('numerator') / func.col('denominator')
                                             ).otherwise(0)) \
                                             .select('movie1', 'movie2', 'score', 'num_of_pairs')
    
    return result


def get_movie_name(movie_names, movie_id):

    result = movie_names.filter(func.col('movie_id') == movie_id) \
                        .select('movie_title').collect()[0]
    return result[0]

In [4]:
spark_session = SparkSession.builder.appName('Calculate most similar movies').getOrCreate()

In [11]:
movieNamesSchema = StructType([
                               StructField("movie_id", IntegerType(), True), \
                               StructField("movie_title", StringType(), True) \
                            ])
    
moviesSchema = StructType([ \
                     StructField("user_id", IntegerType(), True), \
                     StructField("movie_id", IntegerType(), True), \
                     StructField("rating", IntegerType(), True), \
                     StructField("timestamp", LongType(), True)
                    ])

In [13]:
movieNames = spark_session.read \
                          .option('sep', '|') \
                          .option('charset', 'ISO-8859-1') \
                          .schema(movieNamesSchema) \
                          .csv('../../data/ml-100k/u.item')

movies = spark_session.read \
                      .option('sep', '\t') \
                      .schema(moviesSchema) \
                      .csv('../../data/ml-100k/u.data')

ratings = movies.select('user_id', 'movie_id', 'rating')
ratings.show(10)

+-------+--------+------+
|user_id|movie_id|rating|
+-------+--------+------+
|    196|     242|     3|
|    186|     302|     3|
|     22|     377|     1|
|    244|      51|     2|
|    166|     346|     1|
|    298|     474|     4|
|    115|     265|     2|
|    253|     465|     5|
|    305|     451|     3|
|      6|      86|     3|
+-------+--------+------+
only showing top 10 rows



In [18]:
movie_pairs = ratings.alias('ratings1') \
                     .join(ratings.alias('ratings2'), 
                           (func.col('ratings1.user_id') == func.col('ratings2.user_id')) & \
                           (func.col('ratings1.movie_id') < func.col('ratings2.movie_id'))) \
                     .select(func.col('ratings1.movie_id').alias('movie1'),
                             func.col('ratings2.movie_id').alias('movie2'),
                             func.col('ratings1.rating').alias('rating1'),
                             func.col('ratings2.rating').alias('rating2'))
                    
movie_pair_similarities = compute_cosine_similarity(spark_session, movie_pairs).cache()

In [20]:
movie_pair_similarities.show(10)

+------+------+------------------+------------+
|movie1|movie2|             score|num_of_pairs|
+------+------+------------------+------------+
|    51|   924|0.9465030160396292|          15|
|   451|   529|0.8700048504395461|          30|
|    86|   318|0.9562989269248869|          95|
|    40|   167|0.9488483124502475|          23|
|   274|  1211|0.9799118698777318|           7|
|  1042|  1067|               1.0|           2|
|   118|   946|0.9180475196475987|          40|
|   234|   461|0.9520240320539911|          54|
|    88|   523|0.9577170094921833|          74|
|   796|  1036|0.9587449708822046|           8|
+------+------+------------------+------------+
only showing top 10 rows



In [28]:
start_movie_id = 71

score_threshold = 0.97
co_occurance_threshold = 50

filtered_results = movie_pair_similarities.filter(((func.col("movie1") == start_movie_id) | (func.col("movie2") == start_movie_id)) & \
                                                  (func.col("score") > score_threshold) & (func.col("num_of_pairs") > co_occurance_threshold)) \
                                          .sort(func.col('score'), ascending=False) \
                                          .take(10)


print ("Top 10 similar movies for " + get_movie_name(movieNames, start_movie_id))

for result in filtered_results:
    # Display the similarity result that isn't the movie we're looking at
    similarMovieID = result.movie1
    if (similarMovieID == start_movie_id):
        similarMovieID = result.movie2
    
    print(f'{get_movie_name(movieNames, similarMovieID)}\tscore: {result.score}\tstrength: {result.num_of_pairs}')

Top 10 similar movies for Lion King, The (1994)
Manchurian Candidate, The (1962)	score: 0.9791417549036365	strength: 51
Great Escape, The (1963)	score: 0.9731943155031116	strength: 63
Aladdin (1992)	score: 0.9729017053707703	strength: 167
Winnie the Pooh and the Blustery Day (1968)	score: 0.9722198659285328	strength: 53
Toy Story (1995)	score: 0.9711875801700128	strength: 173
It's a Wonderful Life (1946)	score: 0.9707942254118238	strength: 128
African Queen, The (1951)	score: 0.9707738128601991	strength: 80


In [None]:
spark_session.stop()