<a href="https://colab.research.google.com/github/JustinRizzo/Team_DN5_FinalProject/blob/main/Supporting_Materials/Plotly.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Connect to GCP and load data

In [1]:
from google.colab import auth
import os
import pandas as pd

# Authenticate your Google account
auth.authenticate_user()

# --- CONFIGURATION ---
# NOTE: Values have been updated based on your screenshots.

# Your Google Cloud Project ID
PROJECT_ID = "team-dn5-finalproject"
REGION = "us-central1" # This remains the same as per your bucket's location

# The GCS bucket from your screenshot
GCS_BUCKET = "team-dn5-finalproject-bucket"

# --- Resource Paths ---
# Define the full paths for our GCS and BigQuery resources

# Input file updated to point to 'retail_store_inventory.csv' in your bucket
input_file = f"gs://{GCS_BUCKET}/retail_store_inventory.csv"

# Temporary location for Dataflow jobs, using the 'temp' folder in your bucket
# This path is valid as the 'temp' folder exists in your bucket.
temp_location = f"gs://{GCS_BUCKET}/temp/"

# BigQuery destination details from your screenshot
BIGQUERY_DATASET = "retail_forecast_dataset"
BIGQUERY_TABLE = "retail_forecast_table"
table_spec = f"{PROJECT_ID}:{BIGQUERY_DATASET}.{BIGQUERY_TABLE}"

# Define the BigQuery table schema based on your 'retail_store_inventory.csv' file.
table_schema = "Date:DATE,StoreID:STRING,ProductID:STRING,Category:STRING,Region:STRING,Inventory_Level:INTEGER,Units_Sold:INTEGER,Units_Ordered:INTEGER,Demand_Forecast:FLOAT,Price:FLOAT,Discount:FLOAT,Weather_Condition:STRING,Holiday_Promotion:INTEGER,Competitor_Pricing:FLOAT,Seasonality:STRING"

# --- Set up the environment variable for the gcloud CLI ---
os.environ["GCP_PROJECT_ID"] = PROJECT_ID

print("✅ Configuration updated for 'team-dn5-finalproject'.")
print(f"Project ID: {PROJECT_ID}")
print(f"GCS Input File: {input_file}")
print(f"BigQuery Table Spec: {table_spec}")
print(f"BQ Schema updated from CSV: {table_schema}")


# --- Load and Preview Data ---
# The following section loads the data from GCS into a pandas DataFrame
# to allow for a quick preview and verification.

print("\nLoading data into pandas DataFrame for preview...")

# Read the CSV file from the GCS path into a pandas DataFrame
df_inventory = pd.read_csv(input_file)

# Display the first 5 rows of the DataFrame to verify it loaded correctly
print("Data preview:")
display(df_inventory.head())


✅ Configuration updated for 'team-dn5-finalproject'.
Project ID: team-dn5-finalproject
GCS Input File: gs://team-dn5-finalproject-bucket/retail_store_inventory.csv
BigQuery Table Spec: team-dn5-finalproject:retail_forecast_dataset.retail_forecast_table
BQ Schema updated from CSV: Date:DATE,StoreID:STRING,ProductID:STRING,Category:STRING,Region:STRING,Inventory_Level:INTEGER,Units_Sold:INTEGER,Units_Ordered:INTEGER,Demand_Forecast:FLOAT,Price:FLOAT,Discount:FLOAT,Weather_Condition:STRING,Holiday_Promotion:INTEGER,Competitor_Pricing:FLOAT,Seasonality:STRING

Loading data into pandas DataFrame for preview...
Data preview:


Unnamed: 0,Date,StoreID,ProductID,Category,Region,Inventory_Level,Units_Sold,Units_Ordered,Demand_Forecast,Price,Discount,Weather_Condition,Holiday_Promotion,Competitor_Pricing,Seasonality
0,1/1/2022,S001,P0001,Groceries,North,231,127,55,135.47,33.5,20,Rainy,0,29.69,Autumn
1,1/1/2022,S001,P0002,Toys,South,204,150,66,144.04,63.01,20,Sunny,0,66.16,Autumn
2,1/1/2022,S001,P0003,Toys,West,102,65,51,74.02,27.99,10,Sunny,1,31.32,Summer
3,1/1/2022,S001,P0004,Toys,North,469,61,164,62.18,32.72,10,Cloudy,1,34.74,Autumn
4,1/1/2022,S001,P0005,Electronics,East,166,14,135,9.26,73.64,0,Sunny,0,68.95,Summer


#Explore the data

In [2]:
display(df_inventory.head())
df_inventory.info()

Unnamed: 0,Date,StoreID,ProductID,Category,Region,Inventory_Level,Units_Sold,Units_Ordered,Demand_Forecast,Price,Discount,Weather_Condition,Holiday_Promotion,Competitor_Pricing,Seasonality
0,1/1/2022,S001,P0001,Groceries,North,231,127,55,135.47,33.5,20,Rainy,0,29.69,Autumn
1,1/1/2022,S001,P0002,Toys,South,204,150,66,144.04,63.01,20,Sunny,0,66.16,Autumn
2,1/1/2022,S001,P0003,Toys,West,102,65,51,74.02,27.99,10,Sunny,1,31.32,Summer
3,1/1/2022,S001,P0004,Toys,North,469,61,164,62.18,32.72,10,Cloudy,1,34.74,Autumn
4,1/1/2022,S001,P0005,Electronics,East,166,14,135,9.26,73.64,0,Sunny,0,68.95,Summer


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 73100 entries, 0 to 73099
Data columns (total 15 columns):
 #   Column              Non-Null Count  Dtype  
---  ------              --------------  -----  
 0   Date                73100 non-null  object 
 1   StoreID             73100 non-null  object 
 2   ProductID           73100 non-null  object 
 3   Category            73100 non-null  object 
 4   Region              73100 non-null  object 
 5   Inventory_Level     73100 non-null  int64  
 6   Units_Sold          73100 non-null  int64  
 7   Units_Ordered       73100 non-null  int64  
 8   Demand_Forecast     73100 non-null  float64
 9   Price               73100 non-null  float64
 10  Discount            73100 non-null  int64  
 11  Weather_Condition   73100 non-null  object 
 12  Holiday_Promotion   73100 non-null  int64  
 13  Competitor_Pricing  73100 non-null  float64
 14  Seasonality         73100 non-null  object 
dtypes: float64(3), int64(5), object(7)
memory usage: 8.4+

Purpose: Before we can build our pipeline, we need to install the necessary Python libraries. We will install apache-beam with the [gcp] extra, which includes all the components needed to run our pipeline on the Google Cloud Dataflow service.

Important Note on Dependencies: Google Colab comes with many pre-installed packages. Sometimes, these can conflict with the specific versions required by new libraries. In this case, apache-beam requires a newer version of a package called dill. To solve this, we will install both apache-beam and the required version of dill in a single command, which helps the package manager resolve the conflict correctly.

Action Required:

Run this cell to install the libraries.

After the installation completes, you must restart the Colab runtime. This ensures that the newly installed package versions are loaded correctly. Go to the menu and select Runtime > Restart runtime.

In [3]:
# --- Install Apache Beam and its dependencies in a single command ---
# This command installs the Apache Beam SDK with Google Cloud Platform support
# and simultaneously ensures that the 'dill' package is at version 0.3.7 or newer,
# which resolves a common dependency conflict in the Colab environment.

!pip install --upgrade pip
!pip install --upgrade dill
!pip install --upgrade google-cloud-bigquery
!pip install --quiet apache-beam[gcp]

Collecting pip
  Downloading pip-25.2-py3-none-any.whl.metadata (4.7 kB)
Downloading pip-25.2-py3-none-any.whl (1.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pip
  Attempting uninstall: pip
    Found existing installation: pip 24.1.2
    Uninstalling pip-24.1.2:
      Successfully uninstalled pip-24.1.2
Successfully installed pip-25.2
Collecting dill
  Downloading dill-0.4.0-py3-none-any.whl.metadata (10 kB)
Downloading dill-0.4.0-py3-none-any.whl (119 kB)
Installing collected packages: dill
  Attempting uninstall: dill
    Found existing installation: dill 0.3.8
    Uninstalling dill-0.3.8:
      Successfully uninstalled dill-0.3.8
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
datasets 4.0.0 requires dill<0.3.9,>=0.3.0, but you have dill 0.

#Create the dataflow pipeline

In [4]:
%%writefile pipeline_inventory.py
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import logging
import datetime

# --- CONFIGURATION ---
# Values are updated to match your 'team-dn5-finalproject' GCP environment.
PROJECT_ID = "team-dn5-finalproject"
GCS_BUCKET = "team-dn5-finalproject-bucket"
BIGQUERY_DATASET = "retail_forecast_dataset"
BIGQUERY_TABLE = "retail_forecast_table"
REGION = "us-south1" # Changed to the new region from your screenshot.

# --- Resource Paths ---
INPUT_FILE = f"gs://{GCS_BUCKET}/retail_store_inventory.csv"
TABLE_SPEC = f"{PROJECT_ID}:{BIGQUERY_DATASET}.{BIGQUERY_TABLE}"
TEMP_LOCATION = f"gs://{GCS_BUCKET}/temp/"
STAGING_LOCATION = f"gs://{GCS_BUCKET}/staging/"

# --- DATA TRANSFORMATION FUNCTION ---
def parse_inventory_csv(line):
    """
    Parses a single line from retail_store_inventory.csv and returns a dictionary.
    Handles type conversions, cleans anomalous data, and formats the date string.
    """
    fields = line.split(',')
    # Ensure the correct number of fields (15) before attempting to parse
    if len(fields) != 15:
        logging.warning(f"Skipping malformed row (incorrect number of fields): {line}")
        return []

    try:
        # Data Cleaning: Check if demand forecast is negative and convert to 0 if it is.
        demand_forecast = float(fields[8])
        if demand_forecast < 0:
            demand_forecast = 0.0

        # Data Formatting: Convert date from 'M/D/YYYY' to 'YYYY-MM-DD' for BigQuery
        date_obj = datetime.datetime.strptime(fields[0], '%m/%d/%Y')
        formatted_date = date_obj.strftime('%Y-%m-%d')

        # Create a dictionary for each row, converting types as needed
        return [{
            "Date": formatted_date, # Use the correctly formatted date
            "StoreID": fields[1],
            "ProductID": fields[2],
            "Category": fields[3],
            "Region": fields[4],
            "Inventory_Level": int(fields[5]),
            "Units_Sold": int(fields[6]),
            "Units_Ordered": int(fields[7]),
            "Demand_Forecast": demand_forecast, # Use the cleaned value
            "Price": float(fields[9]),
            "Discount": float(fields[10]),
            "Weather_Condition": fields[11],
            "Holiday_Promotion": int(fields[12]),
            "Competitor_Pricing": float(fields[13]),
            "Seasonality": fields[14].strip() # Remove potential trailing whitespace
        }]
    except (ValueError, IndexError) as e:
        # Log rows that don't match the expected format or type conversions
        logging.warning(f"Skipping malformed row (type conversion error): {line} | Error: {e}")
        return []

def run():
    """Defines and runs the Beam pipeline."""
    # --- PIPELINE OPTIONS ---
    options = PipelineOptions(
        runner='DataflowRunner',
        project=PROJECT_ID,
        job_name='gcs-to-bq-retail-inventory',
        staging_location=STAGING_LOCATION,
        temp_location=TEMP_LOCATION,
        region=REGION,
        # Updated the zone to match the new region
        zone='us-south1-a',
        save_main_session=True
    )

    # BigQuery table schema, matching the target table
    table_schema = "Date:DATE,StoreID:STRING,ProductID:STRING,Category:STRING,Region:STRING,Inventory_Level:INTEGER,Units_Sold:INTEGER,Units_Ordered:INTEGER,Demand_Forecast:FLOAT,Price:FLOAT,Discount:FLOAT,Weather_Condition:STRING,Holiday_Promotion:INTEGER,Competitor_Pricing:FLOAT,Seasonality:STRING"

    logging.info(f"Starting Dataflow job: {options.get_all_options()['job_name']}")
    logging.info(f"Reading from: {INPUT_FILE}")
    logging.info(f"Writing to BigQuery table: {TABLE_SPEC}")
    logging.info(f"BigQuery schema: {table_schema}")


    # Define the pipeline
    with beam.Pipeline(options=options) as p:
        (
            p
            | 'ReadInventoryData' >> beam.io.ReadFromText(INPUT_FILE, skip_header_lines=1)
            | 'ParseInventoryCSV' >> beam.FlatMap(parse_inventory_csv)
            | 'WriteInventoryToBigQuery' >> beam.io.WriteToBigQuery(
                table=TABLE_SPEC,
                schema=table_schema,
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
            )
        )
    logging.info(f"Job '{options.get_all_options()['job_name']}' submitted successfully.")

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()


Writing pipeline_inventory.py


In [5]:
# --- Execute the Dataflow pipeline script ---
# This command runs the Python script that defines and launches the Dataflow job.
!python pipeline_inventory.py

INFO:root:Starting Dataflow job: gcs-to-bq-retail-inventory
INFO:root:Reading from: gs://team-dn5-finalproject-bucket/retail_store_inventory.csv
INFO:root:Writing to BigQuery table: team-dn5-finalproject:retail_forecast_dataset.retail_forecast_table
INFO:root:BigQuery schema: Date:DATE,StoreID:STRING,ProductID:STRING,Category:STRING,Region:STRING,Inventory_Level:INTEGER,Units_Sold:INTEGER,Units_Ordered:INTEGER,Demand_Forecast:FLOAT,Price:FLOAT,Discount:FLOAT,Weather_Condition:STRING,Holiday_Promotion:INTEGER,Competitor_Pricing:FLOAT,Seasonality:STRING
INFO:root:Runner defaulting to pickling library: cloudpickle.
INFO:apache_beam.runners.dataflow.dataflow_runner:Pipeline has additional dependencies to be installed in SDK worker container, consider using the SDK container image pre-building workflow to avoid repetitive installations. Learn more on https://cloud.google.com/dataflow/docs/guides/using-custom-containers#prebuild
INFO:apache_beam.runners.dataflow.internal.apiclient:Starting G

In [8]:
# Import ALL necessary libraries
import pandas as pd
import plotly.express as px
import numpy as np

# --- Step 1: Create the Missing 'Revenue' Column ---
# The errors show this column doesn't exist yet, so we create it.
# We assume Revenue is the Price multiplied by the Units Sold.
print("Calculating 'Revenue' column...")
df_inventory['Revenue'] = df_inventory['Price'] * df_inventory['Units_Sold']


# --- Chart 1: Profit vs. Burden Bubble Chart ---
# NOTE: Your dataframe does not have 'Gross_Margin'. We are using 'Price' on the
# x-axis instead to show which high-inventory items are also low-priced.
print("Generating Chart 1: Price vs. Burden...")
try:
    fig1 = px.scatter(
        df_inventory,
        x='Price',  # SUBSTITUTED for the missing 'Gross_Margin'
        y='Revenue',
        size='Inventory_Level',
        color='Category',
        hover_data=['ProductID'],
        title="Price vs. Revenue vs. Inventory Burden",
        labels={'Price': 'Unit Price', 'Revenue': 'Total Revenue'}
    )
    fig1.show()
except Exception as e:
    print(f"An error occurred with Chart 1: {e}")


# --- Chart 2: Revenue Breakdown Sunburst Chart ---
# This chart will now work because the 'Revenue' column exists.
print("Generating Chart 2: Revenue Breakdown...")
try:
    fig2 = px.sunburst(
        df_inventory,
        path=['Region', 'Category'],
        values='Revenue',
        title="Interactive Revenue Breakdown by Region and Category"
    )
    fig2.show()
except Exception as e:
    print(f"An error occurred with Chart 2: {e}")


# --- Chart 3: Inventory Crisis Bar Chart ---
print("Generating Chart 3: Inventory Crisis...")
try:
    # This chart does not depend on Revenue or Gross Margin and should be fine.
    df_charting = df_inventory.copy()
    df_charting['Days_on_Hand'] = (df_charting['Inventory_Level'] / df_charting['Units_Sold']) * 365
    df_charting.replace([np.inf, -np.inf], 9999, inplace=True)
    df_charting.dropna(subset=['Days_on_Hand'], inplace=True)

    avg_days_by_category = df_charting.groupby('Category')['Days_on_Hand'].mean().sort_values(ascending=False).reset_index()

    fig3 = px.bar(
        avg_days_by_category,
        x='Days_on_Hand',
        y='Category',
        orientation='h',
        title="The 700-Day Problem: Average Inventory Holding Time by Category"
    )
    fig3.show()
except Exception as e:
    print(f"An error occurred with Chart 3: {e}")

Calculating 'Revenue' column...
Generating Chart 1: Price vs. Burden...


Generating Chart 2: Revenue Breakdown...


Generating Chart 3: Inventory Crisis...


In [11]:
# Import ALL necessary libraries
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# --- Step 1: Create Missing Columns ---
# The errors show 'Revenue' and 'Gross_Margin' don't exist yet. We will create them.
print("Calculating 'Revenue' and 'Gross_Margin' columns...")
# We assume Revenue is Price * Units Sold
df_inventory['Revenue'] = df_inventory['Price'] * df_inventory['Units_Sold']
# We assume a Cost of Goods Sold (COGS) is 60% of the Price to calculate Gross Margin
df_inventory['Gross_Margin'] = (df_inventory['Price'] - (df_inventory['Price'] * 0.60)) / df_inventory['Price']


# --- Chart 4: Performance Over Time Line Chart ---
print("Generating corrected Chart 4: Performance Over Time with Dual Y-Axes...")
try:
    # --- Data Preparation ---
    df_time = df_inventory.copy()
    df_time['Date'] = pd.to_datetime(df_time['Date'])
    # Aggregate data by month for a cleaner trend view
    monthly_performance = df_time.set_index('Date').resample('ME').agg({ # FIXED: Changed 'M' to 'ME'
        'Revenue': 'sum',
        'Gross_Margin': 'mean'
    }).reset_index()

    # --- Create the Chart with a Secondary Y-Axis ---
    fig4_corrected = make_subplots(specs=[[{"secondary_y": True}]])

    # Add 'Revenue' trace
    fig4_corrected.add_trace(
        go.Scatter(x=monthly_performance['Date'], y=monthly_performance['Revenue'], name="Monthly Revenue", mode='lines+markers'),
        secondary_y=False,
    )
    # Add 'Gross_Margin' trace
    fig4_corrected.add_trace(
        go.Scatter(x=monthly_performance['Date'], y=monthly_performance['Gross_Margin'], name="Average Gross Margin", mode='lines+markers'),
        secondary_y=True,
    )

    # --- Add Titles and Labels ---
    fig4_corrected.update_layout(title_text='Monthly Revenue & Gross Margin Trend', xaxis_title='Month')
    fig4_corrected.update_yaxes(title_text="<b>Total Revenue ($)</b>", secondary_y=False)
    fig4_corrected.update_yaxes(title_text="<b>Average Gross Margin</b>", secondary_y=True)
    fig4_corrected.show()

except Exception as e:
    print(f"An error occurred while creating the corrected chart: {e}")


# --- Revenue Contribution Treemap by Store ---
print("Generating Treemap: Revenue Contribution by Store...")
try:
    fig_store_treemap = px.treemap(
        df_inventory,
        path=[px.Constant("All Stores"), 'StoreID', 'Category'],
        values='Revenue',
        color='Revenue',
        color_continuous_scale='RdYlGn',
        title='Revenue Contribution by Store and Category'
    )
    fig_store_treemap.update_layout(margin = dict(t=50, l=25, r=25, b=25))
    fig_store_treemap.show()
except Exception as e:
    print(f"An error occurred with the Store Treemap chart: {e}")


# --- Store Performance by Category (Stacked Bar) ---
print("Generating Stacked Bar: Store Performance...")
try:
    store_category_performance = df_inventory.groupby(['StoreID', 'Category'])['Revenue'].sum().reset_index()
    fig_store_stacked_bar = px.bar(
        store_category_performance,
        x='StoreID',
        y='Revenue',
        color='Category',
        title='Revenue Performance by Category within Each Store',
        labels={'Revenue': 'Total Revenue', 'StoreID': 'Store ID'}
    )
    fig_store_stacked_bar.show()
except Exception as e:
    print(f"An error occurred with the Store Stacked Bar chart: {e}")

Calculating 'Revenue' and 'Gross_Margin' columns...
Generating corrected Chart 4: Performance Over Time with Dual Y-Axes...


Generating Treemap: Revenue Contribution by Store...


Generating Stacked Bar: Store Performance...
