# Movies analysis dashboard using OMDB API

## Imports, configuration and DB connection
I decided to go with Duck DB due to its light weight and great CSV support.
*IMPORTANT NOTE*
`omdb.duckdb` file must be created locally

In [27]:
import duckdb
import pandas as pd
import requests
import time
from datetime import datetime
import matplotlib.pyplot as plt
import seaborn as sns
from IPython.display import display, HTML

In [28]:
# Configuration
CSV_FILE_PATH = 'revenues_per_day.csv' # Path to your CSV file
DUCKDB_FILE_PATH = 'omdb.duckdb' # Path to store the DuckDB database file
OMDB_API_KEY = 'f95509d3'

In [29]:
# Initialize DuckDB connection
conn = duckdb.connect(database=DUCKDB_FILE_PATH, read_only=False)

## Data imports from CSV 

In [None]:
# Raw data load from CSV
def load_csv_data(file_path):
    df = pd.read_csv(file_path)
    print(f"CSV data loaded: {len(df)} rows")
    return df

# Create the raw table for the CSV data
def create_raw_table(df):
    conn.execute('''
    CREATE OR REPLACE TABLE raw_movie_revenue AS 
    SELECT * FROM df
    ''')
    print("Raw table created")

## Creating DIM and Fact tables
- dim_date
- dim_distributor
- dim_movie_temp (this one will be enriched in the next step with data from API)
- fact_daily_revenue (*I used TOP 250 movies due to API limitations as it only allows 1000 calls per day*)

In [37]:
# Create dimension and fact tables
def create_data_model():
    conn.execute('''
    CREATE OR REPLACE TABLE dim_date AS
    SELECT DISTINCT
      replace(date,'-','')::int as date_id,
      trim(date)::date as date,
      year(date::date)::int as year,
      month(date::date)::int as month,
      day(date::date)::int as day,
      case when month(date::date)::int in (1,2,12) then 'Winter'
      	   when month(date::date)::int in (3,4,5) then 'Spring'
      	   when month(date::date)::int in (6,7,8) then 'Summer'
      	   else 'Fall'
      end as season
    FROM raw_movie_revenue
    ''')
    print("Date dimension created")
   
   # Create distributor dimension
    conn.execute('''
    CREATE OR REPLACE TABLE dim_distributor AS
    SELECT DISTINCT
      ROW_NUMBER() OVER() AS distributor_id,
      distributor AS distributor_name
    FROM raw_movie_revenue
    WHERE distributor IS NOT NULL
    ''')
    print("Distributor dimension created")

    # Create temporary movie dimension (will be enriched with API data later)
    conn.execute('''
    CREATE OR REPLACE TABLE dim_movie_temp AS
    SELECT DISTINCT row_number() over(order by ut.movie_title) as movie_id,
		  UPPER(TRIM(ut.movie_title)) AS movie_title
	  FROM (
		  SELECT
		  	DISTINCT TRIM(UPPER(title)) as movie_title
		  FROM raw_movie_revenue) as ut
    ''')
    print("Temporary movie dimension created")

    # Create fact table
    # I limited the number of rows to 250 titles due to the API limitation - 1000 calls per day. 
    conn.execute('''
    DROP TABLE IF EXISTS tmp_fact_daily_revenue;
    DROP TABLE IF EXISTS tpm_fact_daily_revenue_top_250;
    ''')
    conn.execute('''
    CREATE TEMP TABLE tpm_fact_daily_revenue_top_250 AS
    SELECT 
      upper(trim(title)) as title,
      sum(revenue) as revenue,
      sum(theaters) as theaters
    FROM raw_movie_revenue r
    LEFT JOIN dim_date dd ON dd.date = r.date
    GROUP BY title
    LIMIT 250
    ''')

    conn.execute('''
    CREATE TEMP TABLE tmp_fact_daily_revenue AS 
    SELECT *
    FROM raw_movie_revenue r
    WHERE EXISTS (
      SELECT 1
      FROM tpm_fact_daily_revenue_top_250 t 
      WHERE UPPER(TRIM(t.title)) = UPPER(TRIM(r.title))
    )
    ''')

    conn.execute('''
    CREATE OR REPLACE TABLE fact_daily_revenue AS
    SELECT 
      dm.movie_id AS movie_id,
      dm.movie_title, 
      dd.date_id,
      r.revenue,
      r.theaters,
      d.distributor_id
    FROM tmp_fact_daily_revenue r
    LEFT JOIN dim_movie_temp dm ON dm.movie_title = UPPER(TRIM(r.title))
    LEFT JOIN dim_date dd on dd.date = r.date
    LEFT JOIN dim_distributor d ON r.distributor = d.distributor_name
    ''')
    print("Fact table created")
    

## Enriching the dim_movie table with data from OMDB API

In [1]:
# Function to fetch movie details from OMDb API and store in a dimension table
def enrich_movie_data_with_api(api_key):
    # Get all unique movie titles
    movie_titles = conn.execute('''
    SELECT d.movie_id, d.movie_title 
    FROM dim_movie_temp d
    WHERE EXISTS (
        SELECT 1 
        FROM fact_daily_revenue f
        WHERE f.movie_title = d.movie_title and d.movie_id = f.movie_id
    ) 
    ''').fetchall()
    
    # Create a list to store enriched movie data
    enriched_movies = []
    
    print(f"Fetching details for {len(movie_titles)} movies from OMDb API...")
    
    # Process each movie
    for movie_id, title in movie_titles:
        # Make API request
        try:
            response = requests.get(f"http://www.omdbapi.com/?t={title}&apikey={api_key}")
            movie_data = response.json()
            
            # Check if we got a valid response
            if movie_data.get('Response') == 'True':
                # Handle potentially missing or non-numeric imdbVotes
                imdb_votes = movie_data.get('imdbVotes', None)
                if imdb_votes and imdb_votes != 'N/A':
                    imdb_votes = imdb_votes.replace(',', '')
                else:
                    imdb_votes = None
                
                # Extract relevant fields
                enriched_movie = {
                    'movie_id': movie_id,
                    'title': movie_data.get('Title', title),
                    'year': movie_data.get('Year', None),
                    'rated': movie_data.get('Rated', None),
                    'runtime': movie_data.get('Runtime', None),
                    'genre': movie_data.get('Genre', None),
                    'director': movie_data.get('Director', None),
                    'imdb_rating': movie_data.get('imdbRating', None),
                    'imdb_votes': imdb_votes,
                    'box_office': movie_data.get('BoxOffice', None)
                }
                enriched_movies.append(enriched_movie)
                
            else:
                # If movie not found, add basic info
                enriched_movies.append({
                    'movie_id': movie_id,
                    'title': title,
                    'year': None,
                    'rated': None,
                    'runtime': None,
                    'genre': None,
                    'director': None,
                    'imdb_rating': None,
                    'imdb_votes': None,
                    'box_office': None
                })
                
            # Be nice to the API with a small delay
            time.sleep(0.2)
            
        except Exception as e:
            print(f"Error fetching data for {title}: {e}")
            # Add basic info if there was an error
            enriched_movies.append({
                'movie_id': movie_id,
                'title': title,
                'year': None,
                'rated': None,
                'runtime': None,
                'genre': None,
                'director': None,
                'imdb_rating': None,
                'imdb_votes': None,
                'box_office': None
            })
    
    # Convert to DataFrame
    df_movies = pd.DataFrame(enriched_movies)
    
    # Create final movie dimension table
    conn.execute("DROP TABLE IF EXISTS dim_movie")
    conn.execute('''
    CREATE TABLE dim_movie AS
    SELECT * FROM df_movies
    ''')
    
    print(f"Movie dimension enriched with API data: {len(enriched_movies)} movies processed")

## Creating the dim_genre dimension tables after pulling this data from API call

In [41]:
# Function to create genre dimension from movie genre data
def create_genre_dimension():
    # Using string_split to properly handle genre data
    conn.execute('''
    -- First create a temporary table with exploded genres
    CREATE OR REPLACE TABLE temp_genres AS
    SELECT 
        movie_id,
        UNNEST(string_split(genre, ',')) AS genre_name
    FROM dim_movie
    WHERE genre IS NOT NULL;
    
    -- Create the genre dimension
    CREATE OR REPLACE TABLE dim_genre AS
    SELECT 
        ROW_NUMBER() OVER() AS genre_id,
        TRIM(genre_name) AS genre_name
    FROM (SELECT DISTINCT genre_name FROM temp_genres)
    WHERE genre_name != '';
    
    -- Create the movie-genre bridge table
    CREATE OR REPLACE TABLE bridge_movie_genre AS
    SELECT 
        movie_id,
        g.genre_id
    FROM temp_genres t
    JOIN dim_genre g ON TRIM(t.genre_name) = g.genre_name;
    ''')
    
    print("Genre dimension and bridge table created")

## Creating analytical views
- vw_top_movies_by_revenue
- vw_revenue_by_distributor
- vw_revenue_by_genre
- vw_revenue_by_season
- vw_revenue_trend

In [48]:
# Function to create dashboard views
def create_dashboard_views():
    # Top movies by total revenue
    conn.execute('''
    CREATE OR REPLACE VIEW vw_top_movies_by_revenue AS
    SELECT 
        m.title,
        m.genre,
        m.imdb_rating,
        SUM(f.revenue) AS total_revenue,
        MAX(f.theaters) AS max_theaters,
        d.distributor_name
    FROM fact_daily_revenue f
    JOIN dim_movie m ON f.movie_id = m.movie_id
    JOIN dim_distributor d ON f.distributor_id = d.distributor_id
    GROUP BY m.title, m.genre, m.imdb_rating, d.distributor_name
    ORDER BY total_revenue DESC
    LIMIT 20
    ''')
    
    # Revenue by distributor
    conn.execute('''
    CREATE OR REPLACE VIEW vw_revenue_by_distributor AS
    SELECT 
        d.distributor_name,
        COUNT(DISTINCT f.movie_id) AS movie_count,
        SUM(f.revenue) AS total_revenue,
        SUM(f.revenue)/NULLIF(COUNT(DISTINCT f.movie_id), 0) AS avg_revenue_per_movie
    FROM fact_daily_revenue f
    JOIN dim_distributor d ON f.distributor_id = d.distributor_id
    GROUP BY d.distributor_name
    ORDER BY total_revenue DESC
    ''')
    
    # Revenue by genre
    conn.execute('''
    CREATE OR REPLACE VIEW vw_revenue_by_genre AS
    SELECT 
        g.genre_name AS genre,
        COUNT(DISTINCT bmg.movie_id) AS movie_count,
        SUM(f.revenue) AS total_revenue,
        AVG(CAST(m.imdb_rating AS FLOAT)) AS avg_imdb_rating
    FROM bridge_movie_genre bmg
    JOIN dim_genre g ON bmg.genre_id = g.genre_id
    JOIN fact_daily_revenue f ON bmg.movie_id = f.movie_id
    JOIN dim_movie m ON bmg.movie_id = m.movie_id
    GROUP BY g.genre_name
    ORDER BY total_revenue DESC
    ''')
    
    # Revenue by season
    conn.execute('''
    CREATE OR REPLACE VIEW vw_revenue_by_season AS
    SELECT 
        dt.season,
        COUNT(DISTINCT f.movie_id) AS movie_count,
        SUM(f.revenue) AS total_revenue
    FROM fact_daily_revenue f
    JOIN dim_date dt ON f.date_id = dt.date_id
    GROUP BY dt.season
    ORDER BY total_revenue DESC
    ''')
    
    # Revenue trend over time
    conn.execute('''
    CREATE OR REPLACE VIEW vw_revenue_trend AS
    SELECT 
        dt.year,
        dt.month,
        SUM(f.revenue) AS monthly_revenue,
        COUNT(DISTINCT f.movie_id) AS movie_count
    FROM fact_daily_revenue f
    JOIN dim_date dt ON f.date_id = dt.date_id
    GROUP BY dt.year, dt.month
    ORDER BY dt.year, dt.month
    ''')
    
    print("Dashboard views created")

## Creating dashboards using data from analytical views

In [50]:
# Function to display the results and visualizations
def create_dashboard():
    # Get the data for visualization
    top_movies = conn.execute("SELECT * FROM vw_top_movies_by_revenue").fetchdf()
    revenue_by_distributor = conn.execute("SELECT * FROM vw_revenue_by_distributor").fetchdf()
    revenue_by_genre = conn.execute("SELECT * FROM vw_revenue_by_genre LIMIT 10").fetchdf()
    revenue_by_season = conn.execute("SELECT * FROM vw_revenue_by_season").fetchdf()
    
    # Create visualizations
    plt.figure(figsize=(15, 15))
    
    # Top 10 movies by revenue
    plt.subplot(2, 2, 1)
    top10_movies = top_movies.head(10)
    sns.barplot(x='total_revenue', y='title', data=top10_movies)
    plt.title('Top 10 Movies by Revenue')
    plt.xlabel('Total Revenue ($)')
    plt.ylabel('Movie Title')
    
    # Revenue by distributor
    plt.subplot(2, 2, 2)
    sns.barplot(x='total_revenue', y='distributor_name', data=revenue_by_distributor.head(10))
    plt.title('Top 10 Distributors by Revenue')
    plt.xlabel('Total Revenue ($)')
    plt.ylabel('Distributor')
    
    # Revenue by genre
    plt.subplot(2, 2, 3)
    sns.barplot(x='total_revenue', y='genre', data=revenue_by_genre)
    plt.title('Top 10 Genres by Revenue')
    plt.xlabel('Total Revenue ($)')
    plt.ylabel('Genre')
    
    # Revenue by season
    plt.subplot(2, 2, 4)
    sns.barplot(x='season', y='total_revenue', data=revenue_by_season)
    plt.title('Revenue by Season')
    plt.xlabel('Season')
    plt.ylabel('Total Revenue ($)')
    
    plt.tight_layout()
    plt.show()
    
    # Display top movies table
    display(HTML('<h3>Top Movies by Revenue</h3>'))
    display(top_movies.head(10))
    
    # Display revenue by distributor
    display(HTML('<h3>Revenue by Distributor</h3>'))
    display(revenue_by_distributor.head(10))
    
    # Display revenue by genre
    display(HTML('<h3>Revenue by Genre</h3>'))
    display(revenue_by_genre)

## Main pipeline definition

In [53]:
# Main function to run the pipeline
def run_pipeline(csv_file, api_key):
    print("Starting movie data analysis pipeline...")
    
    # Step 1: Load CSV data
    df = load_csv_data(csv_file)
    
    # Step 2: Create raw table
    create_raw_table(df)
    
    # Step 3: Create dimensional model
    create_data_model()
    
    # Step 4: Enrich movie data with OMDb API
    enrich_movie_data_with_api(api_key)
    
    # Step 5: Create genre dimension from movie genre data
    create_genre_dimension()
    
    # Step 6: Create dashboard views
    create_dashboard_views()
    
    # Step 7: Display dashboard
    create_dashboard()
    
    print("Pipeline execution completed!")

## Main pipeline execution

In [None]:
# Example usage:
run_pipeline(CSV_FILE_PATH, OMDB_API_KEY)