# TMDB Movie Data Analysis using Spark and APIs

## Step 1: API Setup and Data Extraction

This section focuses on setting up the TMDB API connection and extracting movie data. We'll use PySpark for distributed data processing and the TMDB API to fetch detailed movie information including cast, crew, and financial data.

### Import Necessary Modules

Start by importing the required libraries for:
- **PySpark**: Distributed data processing and analysis
- **API calls**: HTTP requests to TMDB API
- **Data manipulation**: NumPy for numerical operations
- **Environment management**: Secure API key handling
- **Visualization**: Matplotlib for data plotting

In [1]:
# Initialize Spark Session for distributed data processing
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Create Spark session with adaptive query execution enabled
spark = SparkSession.builder \
        .appName("TMDB Movie Data Analysis") \
        .config("spark.sql.adaptive.enabled", "true") \
        .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/01 21:28:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Import additional libraries for API calls and data analysis
import os
from dotenv import load_dotenv
import requests
import numpy as np
import matplotlib.pyplot as plt
import time  # For handling API rate limits

### Environment Configuration

Load environment variables from .env file to securely access the TMDB API key without hardcoding sensitive information.

In [3]:
# Load environment variables from .env file
load_dotenv()

True

### API Key Configuration

Retrieve the TMDB API key from environment variables. This approach ensures:
- Security: API keys are not exposed in code
- Flexibility: Easy to change keys without code modification
- Best practices: Following secure development standards

In [4]:
# Retrieve TMDB API key from environment variables
API_KEY = os.getenv('TMDB_API_KEY')

# Validate API key is loaded
if not API_KEY:
    raise ValueError("TMDB_API_KEY not found in environment variables. Please check your .env file.")

### Movie Data Extraction with Error Handling

This section implements robust movie data fetching with comprehensive error handling:

**Features:**
- **Timeout handling**: Prevents hanging requests
- **HTTP error handling**: Manages API response errors
- **Connection error handling**: Handles network issues
- **Rate limiting**: Respects API usage limits
- **Data validation**: Ensures safe data access
- **Progress tracking**: Shows real-time processing status

**Data collected for each movie:**
- Basic info: Title, release date, runtime, budget, revenue
- Ratings: Vote average and count
- Cast: Top 5 actors and total cast size
- Crew: Director and total crew size
- Genres and production details

In [None]:
# List of popular movie IDs from TMDB (including one invalid ID for testing error handling)
movie_ids = [
    0, 299534, 19995, 140607, 299536, 597, 135397,
    420818, 24428, 168259, 99861, 284054, 12445,
    181808, 330457, 351286, 109445, 321612, 260513
]

# TMDB API configuration
base_url = "https://api.themoviedb.org/3/movie"
params = {"api_key": API_KEY, "language": "en-US"}

# Initialize lists to store results
movies_data = []
failed_requests = []  # Track failed requests for analysis

print("Starting movie data extraction...")

for i, movie_id in enumerate(movie_ids, 1):
    try:
        print(f"Processing movie {i}/{len(movie_ids)} (ID: {movie_id})...", end=" ")
        
        # Fetch movie details with timeout and error handling
        movie_url = f"{base_url}/{movie_id}"
        movie_response = requests.get(movie_url, params=params, timeout=10)
        movie_response.raise_for_status()  # Raises HTTPError for bad responses
        
        data = movie_response.json()
        
        # Fetch credits with nested error handling
        try:
            credits_url = f"{base_url}/{movie_id}/credits"
            credits_response = requests.get(credits_url, params=params, timeout=10)
            credits_response.raise_for_status()
            
            credits = credits_response.json()
            
            # Extract main cast (top 5 names) with safe access
            cast_list = [member.get("name", "Unknown") for member in credits.get("cast", [])[:5]]
            cast_names = "|".join(cast_list) if cast_list else "No cast data"
            cast_size = len(credits.get("cast", []))
            
            # Extract directors from crew with safe access
            crew = credits.get("crew", [])
            directors = [m.get("name") for m in crew if m.get("job") == "Director" and m.get("name")]
            director_name = directors[0] if directors else "Unknown Director"
            crew_size = len(crew)
            
            # Add credit info to movie data
            data["cast"] = cast_names
            data["cast_size"] = cast_size
            data["director"] = director_name
            data["crew_size"] = crew_size
            
        except requests.exceptions.RequestException as e:
            print(f"Credits fetch failed: {str(e)[:50]}...")
            # Set default values if credits fetch fails
            data["cast"] = "Credits unavailable"
            data["cast_size"] = 0
            data["director"] = "Unknown Director"
            data["crew_size"] = 0
        
        except Exception as e:
            print(f"Credits processing error: {str(e)[:50]}...")
            data["cast"] = "Processing error"
            data["cast_size"] = 0
            data["director"] = "Unknown Director"
            data["crew_size"] = 0
        
        movies_data.append(data)
        print(f"Fetched movie: {data.get('title', 'Unknown Title')} (ID: {movie_id})")
        
        # Add small delay to respect API rate limits
        time.sleep(0.1)
        
    except requests.exceptions.HTTPError as e:
        error_msg = f"HTTP Error {e.response.status_code}: {e.response.reason}"
        print(f"Failed to fetch movie with ID: {movie_id}")
        failed_requests.append({"movie_id": movie_id, "error": error_msg})
        
    except requests.exceptions.Timeout:
        error_msg = "Request timeout (>10s)"
        print(f"Timeout error for movie ID: {movie_id}")
        failed_requests.append({"movie_id": movie_id, "error": error_msg})
        
    except requests.exceptions.ConnectionError:
        error_msg = "Connection error - check internet connection"
        print(f"Connection error for movie ID: {movie_id}")
        failed_requests.append({"movie_id": movie_id, "error": error_msg})
        
    except requests.exceptions.RequestException as e:
        error_msg = f"Request error: {str(e)[:50]}..."
        print(f"Request error for movie ID: {movie_id}")
        failed_requests.append({"movie_id": movie_id, "error": error_msg})
        
    except Exception as e:
        error_msg = f"Unexpected error: {str(e)[:50]}..."
        print(f"Unexpected error for movie ID: {movie_id}")
        failed_requests.append({"movie_id": movie_id, "error": error_msg})

# Summary of data extraction
print(f"\n=== Data Extraction Summary ===")
print(f"Total movies requested: {len(movie_ids)}")
print(f"Successfully fetched: {len(movies_data)}")
print(f"Failed requests: {len(failed_requests)}")

if failed_requests:
    print("\nFailed requests details:")
    for failure in failed_requests:
        print(f"  - Movie ID {failure['movie_id']}: {failure['error']}")

Starting movie data extraction...
Processing movie 1/19 (ID: 0)... Failed to fetch movie with ID: 0
Processing movie 2/19 (ID: 299534)... Fetched movie: Avengers: Endgame (ID: 299534)
Processing movie 3/19 (ID: 19995)... Fetched movie: Avatar (ID: 19995)
Processing movie 4/19 (ID: 140607)... Fetched movie: Star Wars: The Force Awakens (ID: 140607)
Processing movie 5/19 (ID: 299536)... Fetched movie: Avengers: Infinity War (ID: 299536)
Processing movie 6/19 (ID: 597)... Fetched movie: Titanic (ID: 597)
Processing movie 7/19 (ID: 135397)... Fetched movie: Jurassic World (ID: 135397)
Processing movie 8/19 (ID: 420818)... Fetched movie: The Lion King (ID: 420818)
Processing movie 9/19 (ID: 24428)... Fetched movie: The Avengers (ID: 24428)
Processing movie 10/19 (ID: 168259)... Fetched movie: Furious 7 (ID: 168259)
Processing movie 11/19 (ID: 99861)... Fetched movie: Avengers: Age of Ultron (ID: 99861)
Processing movie 12/19 (ID: 284054)... 

## Step 2: Data Processing with PySpark

Now that we have successfully extracted movie data from the TMDB API with robust error handling, we'll:

1. **Convert to Spark DataFrame**: Transform the collected data into a distributed Spark DataFrame
2. **Data Cleaning**: Handle missing values and standardize data types
3. **Feature Engineering**: Create new columns for analysis
4. **Data Analysis**: Perform statistical analysis and insights extraction

This approach leverages Spark's distributed computing capabilities for scalable data processing.

In [None]:
# Convert the collected movie data to Spark DataFrame
if movies_data:
    try:
        # Create Spark DataFrame from the collected data
        movies_df = spark.createDataFrame(movies_data)
        
        print(f"Created Spark DataFrame with {movies_df.count()} movies")
        print(f"DataFrame schema:")
        movies_df.printSchema()
        
        # Display first few rows
        print("\nFirst 3 movies:")
        movies_df.select("title", "release_date", "vote_average", "budget", "revenue", "director").show(3, truncate=False)
        
    except Exception as e:
        print(f"Error creating Spark DataFrame: {e}")
        print("Falling back to basic data display...")
        
        # Display basic info about collected data
        for i, movie in enumerate(movies_data[:3], 1):
            print(f"{i}. {movie.get('title', 'Unknown')} ({movie.get('release_date', 'Unknown date')})")
else:
    print("No movie data available to create DataFrame")