In [1]:
import sys

from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql.functions import lit

In [2]:
""" Helper functions. """

def load_movie_names():
    """ Loads a dictionary of movie_ids as keys and the corresponding movie_name
        as the value.
    """
    movie_names = dict()
    with open("ml-100k/u.item", encoding="ISO-8859-1") as f:
        for line in f:
            fields = line.split("|")
            movie_names[int(fields[0])] = fields[1]
    return movie_names

def parse_input(line):
    """ Transforms a line of data into a Row object.
    """
    fields = line.value.split()
    return Row(
        user_id   = int(fields[0]),
        movie_id  = int(fields[1]),
        rating    = int(fields[2])
    )

In [3]:
""" User who you want to predict movies for. """

user = 1

In [4]:
""" Spark setup. """

spark = SparkSession.builder.appName("MovieRecommendations").getOrCreate()

# Set log level
spark.sparkContext.setLogLevel("FATAL")

In [5]:
""" Load data into a format we can use. """

movie_names = load_movie_names()

lines = spark.read.text("ml-100k/u.data").rdd

# Converts the input data into an RDD
ratings_rdd = lines.map(parse_input)

# Converts previous RDD into a dataframe
# This will be often reused, so it's a good idea to make sure it's cached
ratings = spark.createDataFrame(ratings_rdd).cache()

In [6]:
""" Initialize and fit model on the data. """

# ALS initialization
als = ALS(
maxIter=5, regParam=0.01, userCol="user_id", itemCol="movie_id", ratingCol="rating"
)

# Fit the model to the ratings data
print("Starting to fit model...")
model = als.fit(ratings)
print("Finished fitting model.")

Starting to fit model...
Finished fitting model.


In [7]:
""" Print all ratings for the user. """

print("Ratings for user ID %s:" % user)
user_ratings = ratings.filter("user_id = %s" % user)
for rating in user_ratings.collect():
    print(
        "%s - %s" % (
            rating["rating"],
            movie_names[rating["movie_id"]]
        )
    )

Ratings for user ID 1:
4 - Three Colors: White (1994)
3 - Grand Day Out, A (1992)
4 - Desperado (1995)
4 - Glengarry Glen Ross (1992)
4 - Angels and Insects (1995)
5 - Groundhog Day (1993)
5 - Delicatessen (1991)
4 - Hunt for Red October, The (1990)
2 - Dirty Dancing (1987)
3 - Rock, The (1996)
4 - Ed Wood (1994)
4 - Star Trek: First Contact (1996)
5 - Pillow Book, The (1995)
5 - Horseman on the Roof, The (Hussard sur le toit, Le) (1995)
4 - Star Trek VI: The Undiscovered Country (1991)
3 - From Dusk Till Dawn (1996)
4 - So I Married an Axe Murderer (1993)
5 - Shawshank Redemption, The (1994)
3 - True Romance (1993)
5 - Star Trek: The Wrath of Khan (1982)
1 - Kull the Conqueror (1997)
4 - Independence Day (ID4) (1996)
5 - Wallace & Gromit: The Best of Aardman Animation (1996)
4 - Wizard of Oz, The (1939)
1 - Faster Pussycat! Kill! Kill! (1965)
4 - Citizen Kane (1941)
4 - Silence of the Lambs, The (1991)
4 - Blues Brothers, The (1980)
5 - Breaking the Waves (1996)
4 - Robert A. Heinlein

In [8]:
""" Get the recommendations using the fitted model. """

# Only pick movies that have been rated more than 100 times
rating_counts = ratings.groupBy("movie_id").count().filter("count > 100")

# Run model on a list containing all movies rated more than 100 times
print("Running recommender...")
recommendations = model.transform(
  rating_counts.select("movie_id").withColumn("user_id", lit(user))
)
print("Recommender finished.")

Running recommender...
Recommender finished.


In [9]:
""" Get the top 20 recommendations. """

top_recommendations = recommendations.sort(
recommendations.prediction.desc()
).take(20)

# Print these recommendations
print("Top 20 recommendations for user %s:" % user)
for recommendation in top_recommendations:
    print(
        "%s - %s" % (
            recommendation["prediction"], 
            movie_names[recommendation["movie_id"]]
        )
    )

Top 20 recommendations for user 1:
5.103930473327637 - Close Shave, A (1995)
5.083609104156494 - Wrong Trousers, The (1993)
5.041621685028076 - Godfather, The (1972)
4.954265594482422 - Blade Runner (1982)
4.946904182434082 - Secrets & Lies (1996)
4.926731586456299 - Sling Blade (1996)
4.922780990600586 - Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1963)
4.8647050857543945 - Usual Suspects, The (1995)
4.861267566680908 - Casablanca (1942)
4.835524082183838 - Citizen Kane (1941)
4.825414657592773 - Swingers (1996)
4.802913188934326 - Star Wars (1977)
4.796285152435303 - Lawrence of Arabia (1962)
4.712485313415527 - Manchurian Candidate, The (1962)
4.697003364562988 - Hudsucker Proxy, The (1994)
4.674718856811523 - Rear Window (1954)
4.674571514129639 - Alien (1979)
4.673681735992432 - Being There (1979)
4.671104431152344 - Fargo (1996)
4.6703691482543945 - 2001: A Space Odyssey (1968)


In [10]:
""" Kill Spark session. """

spark.stop()