In [229]:
import os
import sys
sys.path.append(os.path.abspath('..'))
import json
import builtins

In [230]:
from pyspark.sql import SparkSession
from Functions.data_extraction import movie
from Functions.Schema import get_tmdb_raw_schema
from Functions.data_cleaning import extract_data, extract_name,get_director,separate_data,get_cast_names,get_crew_names
from Functions.KPIs_Analysis import rank_movies
from pyspark.sql.functions import col, lit, round, size, coalesce
from pyspark.sql.types import DoubleType

In [231]:
# Initialize Spark
spark = SparkSession.builder \
    .appName("TMDB Movie Analysis") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "8") \
    .getOrCreate()

print("✓ PySpark initialized successfully!")
print(f"   Spark Version: {spark.version}")

✓ PySpark initialized successfully!
   Spark Version: 3.5.0


In [232]:
#Extracting perferred movie data from the Api
key = os.getenv("key")

movies = []

movie_ids = [0,299534, 19995, 140607, 299536, 597, 135397, 420818,
             24428, 168259, 99861, 284054, 12445, 181808, 330457,
               351286, 109445, 321612, 260513
]

for movie_id in movie_ids:
    movies.append(movie(movie_id,key))

#print(movies)

Movie ID 0 not found (404). Skipping.


In [233]:
movies = list(builtins.filter(None, movies))

In [234]:
# Read with multiLine and let Spark infer schema
df = spark.read \
    .option("multiLine", "true") \
    .option("mode", "PERMISSIVE") \
    .option("columnNameOfCorruptRecord", "_corrupt_record") \
    .json('tmdb_movies.json')

In [235]:
df.show(truncate=True)

+-----+--------------------+---------------------+---------+--------------------+--------------------+--------------------+------+---------+--------------+-----------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+------------+----------+-------+--------------------+--------+--------------------+--------------------+-----+------------+----------+
|adult|       backdrop_path|belongs_to_collection|   budget|             credits|              genres|            homepage|    id|  imdb_id|origin_country|original_language|      original_title|            overview|popularity|         poster_path|production_companies|production_countries|release_date|   revenue|runtime|    spoken_languages|  status|             tagline|               title|video|vote_average|vote_count|
+-----+--------------------+---------------------+---------+--------------------+--------------------+--------------------+------+---------+------------

In [236]:
# Apply extract_data function to get genre names
df = df.withColumn("genres", extract_data(col("genres")))

# Apply extract_name function to get collection names
# Need to pass 'name' as a literal column since the UDF expects column parameters
df = df.withColumn("belongs_to_collection", extract_name(col("belongs_to_collection"), lit("name")))

# Apply extract_data function to get production countries names
df = df.withColumn("production_countries", extract_data(col("production_countries")))

# Apply extract_data function to get production companies names
df = df.withColumn("production_companies", extract_data(col("production_companies")))

# Apply extract_data function to get spoken_languages
df = df.withColumn("spoken_languages", extract_data(col("spoken_languages")))

In [237]:
df = df.withColumn("cast", get_cast_names(col("credits")))
df = df.withColumn("crew", get_crew_names(col("credits")))

In [238]:
df = df.withColumn("director", get_director(col("credits")))

In [239]:
df = (
    df
    .withColumn("cast_size", coalesce(size(col("credits.cast")), lit(0)))
    .withColumn("crew_size", coalesce(size(col("credits.crew")), lit(0)))
)


In [240]:
cols_to_drop = ["adult", "imdb_id", "original_title", "video", "homepage"]
df = df.drop(*cols_to_drop)


In [241]:
columns_to_separate = ['genres', 'spoken_languages', 'production_countries', 'production_companies','cast','crew']
for column in columns_to_separate:
    df = separate_data(df, column)


In [242]:
# Convert budget & revenue to millions and round to 2 decimals
df = df.withColumn("budget_musd", round(col("budget") / 1_000_000, 2)) \
       .withColumn("revenue_musd", round(col("revenue") / 1_000_000, 2))


In [243]:
# Drop the 'status' column
df = df.drop("status")


In [244]:
df = df.select('id', 'title', 'tagline', 'release_date', 'genres', 'belongs_to_collection', 
         'original_language', 'budget_musd', 'revenue_musd', 'production_companies', 
         'production_countries', 'vote_count', 'vote_average', 'popularity', 'runtime',
           'overview', 'spoken_languages', 'poster_path', 'cast', 'cast_size', 'director', 'crew_size')

In [245]:
# Ensure numeric (cast to double)
df = (
    df
    .withColumn("budget_musd", col("budget_musd").cast(DoubleType()))
    .withColumn("revenue_musd", col("revenue_musd").cast(DoubleType()))
)

# Profit
df = df.withColumn(
    "profit",
    col("revenue_musd") - col("budget_musd")
)

# ROI
df = df.withColumn(
    "roi",
    (col("revenue_musd") / col("budget_musd")) * 100
)

In [246]:
# Top 5 movies by revenue
highest_revenue = rank_movies(df, column='revenue_musd', n=5)

for i, row in enumerate(highest_revenue.collect(), 1):
    print(f"{i}. {row['title']} - ${row['revenue_musd']}M")

1. Avatar - $2923.71M
2. Avengers: Endgame - $2799.44M
3. Titanic - $2264.16M
4. Star Wars: The Force Awakens - $2068.22M
5. Avengers: Infinity War - $2052.42M


In [247]:
# Highest Budget using rank_movies 
highest_budget = rank_movies(df, column='budget_musd', n=1) 
for row in highest_budget.collect(): 
    print(f"The highest budget: ${row['budget_musd']}M - {row['title']}")


The highest budget: $356.0M - Avengers: Endgame


In [248]:
#Highest profit
highest_profit = rank_movies(df, column='profit', n=1) 
for row in highest_profit.collect(): 
    print(f'The highest profit: ${row["profit"]}M - {row["title"]}')

The highest profit: $2686.71M - Avatar


In [222]:
#Lowest profit
lowest_profit = rank_movies(df, column='profit', ascending=True, n=1) 
for row in lowest_profit.collect(): 
    print(f'The lowest profit: ${row["profit"]}M - {row["title"]}')

The lowest profit: $1032.7M - Star Wars: The Last Jedi


In [223]:
# Highest ROI using rank_movies (only movies with Budget >= 10M)
highest_roi = rank_movies(df, column='roi', n=1, min_budget=10)
for row in highest_roi.collect():
    print(f'The highest Return On Investment: {row["roi"]:.2f} - {row["title"]}')

The highest Return On Investment: 1233.63 - Avatar


In [224]:
# Lowest ROI using rank_movies (only movies with Budget >= 10M)
lowest_roi = rank_movies(df, column='roi', ascending=True, n=1, min_budget=10)
for row in lowest_roi.collect():
    print(f'The lowest Return On Investment: {row["roi"]:.2f} - {row["title"]}')

The lowest Return On Investment: 444.23 - Star Wars: The Last Jedi


In [225]:
# Most Voted Movies using rank_movies
most_voted = rank_movies(df, column='vote_count', n=1)

for row in most_voted.collect():
    print(f'The most voted movie: {row["title"]}')

The most voted movie: The Avengers


In [226]:
# Highest Rated Movies (only movies with ≥ 10 votes)
highest_rate = rank_movies(df, column='vote_average', n=1, min_votes=10)
for row in highest_rate.collect():
    print(f'The most rated movie: {row["vote_average"]} - {row["title"]}')

The most rated movie: 8.238 - Avengers: Endgame


In [227]:
# Lowest Rated Movies (only movies with ≥ 10 votes) using rank_movies
lowest_rate = rank_movies(df, column='vote_average', ascending=True, n=1, min_votes=10)
for row in lowest_rate.collect():
    print(f'The lowest rated movie: {row["vote_average"]} - {row["title"]}')

The lowest rated movie: 6.536 - Jurassic World: Fallen Kingdom


In [228]:
# Most Popular Movies using rank_movies
most_popular = rank_movies(df, column='popularity', n=1)
for row in most_popular.collect():
    print(f'The most popular movie: {row["title"]}')

The most popular movie: Avatar
