### Set Up Notebook

#### Import Required Modules

In [0]:
import os
import time
import json
import requests
from typing import List, Dict

from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.window import Window

#### Get or Create SparkSession

In [0]:
spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("error")
spark

#### Set Workflow Constants

In [0]:
RAW_PATH = "dbfs:/FileStore/data/raw/ml-latest-small"
CLEAN_PATH = "dbfs:/FileStore/data/clean"

In [0]:
MIN_USER_RATINGS = 10
MIN_MOVIE_RATINGS = 10
MAX_CAST_MEMBERS = 5

### Import and Clean Ratings Data

#### Import Ratings Data

In [0]:
ratings = spark \
    .read.csv(os.path.join(RAW_PATH, "ratings.csv"), header=True) \
    .select(
        f.col('userId').cast('string').alias('user_id'),
        f.col('movieId').cast('string').alias('movie_id'),
        f.col('rating').cast('double').alias('rating'),
        f.from_unixtime('timestamp').alias('timestamp')
    )

ratings.printSchema()
ratings.show(5, truncate=False)
ratings.agg(
    f.countDistinct('user_id').alias('users'), 
    f.countDistinct('movie_id').alias('movies'), 
    f.count('*').alias('ratings'),
    f.min('timestamp').alias('min_time'),
    f.max('timestamp').alias('max_time')
).show()

root
 |-- user_id: string (nullable = true)
 |-- movie_id: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: string (nullable = true)

+-------+--------+------+-------------------+
|user_id|movie_id|rating|timestamp          |
+-------+--------+------+-------------------+
|1      |1       |4.0   |2000-07-30 18:45:03|
|1      |3       |4.0   |2000-07-30 18:20:47|
|1      |6       |4.0   |2000-07-30 18:37:04|
|1      |47      |5.0   |2000-07-30 19:03:35|
|1      |50      |5.0   |2000-07-30 18:48:51|
+-------+--------+------+-------------------+
only showing top 5 rows

+-----+------+-------+-------------------+-------------------+
|users|movies|ratings|           min_time|           max_time|
+-----+------+-------+-------------------+-------------------+
|  610|  9724| 100836|1996-03-29 18:36:55|2018-09-24 14:27:30|
+-----+------+-------+-------------------+-------------------+



#### Import the Links Data

In [0]:
links = spark \
    .read.csv(os.path.join(RAW_PATH, "links.csv"), header=True) \
    .select(
        f.col('movieId').cast('string').alias('movie_id'),
        f.col('imdbId').cast('string').alias('imdb_id'),
        f.col('tmdbId').cast('string').alias('tmdb_id')
    )

links.printSchema()
links.show(5, truncate=False)
links.agg(f.countDistinct('tmdb_id').alias('tmdb_id'), f.countDistinct('movie_id').alias('movie_id'), f.count('*').alias('records')).show()

root
 |-- movie_id: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- tmdb_id: string (nullable = true)

+--------+-------+-------+
|movie_id|imdb_id|tmdb_id|
+--------+-------+-------+
|1       |0114709|862    |
|2       |0113497|8844   |
|3       |0113228|15602  |
|4       |0114885|31357  |
|5       |0113041|11862  |
+--------+-------+-------+
only showing top 5 rows

+-------+--------+-------+
|tmdb_id|movie_id|records|
+-------+--------+-------+
|   9733|    9742|   9742|
+-------+--------+-------+



#### Clean the Ratings Data and Save as Parquet

In [0]:
clean_ratings = ratings \
    .join(links, on="movie_id", how="inner") \
    .dropna(how="any", subset=["user_id", "tmdb_id", "rating", "timestamp"]) \
    .withColumn("rank", f.row_number().over(Window.partitionBy("user_id", "tmdb_id").orderBy(f.desc("timestamp")))) \
    .filter(f.col("rank") == 1) \
    .withColumn("cnt_user_ratings", f.count("*").over(Window.partitionBy("user_id"))) \
    .withColumn("cnt_movie_ratings", f.count("*").over(Window.partitionBy("tmdb_id"))) \
    .filter(f.col("cnt_user_ratings") >= MIN_USER_RATINGS) \
    .filter(f.col("cnt_movie_ratings") >= MIN_MOVIE_RATINGS) \
    .select("user_id", "tmdb_id", "rating", "timestamp")

counts = clean_ratings.agg(f.countDistinct('user_id').alias('users'), f.countDistinct('tmdb_id').alias('movies'), f.count('*').alias('ratings')).first()
sparsity = round(1 - (counts['ratings'] / (counts['users'] * counts['movies'])), 4)
print(f"users={counts['users']} movies={counts['movies']} ratings={counts['ratings']} sparsity={sparsity}")

clean_ratings.agg(f.countDistinct("user_id", "tmdb_id").alias("ratings"), f.count("*").alias("records")).show()
clean_ratings.printSchema()
clean_ratings.show(5)

users=610 movies=2269 ratings=81116 sparsity=0.9414
+-------+-------+
|ratings|records|
+-------+-------+
|  81116|  81116|
+-------+-------+

root
 |-- user_id: string (nullable = true)
 |-- tmdb_id: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: string (nullable = true)

+-------+-------+------+-------------------+
|user_id|tmdb_id|rating|          timestamp|
+-------+-------+------+-------------------+
|    140|    100|   4.0|2003-06-08 17:14:57|
|    166|    100|   3.5|2007-09-06 00:24:53|
|     18|    100|   4.5|2016-02-27 14:48:19|
|    187|    100|   1.0|2006-10-26 07:52:18|
|    199|    100|   3.0|2001-07-15 02:47:22|
+-------+-------+------+-------------------+
only showing top 5 rows



In [0]:
clean_ratings.repartition(10).write.parquet(os.path.join(CLEAN_PATH, "ratings"), mode="overwrite")

### Gather Movie Information for All Movies in Clean Ratings Data

#### Determine the Set of Movies to Fetch

In [0]:
ratings_movies = sorted([int(row["tmdb_id"]) for row in clean_ratings.select("tmdb_id").distinct().collect()])
len(ratings_movies)

Out[9]: 2269

#### Define a Function to Get All Movie Details

In [0]:
def get_movies(tmdb_ids: List[int], max_rps=20) -> List[Dict]:
    """get movie information from the TMDB API"""

    # define needed TMDB endpoints
    details_endpoint = "https://api.themoviedb.org/3/movie/{tmdb_id}"
    credits_endpoint = "https://api.themoviedb.org/3/movie/{tmdb_id}/credits"
    keywords_endpoint = "https://api.themoviedb.org/3/movie/{tmdb_id}/keywords"

    # define common request headers
    headers = {
        "accept": "application/json",
        "Authorization": f"Bearer {os.environ['TMDB_ACCESS_TOKEN']}"
    }

    # create a Session object to set common parameters and handle connection pooling
    session = requests.Session()
    session.headers.update(headers)

    movies = [None] * len(tmdb_ids)
    for i, tmdb_id in enumerate(tmdb_ids):

        try:

            # get movie details
            response = session.get(details_endpoint.format(tmdb_id=tmdb_id))
            response.raise_for_status()
            response_body = response.json()
            movie_details = {
                "tmdb_id": str(tmdb_id),
                "tmdb_homepage": f"https://www.themoviedb.org/movie/{tmdb_id}",
                "title": response_body["title"],
                "status": response_body["status"],
                "language": response_body["original_language"],
                "release_date": response_body["release_date"],
                "runtime": response_body["runtime"],
                "overview": response_body["overview"],
                "genres": [genre["name"] for genre in response_body["genres"]],
                "budget": response_body["budget"],
                "revenue": response_body["revenue"],
                "popularity": response_body["popularity"],
                "vote_average": response_body["vote_average"],
                "vote_count": response_body["vote_count"]
            }

            # get movie credits
            response = session.get(credits_endpoint.format(tmdb_id=tmdb_id))
            response.raise_for_status()
            response_body = response.json()
            director = [item for item in response_body["crew"] if item["job"] == "Director"]
            top_cast = sorted(response_body["cast"], key=lambda x: x["order"])[:MAX_CAST_MEMBERS]
            movie_credits = {
                "director": director[0]["name"] if len(director) >= 1 else "",
                "cast": [actor["name"] for actor in top_cast]
            }

            # get movie keywords
            response = session.get(keywords_endpoint.format(tmdb_id=tmdb_id))
            response.raise_for_status()
            response_body = response.json()
            movie_keywords = {
                "keywords": [item["name"] for item in response_body["keywords"]]
            }

            # combine all movie information together and add to running list
            movies[i] = {**movie_details, **movie_credits, **movie_keywords}

        # log any movie-specific HTTP errors and continue processing
        except requests.HTTPError as err:
            print(f"error for tmdb_id={tmdb_id}")
            print(err)

        # add a delay to avoid rate limiting
        if (i > 0) and i % max_rps == 0:
            print(f"current iteration={i} current tmdb_id={tmdb_id}")
            time.sleep(0.5)

    # remove the None entries for movies with API errors and return
    movies = [movie for movie in movies if movie is not None]
    return movies

#### Get All Movie Details

In [0]:
movies = get_movies(tmdb_ids=ratings_movies)
len(movies)

current iteration=20 current tmdb_id=63
current iteration=40 current tmdb_id=88
current iteration=60 current tmdb_id=114
current iteration=80 current tmdb_id=150
current iteration=100 current tmdb_id=177
current iteration=120 current tmdb_id=215
current iteration=140 current tmdb_id=252
current iteration=160 current tmdb_id=293
current iteration=180 current tmdb_id=334
current iteration=200 current tmdb_id=395
current iteration=220 current tmdb_id=429
current iteration=240 current tmdb_id=497
current iteration=260 current tmdb_id=548
current iteration=280 current tmdb_id=581
current iteration=300 current tmdb_id=603
current iteration=320 current tmdb_id=628
current iteration=340 current tmdb_id=657
current iteration=360 current tmdb_id=688
current iteration=380 current tmdb_id=714
current iteration=400 current tmdb_id=773
current iteration=420 current tmdb_id=814
current iteration=440 current tmdb_id=850
current iteration=460 current tmdb_id=879
current iteration=480 current tmdb_id=92

#### Convert Movie Details to a Spark DataFrame

In [0]:
clean_movies = spark.createDataFrame(movies)
clean_movies.printSchema()
clean_movies.show(1, truncate=False, vertical=True)
clean_movies.count()

root
 |-- budget: long (nullable = true)
 |-- cast: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- director: string (nullable = true)
 |-- genres: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- keywords: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- language: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: double (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: long (nullable = true)
 |-- runtime: long (nullable = true)
 |-- status: string (nullable = true)
 |-- title: string (nullable = true)
 |-- tmdb_homepage: string (nullable = true)
 |-- tmdb_id: string (nullable = true)
 |-- vote_average: double (nullable = true)
 |-- vote_count: long (nullable = true)

-RECORD 0---------------------------------------------------------------------------------------------------------------------------------------------------------------------------

#### Basic EDA on the Clean Movies Dataset

In [0]:
clean_movies.groupby("status").count().sort(f.desc("count")).show()
clean_movies.groupby("language").count().sort(f.desc("count")).show(10)
clean_movies.groupby(f.year("release_date")).count().sort(f.year("release_date")).show(100)

clean_movies.groupby(f.col("budget") == 0).count().show()
clean_movies.groupby(f.col("revenue") == 0).count().show()
clean_movies.groupby(f.col("runtime") == 0).count().show()

clean_movies.groupby(f.col("director") == "").count().show()
clean_movies.groupby(f.col("vote_average").isNull()).count().show()
clean_movies.groupby(f.col("vote_count").isNull()).count().show()

clean_movies.agg(f.mean(f.length("overview")).alias("avg_overview_length")).show()
clean_movies.groupby(f.size("cast")).count().sort(f.desc('count')).show()
clean_movies.groupby(f.size("genres")).count().sort(f.desc('count')).show()
clean_movies.groupby(f.size("keywords")).count().sort(f.desc('count')).show()

+--------+-----+
|  status|count|
+--------+-----+
|Released| 2264|
+--------+-----+

+--------+-----+
|language|count|
+--------+-----+
|      en| 2156|
|      ja|   25|
|      fr|   24|
|      es|   12|
|      de|   10|
|      cn|    9|
|      it|    9|
|      zh|    4|
|      ko|    3|
|      sv|    3|
+--------+-----+
only showing top 10 rows

+------------------+-----+
|year(release_date)|count|
+------------------+-----+
|              1922|    1|
|              1925|    1|
|              1927|    1|
|              1930|    1|
|              1931|    4|
|              1933|    2|
|              1934|    1|
|              1935|    1|
|              1936|    1|
|              1937|    1|
|              1938|    1|
|              1939|    3|
|              1940|    6|
|              1941|    3|
|              1942|    1|
|              1943|    1|
|              1944|    3|
|              1946|    3|
|              1947|    1|
|              1948|    4|
|              1949|    1|
| 

In [0]:
clean_movies.drop('status', 'release_date', 'overview', 'tmdb_homepage').show(200)

+---------+--------------------+--------------------+--------------------+----------------------+--------+----------+----------+-------+--------------------+-------+------------+----------+
|   budget|                cast|            director|              genres|              keywords|language|popularity|   revenue|runtime|               title|tmdb_id|vote_average|vote_count|
+---------+--------------------+--------------------+--------------------+----------------------+--------+----------+----------+-------+--------------------+-------+------------+----------+
|  4000000|[Tim Roth, Jennif...|      Allison Anders|            [Comedy]|  [hotel, new year'...|      en|    21.138|   4257354|     98|          Four Rooms|      5|       5.789|      2443|
| 11000000|[Mark Hamill, Har...|        George Lucas|[Adventure, Actio...|  [android, galaxy,...|      en|    88.613| 775398007|    121|           Star Wars|     11|       8.204|     19236|
| 94000000|[Albert Brooks, E...|      Andrew Stant

#### Save the Cleaned Movies Dataset

In [0]:
clean_movies.coalesce(1).write.parquet(os.path.join(CLEAN_PATH, "movies"), mode="overwrite")

# START SANDBOX CODE