# TMDB Movie Data Analysis Using Pyspark

In [None]:
# from utils import *
from pyspark.sql.types import StructField, StructType, BooleanType, StringType, IntegerType, DoubleType, ArrayType, LongType, FloatType
import os
from utils import *

In [2]:
#declare constant variables
BASE_URL = "https://api.themoviedb.org/3/movie/"
API_ACCESS_TOKEN = os.getenv('API_ACCESS_TOKEN')
HEADERS = {
    "accept": "application/json",
    "Authorization": f"Bearer {API_ACCESS_TOKEN}"
}
movie_ids = [0, 299534, 19995, 140607, 299536, 597, 135397, 420818, 24428, 168259, 99861, 284054, 12445,181808, 330457, 351286, 109445, 321612, 260513]

In [3]:
spark = create_spark_session("imdb_movie_data_analysis")

In [None]:

movie_schema = StructType([
    StructField("adult", BooleanType(), True),
    StructField("backdrop_path", StringType(), True),
    StructField("belongs_to_collection", StructType([
        StructField("backdrop_path", StringType(), True),
        StructField("name", StringType(), True),
        StructField("id", IntegerType(), True),
        StructField("poster_path", StringType(), True)
    ]), True), 
    StructField("budget", LongType(), True),
    StructField("genres", ArrayType(StructType([
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True)
    ])), True),
    StructField("homepage", StringType(), True),
    StructField("id", IntegerType(), True),
    StructField("imdb_id", StringType(), True),
    StructField("origin_country", ArrayType(StringType()), True),
    StructField("original_language", StringType(), True),
    StructField("original_title", StringType(), True),
    StructField("overview", StringType(), True),
    StructField("popularity", FloatType(), True),
    StructField("poster_path", StringType(), True),
    StructField("production_companies", ArrayType(StructType([
        StructField("id", IntegerType(), True),
        StructField("logo_path", StringType(), True),
        StructField("name", StringType(), True),
        StructField("origin_country", StringType(), True)
    ])), True),
    StructField("production_countries", ArrayType(StructType([
        StructField("iso_3166_1", StringType(), True),
        StructField("name", StringType(), True)
    ])), True),
    StructField("release_date", StringType(), True),
    StructField("revenue", LongType(), True),
    StructField("runtime", IntegerType(), True),
    StructField("spoken_languages", ArrayType(StructType([
        StructField("english_name", StringType(), True),
        StructField("iso_639_1", StringType(), True),
        StructField("name", StringType(), True)
    ])), True),
    StructField("status", StringType(), True),
    StructField("tagline", StringType(), True),
    StructField("title", StringType(), True),
    StructField("video", BooleanType(), True),
    StructField("vote_average", FloatType(), True),
    StructField("vote_count", IntegerType(), True)
])



credit_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("cast", ArrayType(
        StructType([
            StructField("adult", BooleanType(), True),
            StructField("gender", IntegerType(), True),
            StructField("id", IntegerType(), True),
            StructField("known_for_department", StringType(), True),
            StructField("name", StringType(), True),
            StructField("original_name", StringType(), True),
            StructField("popularity", DoubleType(), True),
            StructField("profile_path", StringType(), True),
            StructField("cast_id", IntegerType(), True),
            StructField("character", StringType(), True),
            StructField("credit_id", StringType(), True),
            StructField("order", IntegerType(), True)
        ])
    ), True),
    StructField("crew", ArrayType(
        StructType([
            StructField("adult", BooleanType(), True),
            StructField("gender", IntegerType(), True),
            StructField("id", IntegerType(), True),
            StructField("known_for_department", StringType(), True),
            StructField("name", StringType(), True),
            StructField("original_name", StringType(), True),
            StructField("popularity", DoubleType(), True),
            StructField("profile_path", StringType(), True),
            StructField("credit_id", StringType(), True),
            StructField("department", StringType(), True),
            StructField("job", StringType(), True)
        ])
    ), True)
])

In [5]:
internet_connection = False
movies_df = spark.createDataFrame([], schema=movie_schema)
credits_df = spark.createDataFrame([], credit_schema)

### Fetching movie data from TMDB Movie Databse API

In [6]:
if (internet_connection != False):
    movies_df,credits_df = get_all_data(spark, BASE_URL, movie_ids, HEADERS, movie_schema=movie_schema, credit_schema=credit_schema)
    movies_df.write.mode("overwrite").json('./data/movies_raw_data.json')
    credits_df.write.mode("overwrite").json('./data/credits_raw_data.json')
else:
    movies_df = spark.read.json('./data/movies_raw_data.json', schema=movie_schema)  
    credits_df = spark.read.json('./data/credits_raw_data.json', schema=credit_schema)

In [7]:
movies_df.show()

+-----+--------------------+---------------------+---------+--------------------+--------------------+------+---------+--------------+-----------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+------------+----------+-------+--------------------+--------+--------------------+--------------------+-----+------------+----------+
|adult|       backdrop_path|belongs_to_collection|   budget|              genres|            homepage|    id|  imdb_id|origin_country|original_language|      original_title|            overview|popularity|         poster_path|production_companies|production_countries|release_date|   revenue|runtime|    spoken_languages|  status|             tagline|               title|video|vote_average|vote_count|
+-----+--------------------+---------------------+---------+--------------------+--------------------+------+---------+--------------+-----------------+--------------------+--------------------+

# Data Cleaning and Preprocessing

In [8]:
#Dropping irrelevant columns
# movies_df
columns_to_drop = ['adult', 'original_title', 'imdb_id', 'video', 'homepage']
movies_df = drop_cols(movies_df, columns_to_drop)

### Evaluating json-like columns, extracting and cleaning key data points

In [9]:
#evaluating json-like columns in movie_df and credits_df
movies_df = eval_movies_json_col(movies_df)
credits_df = eval_credits_json_col(credits_df)



In [10]:
#Dropping irrelevant columns
# credits_df
credits_df = drop_cols(credits_df, ['crew'])


In [11]:
credits_df.show()

+------+--------------------+---------+---------------+---------+
|    id|                cast|cast_size|       director|crew_size|
+------+--------------------+---------+---------------+---------+
|299534|Robert Downey Jr....|      105|  Anthony Russo|      593|
| 19995|Sam Worthington|Z...|       65|  James Cameron|      986|
|140607|Harrison Ford|Mar...|      182|    J.J. Abrams|      257|
|299536|Robert Downey Jr....|       69|  Anthony Russo|      724|
|168259|Vin Diesel|Paul W...|       48|      James Wan|      222|
| 99861|Robert Downey Jr....|       72|    Joss Whedon|      636|
|284054|Chadwick Boseman|...|       66|   Ryan Coogler|      557|
| 12445|Daniel Radcliffe|...|      104|    David Yates|      154|
|181808|Mark Hamill|Carri...|      110|   Rian Johnson|      213|
|330457|Kristen Bell|Idin...|       64|   Jennifer Lee|       39|
|351286|Chris Pratt|Bryce...|       39|    J.A. Bayona|      381|
|109445|Kristen Bell|Idin...|       60|     Chris Buck|      284|
|321612|Em

In [12]:
# combining movies df and credits df using inner join
combined_df = join_dfs(movies_df, credits_df, on='id', how='inner')

In [13]:
## Replacing invalid data with Nan
cols_with_zero_val = ['budget', 'revenue', 'runtime']
cols_with_placeholders = ['overview', 'tagline']
cols_to_musd = ['revenue', 'budget']

combined_df = replace_with_nan(combined_df, cols_with_zero_val)
combined_df = replace_known_placeholders(combined_df, cols_with_placeholders)

combined_df = convert_to_milions(combined_df, cols_to_musd)


In [14]:

#Drop Duplicate
combined_df = combined_df.drop_duplicates()

#drop unknown id and title
combined_df = combined_df.dropna(subset=['id', 'title'])

#kekep only roow where at least 1- columns have non_Non values
combined_df = combined_df.dropna(thresh=10)

# filter to include only released movies
combined_df = combined_df.filter(F.col("status") == "Released")
# #drop status column
combined_df = drop_cols(combined_df, ['status'])

In [15]:
# Replacing Movies with vote_count = 0 with avearge count per genre
combined_df = replace_zero_count_vote(combined_df)

In [None]:
#reorder columns
reordered_df = reorder_col_and_reindex(combined_df)
reordered_df.show()

+------+--------------------+--------------------+------------+--------------------+---------------------+-----------------+-----------+------------+--------------------+--------------------+----------+------------------+----------+-------+--------------------+--------------------+--------------------+--------------------+---------+---------------+---------+
|    id|               title|             tagline|release_date|              genres|belongs_to_collection|original_language|budget_musd|revenue_musd|production_companies|production_countries|vote_count|      vote_average|popularity|runtime|            overview|    spoken_languages|         poster_path|                cast|cast_size|       director|crew_size|
+------+--------------------+--------------------+------------+--------------------+---------------------+-----------------+-----------+------------+--------------------+--------------------+----------+------------------+----------+-------+--------------------+-----------------