# TMDB Data Engineering Project: Modular Pipeline (PySpark)

## Project Structure
- **extraction.api**: Handles API requests.
- **transformation.cleaning**: Handles data cleaning (Spark).
- **analysis.analysis**: Contains logic for KPIs and Ranking (Spark).
- **visualization.plots**: Contains plotting functions (Pandas interface).
- **config.settings**: Configuration (API keys).

In [None]:
import sys
import os
import json
# Add project root to path
sys.path.append(os.path.abspath('..'))

from pyspark.sql import SparkSession
import pandas as pd

from config.settings import MOVIE_IDS
from extraction.api import fetch_movie_data
from transformation.cleaning import clean_movie_data
from analysis.analysis import (
    calculate_kpis, rank_movies, get_franchise_performance, 
    get_director_performance, filter_bruce_willis_scifi, 
    filter_uma_tarantino, compare_franchise_vs_standalone
)
from visualization.plots import (
    set_style, plot_revenue_vs_budget, plot_roi_by_genre, 
    plot_popularity_vs_rating, plot_franchise_comparison, plot_yearly_trends, 
    plot_franchise_vs_standalone
)

# Initialize Spark Session (Required for PySpark functions to work)
spark = SparkSession.builder.appName("TMDB_Notebook_Analysis").master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

set_style()

# Step 1: Fetch & Clean Data

In [None]:
print(f"Fetching data for {len(MOVIE_IDS)} movies...")
raw_data_list = fetch_movie_data(MOVIE_IDS)

# Convert raw list of dicts to Spark DataFrame (JSON RDD method for robustness)
json_rdd = spark.sparkContext.parallelize([json.dumps(r) for r in raw_data_list])
raw_df = spark.read.json(json_rdd)

df_spark = clean_movie_data(raw_df)
df_spark = calculate_kpis(df_spark)

# For quick display in notebook, we show the Spark DF
df_spark.select('title', 'release_date', 'revenue_musd', 'budget_musd', 'profit_musd', 'roi').show(5)

# Step 2: KPI Analysis

In Spark, we operate on the distributed DataFrame. We use `.show()` to view results or `.toPandas()` if we need pretty printing.

In [None]:
print("TOP 5 Highest Revenue")
rank_movies(df_spark, 'revenue_musd', top_n=5, ascending=False).select('title', 'revenue_musd').show(truncate=False)

print("\nTOP 5 Highest Budget")
rank_movies(df_spark, 'budget_musd', top_n=5, ascending=False).select('title', 'budget_musd').show(truncate=False)

print("\nTOP 5 Highest Profit")
rank_movies(df_spark, 'profit_musd', top_n=5, ascending=False).select('title', 'profit_musd').show(truncate=False)

print("\nTOP 5 ROI (Budget >= 10M)")
rank_movies(df_spark, 'roi', top_n=5, ascending=False, min_budget=10).select('title', 'roi').show(truncate=False)

print("\nHighest Rated Movies (>= 10 votes)")
rank_movies(df_spark, 'vote_average', top_n=5, ascending=False, min_votes=10).select('title', 'vote_average', 'vote_count').show(truncate=False)

print("\nMost Popular Movies")
rank_movies(df_spark, 'popularity', top_n=5, ascending=False).select('title', 'popularity').show(truncate=False)

## 2.2 Advanced Filtering

In [None]:
print("Search 1: Sci-Fi Action starring Bruce Willis")
res1 = filter_bruce_willis_scifi(df_spark)
if res1: res1.show(truncate=False)

print("\nSearch 2: Uma Thurman directed by Quentin Tarantino")
res2 = filter_uma_tarantino(df_spark)
if res2: res2.show(truncate=False)

## 2.3 Franchise vs Standalone Analysis

In [None]:
print("Franchise vs Standalone Performance Comparison")
comp = compare_franchise_vs_standalone(df_spark)
# Convert to Pandas for cleaner display if it's small
if comp: 
    comp_pd = comp.toPandas()
    display(comp_pd)
    plot_franchise_vs_standalone(comp_pd)


## 2.4 Most Successful Franchises & Directors

In [None]:
print("Top Franchises")
franchises_spark = get_franchise_performance(df_spark)
franchises_spark.show(5, truncate=False)

print("\nTop Directors")
directors_spark = get_director_performance(df_spark)
directors_spark.show(5, truncate=False)

# Step 3: Visualization
Note: Spark DataFrames must be collected to the driver (converted to Pandas) for plotting with Seaborn/Matplotlib.

In [None]:
# Collect data for plotting
df_pd = df_spark.toPandas()

plot_revenue_vs_budget(df_pd)

In [None]:
plot_roi_by_genre(df_pd)

In [None]:
plot_popularity_vs_rating(df_pd)

In [None]:
plot_yearly_trends(df_pd)

In [None]:
franchises_pd = franchises_spark.toPandas()
franchises_pd.set_index('belongs_to_collection', inplace=True)
plot_franchise_comparison(franchises_pd)