<center>Business Sales Performance Dashboard </center>
  
<center>B3501 MSDS1</center>
<center>CSC8101 — Object Oriented Programming </center>

In [None]:
""" The utimate goal of this project is to analyze daily sales data of a feeds manufacturing company from different depots, the projects will also automate the entire pipiline such that any time the state holders 
look at the dashboard on the browser they will see the latest data."""

#Importing necessary libraries and setting up the coding environment
import os
import logging
from datetime import datetime
from dataclasses import dataclass, asdict
from typing import Optional, Dict

import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from IPython.display import display, HTML

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("TNDailySalesNotebook")

# Output folders
os.makedirs("outputs", exist_ok=True)
os.makedirs("data", exist_ok=True)

# Display a message indicating the environment is set up
print("Environment ready. Outputs will be saved to ./outputs/ and cleaned data to ./data/")

Environment ready. Outputs will be saved to ./outputs/ and cleaned data to ./data/


This being real company information drafted from their online Sharepoint list, the live cloud connection to this source was prohibited by the IT admin for security purposes. However, with required authentications, Python could directly connect to the datasource and refresh automatically. 

I thus opted to use a local file as a datasource extracted from the real datasource and automated the extraction process (Using Task Schedular) to always have live information locally for the reports to consume

In [None]:
#Dataset path & settings
CONFIG = {
    "DATA_PATH": "TNDailySales.csv",     
    "CLEANED_PATH": os.path.join("data", "TNDailySales_cleaned.csv"),
    "EXPORT_HTML_DIR": "outputs",
    "REFRESH_SCRIPT": "refresh_and_export.py",  # script to schedule with Task Scheduler
    "DATE_COL": "Date",
    "CACHE_TTL_SECONDS": 60
}

print("Data path:", CONFIG["DATA_PATH"])


Data path: TNDailySales.csv


Application of OOP Principles to implement data extraction, data cleaning and data visualization

In [None]:
# OOP Classes: SalesRecord, SalesDataset, PlotlyDashboard, Exporter
@dataclass
class SalesRecord:
    """Lightweight domain object for a sale row in the dataset."""
    ChannelName: str
    ProductCategory: str
    Date: pd.Timestamp
    ProductName: str
    NetWeightKGs: float
    NetSalesUGX: float
    SalesCategory: str
    PaymentType: str
    CustomerName: str

    def to_dict(self):
        d = asdict(self)
        # ensure Date serializable
        if isinstance(d["Date"], pd.Timestamp):
            d["Date"] = d["Date"].to_pydatetime()
        return d

class SalesDataset:
    """Encapsulates the dataset, cleaning, and aggregation methods."""
    def __init__(self, df: pd.DataFrame):
        self.raw = df.copy()
        self.df = df.copy()
        self.cleaned = False

    @staticmethod
    def standardize_columns(df: pd.DataFrame) -> pd.DataFrame:
        # Map provided column names to canonical names (case-insensitive)
        mapping = {
            'channelname': 'ChannelName',
            'productcategory': 'ProductCategory',
            'date': 'Date',
            'productname': 'ProductName',
            'netweightkgs': 'NetWeightKGs',
            'netweightkgs.0': 'NetWeightKGs',
            'netsalesugx': 'NetSalesUGX',
            'salescategory': 'SalesCategory',
            'paymenttype': 'PaymentType',
            'customername': 'CustomerName'

        }
        cols = {}
        for c in df.columns:
            key = c.lower().replace(" ", "").replace("-", "").replace("_", "")
            if key in mapping:
                cols[c] = mapping[key]
            else:
                
                cols[c] = c.strip()
        df = df.rename(columns=cols)
        return df

    def clean(self):
        df = self.raw.copy()
        df = self.standardize_columns(df)

        # Ensuring required columns exist
        required = ['ChannelName', 'ProductCategory', 'Date', 'ProductName', 'NetWeightKGs', 'NetSalesUGX']
        for col in required:
            if col not in df.columns:
                df[col] = np.nan

        # Parse Date
        df['Date'] = pd.to_datetime(df['Date'], errors='coerce')

        # Numeric conversions
        df['NetWeightKGs'] = pd.to_numeric(df['NetWeightKGs'], errors='coerce').fillna(0.0)
        df['NetSalesUGX'] = pd.to_numeric(df['NetSalesUGX'], errors='coerce').fillna(0.0)

        # Text fields fill
        for c in ['SalesCategory','PaymentType','CustomerName','ChannelName','ProductCategory','ProductName']:
            if c not in df.columns:
                df[c] = ""
            else:
                df[c] = df[c].astype(str).fillna("").str.strip()

        # Drop rows missing essential fields
        df = df.dropna(subset=['Date', 'ChannelName', 'ProductName'])

        # Remove obvious duplicates
        df = df.drop_duplicates(subset=['Date', 'ChannelName', 'ProductName', 'NetWeightKGs', 'NetSalesUGX', 'CustomerName'])

        # Derived columns
        df['Year'] = df['Date'].dt.year
        df['Month'] = df['Date'].dt.to_period('M').astype(str)
        df['Day'] = df['Date'].dt.date
        df['Week'] = df['Date'].dt.isocalendar().week
        df['UGX_per_KG'] = df.apply(lambda r: (r['NetSalesUGX'] / r['NetWeightKGs']) if r['NetWeightKGs'] > 0 else 0.0, axis=1)

        self.df = df
        self.cleaned = True
        return df

    # KPI & aggregation methods
    def compute_kpis(self) -> Dict:
        if not self.cleaned:
            self.clean()
        df = self.df
        total_sales = float(df['NetSalesUGX'].sum())
        total_qty = float(df['NetWeightKGs'].sum())
        avg_price = float(total_sales / total_qty) if total_qty > 0 else 0.0
        transactions = int(len(df))
        cash_amount = float(df.loc[df['PaymentType'].str.lower()=='cash','NetSalesUGX'].sum()) if 'PaymentType' in df.columns else 0.0
        credit_amount = float(df.loc[df['PaymentType'].str.lower()=='credit','NetSalesUGX'].sum()) if 'PaymentType' in df.columns else 0.0
        unique_channels = int(df['ChannelName'].nunique())
        top_channels = df.groupby('ChannelName')['NetSalesUGX'].sum().sort_values(ascending=False).head(10)
        top_products = df.groupby('ProductName')['NetSalesUGX'].sum().sort_values(ascending=False).head(10)
        top_categories = df.groupby('ProductCategory')['NetSalesUGX'].sum().sort_values(ascending=False)
        return {
            'total_sales': total_sales,
            'total_qty': total_qty,
            'avg_price': avg_price,
            'transactions': transactions,
            'cash_amount': cash_amount,
            'credit_amount': credit_amount,
            'unique_channels': unique_channels,
            'top_channels': top_channels,
            'top_products': top_products,
            'top_categories': top_categories
        }

    def daily_agg(self):
        if not self.cleaned:
            self.clean()
        return self.df.groupby('Day').agg({'NetSalesUGX':'sum','NetWeightKGs':'sum'}).reset_index().sort_values('Day')

    def channels_agg(self):
        if not self.cleaned:
            self.clean()
        return self.df.groupby('ChannelName').agg({'NetSalesUGX':'sum','NetWeightKGs':'sum'}).reset_index().sort_values('NetSalesUGX', ascending=False)

    def category_agg(self):
        if not self.cleaned:
            self.clean()
        return self.df.groupby('ProductCategory').agg({'NetSalesUGX':'sum','NetWeightKGs':'sum'}).reset_index().sort_values('NetSalesUGX', ascending=False)

    def reps_agg(self):  # if Sales rep exists in CustomerName or different column adapt accordingly
        if not self.cleaned:
            self.clean()
        # There is no SalesRep column in your spec; skip or aggregate by CustomerName if desired
        return pd.DataFrame()  # placeholder

class PlotlyDashboard:
    """Contains methods to produce Plotly figures and display in notebook."""
    def __init__(self, dataset: SalesDataset):
        self.dataset = dataset

    def kpi_html(self, kpis: Dict):
        # simple HTML block of KPIs
        html = f"""
        <div style="display:flex; gap:20px; margin-bottom:10px">
          <div style="background:#f8f9fa;padding:12px;border-radius:6px;width:240px">
            <div style="font-size:12px;color:#6c757d">Total Sales (UGX)</div>
            <div style="font-size:20px;font-weight:700">{kpis['total_sales']:,.0f}</div>
          </div>
          <div style="background:#f8f9fa;padding:12px;border-radius:6px;width:240px">
            <div style="font-size:12px;color:#6c757d">Total Quantity (KG)</div>
            <div style="font-size:20px;font-weight:700">{kpis['total_qty']:,.2f}</div>
          </div>
          <div style="background:#f8f9fa;padding:12px;border-radius:6px;width:240px">
            <div style="font-size:12px;color:#6c757d">Average Price (UGX/kg)</div>
            <div style="font-size:20px;font-weight:700">{kpis['avg_price']:,.2f}</div>
          </div>
          <div style="background:#f8f9fa;padding:12px;border-radius:6px;width:180px">
            <div style="font-size:12px;color:#6c757d">Transactions</div>
            <div style="font-size:20px;font-weight:700">{kpis['transactions']:,}</div>
          </div>
        </div>
        """
        return HTML(html)

    def plot_daily_trend(self):
        daily = self.dataset.daily_agg()
        fig = px.line(daily, x='Day', y='NetSalesUGX', title='Daily Sales Amount (UGX)', markers=True)
        fig.update_layout(yaxis_title="Amount (UGX)")
        return fig

    def plot_channel_leaderboard(self, top_n=15):
        df = self.dataset.channels_agg().head(top_n)
        fig = px.bar(df, x='ChannelName', y='NetSalesUGX', title=f'Top {top_n} Depots by Sales (UGX)')
        return fig

    def plot_category_share(self):
        df = self.dataset.category_agg()
        fig = px.pie(df, values='NetSalesUGX', names='ProductCategory', title='Sales by Product Category')
        return fig

    def plot_heatmap_channel_category(self):
        df = self.dataset.df
        pivot = df.pivot_table(index='ChannelName', columns='ProductCategory', values='NetSalesUGX', aggfunc='sum', fill_value=0)
        if pivot.empty:
            return go.Figure()
        fig = go.Figure(data=go.Heatmap(z=pivot.values, x=list(pivot.columns), y=list(pivot.index), colorscale='Viridis'))
        fig.update_layout(title='Depot × Category Sales Heatmap', xaxis_title='Product Category', yaxis_title='Depot (Channel)')
        return fig

    def show_all(self):
        kpis = self.dataset.compute_kpis()
        display(HTML("<h3>Sales Performance Dashboard (Local)</h3>"))
        display(self.kpi_html(kpis))
        display(self.plot_daily_trend())
        display(self.plot_channel_leaderboard())
        display(self.plot_category_share())
        display(self.plot_heatmap_channel_category())

In [None]:
#Loading and inspecting the raw dataset
if not os.path.exists(CONFIG['DATA_PATH']):
    print(f"ERROR: {CONFIG['DATA_PATH']} not found.")
else:
    raw_df = pd.read_csv(CONFIG['DATA_PATH'])
    print("Loaded raw dataset rows:", len(raw_df))
    display(raw_df.head(5))

Loaded raw dataset rows: 23523


Unnamed: 0,SalesChannel,ChannelName,ProductCategory,Date,ProductName,Batch,NetWeightKGs,NetSalesUGX,SalesCategory,PaymentType,CustomerName,Payment Method
0,Depot,Mukono,LOCAL CONCENTRATES,1/2/2025,Layer CCT 10% 50KG,,50,220000,Retail Price,Cash Sale,Bukenya,
1,Depot,Lira,COMPLETE FEED,1/2/2025,Broiler Grower 25KG,,25,67250,Retail Price,Cash Sale,DENNIS,
2,Depot,Lira,COMPLETE FEED,1/2/2025,Broiler Finisher 25KG,,25,63500,Retail Price,Cash Sale,DENNIS,
3,Depot,Mukono,LOCAL CONCENTRATES,1/2/2025,Layer CCT 20% 50KG,,50,210000,Retail Price,Cash Sale,Mrs. Kasolo,
4,Depot,Mukono,COMPLETE FEED,1/2/2025,Broiler Grower 25KG,,50,132500,Retail Price,Cash Sale,Karuhanga,


In [14]:
#Instantiate SalesDataset, clean data and save cleaned CSV
ds = SalesDataset(raw_df)
cleaned_df = ds.clean()
print("Cleaned rows:", len(cleaned_df))
# Save cleaned data
cleaned_df.to_csv(CONFIG['CLEANED_PATH'], index=False)
print("Saved cleaned CSV to", CONFIG['CLEANED_PATH'])

cleaned_df.drop(columns=['Batch','SalesChannel','NetSalesUGX','PaymentType','Payment Method', 'UGX_per_KG'], inplace=True, errors='ignore')
display(cleaned_df.head(5))

Cleaned rows: 22817
Saved cleaned CSV to data\TNDailySales_cleaned.csv


Unnamed: 0,ChannelName,ProductCategory,Date,ProductName,NetWeightKGs,SalesCategory,CustomerName,Year,Month,Day,Week
0,Mukono,LOCAL CONCENTRATES,2025-01-02,Layer CCT 10% 50KG,50.0,Retail Price,Bukenya,2025,2025-01,2025-01-02,1
1,Lira,COMPLETE FEED,2025-01-02,Broiler Grower 25KG,25.0,Retail Price,DENNIS,2025,2025-01,2025-01-02,1
2,Lira,COMPLETE FEED,2025-01-02,Broiler Finisher 25KG,25.0,Retail Price,DENNIS,2025,2025-01,2025-01-02,1
3,Mukono,LOCAL CONCENTRATES,2025-01-02,Layer CCT 20% 50KG,50.0,Retail Price,Mrs. Kasolo,2025,2025-01,2025-01-02,1
4,Mukono,COMPLETE FEED,2025-01-02,Broiler Grower 25KG,50.0,Retail Price,Karuhanga,2025,2025-01,2025-01-02,1


In [None]:
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import pandas as pd

df = ds.df  

# Ensure Month is string (safe)
df["Month"] = df["Month"].astype(str)

# KPI CALCULATIONS
total_sales = df["NetWeightKGs"].sum()
total_transactions = len(df)


# KPI Cards
from IPython.display import HTML

kpi_html = f"""
<div style="display:flex; gap:20px; margin-bottom:20px">

  <div style="background:#f8f9fa;padding:15px;border-radius:8px;width:220px;
              box-shadow:0 1px 3px rgba(0,0,0,0.1);">
    <div style="font-size:12px;color:#6c757d">Total KGs Sold</div>
    <div style="font-size:22px;font-weight:bold">{total_sales:,.2f}</div>
  </div>

  <div style="background:#f8f9fa;padding:15px;border-radius:8px;width:220px;
              box-shadow:0 1px 3px rgba(0,0,0,0.1);">
    <div style="font-size:12px;color:#6c757d">Total Transactions</div>
    <div style="font-size:22px;font-weight:bold">{total_transactions:,}</div>
  </div>

</div>
"""

display(HTML(kpi_html))

# BAR CHART: Channel vs NetWeightKGs
fig_channel = px.bar(
    df.groupby("ChannelName")["NetWeightKGs"].sum().reset_index(),
    x="ChannelName",
    y="NetWeightKGs",
    title="Sales Volume by Channel (KGs)",
    labels={"NetWeightKGs": "Total KGs"},
)
fig_channel.show()


# LINE CHART: Month vs NetWeightKGs
month_order = sorted(df["Month"].unique())

fig_month = px.line(
    df.groupby("Month")["NetWeightKGs"].sum().reindex(month_order).reset_index(),
    x="Month",
    y="NetWeightKGs",
    title="Monthly Sales Trend (KGs)",
    markers=True,
    labels={"NetWeightKGs": "Total KGs"},
)
fig_month.show()


# BAR CHART: SalesCategory vs NetWeightKGs
fig_salescat = px.bar(
    df.groupby("SalesCategory")["NetWeightKGs"].sum().reset_index(),
    x="SalesCategory",
    y="NetWeightKGs",
    title="Sales Category Breakdown (KGs)",
    labels={"NetWeightKGs": "Total KGs"},
)
fig_salescat.show()

# BAR CHART: ProductName vs NetWeightKGs
fig_product = px.bar(
    df.groupby("ProductName")["NetWeightKGs"].sum().reset_index(),
    x="ProductName",
    y="NetWeightKGs",
    title="Product Sales by Weight (KGs)",
    labels={"NetWeightKGs": "Total KGs"},
)
fig_product.show()

# INTERACTIVE DASHBOARD WITH MONTH FILTER

# Build monthly filtered datasets
unique_months = sorted(df["Month"].unique())

fig_filtered = make_subplots(rows=2, cols=2,
                             subplot_titles=(
                                 "Channel vs KGs",
                                 "Sales Category vs KGs",
                                 "Product vs KGs",
                                 "Monthly Trend"
                             ))

def get_filtered(m):
    return df[df["Month"] == m]

# initial data for the first month
initial = get_filtered(unique_months[0])

# Chart 1: Channel
fig_filtered.add_trace(
    go.Bar(
        x=initial.groupby("ChannelName")["NetWeightKGs"].sum().index,
        y=initial.groupby("ChannelName")["NetWeightKGs"].sum().values,
        name="Channel"
    ),
    row=1, col=1
)

# Chart 2: Sales Category
fig_filtered.add_trace(
    go.Bar(
        x=initial.groupby("SalesCategory")["NetWeightKGs"].sum().index,
        y=initial.groupby("SalesCategory")["NetWeightKGs"].sum().values,
        name="SalesCategory"
    ),
    row=1, col=2
)

# Chart 3: Product
fig_filtered.add_trace(
    go.Bar(
        x=initial.groupby("ProductName")["NetWeightKGs"].sum().index,
        y=initial.groupby("ProductName")["NetWeightKGs"].sum().values,
        name="ProductName"
    ),
    row=2, col=1
)

# Chart 4: Month trend (full df)
fig_filtered.add_trace(
    go.Scatter(
        x=df.groupby("Month")["NetWeightKGs"].sum().index,
        y=df.groupby("Month")["NetWeightKGs"].sum().values,
        mode="lines+markers",
        name="Trend"
    ),
    row=2, col=2
)


# Dropdown menu creation

# For each month, create update menu entry
buttons = []

for m in unique_months:
    dff = get_filtered(m)

    buttons.append(
        dict(
            label=m,
            method="update",
            args=[
                {
                    "x": [
                        dff.groupby("ChannelName")["NetWeightKGs"].sum().index,
                        dff.groupby("SalesCategory")["NetWeightKGs"].sum().index,
                        dff.groupby("ProductName")["NetWeightKGs"].sum().index,
                        df.groupby("Month")["NetWeightKGs"].sum().index
                    ],
                    "y": [
                        dff.groupby("ChannelName")["NetWeightKGs"].sum().values,
                        dff.groupby("SalesCategory")["NetWeightKGs"].sum().values,
                        dff.groupby("ProductName")["NetWeightKGs"].sum().values,
                        df.groupby("Month")["NetWeightKGs"].sum().values
                    ]
                }
            ]
        )
    )

fig_filtered.update_layout(
    title="Interactive Sales Dashboard (Filtered by Month)",
    updatemenus=[dict(
        buttons=buttons,
        direction="down",
        showactive=True,
        x=1.05,
        y=1.15
    )],
    height=900
)

fig_filtered.show()


<center> Exporting output dashboards to HTML to be visible online </center>

In [18]:
fig_channel.write_html("outputs/channel_sales.html")
fig_month.write_html("outputs/month_trend.html")
fig_salescat.write_html("outputs/sales_category.html")
fig_product.write_html("outputs/product_sales.html")
fig_filtered.write_html("outputs/interactive_dashboard.html")

print("HTML dashboards exported to 'outputs/' folder.")


HTML dashboards exported to 'outputs/' folder.


<center> Triggering Auto Refresh of Reports using Task Schedular  </center>