# TMDB Movie Pipeline

A simple ETL pipeline that extracts movie data from TMDB API, transforms it, and performs analysis.

## Setup

In [None]:
import sys
sys.path.insert(0, '.')

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("TMDB_Movie_Pipeline") \
    .master("local[*]") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()

print("Spark session created")

## Step 1: Extract - Fetch Movies from TMDB API

In [None]:
from src.extract.fetch_movies import fetch_movies

raw_df = fetch_movies(spark)

print(f"Fetched {raw_df.count()} movies")
raw_df.select('id', 'title', 'release_date').show(5, truncate=False)

## Step 2: Transform - Clean Data

In [None]:
from src.transform.clean_movies import clean_movies

cleaned_df = clean_movies(raw_df)

print(f"Cleaned data: {cleaned_df.count()} movies")
cleaned_df.select('title', 'genres', 'budget_musd', 'revenue_musd').show(5, truncate=False)

## Step 3: Transform - Enrich Data

In [None]:
from src.transform.enrich_movies import enrich_movies

enriched_df = enrich_movies(cleaned_df)

print("Enriched data with profit and ROI")
enriched_df.select('title', 'profit_musd', 'roi', 'release_year').show(5, truncate=False)

## Step 4: Analysis - Best and Worst Performers

In [None]:
from src.analysis.kpi_rankings import get_highest_revenue_movies, get_highest_roi_movies

print("Top 5 Movies by Revenue:")
get_highest_revenue_movies(enriched_df, n=5).select('title', 'revenue_musd').show(truncate=False)

print("Top 5 Movies by ROI:")
get_highest_roi_movies(enriched_df, n=5).select('title', 'roi').show(truncate=False)

## Step 5: Analysis - Franchise vs Standalone

In [None]:
from src.analysis.franchise_analysis import compare_franchise_vs_standalone

comparison = compare_franchise_vs_standalone(enriched_df)

print("Franchise vs Standalone Comparison:")
comparison.show(truncate=False)

## Step 6: Create and Display Visualizations

In [None]:
from src.visualization.plots import create_all_visualizations
from IPython.display import Image, display

plots = create_all_visualizations(enriched_df, 'data/analytics/plots/', comparison_df=comparison)

print(f"Created {len(plots)} visualizations")

# Display all plots inline
for plot_path in plots:
    print(f"\n{plot_path}:")
    display(Image(filename=plot_path))

## Done

In [None]:
spark.stop()
print("Pipeline complete")