# **TMDB MOVIE DATA ANALYSIS USING PYSPARK**

This project extracts movie data from the TMDB API, cleans and preprocesses the dataset, and performs in-depth exploratory data analysis. The goal is to uncover meaningful insights, identify trends, and support data-driven decision-making based on movie performance, audience behavior, and industry patterns.

## **IMPORTS LIBRARIES**

In [1]:
#Import os and sys
import os
import sys 
from pathlib import Path
import ast
import pandas as pd
import numpy as np

#Spark imports
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

sys.path

#Imports loadEnv from config module
from config.config import loadEnv, getURL, create_retry

#Imports extractDataFromAPI from the extract data module
from Data_Extraction.extractData import extractDataFromAPI
from Data_Cleaning.removeColumns import dropColumns
from Data_Cleaning.convertArray import separateArrayColumn

## **PYSPARK CONFIGUARTION**

In [2]:
"""
    Adaptive Query Execution (AQE): Dynamically optimizes query plans at runtime
    Coalesce Partitions: Reduces number of partitions for small datasets automatically

"""
spark = (
    SparkSession.builder
    .appName("TMDB Movie Data Analysis")
    .master("local[*]")
    .config("spark.sql.adaptive.enabled", "true") 
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
    .getOrCreate()
)

# Set display options for better debugging
#Eager Eval: Shows DataFrames nicely in notebooks (like Pandas)
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
spark.conf.set("spark.sql.repl.eagerEval.maxNumRows", 10)

print("Spark Session Configured Sucessfully")


Spark Session Configured Sucessfully


## **DATA EXTRACTION**

In [3]:
"""

movie_ids = [0, 299534,19995,140607,299536,597,135397, 420818, 24428, 168259, 99861,
                    284054, 12445, 181808, 330457, 351286, 109445, 321612, 260513] 

API_KEY = loadEnv(fileName="API_KEY")

url = getURL()

session = create_retry()

extractDataFromAPI(session = session, url = url, API_KEY = API_KEY, movie_ids=movie_ids)

"""


'\n\nmovie_ids = [0, 299534,19995,140607,299536,597,135397, 420818, 24428, 168259, 99861,\n                    284054, 12445, 181808, 330457, 351286, 109445, 321612, 260513] \n\nAPI_KEY = loadEnv(fileName="API_KEY")\n\nurl = getURL()\n\nsession = create_retry()\n\nextractDataFromAPI(session = session, url = url, API_KEY = API_KEY, movie_ids=movie_ids)\n\n'

## **EXPLORATORY DATA ANALYSIS**

### **READS THE CSV FILE**

In [4]:
#Reads the csv file
movie_data = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .json("../data/movieData.json")
)


### **INSPECTS DATA**

In [5]:
# Verifies the data is loaded
print("Initial Data Overview \n")
print(f"Total Records: {movie_data.count()}")
print(f"Total Columns: {len(movie_data.columns)}")

# Show sample data (first 3 rows, non-truncated for key columns)
movie_data.select("id", "title", "release_date", "genres").show(3, truncate=60)

Initial Data Overview 

Total Records: 18
Total Columns: 27
+------+----------------------------+------------+------------------------------------------------------------+
|    id|                       title|release_date|                                                      genres|
+------+----------------------------+------------+------------------------------------------------------------+
|299534|           Avengers: Endgame|  2019-04-24|     [{12, Adventure}, {878, Science Fiction}, {28, Action}]|
| 19995|                      Avatar|  2009-12-16|[{28, Action}, {12, Adventure}, {14, Fantasy}, {878, Scie...|
|140607|Star Wars: The Force Awakens|  2015-12-15|     [{12, Adventure}, {28, Action}, {878, Science Fiction}]|
+------+----------------------------+------------+------------------------------------------------------------+
only showing top 3 rows



### **OUTPUTS THE COLUMNS IN THE DATAFRAME**

In [6]:
if movie_data is None or movie_data.rdd.isEmpty():
    print("No data extracted from the API")
else:
    movie_data.printSchema()


root
 |-- adult: boolean (nullable = true)
 |-- backdrop_path: string (nullable = true)
 |-- belongs_to_collection: struct (nullable = true)
 |    |-- backdrop_path: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- poster_path: string (nullable = true)
 |-- budget: long (nullable = true)
 |-- credits: struct (nullable = true)
 |    |-- cast: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- adult: boolean (nullable = true)
 |    |    |    |-- cast_id: long (nullable = true)
 |    |    |    |-- character: string (nullable = true)
 |    |    |    |-- credit_id: string (nullable = true)
 |    |    |    |-- gender: long (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- known_for_department: string (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- order: long (nullable = true)
 |    |    |    |-- original_nam

In [7]:
#Outputs the first five rows in the data
movie_data.show(5)

+-----+--------------------+---------------------+---------+--------------------+--------------------+--------------------+------+---------+--------------+-----------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+------------+----------+-------+--------------------+--------+--------------------+--------------------+-----+------------+----------+
|adult|       backdrop_path|belongs_to_collection|   budget|             credits|              genres|            homepage|    id|  imdb_id|origin_country|original_language|      original_title|            overview|popularity|         poster_path|production_companies|production_countries|release_date|   revenue|runtime|    spoken_languages|  status|             tagline|               title|video|vote_average|vote_count|
+-----+--------------------+---------------------+---------+--------------------+--------------------+--------------------+------+---------+------------

## **DATA CLEANINGAND PREPARATION**

### **DROPPING COLUMNS**

The below columns will be dropped based on the reasons attached:

adult — The adult column is not needed for KPI analysis.

backdrop_path — Rarely needed; large strings/URLs; drop to save space (poster_path is enough).

homepage — optional metadata; drop (keeps size down).

imdb_id — external id not used in spec; drop unless you plan cross-referencing.

origin_country — ambiguous / redundant with production_countries.

original_title — redundant with title for your analysis (drop unless you need original-language title).

video — not useful for KPIs.

belongs_to_collection.id, belongs_to_collection.poster_path, belongs_to_collection.backdrop_path — drop (keeps only collection name).

belongs_to_collection (raw JSON) — if you extract the .name into a single column, drop the raw JSON.

any duplicate columns (e.g., both belongs_to_collection and belongs_to_collection.name keep only the .name value).

In [8]:
#Outputs all the columns in the dataFrame
movie_data.columns

['adult',
 'backdrop_path',
 'belongs_to_collection',
 'budget',
 'credits',
 'genres',
 'homepage',
 'id',
 'imdb_id',
 'origin_country',
 'original_language',
 'original_title',
 'overview',
 'popularity',
 'poster_path',
 'production_companies',
 'production_countries',
 'release_date',
 'revenue',
 'runtime',
 'spoken_languages',
 'status',
 'tagline',
 'title',
 'video',
 'vote_average',
 'vote_count']

In [9]:
# Define columns to drop as per project requirements
columns_to_drop = (
    'adult',           
    'backdrop_path',
    'origin_country',
    'original_title',
    'imdb_id',         
    'original_title',  
    'video',           
    'homepage'
)

#Calls the remove column function to remove all the stated columns from the dataframe
movie_data = dropColumns(movie_data, columns=columns_to_drop)

In [10]:
print("Columns Dropped Successfully! \n")
print(f"Current column count: {len(movie_data.columns)}")
print(f"Columns removed: {len(columns_to_drop)}")

# Verify the columns are gone
print("Remaining columns: \n")
for idx, col in enumerate(movie_data.columns, 1):
    print(f"  {idx:2d}. {col}")


Columns Dropped Successfully! 

Current column count: 20
Columns removed: 8
Remaining columns: 

   1. belongs_to_collection
   2. budget
   3. credits
   4. genres
   5. id
   6. original_language
   7. overview
   8. popularity
   9. poster_path
  10. production_companies
  11. production_countries
  12. release_date
  13. revenue
  14. runtime
  15. spoken_languages
  16. status
  17. tagline
  18. title
  19. vote_average
  20. vote_count


### **EVALUATING JSON LIKE COLUMNS**

In [11]:
"""
 WHY: Before parsing, we need to understand the JSON structure
 HOW: Inspect sample values to design our parsing strategy
 BEST PRACTICE: Always examine your data before writing transformation logic

"""

print("INSPECTING JSON-LIKE COLUMNS")
print("="*80 + "\n")

# List of JSON-like columns to parse
json_columns = ['belongs_to_collection','genres', 'production_companies', 'production_countries', 'spoken_languages', 'credits.crew', 'credits.cast']

# Show sample values for each JSON column
for col_name in json_columns:
    print(f"Column: {col_name}")
    sample_value = movie_data.select(col_name).filter(F.col(col_name).isNotNull()).first()
    if sample_value:
        print(f"   Sample: {sample_value[0][:300]}... \n")  # First 200 chars

print("\nInspection Complete!")

INSPECTING JSON-LIKE COLUMNS

Column: belongs_to_collection
   Sample: ('/zuW6fOiusv4X9nnW3paHGfXcSll.jpg', 86311, 'The Avengers Collection', '/yFSIUVTCvgYrpalUktulvk3Gi5Y.jpg')... 

Column: genres
   Sample: [Row(id=12, name='Adventure'), Row(id=878, name='Science Fiction'), Row(id=28, name='Action')]... 

Column: production_companies
   Sample: [Row(id=420, logo_path='/hUzeosd33nzE5MCNsZxCGEKTXaQ.png', name='Marvel Studios', origin_country='US')]... 

Column: production_countries
   Sample: [Row(iso_3166_1='US', name='United States of America')]... 

Column: spoken_languages
   Sample: [Row(english_name='English', iso_639_1='en', name='English'), Row(english_name='Japanese', iso_639_1='ja', name='日本語'), Row(english_name='Xhosa', iso_639_1='xh', name='')]... 

Column: credits.crew
   Sample: [Row(adult=False, credit_id='6057fbdf8c44b90054683688', department='Directing', gender=0, id=3019687, job='Second Assistant Director', known_for_department='Directing', name='Paul Schneider', orig

### **DATA EXTRACTION**

#### **EXTRACTS NAME FROM BELONGS TO COLLECTION**

In [12]:
#Extracts the name of the belongs to collection and replace it with the belongs to collection column
movie_data = movie_data.withColumn(
    "belongs_to_collection",
    F.col("belongs_to_collection.name")
)

In [13]:
#Outputs the belongs to collection column
movie_data.select("belongs_to_collection").show(18, truncate=False)

+-----------------------------------+
|belongs_to_collection              |
+-----------------------------------+
|The Avengers Collection            |
|Avatar Collection                  |
|Star Wars Collection               |
|The Avengers Collection            |
|NULL                               |
|Jurassic Park Collection           |
|The Lion King (Reboot) Collection  |
|The Avengers Collection            |
|The Fast and the Furious Collection|
|The Avengers Collection            |
|Black Panther Collection           |
|Harry Potter Collection            |
|Star Wars Collection               |
|Frozen Collection                  |
|Jurassic Park Collection           |
|Frozen Collection                  |
|NULL                               |
|The Incredibles Collection         |
+-----------------------------------+



### **JSON PARSING**

This section extracts the 

Genre names (genres → separate multiple genres with "|").

Spoken languages (spoken_languages → separate with "|").

Production countries (production_countries → separate with "|").

Production companies (production_companies → separate with "|").

In [22]:
#Extracts the names from the json columns
movie_data = (
    movie_data
    .withColumn("genres", separateArrayColumn(F.col("genres"), "name"))
    .withColumn("spoken_languages", separateArrayColumn(F.col("spoken_languages"), "english_name"))
    .withColumn("production_countries", separateArrayColumn(F.col("production_countries"), "name"))
    .withColumn("production_companies", separateArrayColumn(F.col("production_companies"), "name"))
)


### **EXTRACTS DIRECTORS**

In [27]:
#Extracts the names from the json columns
movie_data = (
    movie_data
    .withColumn(
        "director",
        F.when(
            F.col("credits.crew").isNull(),
            F.lit(None)
        ).otherwise(
            F.array_join(
                F.transform(
                    F.filter(
                        F.col("credits.crew"),
                        lambda c: c.getField("job") == F.lit("Director")
                    ),
                    lambda c: c.getField("name")
                ),
                ", "
            )
        )
    )
)


### **EXTRACTS CREW SIZE**

In [None]:
#Extracts crew size from the crew column
movie_data = (
    movie_data
    .withColumn(
        "crew_size",
        F.when(
            F.col("credits.crew").isNull(),
            F.lit(None)
        ).otherwise(
            F.size("credits.crew")
        ) 
    )
)

#### **EXTRACTS CAST SIZE**

In [54]:
#Extracts crew size from the crew column
movie_data = (
    movie_data
    .withColumn(
        "cast_size",
        F.when(
            F.col("credits.cast").isNull(),
            F.lit(None)
        ).otherwise(
            F.size("credits.cast")
        ) 
    )
)

In [57]:
#Outputs the data for a clear view of the extraction
movie_data.select("title", "genres", "spoken_languages", "production_countries", "production_companies", "director", "crew_size", "cast_size").show(18, truncate=False)

+--------------------------------------------+-----------------------------------------+---------------------------------------------+---------------------------------------+------------------------------------------------------------------------------------+------------------------+---------+---------+
|title                                       |genres                                   |spoken_languages                             |production_countries                   |production_companies                                                                |director                |crew_size|cast_size|
+--------------------------------------------+-----------------------------------------+---------------------------------------------+---------------------------------------+------------------------------------------------------------------------------------+------------------------+---------+---------+
|Titanic                                     |Drama|Romance                          

### **INSPECTS EXTRACTED COLUMNS**

In [25]:
movie_data.groupBy("genres").count().orderBy(F.desc("count")).show(18, truncate=False)

movie_data.groupBy("belongs_to_collection").count().orderBy(F.desc("count")).show(18, truncate=False)

movie_data.groupBy("spoken_languages").count().orderBy(F.desc("count")).show(18, truncate=False)


+-----------------------------------------+-----+
|genres                                   |count|
+-----------------------------------------+-----+
|Adventure|Action|Science Fiction         |3    |
|Action|Adventure|Science Fiction|Thriller|2    |
|Action|Adventure|Science Fiction         |2    |
|Adventure|Fantasy                        |1    |
|Action|Crime|Thriller                    |1    |
|Family|Fantasy|Romance                   |1    |
|Adventure|Science Fiction|Action         |1    |
|Adventure|Drama|Family|Animation         |1    |
|Action|Adventure|Fantasy|Science Fiction |1    |
|Drama|Romance                            |1    |
|Family|Animation|Adventure|Comedy|Fantasy|1    |
|Animation|Family|Adventure|Fantasy       |1    |
|Action|Adventure|Animation|Family        |1    |
|Science Fiction|Action|Adventure         |1    |
+-----------------------------------------+-----+

+-----------------------------------+-----+
|belongs_to_collection              |count|
+----------

### **HANDLING MISSING AND INCORRECT DATA**

#### **CONVERT COULMN DATA TYPES**

In [36]:
# Convert numeric columns safely
movie_data = movie_data.withColumn(
    "budget",
    F.when(F.col("budget").cast(LongType()) > 0, F.col("budget").cast(LongType()))
     .otherwise(None)
).withColumn(
    "revenue",
    F.when(F.col("revenue").cast(LongType()) > 0, F.col("revenue").cast(LongType()))
     .otherwise(None)
).withColumn(
    "runtime",
    F.when(F.col("runtime").cast(IntegerType()) > 0, F.col("runtime").cast(IntegerType()))
     .otherwise(None)
).withColumn(
    "popularity",
    F.col("popularity").cast(DoubleType())
).withColumn(
    "vote_count",
    F.col("vote_count").cast(LongType())
).withColumn(
    "vote_average",
    F.col("vote_average").cast(DoubleType())
).withColumn(
    "id",
    F.col("id").cast(LongType())
)

# Convert release_date to proper date format
movie_data = movie_data.withColumn(
    "release_date",
    F.to_date("release_date", "yyyy-MM-dd")
)


In [38]:
#Outputs the schema
movie_data.printSchema()

root
 |-- belongs_to_collection: string (nullable = true)
 |-- budget: long (nullable = true)
 |-- credits: struct (nullable = true)
 |    |-- cast: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- adult: boolean (nullable = true)
 |    |    |    |-- cast_id: long (nullable = true)
 |    |    |    |-- character: string (nullable = true)
 |    |    |    |-- credit_id: string (nullable = true)
 |    |    |    |-- gender: long (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- known_for_department: string (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- order: long (nullable = true)
 |    |    |    |-- original_name: string (nullable = true)
 |    |    |    |-- popularity: double (nullable = true)
 |    |    |    |-- profile_path: string (nullable = true)
 |    |-- crew: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- ad

#### **REPLACES ZEROS IN BUDGET, REVENUE AND RUNTIME COLUMNS**

In [None]:
# Replace zeros in budget, revenue, runtime with null
movie_data = movie_data.withColumn(
    "budget", F.when(F.col("budget") > 0, F.col("budget")).otherwise(None)
).withColumn(
    "revenue", F.when(F.col("revenue") > 0, F.col("revenue")).otherwise(None)
).withColumn(
    "runtime", F.when(F.col("runtime") > 0, F.col("runtime")).otherwise(None)
)

#### **CONVERT BUDGET AND REVENUE TO MILLION USD**

In [40]:
# Convert budget and revenue to million USD
movie_data = movie_data.withColumn(
    "budget_musd", (F.col("budget") / 1_000_000).cast("double")
).withColumn(
    "revenue_musd", (F.col("revenue") / 1_000_000).cast("double")
)

#### **HANDLE VOTE COUNT**

In [41]:
# Handle vote_count = 0 → set vote_average to null (cannot compute meaningful rating)
movie_data = movie_data.withColumn(
    "vote_average",
    F.when(F.col("vote_count") > 0, F.col("vote_average")).otherwise(None)
)

#### **CLEANS OVERVIEW AND TAGLINE COLUMNS**

In [42]:
# Clean placeholders in text fields
placeholder_values = ["No Data", "Unknown", "N/A", ""]
movie_data = movie_data.withColumn(
    "overview",
    F.when(~F.col("overview").isin(placeholder_values), F.col("overview"))
     .otherwise(None)
).withColumn(
    "tagline",
    F.when(~F.col("tagline").isin(placeholder_values), F.col("tagline"))
     .otherwise(None)
)

#### **DROPS DUPLICATES IN TITLE AND ID COLUMNS**

In [43]:
#Drops duplicates in the dataFrame
initial_count = movie_data.count()
movie_data = movie_data.dropDuplicates(["id"])
final_count = movie_data.count()
print(f"Dropped {initial_count - final_count} duplicate rows.")

Dropped 0 duplicate rows.


#### **FILTER ROWS BY NON-NULL COLUMNS**

In [45]:
# Count rows before filtering
initial_count = movie_data.count()

# Minimum number of non-null columns required
min_non_null = 10

# Count non-null columns for each row
non_null_count_expr = sum(F.col(c).isNotNull().cast("int") for c in movie_data.columns)

# Filter rows where non-null columns >= min_non_null
movie_data = movie_data.withColumn("non_null_count", non_null_count_expr) \
                       .filter(F.col("non_null_count") >= min_non_null) \
                       .drop("non_null_count")


# Count rows after filtering
final_count = movie_data.count()

# Number of rows dropped
rows_dropped = initial_count - final_count

print(f"Rows before filtering: {initial_count}")
print(f"Rows after filtering: {final_count}")
print(f"Rows dropped: {rows_dropped}")




Rows before filtering: 18
Rows after filtering: 18
Rows dropped: 0


#### **FILTER ROWS BASED ON RELEASED DATE**

In [46]:
# Count rows before filtering (for auditing)
before_count = movie_data.count()

# Keep only movies with status = 'Released'
movie_data = movie_data.filter(F.col("status") == "Released")

# Drop the status column as it's no longer needed
movie_data = movie_data.drop("status")

# Count rows after filtering
after_count = movie_data.count()
rows_dropped = before_count - after_count

print(f"Rows before filtering 'Released': {before_count}")
print(f"Rows after filtering 'Released': {after_count}")
print(f"Rows dropped: {rows_dropped}")


Rows before filtering 'Released': 18
Rows after filtering 'Released': 18
Rows dropped: 0


## **REORDER COLUMNS AND FINALIZE THE DATAFRAME**

In [None]:
# Desired column order
final_columns = [
    'id', 'title', 'tagline', 'release_date', 'genres', 'belongs_to_collection',
    'original_language', 'budget_musd', 'revenue_musd', 'production_companies',
    'production_countries', 'vote_count', 'vote_average', 'popularity',
    'runtime', 'overview', 'spoken_languages', 'poster_path',
    'cast', 'cast_size', 'director', 'crew_size'
]

# Keep only the columns in final_columns and reorder them
movie_data = movie_data.select([c for c in final_columns if c in movie_data.columns])

# Show the dataFrame
movie_data.show(18, truncate=False)


+------+--------------------------------------------+--------------------------------------------------------------------------+------------+-----------------------------------------+-----------------------------------+-----------------+-----------+------------+------------------------------------------------------------------------------------+---------------------------------------+----------+------------+----------+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------

## **KPI IMPLEMENTATION AND ANALYSIS**