# Movie Data Analysis with PySpark

## Setup and Imports

In [1]:
# Add the parent directory to sys.path to import from src
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..')))

# Import necessary libraries
import json
from dotenv import load_dotenv
from pyspark.sql import functions as F

# Import project modules
from src.data_extraction import initialize_spark, fetch_all_movies, create_spark_dataframe

# Load environment variables
load_dotenv()

# Access API token from environment variable
API_ACCESS_TOKEN = os.getenv('API_ACCESS_TOKEN')
BASE_URL = "https://api.themoviedb.org/3/movie"

print("Setup complete")


Setup complete


## Initialize Spark Session

In [2]:
# Initialize Spark session
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

spark = initialize_spark(app_name="Movie Data Analysis")
print(f"PySpark version: {spark.version}")



2025-04-25 05:13:13,060 - src.data_extraction - INFO - PySpark session initialized: version 3.5.5


PySpark version: 3.5.5


## Step 1: Fetch Movie Data from API

In [3]:
# List of movie IDs to fetch (as specified in the project requirements)
movie_ids = [0, 299534, 19995, 140607, 299536, 597, 135397, 420818, 24428,
             168259, 99861, 284054, 12445, 181808, 330457, 351286, 109445,
             321612, 260513]

# Fetch movie data from API
all_movies_data = fetch_all_movies(movie_ids, BASE_URL, API_ACCESS_TOKEN)

# Save raw data to JSON file for backup
with open("../data/movies_raw.json", "w") as json_file:
    json.dump(all_movies_data, json_file, indent=4)
print(f"Raw data saved to file, fetched {len(all_movies_data)} movies")

# Convert movie data to PySpark DataFrame
movies_df = spark.read.option("multiline", "true").json("../data/movies_raw.json")


# Display schema to understand the data structure
movies_df.printSchema()

# Display a sample of the data
movies_df.select("belongs_to_collection", "genres", "production_companies", "credits").show(5, truncate=False)



2025-04-25 05:13:13,075 - src.data_extraction - INFO - Fetching data for 19 movies
2025-04-25 05:13:13,473 - src.data_extraction - ERROR - Error 404 for movie_id=0
2025-04-25 05:13:13,759 - src.data_extraction - INFO - Successfully fetched data for movie_id=299534
2025-04-25 05:13:14,059 - src.data_extraction - INFO - Successfully fetched data for movie_id=19995
2025-04-25 05:13:14,334 - src.data_extraction - INFO - Successfully fetched data for movie_id=140607
2025-04-25 05:13:14,641 - src.data_extraction - INFO - Successfully fetched data for movie_id=299536
2025-04-25 05:13:14,913 - src.data_extraction - INFO - Successfully fetched data for movie_id=597
2025-04-25 05:13:15,189 - src.data_extraction - INFO - Successfully fetched data for movie_id=135397
2025-04-25 05:13:15,458 - src.data_extraction - INFO - Successfully fetched data for movie_id=420818
2025-04-25 05:13:15,725 - src.data_extraction - INFO - Successfully fetched data for movie_id=24428
2025-04-25 05:13:16,008 - src.dat

Raw data saved to file, fetched 18 movies
root
 |-- adult: boolean (nullable = true)
 |-- backdrop_path: string (nullable = true)
 |-- belongs_to_collection: struct (nullable = true)
 |    |-- backdrop_path: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- poster_path: string (nullable = true)
 |-- budget: long (nullable = true)
 |-- credits: struct (nullable = true)
 |    |-- cast: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- adult: boolean (nullable = true)
 |    |    |    |-- cast_id: long (nullable = true)
 |    |    |    |-- character: string (nullable = true)
 |    |    |    |-- credit_id: string (nullable = true)
 |    |    |    |-- gender: long (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- known_for_department: string (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- order: long (nullabl

In [4]:
# Cache the DataFrame for better performance in subsequent operations
movies_df.cache()

# Save DataFrame in Parquet format (efficient for later loading)
# movies_df.write.mode("overwrite").parquet("../data/movies_raw.parquet")
# print("DataFrame saved to Parquet file")


DataFrame[adult: boolean, backdrop_path: string, belongs_to_collection: struct<backdrop_path:string,id:bigint,name:string,poster_path:string>, budget: bigint, credits: struct<cast:array<struct<adult:boolean,cast_id:bigint,character:string,credit_id:string,gender:bigint,id:bigint,known_for_department:string,name:string,order:bigint,original_name:string,popularity:double,profile_path:string>>,crew:array<struct<adult:boolean,credit_id:string,department:string,gender:bigint,id:bigint,job:string,known_for_department:string,name:string,original_name:string,popularity:double,profile_path:string>>>, genres: array<struct<id:bigint,name:string>>, homepage: string, id: bigint, imdb_id: string, origin_country: array<string>, original_language: string, original_title: string, overview: string, popularity: double, poster_path: string, production_companies: array<struct<id:bigint,logo_path:string,name:string,origin_country:string>>, production_countries: array<struct<iso_3166_1:string,name:string>>, 

In [5]:
# Define constants
COLUMNS_TO_DROP = ['adult', 'imdb_id', 'original_title', 'video', 'homepage']
FINAL_COLUMN_ORDER = [
    'id', 'title', 'tagline', 'release_date', 'genre_names', 'collection_name',
    'original_language', 'budget_musd', 'revenue_musd', 'production_companies_names',
    'production_countries_names', 'vote_count', 'vote_average', 'popularity', 'runtime',
    'overview', 'spoken_languages_names', 'poster_path', 'cast', 'cast_size', 'director', 'crew_size'
]

## Step 2: Data Cleaning and Preprocessing

### 2.1 Process and Clean the Data

In [6]:
from src.data_preprocessing import preprocess_movie_data, extract_data, process_credits

In [7]:
# Process the raw movie data through the complete preprocessing pipeline
cleaned_df = preprocess_movie_data(movies_df, COLUMNS_TO_DROP, FINAL_COLUMN_ORDER)

2025-04-25 05:13:25,998 - src.data_preprocessing - INFO - Starting preprocessing pipeline
2025-04-25 05:13:26,034 - src.data_preprocessing - INFO - Dropped columns: ['adult', 'imdb_id', 'original_title', 'video', 'homepage']


Extracting name from belongs_to_collection to collection_name
Extracting name from genres to genre_names
Extracting name from production_countries to production_countries_names
Extracting name from production_companies to production_companies_names
Extracting english_name from spoken_languages to spoken_languages_names


2025-04-25 05:13:26,442 - src.data_preprocessing - INFO - Converting column data types


Processing credits: extract_names from credits to cast
Processing credits: count_members from credits to cast_size
Processing credits: extract_by_job from credits to director
Processing credits: count_members from credits to crew_size


2025-04-25 05:13:26,656 - src.data_preprocessing - INFO - Successfully converted column data types
2025-04-25 05:13:26,656 - src.data_preprocessing - INFO - Handling missing and incorrect data
2025-04-25 05:13:27,023 - src.data_preprocessing - INFO - Successfully handled missing and incorrect data
2025-04-25 05:13:27,024 - src.data_preprocessing - INFO - Organizing final DataFrame
2025-04-25 05:13:30,728 - src.data_preprocessing - INFO - Final DataFrame has 18 rows and 22 columns
2025-04-25 05:13:30,728 - src.data_preprocessing - INFO - Preprocessing pipeline completed successfully


### 2.2 Examine the Preprocessed Data

In [8]:
# Display the schema of the preprocessed DataFrame
cleaned_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- release_date: date (nullable = true)
 |-- genre_names: string (nullable = false)
 |-- collection_name: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- budget_musd: double (nullable = true)
 |-- revenue_musd: double (nullable = true)
 |-- production_companies_names: string (nullable = false)
 |-- production_countries_names: string (nullable = false)
 |-- vote_count: integer (nullable = true)
 |-- vote_average: double (nullable = true)
 |-- popularity: double (nullable = true)
 |-- runtime: integer (nullable = true)
 |-- overview: string (nullable = true)
 |-- spoken_languages_names: string (nullable = false)
 |-- poster_path: string (nullable = true)
 |-- cast: string (nullable = true)
 |-- cast_size: integer (nullable = true)
 |-- director: string (nullable = true)
 |-- crew_size: integer (nullable = true)



In [10]:
# Show a sample of the cleaned data
cleaned_df.show(5)

# cleaned_df.select("cast_size", "director", "crew_size", "cast").show(truncate=False)


# cleaned_df.select("title", "collection_name", "genre_names", "cast").show(5, truncate=False)

+------+--------------------+--------------------+------------+--------------------+--------------------+-----------------+-----------+------------+--------------------------+--------------------------+----------+------------+----------+-------+--------------------+----------------------+--------------------+--------------------+---------+--------------------+---------+
|    id|               title|             tagline|release_date|         genre_names|     collection_name|original_language|budget_musd|revenue_musd|production_companies_names|production_countries_names|vote_count|vote_average|popularity|runtime|            overview|spoken_languages_names|         poster_path|                cast|cast_size|            director|crew_size|
+------+--------------------+--------------------+------------+--------------------+--------------------+-----------------+-----------+------------+--------------------------+--------------------------+----------+------------+----------+-------+---------