# Movie Analytics Project

## Import Libraries

In [2]:
import ast
from datetime import datetime
import io
import json
import os
import zipfile
import pandas as pd
import requests
from sqlalchemy import create_engine
from dotenv import load_dotenv
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when

In [3]:
load_dotenv()

zip_url = os.getenv("DATASOURCE")

In [None]:
spark = SparkSession\
    .builder \
    .appName("Movie_Analytics") \
    .config("spark.jars", os.getenv("MYSQL_CONNECTOR_JAR")) \
    .getOrCreate()


jdbc_url = "jdbc:mysql://localhost:3306/finalprojectschema"
properties = {
    "user": os.getenv("USER"),
    "password": os.getenv("PASSWORD"),
    "driver": "com.mysql.cj.jdbc.Driver"
}

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 54993)
Traceback (most recent call last):
  File "c:\Users\JLC04\AppData\Local\Programs\Python\Python311\Lib\socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "c:\Users\JLC04\AppData\Local\Programs\Python\Python311\Lib\socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "c:\Users\JLC04\AppData\Local\Programs\Python\Python311\Lib\socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "c:\Users\JLC04\AppData\Local\Programs\Python\Python311\Lib\socketserver.py", line 755, in __init__
    self.handle()
  File "c:\Users\JLC04\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\accumulators.py", line 295, in handle
    poll(accum_updates)
  File "c:\Users\JLC04\AppData\Local\Programs\Python\Python311\Lib

## Initialize ETL Functions

### Extract

In [5]:
def extract_data(zip_url):
    """
    Extracts CSV and JSON files from a ZIP file located at the provided URL.
    Only files within the 'project_data/' directory are processed.
    
    Returns:
        Tuple of three DataFrames:
        (movies_main_df, movie_extended_df, ratings_df)
    """

    response = requests.get(zip_url)

    if response.status_code == 200:
        buffer = io.BytesIO(response.content)
        dataframes = {}

        with zipfile.ZipFile(buffer, "r") as zip_ref:
            for file_name in zip_ref.namelist():

                if file_name.startswith("project_data/") and not file_name.endswith("/"):
                    print("Processing:", file_name)
                    clean_name = file_name.split("/")[-1].split(".")[0]

                    with zip_ref.open(file_name) as f:
                        if file_name.endswith(".csv"):
                            dataframes[f"{clean_name}_df"] = pd.read_csv(f)

                        elif file_name.endswith(".json"):
                            dataframes[f"{clean_name}_df"] = pd.read_json(f)

        print("\nExtracted the data into a pandas dataframe")
        return (dataframes.get("movies_main_df"), 
                dataframes.get("movie_extended_df"), 
                dataframes.get("ratings_df"))

    else:
        print("Failed to download the ZIP file")
        return None, None, None

### Transform

In [6]:
def transform_dim_movies(df):
    """
    Transforms the movie DataFrame into a dimension table, which returns a DataFrame suitable for the 'dim_movies' table.
    
    Steps:
    - Filters out rows with non-numeric IDs.
    - Converts release dates to a consistent day/month/year format.
    - Drops budget and revenue columns.
    - Renames 'id' to 'movie_id' and ensures it's an integer.
    - Removes rows with missing movie_id or title.
    """
    
    df = df[df["id"].str.isnumeric()] 
    df.loc[:, "release_date"] = pd.to_datetime(df["release_date"], format="mixed").dt.strftime("%m/%d/%Y")
    df = df.drop(columns=["budget", "revenue"]).rename(columns={"id": "movie_id"})
    df["movie_id"] = df["movie_id"].astype(int)
    df.dropna(subset=["movie_id", "title"], inplace=True)

    return df

In [7]:
def transform_dim_genres(df):
    """
    Transforms the genre information into a dimension table, and returns the dim_genres dataframe.

    Steps:
    - Splits the 'genres' string into lists and explodes them into separate rows.
    - Converts 'id' to numeric and renames it to 'movie_id'.
    - Drops unrelated columns.
    - Drops any rows with missing values and ensures 'movie_id' is an integer.
    """
        
    df["genres"] = df["genres"].str.split(",").reset_index(drop=True)
    df = df.explode("genres")
    df["id"] = pd.to_numeric(df["id"], errors="coerce")
    df = df.drop(columns=["production_companies", "production_countries", "spoken_languages"]).rename(columns={"id": "movie_id", "genres": "genre"}).dropna()
    df["movie_id"] = df["movie_id"].astype(int)

    return df

In [8]:
def transform_dim_companies(df):
    """
    Transforms the production company information into a dimension table, and returns the dim_companies dataframe.

    Steps:
    - Splits 'production_companies' into lists and explodes them into separate rows.
    - Converts 'id' to numeric and renames it to 'movie_id'.
    - Drops unrelated columns and rows with missing data.
    - Ensures 'movie_id' is an integer.
    """
    
    df["production_companies"] = df["production_companies"].str.split(",").reset_index(drop=True)
    df = df.explode("production_companies")
    df["id"] = pd.to_numeric(df["id"], errors="coerce")
    df = df.drop(columns=["genres", "production_countries", "spoken_languages"]).rename(columns={"id": "movie_id", "production_companies": "production_company"}).dropna()
    df["movie_id"] = df["movie_id"].astype(int)

    return df

In [9]:
def safe_eval(val):
    """
    Safely evaluates a string representation of a list or dictionary into an object,
    for the purpose of columns with stringified dictionary as a data point (For stringified lists/dictionaries datapoints).
    """

    if isinstance(val, str):
        return ast.literal_eval(val)
    
    return val


In [10]:
def transform_dim_countries(df):
    """
    Transforms country information into a dimension table, and returns the dim_countries dataframe.

    Steps:
    - Parses 'production_countries' JSON strings into Python objects.
    - Explodes the list of countries into separate rows.
    - Normalizes country dictionaries into separate columns.
    - Renames and filters relevant columns.
    - Ensures 'movie_id' is an integer.
    """
    
    df["production_countries"] = df["production_countries"].apply(safe_eval)
    df = df.explode("production_countries")
    df["id"] = pd.to_numeric(df["id"], errors="coerce")
    df = df .dropna(subset=["production_countries"]).reset_index(drop=True)
    df = pd.concat([df["id"], pd.json_normalize(df["production_countries"])], axis=1).rename(columns={"id": "movie_id", "iso_3166_1": "country_code", "name": "country"}).dropna()
    df["movie_id"] = df["movie_id"].astype(int)
    
    return df

In [11]:
def transform_dim_languages(df):
    """
    Transforms the raw movie data to create the dimension table for spoken languages, returns the dim_languages dataframe.

    Steps:
    - Converts stringified lists/dictionaries to Python objects
    - Explodes the list so each language becomes a separate row
    - Normalizes the language dictionaries into individual columns
    - Renames and cleans up the dataframe to match the star schema
    """
    
    df["spoken_languages"] = df["spoken_languages"].apply(safe_eval)
    df = df.explode("spoken_languages")
    df["id"] = pd.to_numeric(df["id"], errors="coerce")
    df = df .dropna(subset=["spoken_languages"]).reset_index(drop=True)
    df = pd.concat([df["id"], pd.json_normalize(df["spoken_languages"])], axis=1).rename(columns={"id": "movie_id", "iso_639_1": "language_code", "name": "language"}).dropna()
    df["movie_id"] = df["movie_id"].astype(int)   
   
    return df

In [12]:
def transform_fact_table(df1, df2):
    """
    Transforms and merges ratings and numerical movie columns into a fact table, returns the fact_table.

    - Gets the ratings and movies_main dataframe
    - Normalizes nested rating summary columns
    - Converts epoch timestamps to readable dates
    - Converts string values to numeric (handling errors)
    - Fills missing rating/financial fields with default values
    - Merges both datasets on 'movie_id' to form a complete fact table

    Args:
        df1 (pd.DataFrame): The ratings dataframe from 'ratings_df'
        df2 (pd.DataFrame): The main movie dataframe from 'movies_main_df'
    """
    
    df_summary = pd.json_normalize(df1["ratings_summary"])
    df1[df_summary.columns] = df_summary

    df1["last_rated"] = pd.to_datetime(df1["last_rated"], unit="s").dt.strftime("%m/%d/%Y")

    df1.drop(columns="ratings_summary", inplace=True)

    df2["id"] = pd.to_numeric(df2["id"], errors="coerce")
    df2["budget"] = pd.to_numeric(df2["budget"], errors="coerce")
    df2["revenue"] = pd.to_numeric(df2["revenue"], errors="coerce")

    fill_df1 = ["last_rated", "avg_rating", "total_ratings", "std_dev"]
    df1[fill_df1] = df1[fill_df1].fillna(0)
    df2[["budget", "revenue"]] = df2[["budget", "revenue"]].fillna(0)

    df2 = df2.dropna(subset=["id"]).drop(columns=["title", "release_date"]).rename(columns={"id": "movie_id"})
    df2["movie_id"] = df2["movie_id"].astype(int)

    df = pd.merge(df1, df2, on="movie_id", how="outer")

    df = df.drop_duplicates(subset=["movie_id"], keep="first")
    
    return df

### Load

In [13]:
def load_data(DIM_movies, DIM_genres, DIM_companies, DIM_countries, DIM_languages, FACT_movies, engine=None):
    """
    - Create MySQL engine if not provided
    - Convert PySpark DataFrames to pandas (for exporting)
    - Store DataFrames to MySQL
    """

    if engine is None:
        dbuser = os.getenv("USER")
        dbpassword = os.getenv("PASSWORD")
        dbname = "trial_projschema"
        engine = create_engine(f"mysql+mysqlconnector://{dbuser}:{dbpassword}@localhost/{dbname}")
    
    DIM_movies_pd = DIM_movies.toPandas()
    DIM_genres_pd = DIM_genres.toPandas()
    DIM_companies_pd = DIM_companies.toPandas()
    DIM_countries_pd = DIM_countries.toPandas()
    DIM_languages_pd = DIM_languages.toPandas()
    FACT_movies_pd = FACT_movies.toPandas()
    
    DIM_movies_pd.to_sql(name="dim_movies", con=engine, index=False, if_exists="replace")
    DIM_genres_pd.to_sql(name="dim_genres", con=engine, index=False, if_exists="replace")
    DIM_companies_pd.to_sql(name="dim_companies", con=engine, index=False, if_exists="replace")
    DIM_countries_pd.to_sql(name="dim_countries", con=engine, index=False, if_exists="replace")
    DIM_languages_pd.to_sql(name="dim_languages", con=engine, index=False, if_exists="replace")
    FACT_movies_pd.to_sql(name="fact_movies", con=engine, index=False, if_exists="replace")
    
    print("Data loading complete")

## Main Code

In [14]:
movies_main_df, movies_extended_df, ratings_df = extract_data(zip_url)

DIM_movies_pd = transform_dim_movies(movies_main_df)
DIM_genres_pd = transform_dim_genres(movies_extended_df)
DIM_companies_pd = transform_dim_companies(movies_extended_df)
DIM_countries_pd = transform_dim_countries(movies_extended_df)
DIM_languages_pd = transform_dim_languages(movies_extended_df)
FACT_movies_pd = transform_fact_table(ratings_df, movies_main_df)
print("Transform done...")


Processing: project_data/movies_main.csv
Processing: project_data/movie_extended.csv
Processing: project_data/ratings.json

Extracted the data into a pandas dataframe
Transform done...


### PySpark Operations

In [15]:
DIM_movies = spark.createDataFrame(DIM_movies_pd)
DIM_genres = spark.createDataFrame(DIM_genres_pd)
DIM_companies = spark.createDataFrame(DIM_companies_pd)
DIM_countries = spark.createDataFrame(DIM_countries_pd)
DIM_languages = spark.createDataFrame(DIM_languages_pd)
FACT_movies = spark.createDataFrame(FACT_movies_pd)

# Average rating per Movie
print("Average Rating per Movie:")
FACT_movies.groupBy("movie_id").agg(
    {"avg_rating": "avg"}
).show()

Average Rating per Movie:
+--------+------------------+
|movie_id|   avg(avg_rating)|
+--------+------------------+
|      26|               4.1|
|      29|             4.025|
|     474|            3.8625|
|     964|               2.5|
|    1677|               NaN|
|    1697|               NaN|
|    1806|               1.5|
|    1950| 3.909090909090909|
|    2040| 3.333333333333333|
|    2214|               3.5|
|    2250|               3.2|
|    2453| 3.285714285714285|
|    2529|3.6315789473684212|
|    2927|               4.5|
|    3091|               4.1|
|      65|             2.025|
|     191|               3.0|
|     418|               3.0|
|     541| 4.037671232876712|
|     558| 2.666666666666666|
+--------+------------------+
only showing top 20 rows



In [26]:
# Categorical Rating by Movies
FACT_movies = FACT_movies.withColumn(
    "rating_category",
    when(col("avg_rating") >= 4.5, "Very High")
    .when((col("avg_rating") >= 3.5) & (col("avg_rating") < 4.5), "High")
    .when((col("avg_rating") >= 2.5) & (col("avg_rating") < 3.5), "Average")
    .when((col("avg_rating") >= 1.5) & (col("avg_rating") < 2.5), "Low")
    .otherwise("Very Low")
)
print("Rating Categories:")
FACT_movies.select("movie_id", "avg_rating", "rating_category").show()

Rating Categories:
+--------+------------------+---------------+
|movie_id|        avg_rating|rating_category|
+--------+------------------+---------------+
|       1|  3.87246963562753|           High|
|       2| 3.401869158878504|        Average|
|       3| 3.161016949152542|        Average|
|       4| 2.384615384615384|            Low|
|       5|3.2678571428571432|        Average|
|       6| 3.884615384615384|           High|
|       7| 3.283018867924528|        Average|
|       8|               3.8|           High|
|       9|              3.15|        Average|
|      10|3.4508196721311473|        Average|
|      11| 3.689024390243902|           High|
|      12| 2.861111111111111|        Average|
|      13|            3.9375|           High|
|      14| 3.451612903225806|        Average|
|      15| 2.318181818181818|            Low|
|      16| 3.948863636363636|           High|
|      17| 3.924418604651162|           High|
|      18| 3.288461538461538|        Average|
|      19| 2.59

In [30]:
FACT_movies.createOrReplaceTempView("fact_movies")
DIM_movies.createOrReplaceTempView("dim_movies")
DIM_genres.createOrReplaceTempView("dim_genres")
DIM_companies.createOrReplaceTempView("dim_companies")
DIM_countries.createOrReplaceTempView("dim_countries")
DIM_languages.createOrReplaceTempView("dim_languages")

In [31]:
# Average rating per Genre (using PySpark SQL)
print("Average Rating per Genre:")
spark.sql("""
    SELECT dg.genre, AVG(fm.avg_rating) AS avg_genre_rating
    FROM fact_movies fm
    JOIN dim_genres dg ON fm.movie_id = dg.movie_id
    JOIN dim_movies dm ON fm.movie_id = dm.movie_id
    GROUP BY dg.genre
""").show()

Average Rating per Genre:
+---------------+----------------+
|          genre|avg_genre_rating|
+---------------+----------------+
|          Crime|             NaN|
|        Romance|             NaN|
|       TV Movie|             NaN|
|       Thriller|             NaN|
|      Adventure|             NaN|
|        Foreign|             NaN|
|          Drama|             NaN|
|            War|             NaN|
|    Documentary|             NaN|
|         Family|             NaN|
|        Fantasy|             NaN|
|        History|             NaN|
|        Mystery|             NaN|
|      Animation|             NaN|
|          Music|             NaN|
|Science Fiction|             NaN|
|         Horror|             NaN|
|        Western|             NaN|
|         Comedy|             NaN|
|         Action|             NaN|
+---------------+----------------+



### Load to DB

In [None]:
dbuser = os.getenv("USER")
dbpassword = os.getenv("PASSWORD")
dbname = "trial_projschema"
engine = create_engine(f"mysql+mysqlconnector://{dbuser}:{dbpassword}@localhost/{dbname}")
load_data(DIM_movies, DIM_genres, DIM_companies, DIM_countries, DIM_languages, FACT_movies, engine)

print("Load and Pipeline complete.")

Data loading complete
Load and Pipeline complete.


: 