
# PySpark Movie Data Analysis


# Install and Import Packages

In [17]:
# Install dotenv if not already installed
# %pip install python-dotenv

import os
import requests
from dotenv import load_dotenv
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.functions import col



# Load Environment Variables

In [3]:
# Load environment variables
load_dotenv()
API_KEY = os.getenv("TMDB_API_KEY")



# Configuration Parameters

In [19]:
# Configuration parameters
API_URL = "https://api.themoviedb.org/3/movie/popular"
OUTPUT_FILE_PATH = "C:\\Datasets\\Labs_Data\\TMDB-Data.csv"  # <-- update this to where you want to save the CSV
APP_NAME = "PySparkFormalPipelineTMDB"
MASTER = "local[*]"




# Functions (Spark Session Creation)

In [20]:
def create_spark_session(app_name=APP_NAME, master=MASTER):
    try:
        spark = SparkSession.builder.appName(app_name).master(master).getOrCreate()
        print("SparkSession started successfully.")
        return spark
    except Exception as e:
        print(f"Failed to create SparkSession: {e}")
        raise



# Functions Fetch TMDB Data

In [21]:
def fetch_tmdb_data(api_key, api_url):
    try:
        if not api_key:
            raise ValueError("TMDB API key is missing. Please check your .env file.")
        response = requests.get(api_url, params={"api_key": api_key})
        response.raise_for_status()
        data = response.json()
        movies = data.get("results", [])
        print(f"Fetched {len(movies)} movies from TMDB.")
        return movies
    except Exception as e:
        print(f"Failed to fetch TMDB data: {e}")
        raise



# Functions (Load, Clean, Transform and Save)

In [22]:
def load_data_into_spark(spark, data):
    try:
        pdf = pd.DataFrame(data)
        df = spark.createDataFrame(pdf)
        print("Data loaded into Spark DataFrame.")
        return df
    except Exception as e:
        print(f"Failed to load data into Spark: {e}")
        raise

def clean_data(df):
    try:
        cleaned_df = df.dropna()
        print("Data cleaned (null values dropped).")
        return cleaned_df
    except Exception as e:
        print(f"Failed during data cleaning: {e}")
        raise

from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

# Define a UDF (user-defined function) to stringify arrays
def array_to_string(arr):
    if arr is not None:
        return ",".join(map(str, arr))
    else:
        return None

array_to_string_udf = udf(array_to_string, StringType())

def transform_data(df):
    try:
        transformed_df = df.withColumn("vote_count", col("vote_count").cast("int"))
        transformed_df = transformed_df.withColumn("genre_ids", array_to_string_udf(col("genre_ids")))
        print("Data transformed (columns cast and arrays stringified).")
        return transformed_df
    except Exception as e:
        print(f"Failed during data transformation: {e}")
        raise


def save_data(df, output_path):
    try:
        df.write.mode("overwrite").option("header", True).csv(output_path)
        print(f"Data saved to {output_path}.")
    except Exception as e:
        print(f"Failed to save data: {e}")
        raise



# Main Pipeline Function

In [23]:
def main():
    spark = create_spark_session()

    try:
        tmdb_data = fetch_tmdb_data(API_KEY, API_URL)
        df = load_data_into_spark(spark, tmdb_data)
        df_clean = clean_data(df)
        df_transformed = transform_data(df_clean)
        save_data(df_transformed, OUTPUT_FILE_PATH)
    except Exception as pipeline_error:
        print(f"Pipeline execution failed: {pipeline_error}")
    finally:
        spark.stop()
        print("SparkSession stopped.")



# Script Entry point

In [None]:
if __name__ == "__main__":
    main()
