In [28]:
from pathlib import Path
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, explode, substring
import pyspark.sql.functions as sf
from pyspark.sql.types import StringType, IntegerType, BooleanType, FloatType, TimestampType

DATA_DIR = Path("data/ml-latest-small")

In [2]:
spark = SparkSession.builder\
    .master("local")\
    .appName("Word Count")\
    .getOrCreate()

In [3]:
# TODO: Spark schemas?
def read_df(file: str, types: dict):
    df = spark.read\
        .format("csv")\
        .option("header", "true")\
        .load(str(DATA_DIR / file))
    return set_dtypes(df, types)

def set_dtypes(df, types: dict):
    for column, dtype in types.items():
        dtype = {
            str: StringType(),
            int: IntegerType(),
            float: FloatType(),
            bool: BooleanType(),
            "time": TimestampType() 
        }[dtype]
        df = df.withColumn(column, col(column).cast(dtype))
    return df

df_links = read_df("links.csv", {
    "movieId": int,
    "imdbId": int,
    "tmdbId": int
})
df_movies = read_df("movies.csv", {
    "movieId": int,
    "title": str,
    "genres": str
})
df_ratings = read_df("ratings.csv", {
    "userId": int,
    "movieId": int,
    "rating": float,
    "timestamp": "time"
})
df_tags = read_df("tags.csv", {
    "userId": int,
    "movieId": int,
    "tag": str,
    "timestamp": "time"
})

In [4]:
# Given a user, get the number of movies watched per genre
def search_user(user_id: int):
    rated_movies = df_ratings.filter(df_ratings.userId == user_id).select("movieId").distinct()
    tagged_movies = df_tags.filter(df_tags.userId == user_id).select("movieId").distinct()
    movies = rated_movies.union(tagged_movies).distinct().join(df_movies, on=["movieId"], how="inner")
    movies = movies.select(movies.movieId, explode(split(movies.genres, "\|")).alias("genre"))
    movies = movies.groupBy("genre").count()
    return movies.collect()
search_user(1)

[Row(genre='Crime', count=45),
 Row(genre='Romance', count=26),
 Row(genre='Thriller', count=55),
 Row(genre='Adventure', count=85),
 Row(genre='Drama', count=68),
 Row(genre='War', count=22),
 Row(genre='Fantasy', count=47),
 Row(genre='Mystery', count=18),
 Row(genre='Musical', count=22),
 Row(genre='Animation', count=29),
 Row(genre='Film-Noir', count=1),
 Row(genre='Horror', count=17),
 Row(genre='Western', count=7),
 Row(genre='Comedy', count=83),
 Row(genre='Children', count=42),
 Row(genre='Action', count=90),
 Row(genre='Sci-Fi', count=40)]

In [5]:
df_ratings.filter("userId = 21").filter("movieId=1").collect()

[Row(userId=21, movieId=1, rating=3.5, timestamp=None)]

In [6]:
# Given a list of users, search all movies watched by each user
def search_movies_by_users(user_ids: [int]):
    rated_movies = df_ratings.filter(df_ratings.userId.isin(user_ids))
    tagged_movies = df_tags.filter(df_tags.userId.isin(user_ids))
    movies = rated_movies.join(tagged_movies, on=["userId", "movieId"], how="outer")
    movies = movies.select("userId", "movieId").distinct()
    movies = movies.groupBy("movieId").count().join(df_movies, on=["movieId"]).select("title", "count")
    return movies.collect()
search_movies_by_users([1, 21])

Row(title='Pride & Prejudice (2005)', count=1),
 Row(title='Who Framed Roger Rabbit? (1988)', count=1),
 Row(title='Pink Floyd: The Wall (1982)', count=1),
 Row(title='Logan (2017)', count=1),
 Row(title='Teenage Mutant Ninja Turtles III (1993)', count=1),
 Row(title='Ratatouille (2007)', count=1),
 Row(title='Transformers: The Movie (1986)', count=1),
 Row(title='Kingsman: The Secret Service (2015)', count=1),
 Row(title='The Interview (2014)', count=1),
 Row(title='Moonraker (1979)', count=1),
 Row(title='Monty Python and the Holy Grail (1975)', count=1),
 Row(title='Jurassic Park III (2001)', count=1),
 Row(title='Predator (1987)', count=1),
 Row(title='Dennis the Menace (1993)', count=1),
 Row(title='Collateral (2004)', count=1),
 Row(title='Welcome to Woop-Woop (1997)', count=1),
 Row(title='Big Lebowski, The (1998)', count=1),
 Row(title='Frozen (2013)', count=1),
 Row(title='Die Hard: With a Vengeance (1995)', count=1),
 Row(title='Terminator, The (1984)', count=2),
 Row(title='

In [7]:
# Search movie by id/title, show the average rating, the number of users that have watched the movie
# TODO: Need to translate between ID & Title
def search_movies_watched_by_id(movie_id: int):
    rated_movies = df_ratings.where(f"movieId = {movie_id}")
    tagged_movies = df_tags.where(f"movieId = {movie_id}")
    movies = rated_movies.join(tagged_movies, on=["userId"], how="outer")
    movies = movies.select("userId").distinct()
    return movies.count()
search_movies_watched_by_id(1) # Toy Story

215

In [8]:
# Search movie by id/title, show the average rating, the number of users that have watched the movie
def search_movies_avg_rating_by_id(movie_id: int):
    rated_movies = df_ratings.where(f"movieId = {movie_id}").agg({"rating": "avg"})
    return rated_movies.first()[0]
search_movies_avg_rating_by_id(1) # Toy Story

3.9209302325581397

In [31]:
# Given a list of genres, search all movies belonging each genre
def search_movies_by_genre(genres: [str]):
    movies = df_movies.select("movieId", "title", explode(split(df_movies.genres, "\|")).alias("genre"))
    movies = movies.filter(movies.genre.isin(genres)).dropDuplicates(subset=["movieId"])
    return [row.title for row in movies.select("title").collect()]
#search_movies_by_genre(["Action", "Crime"])

In [14]:
# Search movies by year
def search_movies_by_year(year: int):
    movies = df_movies.select("movieId", "title", substring(df_movies.title, -5, 4).cast(IntegerType()).alias("year"))
    movies = movies.filter(movies.year == year)
    return movies.collect()
search_movies_by_year(1979)

[Row(movieId=1080, title="Monty Python's Life of Brian (1979)", year=1979),
 Row(movieId=1161, title='Tin Drum, The (Blechtrommel, Die) (1979)', year=1979),
 Row(movieId=1208, title='Apocalypse Now (1979)', year=1979),
 Row(movieId=1214, title='Alien (1979)', year=1979),
 Row(movieId=1232, title='Stalker (1979)', year=1979),
 Row(movieId=1244, title='Manhattan (1979)', year=1979),
 Row(movieId=1292, title='Being There (1979)', year=1979),
 Row(movieId=1327, title='Amityville Horror, The (1979)', year=1979),
 Row(movieId=1371, title='Star Trek: The Motion Picture (1979)', year=1979),
 Row(movieId=1955, title='Kramer vs. Kramer (1979)', year=1979),
 Row(movieId=2016, title='Apple Dumpling Gang Rides Again, The (1979)', year=1979),
 Row(movieId=2034, title='Black Hole, The (1979)', year=1979),
 Row(movieId=2109, title='Jerk, The (1979)', year=1979),
 Row(movieId=2409, title='Rocky II (1979)', year=1979),
 Row(movieId=2526, title='Meteor (1979)', year=1979),
 Row(movieId=2537, title='Beyon

In [40]:
# List the top n movies with highest rating, ordered by the rating
def top_n_movies_by_rating(n: int):
    top_n = df_ratings.groupBy("movieId")\
        .agg(sf.avg("rating").alias("rating"), sf.count("movieId").alias("count"))\
        .join(df_movies, on=["movieId"], how="inner")\
        .sort(col("rating").desc(), col("count").desc())\
        .limit(n)
    return top_n.collect()
top_n_movies_by_rating(10)

[Row(movieId=6818, rating=5.0, count=2, title='Come and See (Idi i smotri) (1985)', genres='Drama|War'),
 Row(movieId=3473, rating=5.0, count=2, title="Jonah Who Will Be 25 in the Year 2000 (Jonas qui aura 25 ans en l'an 2000) (1976)", genres='Comedy'),
 Row(movieId=1151, rating=5.0, count=2, title='Lesson Faust (1994)', genres='Animation|Comedy|Drama|Fantasy'),
 Row(movieId=78836, rating=5.0, count=2, title='Enter the Void (2009)', genres='Drama'),
 Row(movieId=53, rating=5.0, count=2, title='Lamerica (1994)', genres='Adventure|Drama'),
 Row(movieId=6442, rating=5.0, count=2, title='Belle époque (1992)', genres='Comedy|Romance'),
 Row(movieId=99, rating=5.0, count=2, title='Heidi Fleiss: Hollywood Madam (1995)', genres='Documentary'),
 Row(movieId=113829, rating=5.0, count=1, title='One I Love, The (2014)', genres='Comedy|Drama|Romance'),
 Row(movieId=148, rating=5.0, count=1, title='Awfully Big Adventure, An (1995)', genres='Drama'),
 Row(movieId=67618, rating=5.0, count=1, title='St

In [39]:
#### OLD VERSION:
# List the top n movies with highest rating, ordered by the rating
def top_n_movies_by_rating(n: int):
    top_n = df_ratings.groupBy("movieId")\
        .agg(sf.avg("rating").alias("rating"), sf.count("movieId").alias("count"))\
        .sort(col("rating").desc(), col("count").desc())\
        .limit(n)\
        .join(df_movies, on=["movieId"], how="inner")\
        .sort(col("rating").desc(), col("count").desc())
    return top_n.collect()
top_n_movies_by_rating(10)

[Row(movieId=53, rating=5.0, count=2, title='Lamerica (1994)', genres='Adventure|Drama'),
 Row(movieId=99, rating=5.0, count=2, title='Heidi Fleiss: Hollywood Madam (1995)', genres='Documentary'),
 Row(movieId=1151, rating=5.0, count=2, title='Lesson Faust (1994)', genres='Animation|Comedy|Drama|Fantasy'),
 Row(movieId=3473, rating=5.0, count=2, title="Jonah Who Will Be 25 in the Year 2000 (Jonas qui aura 25 ans en l'an 2000) (1976)", genres='Comedy'),
 Row(movieId=6442, rating=5.0, count=2, title='Belle époque (1992)', genres='Comedy|Romance'),
 Row(movieId=6818, rating=5.0, count=2, title='Come and See (Idi i smotri) (1985)', genres='Drama|War'),
 Row(movieId=78836, rating=5.0, count=2, title='Enter the Void (2009)', genres='Drama'),
 Row(movieId=84273, rating=5.0, count=1, title='Zeitgeist: Moving Forward (2011)', genres='Documentary'),
 Row(movieId=152711, rating=5.0, count=1, title='Who Killed Chea Vichea? (2010)', genres='Documentary'),
 Row(movieId=173963, rating=5.0, count=1, t