In [37]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType

In [38]:
def computeCosineSimilarity(spark, data):
    # Compute xx, xy and yy columns
    pairScores = (
        data.withColumn("xx", func.col("rating1") * func.col("rating1"))
        .withColumn("yy", func.col("rating2") * func.col("rating2"))
        .withColumn("xy", func.col("rating1") * func.col("rating2"))
    )

    # Compute numerator, denominator and numPairs columns
    calculateSimilarity = pairScores.groupBy("movie1", "movie2").agg(
        func.sum(func.col("xy")).alias("numerator"),
        (
            func.sqrt(func.sum(func.col("xx"))) * func.sqrt(func.sum(func.col("yy")))
        ).alias("denominator"),
        func.count(func.col("xy")).alias("numPairs"),
    )

    # Calculate score and select only needed columns (movie1, movie2, score, numPairs)
    result = calculateSimilarity.withColumn(
        "score",
        func.when(
            func.col("denominator") != 0,
            func.col("numerator") / func.col("denominator"),
        ).otherwise(0),
    ).select("movie1", "movie2", "score", "numPairs")

    return result

In [39]:
# Get movie name by given movie id
def getMovieName(movieNames, movieId):
    result = (
        movieNames.filter(func.col("movieID") == movieId)
        .select("movieTitle")
        .collect()[0]
    )

    return result[0]

In [40]:
spark = (
    SparkSession.builder.appName("MovieSimilarities").master("local[*]").getOrCreate()
)

movieNamesSchema = StructType(
    [
        StructField("movieID", IntegerType(), True),
        StructField("movieTitle", StringType(), True),
    ]
)

moviesSchema = StructType(
    [
        StructField("userID", IntegerType(), True),
        StructField("movieID", IntegerType(), True),
        StructField("rating", IntegerType(), True),
        StructField("timestamp", LongType(), True),
    ]
)

In [41]:
# Create a broadcast dataset of movieID and movieTitle.
# Apply ISO-885901 charset
movieNames = (
    spark.read.option("sep", "|")
    .option("charset", "ISO-8859-1")
    .schema(movieNamesSchema)
    .csv("./ml-100k/u.item")
)
movieNames.show(5)

+-------+-----------------+
|movieID|       movieTitle|
+-------+-----------------+
|      1| Toy Story (1995)|
|      2| GoldenEye (1995)|
|      3|Four Rooms (1995)|
|      4|Get Shorty (1995)|
|      5|   Copycat (1995)|
+-------+-----------------+
only showing top 5 rows



In [42]:
# Load up movie data as dataset
movies = spark.read.option("sep", "\t").schema(moviesSchema).csv("./ml-100k/u.data")
movies.show(5)

+------+-------+------+---------+
|userID|movieID|rating|timestamp|
+------+-------+------+---------+
|   196|    242|     3|881250949|
|   186|    302|     3|891717742|
|    22|    377|     1|878887116|
|   244|     51|     2|880606923|
|   166|    346|     1|886397596|
+------+-------+------+---------+
only showing top 5 rows



In [43]:
ratings = movies.select("userId", "movieId", "rating")
moviePairs = (
    ratings.alias("ratings1")
    .join(
        ratings.alias("ratings2"),
        (func.col("ratings1.userId") == func.col("ratings2.userId"))
        & (func.col("ratings1.movieId") < func.col("ratings2.movieId")),
    )
    .select(
        func.col("ratings1.movieId").alias("movie1"),
        func.col("ratings2.movieId").alias("movie2"),
        func.col("ratings1.rating").alias("rating1"),
        func.col("ratings2.rating").alias("rating2"),
    )
)
moviePairs.show(5)

+------+------+-------+-------+
|movie1|movie2|rating1|rating2|
+------+------+-------+-------+
|   242|   269|      3|      3|
|   242|   845|      3|      4|
|   242|  1022|      3|      4|
|   242|   762|      3|      3|
|   242|   411|      3|      4|
+------+------+-------+-------+
only showing top 5 rows



In [44]:
moviePairSimilarities = computeCosineSimilarity(spark, moviePairs).cache()

moviePairSimilarities.show(5)

+------+------+------------------+--------+
|movie1|movie2|             score|numPairs|
+------+------+------------------+--------+
|    51|   924|0.9465030160396292|      15|
|   451|   529|0.8700048504395461|      30|
|    86|   318|0.9562989269248869|      95|
|    40|   167|0.9488483124502475|      23|
|   274|  1211|0.9799118698777318|       7|
+------+------+------------------+--------+
only showing top 5 rows



24/11/06 13:57:26 WARN CacheManager: Asked to cache already cached data.


In [45]:
scoreThreshold = 0.56
coOccurrenceThreshold = 50.0

movieID = 1

# Filter for movies with this sim that are "good" as defined by
# our quality thresholds above
filteredResults = moviePairSimilarities.filter(
    ((func.col("movie1") == movieID) | (func.col("movie2") == movieID))
    & (func.col("score") > scoreThreshold)
    & (func.col("numPairs") > coOccurrenceThreshold)
)

# Sort by quality score.
results = filteredResults.sort(func.col("score").desc()).take(10)

print("Top 10 similar movies for " + getMovieName(movieNames, movieID))

for result in results:
    # Display the similarity result that isn't the movie we're looking at
    similarMovieID = result.movie1
    if similarMovieID == movieID:
        similarMovieID = result.movie2

    print(
        getMovieName(movieNames, similarMovieID)
        + "\tscore: "
        + str(result.score)
        + "\tstrength: "
    )

Top 10 similar movies for Toy Story (1995)
Hamlet (1996)	score: 0.9745438715121281	strength: 
Raiders of the Lost Ark (1981)	score: 0.9740842172192801	strength: 
Cinderella (1950)	score: 0.9740029877471444	strength: 
Winnie the Pooh and the Blustery Day (1968)	score: 0.9734154958854764	strength: 
Cool Hand Luke (1967)	score: 0.9733423477201257	strength: 
Great Escape, The (1963)	score: 0.9732705816130491	strength: 
African Queen, The (1951)	score: 0.9731512715078089	strength: 
Apollo 13 (1995)	score: 0.9723951205383821	strength: 
12 Angry Men (1957)	score: 0.9719872951015222	strength: 
Wrong Trousers, The (1993)	score: 0.9718143066672611	strength: 
