# TMDB Movie Analysis Pipeline (PySpark)

This notebook orchestrates the complete ETL and analysis pipeline for the TMDB Movie Dataset.
It leverages **PySpark** for scalable data processing and **Pandas/Matplotlib** for final visualization.

## Pipeline Steps
1.  **Fetch**: Retrieve data from TMDB API (with retries and logging).
2.  **Process**: Clean and transform JSON data into Parquet using PySpark.
3.  **Analyze**: Aggregate and query data using PySpark DSL.
4.  **Visualize**: Plot insights.

In [1]:
import os
import sys
import socketserver
from pathlib import Path

# --- PySpark on Windows Workaround ---
# PySpark 4.x on Windows references UnixStreamServer in accumulators.py 
# which causes an AttributeError. We patch it to use TCPServer (or a dummy) 
# to allow the module to import.
if os.name == 'nt' and not hasattr(socketserver, 'UnixStreamServer'):
    socketserver.UnixStreamServer = socketserver.TCPServer

# Add project root to path to allow importing src modules
project_root = Path("..").resolve()
if str(project_root) not in sys.path:
    sys.path.append(str(project_root))

from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
import seaborn as sns

# Import our custom modules
from src.logger import setup_logger
from src.fetch_data import TMDBFetcher
from src.process_data import process_data
from src.analysis import MovieAnalyzer

# Configure Plotting Style
sns.set_theme(style="whitegrid")
plt.rcParams["figure.figsize"] = (12, 6)

In [None]:
# Setup Spark Session
import os
import sys
from pathlib import Path
from pyspark.sql import SparkSession

# Add the project root to system path to import modules from src/
# In Docker, src is at /home/spark/src, and notebooks are at /home/spark/work
sys.path.append('/home/spark') 
# Fallback for local run if not in Docker structure
sys.path.append(str(Path.cwd().parent))

# Configuration
# Using local[*] mode - runs Spark locally within this container
# This is simpler and more stable for development/learning
MASTER_URL = "local[*]"
DATA_DIR = Path(os.getenv("DATA_DIR", "../data"))

print(f"Using Spark in Local Mode: {MASTER_URL}")
print(f"Data Directory: {DATA_DIR}")

# Create SparkSession in local mode
spark = SparkSession.builder \
    .appName("TMDB_Movie_Analysis") \
    .master(MASTER_URL) \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

print(f"Spark Session Created: {spark.sparkContext.appName}")
print(f"Spark Version: {spark.version}")    

Using Spark in Local Mode: local[*]
Data Directory: /home/spark/data


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/26 11:53:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
26/01/26 11:54:04 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


Spark Session Created: TMDB_Movie_Analysis
Spark Version: 4.0.1


## Step 1: Fetch Data (API)
We use the `TMDBFetcher` class to robustly download movie data. This handles retries for rate limits and saves the raw JSON to disk.

In [None]:
from src.fetch_data import TMDBFetcher

# Ensure API Key is set
# os.environ['TMDB_API_KEY'] = 'your_key_here' # Uncomment and set if not in .env

# IDs to fetch
movie_ids = [0, 299534, 19995, 140607, 299536, 597, 135397, 420818, 24428, 168259, 99861, 284054, 12445, 181808, 330457, 351286, 109445, 321612, 260513]

try:
    fetcher = TMDBFetcher()
    movies_data = fetcher.fetch_specific_movies(movie_ids)
    
    raw_output_path = DATA_DIR / "raw" / "movies.json"
    TMDBFetcher.save_raw_data(movies_data, raw_output_path)
    
except Exception as e:
    print(f"Error occurred: {e}")


## Step 2: Process Data (ETL)
We use PySpark to read the nested JSON, flatten it, clean data types, and compute metrics like ROI.
The result is saved as a columnar **Parquet** file.

In [None]:
from src.process_data import process_data

raw_path = str(DATA_DIR / "raw" / "movies.json")
processed_path = str(DATA_DIR / "processed" / "movies.parquet")

try:
    process_data(raw_path, processed_path, spark)
    print("Data processing complete.")
except Exception as e:
    print(f"Processing failed: {e}")

## Step 3: Analyze Data
We initialize the `MovieAnalyzer` which wraps PySpark queries. It computes aggregations on the cluster (simulated) and returns lightweight Pandas DataFrames for results.

In [None]:
from src.analysis import MovieAnalyzer

analyzer = MovieAnalyzer(spark)
analyzer.load_data(str(DATA_DIR / "processed" / "movies.parquet"))

# 1. Financial Stats
financials = analyzer.get_financial_stats()

In [None]:
print("--- Highest Revenue ---")
print(financials['top_revenue'])

In [None]:
print("\n--- Highest Budget ---")
print(financials['top_budget'])

In [None]:
print("\n--- Highest Profit ---")
print(financials['top_profit'])

In [None]:
print("\n--- Lowest Profit (Flops) ---")
print(financials['flops'])

In [None]:
print("\n--- Highest ROI (Budget >= 10M) ---")
print(financials['top_roi'])

In [None]:
print("\n--- Lowest ROI (Budget >= 10M) ---")
print(financials['flops_roi'])

In [None]:
print("\n--- Most Voted Movies ---")
print(financials['most_voted'])

In [None]:
print("\n--- Highest Rated Movies (Votes >= 10) ---")
print(financials['highest_rated'])

In [None]:
print("\n--- Highest Rated Movies (Votes >= 10) ---")
print(financials['highest_rated'])

In [None]:
print("\n--- Lowest Rated Movies (Votes >= 10) ---")
print(financials['lowest_rated'])

In [None]:
print("\n--- Most Popular Movies ---")
print(financials['most_popular'])

## Step 4: Visualization
Using the aggregated stats from Step 3 to draw insights.

In [None]:
# 2. Specific Queries
specifics = analyzer.get_specific_movies()
print("\n--- Bruce Willis Sci-Fi ---")
print(specifics['bruce_willis_scifi'])

In [None]:

print("\n--- Uma Thurman & Quentin Tarantino ---")
print(specifics['uma_qt_collab'])

In [None]:
# 3. Franchise Analysis
stats, top_franchises = analyzer.analyze_franchises()
print("\n--- Franchise Stats ---")
print(stats)

In [None]:
print("\n--- Top Franchises ---")
print(top_franchises)


In [None]:
# 4. Top Directors
print("\n--- Top Directors ---")
print(analyzer.analyze_directors())

In [None]:
# 5. Visualization
import matplotlib.pyplot as plt
import seaborn as sns

# Set style
sns.set_theme(style="ticks")
plt.rcParams['figure.figsize'] = (14, 8)

# Fetch Data needed for plots
genre_stats = analyzer.get_genre_stats()
yearly_stats = analyzer.get_yearly_trends()
all_movies = analyzer.get_all_movies_for_plot()


In [None]:
# A. Revenue vs. Budget (Scatter Plot)
plt.figure()
sns.scatterplot(data=all_movies, x="budget_musd", y="revenue_musd", size="vote_count", sizes=(20, 500), hue="vote_average", palette="viridis")
plt.title("Revenue vs. Budget")
plt.xlabel("Budget (M$)")
plt.ylabel("Revenue (M$)")
plt.plot([0, all_movies['budget_musd'].max()], [0, all_movies['budget_musd'].max()], 'r--', label='Break-even line') # Break-even line
plt.legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.)
plt.show()

In [None]:
# B. ROI Distribution by Genre (Bar Plot)
plt.figure()
sns.barplot(data=genre_stats.sort_values(by="avg_roi", ascending=False), x="avg_roi", y="genre", hue="genre", palette="viridis", legend=False)
plt.title("Average ROI by Genre")
plt.xlabel("Average ROI")
plt.ylabel("Genre")
plt.show()

In [None]:
# C. Popularity vs. Rating (Scatter Plot)
plt.figure()
annotated = sns.scatterplot(data=all_movies, x="vote_average", y="popularity", s=100)
plt.title("Popularity vs. Rating")
plt.xlabel("Vote Average")
plt.ylabel("Popularity Score")
plt.show()

In [None]:
# D. Yearly Box Office Trends (Line Plot)
plt.figure()
sns.lineplot(data=yearly_stats, x="release_year", y="total_revenue", marker="o", label="Revenue")
sns.lineplot(data=yearly_stats, x="release_year", y="total_budget", marker="o", color="orange", label="Budget")
plt.title("Yearly Box Office Trends")
plt.xlabel("Year")
plt.ylabel("Amount (M$)")
plt.legend()
plt.show()

In [None]:
# E. Franchise vs Standalone (Side-by-Side Bar Plot)
plt.figure()
stats_reset = stats.reset_index()

# Melt the dataframe to have Revenue and Budget in one column
stats_melted = stats_reset.melt(
    id_vars="is_franchise", 
    value_vars=["avg_revenue", "avg_budget"], 
    var_name="Metric", 
    value_name="Amount (M$)"
)

# Rename the metrics for better legend readability
stats_melted["Metric"] = stats_melted["Metric"].replace({
    "avg_revenue": "Revenue", 
    "avg_budget": "Budget"
})

# Create grouped bar chart - Swapped x and hue to match the desired layout
sns.barplot(data=stats_melted, x="Metric", y="Amount (M$)", hue="is_franchise", palette="muted")

plt.title("Franchise vs. Standalone: Revenue and Budget Comparison")
plt.ylabel("Amount (M$)")
plt.xlabel("Metric")
plt.legend(title="Franchise Type")
plt.show()