<style>
@media (prefers-color-scheme: dark) {
    .autosense-logo {
        filter: brightness(0) invert(1);
    }
}
</style>

<div style="text-align: center;">
    <img src="./images/logo.svg" alt="Drawing" style="width: 5%;"/> <br>
    <img src="./images/autosense.svg" alt="Drawing" class="autosense-logo" style="width: 10%;"/>
</div>

<h1 style="text-align: center;">AutoSense - EV Charging Operations Dashboard</h1>


## Table of Contents

<ul>
<li><a href="#setup">Setup & Configuration</a></li>
<li><a href="#connection">Snowflake Connection</a></li>
<li>
    <a href="#dashboard">Dashboard Visualizations</a>
    <ul>
        <li><a href="#kpis">KPI Cards (Month-over-Month)</a></li>
        <li><a href="#revenue-trend">Revenue Trend (Last 90 Days)</a></li>
        <li><a href="#charger-matrix">Charger Performance Matrix</a></li>
        <li><a href="#geographic">Geographic Performance</a></li>
        <li><a href="#data-quality">Data Quality Monitor</a></li>
        <li><a href="#data-quality-trend">Data Quality Trend Over Time</a></li>
    </ul>
</li>
<li><a href="#conclusions">Conclusions</a></li>
</ul>


<a id='setup'></a>

## Setup & Configuration

Import required libraries and configure visualization settings.


In [None]:
# Core libraries
import pandas as pd
import numpy as np
from pathlib import Path
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta

# Snowflake connection
import snowflake.connector
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization

# Visualization
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# Interactive widgets
import ipywidgets as widgets
from IPython.display import display, HTML

# Configuration
pd.options.display.max_columns = None
pd.options.display.float_format = "{:,.2f}".format

# =============================================================================
# DATE CONFIGURATION
# The data ends on 2025-05-31, so we treat this as the "current" date for analysis
# =============================================================================
DATA_END_DATE = datetime(2025, 5, 31)

# Current month boundaries
CURRENT_MONTH_START = DATA_END_DATE.replace(day=1)
CURRENT_MONTH_END = DATA_END_DATE

# Previous month boundaries
PREVIOUS_MONTH_END = CURRENT_MONTH_START - timedelta(days=1)
PREVIOUS_MONTH_START = PREVIOUS_MONTH_END.replace(day=1)

# Last 90 days for trend analysis
TREND_START_DATE = DATA_END_DATE - timedelta(days=89)

# Format dates for SQL queries
DATE_FORMAT = "%Y-%m-%d"
CURRENT_MONTH_START_STR = CURRENT_MONTH_START.strftime(DATE_FORMAT)
CURRENT_MONTH_END_STR = CURRENT_MONTH_END.strftime(DATE_FORMAT)
PREVIOUS_MONTH_START_STR = PREVIOUS_MONTH_START.strftime(DATE_FORMAT)
PREVIOUS_MONTH_END_STR = PREVIOUS_MONTH_END.strftime(DATE_FORMAT)
TREND_START_DATE_STR = TREND_START_DATE.strftime(DATE_FORMAT)
DATA_END_DATE_STR = DATA_END_DATE.strftime(DATE_FORMAT)

# Color palette (matching AutoSense brand)
COLOR_PRIMARY = "#4CD9A4"  # Light green
COLOR_SECONDARY = "#2E1052"  # Purple
COLOR_SUCCESS = "#00b159"  # Green
COLOR_DANGER = "#e60000"  # Red
COLOR_WARNING = "#fecb00"  # Yellow
COLOR_GRAY = "#4a4d4e"  # Gray

print("‚úÖ Libraries imported successfully")
print(f"\nüìÖ Date Configuration:")
print(f"   Data End Date: {DATA_END_DATE_STR}")
print(f"   Current Month: {CURRENT_MONTH_START_STR} to {CURRENT_MONTH_END_STR}")
print(f"   Previous Month: {PREVIOUS_MONTH_START_STR} to {PREVIOUS_MONTH_END_STR}")
print(f"   Trend Period (90 days): {TREND_START_DATE_STR} to {DATA_END_DATE_STR}")


‚úÖ Libraries imported successfully

üìÖ Date Configuration:
   Data End Date: 2025-05-31
   Current Month: 2025-05-01 to 2025-05-31
   Previous Month: 2025-04-01 to 2025-04-30
   Trend Period (90 days): 2025-03-03 to 2025-05-31


<a id='connection'></a>

## Snowflake Connection

Connect to Snowflake using key-pair authentication with the private key file.


In [None]:
# Snowflake connection configuration
SNOWFLAKE_CONFIG = {
    "account": "NXUGVRC-QN48006",
    "user": "AGHARIB",
    "warehouse": "COMPUTE_WH",
    "database": "AUTOSENSE_DEV",
    "schema": "SEMANTIC",
    "role": "DBT_ROLE",
    "private_key_path": Path.home() / ".ssh" / "autosense_key.p8",
}


def get_snowflake_connection():
    """
    Create a Snowflake connection using key-pair authentication.
    Returns a connection object that can execute queries.
    """
    # Load the private key
    with open(SNOWFLAKE_CONFIG["private_key_path"], "rb") as key_file:
        private_key = serialization.load_pem_private_key(
            key_file.read(), password=None, backend=default_backend()
        )

    # Get the private key bytes
    private_key_bytes = private_key.private_bytes(
        encoding=serialization.Encoding.DER,
        format=serialization.PrivateFormat.PKCS8,
        encryption_algorithm=serialization.NoEncryption(),
    )

    # Create connection
    conn = snowflake.connector.connect(
        account=SNOWFLAKE_CONFIG["account"],
        user=SNOWFLAKE_CONFIG["user"],
        private_key=private_key_bytes,
        warehouse=SNOWFLAKE_CONFIG["warehouse"],
        database=SNOWFLAKE_CONFIG["database"],
        schema=SNOWFLAKE_CONFIG["schema"],
        role=SNOWFLAKE_CONFIG["role"],
    )

    return conn


def run_query(sql: str) -> pd.DataFrame:
    """
    Execute a SQL query and return results as a DataFrame.
    Opens a new connection for each query and closes it after.
    """
    conn = get_snowflake_connection()
    try:
        df = pd.read_sql(sql, conn)
        # Lowercase column names for consistency
        df.columns = df.columns.str.lower()
        return df
    finally:
        conn.close()


# Test the connection
try:
    test_df = run_query(
        "SELECT CURRENT_TIMESTAMP() AS current_time, CURRENT_DATABASE() AS db, CURRENT_SCHEMA() AS schema"
    )
    print(f"‚úÖ Connected to Snowflake successfully!")
    print(f"   Database: {test_df['db'].iloc[0]}")
    print(f"   Schema: {test_df['schema'].iloc[0]}")
    print(f"   Time: {test_df['current_time'].iloc[0]}")
except Exception as e:
    print(f"‚ùå Connection failed: {e}")


  df = pd.read_sql(sql, conn)


‚úÖ Connected to Snowflake successfully!
   Database: AUTOSENSE_DEV
   Schema: SEMANTIC
   Time: 2026-01-26 08:47:26.213000+00:00


<a id='dashboard'></a>

## Dashboard Visualizations

<a id='kpis'></a>

### 1. KPI Cards (Month-over-Month)

Compare current month performance against previous month for key business metrics.


In [None]:
# Query KPI data using fact_daily_metrics with dynamic date configuration
kpi_query = f"""
SELECT
    CASE
        WHEN d.date_actual BETWEEN '{CURRENT_MONTH_START_STR}' AND '{CURRENT_MONTH_END_STR}' THEN 'Current Month'
        WHEN d.date_actual BETWEEN '{PREVIOUS_MONTH_START_STR}' AND '{PREVIOUS_MONTH_END_STR}' THEN 'Previous Month'
    END AS period,
    SUM(f.valid_revenue) AS total_revenue,
    SUM(f.valid_energy_kwh) AS total_energy_kwh,
    MAX(f.active_chargers) AS active_chargers,
    MAX(f.active_users) AS active_users,
    SUM(f.valid_transactions) AS total_transactions,
    CASE
        WHEN SUM(f.valid_transactions) > 0
        THEN SUM(f.valid_revenue) / SUM(f.valid_transactions)
        ELSE 0
    END AS avg_revenue_per_transaction
FROM fact_daily_metrics f
JOIN dim_dates d ON f.transaction_date_key = d.date_key
WHERE f.city_name = '_ALL_CITIES_'
  AND d.date_actual BETWEEN '{PREVIOUS_MONTH_START_STR}' AND '{CURRENT_MONTH_END_STR}'
GROUP BY
    CASE
        WHEN d.date_actual BETWEEN '{CURRENT_MONTH_START_STR}' AND '{CURRENT_MONTH_END_STR}' THEN 'Current Month'
        WHEN d.date_actual BETWEEN '{PREVIOUS_MONTH_START_STR}' AND '{PREVIOUS_MONTH_END_STR}' THEN 'Previous Month'
    END
ORDER BY period DESC
"""

df_kpis = run_query(kpi_query)
df_kpis


  df = pd.read_sql(sql, conn)


Unnamed: 0,period,total_revenue,total_energy_kwh,active_chargers,active_users,total_transactions,avg_revenue_per_transaction
0,Previous Month,169895.03,339786.23,672,633,18220,9.32
1,Current Month,173667.65,347330.52,655,616,18922,9.18


In [None]:
# Calculate KPI values and % change
def calculate_kpi_metrics(df, metric):
    """Calculate current value, previous value, and % change."""
    current_row = df[df["period"] == "Current Month"]
    previous_row = df[df["period"] == "Previous Month"]

    current_val = float(current_row[metric].values[0]) if len(current_row) > 0 else 0.0
    previous_val = (
        float(previous_row[metric].values[0]) if len(previous_row) > 0 else 0.0
    )

    if previous_val > 0:
        pct_change = ((current_val - previous_val) / previous_val) * 100
    else:
        pct_change = 0.0

    return current_val, previous_val, pct_change


# Calculate all KPIs with current, previous, and change
kpis = {
    "Total Revenue": calculate_kpi_metrics(df_kpis, "total_revenue"),
    "Energy Delivered (kWh)": calculate_kpi_metrics(df_kpis, "total_energy_kwh"),
    "Active Chargers": calculate_kpi_metrics(df_kpis, "active_chargers"),
    "Active Users": calculate_kpi_metrics(df_kpis, "active_users"),
    "Transactions": calculate_kpi_metrics(df_kpis, "total_transactions"),
    "Avg Revenue/Txn": calculate_kpi_metrics(df_kpis, "avg_revenue_per_transaction"),
}

# Debug: Print calculated values
print("KPI Calculations:")
for name, (current, previous, change) in kpis.items():
    print(
        f"  {name}: Current={current:,.2f}, Previous={previous:,.2f}, Change={change:+.1f}%"
    )

# Create KPI indicator cards using Plotly
fig = make_subplots(
    rows=2,
    cols=3,
    specs=[[{"type": "indicator"}] * 3] * 2,
    subplot_titles=list(kpis.keys()),
)

positions = [(1, 1), (1, 2), (1, 3), (2, 1), (2, 2), (2, 3)]

for idx, (name, (current_val, previous_val, pct_change)) in enumerate(kpis.items()):
    row, col = positions[idx]

    fig.add_trace(
        go.Indicator(
            mode="number+delta",
            value=current_val,
            number={
                "prefix": "CHF " if "Revenue" in name else "",
                "suffix": " kWh" if "kWh" in name else "",
                "valueformat": ",.0f" if current_val > 100 else ",.2f",
            },
            delta={
                "position": "bottom",
                "reference": previous_val,  # Use actual previous value
                "relative": True,  # Show as percentage
                "valueformat": "+.1%",  # Format as percentage with sign
            },
            domain={"row": row - 1, "column": col - 1},
        ),
        row=row,
        col=col,
    )

# Dynamic title using month names
current_month_name = CURRENT_MONTH_START.strftime("%B %Y")
previous_month_name = PREVIOUS_MONTH_START.strftime("%B %Y")

fig.update_layout(
    height=400,
    title_text=f"<b>Monthly KPIs: {current_month_name} vs {previous_month_name}</b>",
    title_x=0.5,
    grid={"rows": 2, "columns": 3, "pattern": "independent"},
    margin=dict(t=80, b=20, l=20, r=20),
)

fig.show()


KPI Calculations:
  Total Revenue: Current=173,667.65, Previous=169,895.03, Change=+2.2%
  Energy Delivered (kWh): Current=347,330.52, Previous=339,786.23, Change=+2.2%
  Active Chargers: Current=655.00, Previous=672.00, Change=-2.5%
  Active Users: Current=616.00, Previous=633.00, Change=-2.7%
  Transactions: Current=18,922.00, Previous=18,220.00, Change=+3.9%
  Avg Revenue/Txn: Current=9.18, Previous=9.32, Change=-1.6%


<a id='revenue-trend'></a>

### 2. Revenue Trend (Last 90 Days)

Daily revenue trend with city filter and average reference line.


In [None]:
# Query revenue trend for last 90 days using dynamic date configuration
revenue_trend_query = f"""
SELECT
    f.transaction_date,
    f.city_name,
    f.valid_revenue AS total_revenue,
    f.valid_transactions AS total_transactions,
    f.active_users,
    f.active_chargers
FROM fact_daily_metrics f
WHERE f.transaction_date BETWEEN '{TREND_START_DATE_STR}' AND '{DATA_END_DATE_STR}'
  AND f.city_name != '_ALL_CITIES_'
ORDER BY f.transaction_date, f.city_name
"""

df_revenue_trend = run_query(revenue_trend_query)

# Also get the ALL_CITIES aggregate for overall trend
revenue_all_query = f"""
SELECT
    f.transaction_date,
    '_ALL_CITIES_' AS city_name,
    f.valid_revenue AS total_revenue
FROM fact_daily_metrics f
WHERE f.transaction_date BETWEEN '{TREND_START_DATE_STR}' AND '{DATA_END_DATE_STR}'
  AND f.city_name = '_ALL_CITIES_'
ORDER BY f.transaction_date
"""

df_revenue_all = run_query(revenue_all_query)

print(
    f"Loaded {len(df_revenue_trend)} daily records across {df_revenue_trend['city_name'].nunique()} cities"
)
print(
    f"Date range: {df_revenue_trend['transaction_date'].min()} to {df_revenue_trend['transaction_date'].max()}"
)
df_revenue_trend.head()



pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy.


pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy.



Loaded 720 daily records across 8 cities
Date range: 2025-03-03 to 2025-05-31


Unnamed: 0,transaction_date,city_name,total_revenue,total_transactions,active_users,active_chargers
0,2025-03-03,Basel,557.58,63,66,62
1,2025-03-03,Bern,669.49,70,81,74
2,2025-03-03,Geneva,658.96,70,74,68
3,2025-03-03,Lausanne,652.76,70,78,66
4,2025-03-03,Lucerne,639.88,67,75,65


In [None]:
# Create revenue trend line chart with city dropdown filter
cities = ["All Cities"] + sorted(df_revenue_trend["city_name"].unique().tolist())

# Create figure with traces for each city
fig = go.Figure()

# Add trace for "All Cities" (network-wide)
fig.add_trace(
    go.Scatter(
        x=df_revenue_all["transaction_date"],
        y=df_revenue_all["total_revenue"],
        mode="lines+markers",
        name="All Cities",
        line=dict(color=COLOR_PRIMARY, width=2),
        marker=dict(size=4),
        visible=True,
    )
)

# Add traces for individual cities (initially hidden)
for city in cities[1:]:  # Skip 'All Cities'
    city_data = df_revenue_trend[df_revenue_trend["city_name"] == city]
    fig.add_trace(
        go.Scatter(
            x=city_data["transaction_date"],
            y=city_data["total_revenue"],
            mode="lines+markers",
            name=city,
            line=dict(width=2),
            marker=dict(size=4),
            visible=False,
        )
    )

# Calculate average revenue for reference line (using All Cities data)
avg_revenue = df_revenue_all["total_revenue"].mean()

# Add horizontal line for average
fig.add_hline(
    y=avg_revenue,
    line_dash="dash",
    line_color=COLOR_GRAY,
    annotation_text=f"Avg: CHF {avg_revenue:,.0f}",
    annotation_position="top right",
)

# Create dropdown buttons
buttons = []
for i, city in enumerate(cities):
    visibility = [False] * len(cities)
    visibility[i] = True
    buttons.append(dict(label=city, method="update", args=[{"visible": visibility}]))

# Dynamic title using configured dates
trend_title = f"<b>Daily Revenue Trend ({TREND_START_DATE.strftime('%b %d')} - {DATA_END_DATE.strftime('%b %d, %Y')})</b>"

# Update layout with dropdown
fig.update_layout(
    title=trend_title,
    title_x=0.5,
    xaxis_title="Date",
    yaxis_title="Revenue (CHF)",
    height=500,
    hovermode="x unified",
    updatemenus=[
        dict(
            active=0,
            buttons=buttons,
            direction="down",
            showactive=True,
            x=0.02,
            xanchor="left",
            y=1.15,
            yanchor="top",
            bgcolor="white",
            bordercolor=COLOR_GRAY,
        )
    ],
    annotations=[
        dict(
            text="<b>Filter by City:</b>",
            x=0.02,
            xref="paper",
            y=1.22,
            yref="paper",
            showarrow=False,
            font=dict(size=12),
        )
    ],
)

fig.show()


<a id='charger-matrix'></a>

### 3. Charger Performance Matrix

Scatter plot showing charger volume (transactions) vs efficiency (revenue per transaction) to identify outliers.


In [None]:
# Query charger performance data (fact_charger_performance already has denormalized city)
charger_perf_query = """
SELECT
    charger_key,
    charger_id,
    charger_city,
    total_transactions,
    valid_transactions,
    avg_valid_revenue_per_transaction AS avg_revenue_per_transaction,
    total_revenue,
    valid_revenue,
    unique_users,
    valid_transaction_pct,
    performance_classification,
    transaction_volume_quartile,
    revenue_per_txn_quartile
FROM fact_charger_performance
WHERE total_transactions > 0
ORDER BY total_transactions DESC
"""

df_charger_perf = run_query(charger_perf_query)
print(f"Loaded {len(df_charger_perf)} chargers with performance data")
print(
    f"Performance classifications: {df_charger_perf['performance_classification'].value_counts().to_dict()}"
)
df_charger_perf.head()



pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy.



Loaded 3000 chargers with performance data
Performance classifications: {'Underperformer': 764, 'Low Volume / High Value': 736, 'High Volume / Low Value': 736, 'High Performer': 581, 'Star Performer': 183}


Unnamed: 0,charger_key,charger_id,charger_city,total_transactions,valid_transactions,avg_revenue_per_transaction,total_revenue,valid_revenue,unique_users,valid_transaction_pct,performance_classification,transaction_volume_quartile,revenue_per_txn_quartile
0,charger_66,charger_66,St. Gallen,57,55,9.79,538.72,538.72,55,96.49,Star Performer,4,4
1,charger_2742,charger_2742,St. Gallen,56,53,10.45,1321.96,554.03,56,94.64,Star Performer,4,4
2,charger_509,charger_509,Lausanne,54,51,10.15,517.42,517.42,54,94.44,Star Performer,4,4
3,charger_2774,charger_2774,Zurich,54,51,9.19,468.56,468.56,54,94.44,High Performer,4,3
4,charger_572,charger_572,St. Gallen,53,43,9.08,2760.28,390.36,53,81.13,Star Performer,4,4


In [None]:
# Create scatter plot for charger performance matrix
fig = px.scatter(
    df_charger_perf,
    x="total_transactions",
    y="avg_revenue_per_transaction",
    color="charger_city",
    hover_name="charger_id",
    hover_data={
        "performance_classification": True,
        "total_revenue": ":,.0f",
        "unique_users": True,
        "valid_transaction_pct": ":.1f",
        "charger_city": False,  # Already in legend
    },
    title="<b>Charger Performance Matrix</b><br><sup>Volume (Transactions) vs Efficiency (Revenue/Transaction)</sup>",
    labels={
        "total_transactions": "Total Transactions (Volume)",
        "avg_revenue_per_transaction": "Avg Revenue per Transaction (CHF)",
        "charger_city": "City",
        "performance_classification": "Classification",
        "total_revenue": "Total Revenue",
        "unique_users": "Unique Users",
        "valid_transaction_pct": "Valid Txn %",
    },
)

# Calculate medians for quadrant reference lines
median_transactions = df_charger_perf["total_transactions"].median()
median_revenue = df_charger_perf["avg_revenue_per_transaction"].median()

# Add quadrant lines
fig.add_hline(y=median_revenue, line_dash="dash", line_color=COLOR_GRAY, opacity=0.5)
fig.add_vline(
    x=median_transactions, line_dash="dash", line_color=COLOR_GRAY, opacity=0.5
)

# Add quadrant annotations
fig.add_annotation(
    x=0.95,
    xref="paper",
    y=0.95,
    yref="paper",
    text="‚≠ê High Volume, High Efficiency",
    showarrow=False,
    font=dict(size=10, color=COLOR_SUCCESS),
)
fig.add_annotation(
    x=0.05,
    xref="paper",
    y=0.95,
    yref="paper",
    text="üíé Low Volume, High Efficiency",
    showarrow=False,
    font=dict(size=10, color=COLOR_WARNING),
)
fig.add_annotation(
    x=0.95,
    xref="paper",
    y=0.05,
    yref="paper",
    text="‚ö†Ô∏è High Volume, Low Efficiency",
    showarrow=False,
    font=dict(size=10, color=COLOR_DANGER),
)
fig.add_annotation(
    x=0.05,
    xref="paper",
    y=0.05,
    yref="paper",
    text="üîª Underperformers",
    showarrow=False,
    font=dict(size=10, color=COLOR_GRAY),
)

fig.update_layout(
    height=600,
    title_x=0.5,
    legend_title_text="City",
    xaxis=dict(gridcolor="lightgray"),
    yaxis=dict(gridcolor="lightgray"),
)

fig.show()


<a id='geographic'></a>

### 4. Geographic Performance

Total revenue by city displayed on a map using coordinates from the semantic layer.


In [None]:
# Query geographic performance data
geo_query = """
SELECT
    c.city_name,
    c.city_center_latitude,
    c.city_center_longitude,
    c.total_chargers,
    c.market_size_tier,
    SUM(f.valid_revenue) AS total_revenue,
    SUM(f.valid_energy_kwh) AS total_energy_kwh,
    SUM(f.valid_transactions) AS total_transactions,
    AVG(f.valid_transaction_pct) AS avg_valid_transaction_pct
FROM fact_daily_metrics f
JOIN dim_cities c ON f.city_name = c.city_name
WHERE f.city_name != '_ALL_CITIES_'
  AND c.is_unmapped_city = FALSE
GROUP BY
    c.city_name,
    c.city_center_latitude,
    c.city_center_longitude,
    c.total_chargers,
    c.market_size_tier
ORDER BY total_revenue DESC
"""

df_geo = run_query(geo_query)
print(f"Loaded geographic data for {len(df_geo)} cities")
df_geo



pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy.



Loaded geographic data for 8 cities


Unnamed: 0,city_name,city_center_latitude,city_center_longitude,total_chargers,market_size_tier,total_revenue,total_energy_kwh,total_transactions,avg_valid_transaction_pct
0,Bern,49.31,9.91,403,Large Market,114113.37,228224.92,12381,91.46
1,Zurich,47.16,8.12,377,Large Market,107594.1,215186.2,11610,91.39
2,Basel,50.07,10.2,380,Large Market,106810.86,213619.64,11471,91.93
3,Geneva,46.2,6.14,370,Large Market,105488.73,210973.56,11340,91.37
4,Lausanne,46.18,7.3,376,Large Market,105281.23,210559.15,11434,90.85
5,Lugano,48.62,11.88,381,Large Market,105166.92,210331.88,11290,91.16
6,St. Gallen,46.85,8.41,363,Large Market,101672.05,203342.99,11024,91.17
7,Lucerne,49.41,11.46,350,Large Market,98899.72,197796.36,10761,91.16


In [None]:
# Create geographic bubble map
fig = px.scatter_mapbox(
    df_geo,
    lat="city_center_latitude",
    lon="city_center_longitude",
    size="total_revenue",
    color="market_size_tier",
    hover_name="city_name",
    hover_data={
        "total_revenue": ":,.0f",
        "total_transactions": ":,.0f",
        "total_chargers": True,
        "avg_valid_transaction_pct": ":.1f",
        "city_center_latitude": False,
        "city_center_longitude": False,
    },
    color_discrete_map={
        "Large Market": COLOR_PRIMARY,
        "Medium Market": COLOR_WARNING,
        "Small Market": COLOR_SECONDARY,
        "Emerging Market": COLOR_GRAY,
    },
    title="<b>Revenue by City</b><br><sup>Bubble size represents total revenue</sup>",
    labels={
        "total_revenue": "Total Revenue (CHF)",
        "total_transactions": "Transactions",
        "total_chargers": "Chargers",
        "avg_valid_transaction_pct": "Valid Txn %",
        "market_size_tier": "Market Tier",
    },
    zoom=6,
    mapbox_style="carto-positron",
)

fig.update_layout(height=600, title_x=0.5, margin=dict(l=0, r=0, t=60, b=0))

fig.show()

# Also create a bar chart as alternative visualization
fig_bar = px.bar(
    df_geo.sort_values("total_revenue", ascending=True),
    x="total_revenue",
    y="city_name",
    color="market_size_tier",
    orientation="h",
    title="<b>Total Revenue by City</b>",
    labels={
        "total_revenue": "Total Revenue (CHF)",
        "city_name": "City",
        "market_size_tier": "Market Tier",
    },
    color_discrete_map={
        "Large Market": COLOR_PRIMARY,
        "Medium Market": COLOR_WARNING,
        "Small Market": COLOR_SECONDARY,
        "Emerging Market": COLOR_GRAY,
    },
)

fig_bar.update_layout(height=400, title_x=0.5, xaxis_tickformat=",", showlegend=True)

fig_bar.show()



*scatter_mapbox* is deprecated! Use *scatter_map* instead. Learn more at: https://plotly.com/python/mapbox-to-maplibre/



<a id='data-quality'></a>

### 5. Data Quality Monitor

Visualize the distribution of valid vs flagged transactions, emphasizing the **"Flag, don't filter"** philosophy.


In [None]:
# Query data quality metrics from fact_transactions
dq_query = """
SELECT
    COUNT(*) AS total_transactions,
    SUM(CASE WHEN is_valid_transaction THEN 1 ELSE 0 END) AS valid_transactions,
    SUM(CASE WHEN is_missing_payment THEN 1 ELSE 0 END) AS missing_payments,
    SUM(CASE WHEN is_missing_payment_amount THEN 1 ELSE 0 END) AS missing_payment_amount,
    SUM(CASE WHEN is_invalid_payment_amount THEN 1 ELSE 0 END) AS invalid_payment_amount,
    SUM(CASE WHEN is_invalid_time_range THEN 1 ELSE 0 END) AS invalid_time_range,
    SUM(CASE WHEN is_missing_kwh THEN 1 ELSE 0 END) AS missing_kwh,
    SUM(CASE WHEN is_negative_kwh THEN 1 ELSE 0 END) AS negative_kwh,
    SUM(CASE WHEN is_outlier_kwh THEN 1 ELSE 0 END) AS outlier_kwh,
    SUM(CASE WHEN is_outlier_duration THEN 1 ELSE 0 END) AS outlier_duration,
    SUM(CASE WHEN is_outlier_payment_amount THEN 1 ELSE 0 END) AS outlier_payment_amount,
    SUM(CASE WHEN has_any_quality_issue AND NOT is_valid_transaction THEN 1 ELSE 0 END) AS transactions_with_issues
FROM fact_transactions
"""

df_dq = run_query(dq_query)

# Calculate percentages
total = df_dq["total_transactions"].iloc[0]
valid = df_dq["valid_transactions"].iloc[0]
issues = df_dq["transactions_with_issues"].iloc[0]

print(f"Total Transactions: {total:,}")
print(f"Valid Transactions: {valid:,} ({valid/total*100:.1f}%)")
print(f"Transactions with Issues: {issues:,} ({issues/total*100:.1f}%)")
df_dq



pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy.



Total Transactions: 100,000
Valid Transactions: 91,311 (91.3%)
Transactions with Issues: 8,689 (8.7%)


Unnamed: 0,total_transactions,valid_transactions,missing_payments,missing_payment_amount,invalid_payment_amount,invalid_time_range,missing_kwh,negative_kwh,outlier_kwh,outlier_duration,outlier_payment_amount,transactions_with_issues
0,100000,91311,0,0,262,137,0,214,265,143,303,8689


In [None]:
# Create Data Quality visualizations
from plotly.subplots import make_subplots

# Prepare data for pie chart
total = df_dq["total_transactions"].iloc[0]
valid = df_dq["valid_transactions"].iloc[0]
issues = total - valid

pie_data = pd.DataFrame(
    {
        "Category": ["‚úÖ Valid Transactions", "‚ö†Ô∏è Transactions with Issues"],
        "Count": [valid, issues],
        "Percentage": [valid / total * 100, issues / total * 100],
    }
)

# Create subplots: pie chart and breakdown bar chart
fig = make_subplots(
    rows=1,
    cols=2,
    specs=[[{"type": "pie"}, {"type": "bar"}]],
    subplot_titles=("Transaction Quality Overview", "Issue Type Breakdown"),
)

# Pie chart
fig.add_trace(
    go.Pie(
        labels=pie_data["Category"],
        values=pie_data["Count"],
        hole=0.4,
        marker_colors=[COLOR_SUCCESS, COLOR_DANGER],
        textinfo="percent+label",
        textposition="outside",
        hovertemplate="%{label}<br>Count: %{value:,.0f}<br>Percentage: %{percent}<extra></extra>",
    ),
    row=1,
    col=1,
)

# Prepare issue breakdown data
issue_types = {
    "Missing Amount": df_dq["missing_payment_amount"].iloc[0],
    "Invalid Amount": df_dq["invalid_payment_amount"].iloc[0],
    "Invalid Time": df_dq["invalid_time_range"].iloc[0],
    "Missing kWh": df_dq["missing_kwh"].iloc[0],
    "Negative kWh": df_dq["negative_kwh"].iloc[0],
    "Outlier kWh": df_dq["outlier_kwh"].iloc[0],
    "Outlier Duration": df_dq["outlier_duration"].iloc[0],
    "Outlier Amount": df_dq["outlier_payment_amount"].iloc[0],
}

# Sort by count descending and filter out zeros
issue_df = pd.DataFrame(
    {"Issue Type": list(issue_types.keys()), "Count": list(issue_types.values())}
)
issue_df = issue_df[issue_df["Count"] > 0].sort_values("Count", ascending=True)

# Bar chart for issue breakdown
fig.add_trace(
    go.Bar(
        x=issue_df["Count"],
        y=issue_df["Issue Type"],
        orientation="h",
        marker_color=COLOR_WARNING,
        text=issue_df["Count"].apply(lambda x: f"{x:,}"),
        textposition="outside",
        hovertemplate="%{y}<br>Count: %{x:,.0f}<extra></extra>",
    ),
    row=1,
    col=2,
)

fig.update_layout(
    height=500,
    title_text='<b>Data Quality Monitor</b><br><sup>Philosophy: "Flag, don\'t filter" - All data is preserved with quality flags</sup>',
    title_x=0.5,
    showlegend=False,
)

fig.update_xaxes(title_text="Count", row=1, col=2)

fig.show()

# Display summary message
display(
    HTML(
        f"""
<div style="background-color: #f0f8ff; padding: 15px; border-radius: 10px; border-left: 5px solid {COLOR_PRIMARY}; margin-top: 20px;">
    <h4 style="margin-top: 0;">üìä Data Quality Summary</h4>
    <p><strong>Total Transactions:</strong> {total:,}</p>
    <p><strong>Valid for Reporting:</strong> {valid:,} ({valid/total*100:.1f}%)</p>
    <p><strong>Flagged with Issues:</strong> {issues:,} ({issues/total*100:.1f}%)</p>
    <hr>
    <p style="font-style: italic; color: #666;">
        <strong>Note:</strong> All transactions are preserved in the data model with quality flags.
        Financial metrics (KPIs, Revenue Trend) use only <code>is_valid_transaction = TRUE</code> records.
        This "Flag, don't filter" approach ensures data transparency and auditability.
    </p>
</div>
"""
    )
)


<a id='data-quality-trend'></a>

#### 5.1 Data Quality Trend Over Time

Monitor if data quality is **degrading over time** - a critical requirement for alerting the engineering team of potential system problems.


In [None]:
# Query ALL daily data quality metrics (full date range available in the data)
dq_trend_query = """
SELECT
    d.date_actual AS transaction_date,
    d.week_start_date,
    COUNT(*) AS total_transactions,
    SUM(CASE WHEN f.is_valid_transaction THEN 1 ELSE 0 END) AS valid_transactions,
    SUM(CASE WHEN f.has_any_quality_issue THEN 1 ELSE 0 END) AS transactions_with_issues,
    -- Individual issue types for filtering
    SUM(CASE WHEN f.is_missing_payment_amount THEN 1 ELSE 0 END) AS missing_amount,
    SUM(CASE WHEN f.is_invalid_payment_amount THEN 1 ELSE 0 END) AS invalid_amount,
    SUM(CASE WHEN f.is_invalid_time_range THEN 1 ELSE 0 END) AS invalid_time,
    SUM(CASE WHEN f.is_missing_kwh THEN 1 ELSE 0 END) AS missing_kwh,
    SUM(CASE WHEN f.is_negative_kwh THEN 1 ELSE 0 END) AS negative_kwh,
    SUM(CASE WHEN f.is_outlier_kwh THEN 1 ELSE 0 END) AS outlier_kwh,
    SUM(CASE WHEN f.is_outlier_duration THEN 1 ELSE 0 END) AS outlier_duration,
    SUM(CASE WHEN f.is_outlier_payment_amount THEN 1 ELSE 0 END) AS outlier_amount
FROM fact_transactions f
JOIN dim_dates d ON f.transaction_date_key = d.date_key
GROUP BY d.date_actual, d.week_start_date
ORDER BY d.date_actual
"""

df_dq_trend_all = run_query(dq_trend_query)

# Calculate percentages for all metrics
for col in [
    "valid_transactions",
    "transactions_with_issues",
    "missing_amount",
    "invalid_amount",
    "invalid_time",
    "missing_kwh",
    "negative_kwh",
    "outlier_kwh",
    "outlier_duration",
    "outlier_amount",
]:
    df_dq_trend_all[f"{col}_pct"] = (
        df_dq_trend_all[col] / df_dq_trend_all["total_transactions"] * 100
    ).round(2)

# Also aggregate by week for weekly view
agg_cols = [
    "total_transactions",
    "valid_transactions",
    "transactions_with_issues",
    "missing_amount",
    "invalid_amount",
    "invalid_time",
    "missing_kwh",
    "negative_kwh",
    "outlier_kwh",
    "outlier_duration",
    "outlier_amount",
]
df_dq_weekly_all = (
    df_dq_trend_all.groupby("week_start_date")
    .agg({col: "sum" for col in agg_cols})
    .reset_index()
)

# Recalculate percentages for weekly aggregates
for col in agg_cols[1:]:  # Skip total_transactions
    df_dq_weekly_all[f"{col}_pct"] = (
        df_dq_weekly_all[col] / df_dq_weekly_all["total_transactions"] * 100
    ).round(2)

# Define issue types for dropdown
ISSUE_TYPES = {
    "All Issues": "transactions_with_issues_pct",
    "Missing Amount": "missing_amount_pct",
    "Invalid Amount": "invalid_amount_pct",
    "Invalid Time Range": "invalid_time_pct",
    "Missing kWh": "missing_kwh_pct",
    "Negative kWh": "negative_kwh_pct",
    "Outlier kWh": "outlier_kwh_pct",
    "Outlier Duration": "outlier_duration_pct",
    "Outlier Amount": "outlier_amount_pct",
}

# Store date range info
DATA_MIN_DATE = df_dq_trend_all["transaction_date"].min()
DATA_MAX_DATE = df_dq_trend_all["transaction_date"].max()

print(
    f"Loaded {len(df_dq_trend_all)} daily records and {len(df_dq_weekly_all)} weekly aggregates"
)
print(f"Full date range available: {DATA_MIN_DATE} to {DATA_MAX_DATE}")
print(f"Available issue filters: {list(ISSUE_TYPES.keys())}")
df_dq_trend_all.head()



pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy.



Loaded 151 daily records and 22 weekly aggregates
Full date range available: 2025-01-01 to 2025-05-31
Available issue filters: ['All Issues', 'Missing Amount', 'Invalid Amount', 'Invalid Time Range', 'Missing kWh', 'Negative kWh', 'Outlier kWh', 'Outlier Duration', 'Outlier Amount']


Unnamed: 0,transaction_date,week_start_date,total_transactions,valid_transactions,transactions_with_issues,missing_amount,invalid_amount,invalid_time,missing_kwh,negative_kwh,outlier_kwh,outlier_duration,outlier_amount,valid_transactions_pct,transactions_with_issues_pct,missing_amount_pct,invalid_amount_pct,invalid_time_pct,missing_kwh_pct,negative_kwh_pct,outlier_kwh_pct,outlier_duration_pct,outlier_amount_pct
0,2025-01-01,2024-12-30,678,627,51,0,1,1,0,1,3,3,3,92.48,7.52,0.0,0.15,0.15,0.0,0.15,0.44,0.44,0.44
1,2025-01-02,2024-12-30,640,588,52,0,2,3,0,2,6,3,7,91.88,8.12,0.0,0.31,0.47,0.0,0.31,0.94,0.47,1.09
2,2025-01-03,2024-12-30,673,613,60,0,2,3,0,2,2,1,2,91.08,8.92,0.0,0.3,0.45,0.0,0.3,0.3,0.15,0.3
3,2025-01-04,2024-12-30,670,614,56,0,1,0,0,0,3,1,3,91.64,8.36,0.0,0.15,0.0,0.0,0.0,0.45,0.15,0.45
4,2025-01-05,2024-12-30,677,609,68,0,2,1,0,2,4,2,4,89.96,10.04,0.0,0.3,0.15,0.0,0.3,0.59,0.3,0.59


In [None]:
# Create interactive Data Quality Trend visualization with Issue Type dropdown, Date Range Selector
fig_dq_trend = go.Figure()

# Calculate dynamic thresholds and y-axis ranges for each issue type
# Threshold = mean + 1.5 * std (statistical alert level)
issue_stats = {}
for issue_name, issue_col in ISSUE_TYPES.items():
    mean_val = df_dq_trend_all[issue_col].mean()
    std_val = df_dq_trend_all[issue_col].std()
    max_val = df_dq_trend_all[issue_col].max()
    # Threshold: mean + 1.5*std, but at least 2% and capped reasonably
    threshold = max(mean_val + 1.5 * std_val, 2.0)
    threshold = round(threshold, 1)
    # Y-axis max: 20% above max value, minimum of 5%
    y_max = max(max_val * 1.2, threshold * 1.3, 5.0)
    issue_stats[issue_name] = {
        "threshold": threshold,
        "y_max": y_max,
        "mean": mean_val,
        "col": issue_col,
    }

# Build traces for each issue type (Daily + Weekly + Threshold_Daily + Threshold_Weekly = 4 traces per issue type)
trace_index = 0
visibility_map = (
    {}
)  # Maps issue_type -> [daily, weekly, threshold_daily, threshold_weekly]

for issue_name, issue_col in ISSUE_TYPES.items():
    stats = issue_stats[issue_name]

    # Daily - Issue Type %
    fig_dq_trend.add_trace(
        go.Scatter(
            x=df_dq_trend_all["transaction_date"],
            y=df_dq_trend_all[issue_col],
            mode="lines+markers",
            name=f"{issue_name} % (Daily)",
            line=dict(color=COLOR_DANGER, width=2),
            marker=dict(size=4),
            visible=(issue_name == "All Issues"),
            hovertemplate=f"Date: %{{x}}<br>{issue_name}: %{{y:.1f}}%<extra></extra>",
        )
    )

    # Weekly - Issue Type %
    fig_dq_trend.add_trace(
        go.Scatter(
            x=df_dq_weekly_all["week_start_date"],
            y=df_dq_weekly_all[issue_col],
            mode="lines+markers",
            name=f"{issue_name} % (Weekly)",
            line=dict(color=COLOR_DANGER, width=3),
            marker=dict(size=8),
            visible=False,
            hovertemplate=f"Week of: %{{x}}<br>{issue_name}: %{{y:.1f}}%<extra></extra>",
        )
    )

    # Threshold line for Daily view
    fig_dq_trend.add_trace(
        go.Scatter(
            x=[
                df_dq_trend_all["transaction_date"].min(),
                df_dq_trend_all["transaction_date"].max(),
            ],
            y=[stats["threshold"], stats["threshold"]],
            mode="lines",
            name=f"Alert Threshold: {stats['threshold']:.1f}%",
            line=dict(color=COLOR_WARNING, width=2, dash="dash"),
            visible=(issue_name == "All Issues"),
            hovertemplate=f"Alert Threshold: {stats['threshold']:.1f}%<extra></extra>",
            showlegend=True,
        )
    )

    # Threshold line for Weekly view (same threshold, but different x range)
    fig_dq_trend.add_trace(
        go.Scatter(
            x=[
                df_dq_weekly_all["week_start_date"].min(),
                df_dq_weekly_all["week_start_date"].max(),
            ],
            y=[stats["threshold"], stats["threshold"]],
            mode="lines",
            name=f"Alert Threshold: {stats['threshold']:.1f}%",
            line=dict(color=COLOR_WARNING, width=2, dash="dash"),
            visible=False,
            hovertemplate=f"Alert Threshold: {stats['threshold']:.1f}%<extra></extra>",
            showlegend=True,
        )
    )

    # Store indices: [daily, weekly, threshold_daily, threshold_weekly]
    visibility_map[issue_name] = [
        trace_index,
        trace_index + 1,
        trace_index + 2,
        trace_index + 3,
    ]
    trace_index += 4

# Total number of traces
total_traces = trace_index

# Track current view state for toggle (will be managed by custom buttons)
# We need separate buttons for issue type AND for daily/weekly toggle

# Create dropdown buttons for issue type selection (Daily view by default)
issue_buttons_daily = []
issue_buttons_weekly = []

for issue_name in ISSUE_TYPES.keys():
    indices = visibility_map[issue_name]
    stats = issue_stats[issue_name]

    # Daily view: show daily trace + daily threshold
    vis_daily = [False] * total_traces
    vis_daily[indices[0]] = True  # Daily trace
    vis_daily[indices[2]] = True  # Daily threshold

    issue_buttons_daily.append(
        dict(
            label=issue_name,
            method="update",
            args=[
                {"visible": vis_daily},
                {
                    "yaxis.range": [0, stats["y_max"]],
                    "title.text": f"<b>Data Quality Trend - {issue_name}</b><br><sup>Use slider below to select date range</sup>",
                },
            ],
        )
    )

    # Weekly view: show weekly trace + weekly threshold
    vis_weekly = [False] * total_traces
    vis_weekly[indices[1]] = True  # Weekly trace
    vis_weekly[indices[3]] = True  # Weekly threshold

    issue_buttons_weekly.append(
        dict(
            label=issue_name,
            method="update",
            args=[
                {"visible": vis_weekly},
                {
                    "yaxis.range": [0, stats["y_max"]],
                    "title.text": f"<b>Data Quality Trend - {issue_name}</b><br><sup>Use slider below to select date range</sup>",
                },
            ],
        )
    )

# Get initial stats for "All Issues"
initial_stats = issue_stats["All Issues"]

# Create layout with dropdown, Daily/Weekly toggle, and date range slider
fig_dq_trend.update_layout(
    updatemenus=[
        # Issue Type Dropdown (Daily view)
        dict(
            type="dropdown",
            direction="down",
            active=0,
            x=0.12,
            xanchor="left",
            y=1.12,
            yanchor="top",
            buttons=issue_buttons_daily,
            bgcolor="white",
            bordercolor=COLOR_GRAY,
            name="issue_dropdown",
        ),
        # Daily/Weekly Toggle Buttons - actually switches trace visibility
        dict(
            type="buttons",
            direction="right",
            active=0,
            x=0.52,
            xanchor="left",
            y=1.12,
            yanchor="top",
            buttons=[
                dict(
                    label="üìÖ Daily",
                    method="update",
                    args=[
                        # Show All Issues daily + threshold (default)
                        {
                            "visible": [
                                i
                                in [
                                    visibility_map["All Issues"][0],
                                    visibility_map["All Issues"][2],
                                ]
                                for i in range(total_traces)
                            ]
                        },
                        {},
                    ],
                ),
                dict(
                    label="üìä Weekly",
                    method="update",
                    args=[
                        # Show All Issues weekly + threshold
                        {
                            "visible": [
                                i
                                in [
                                    visibility_map["All Issues"][1],
                                    visibility_map["All Issues"][3],
                                ]
                                for i in range(total_traces)
                            ]
                        },
                        {},
                    ],
                ),
            ],
            bgcolor="white",
            bordercolor=COLOR_GRAY,
        ),
    ],
    title=dict(
        text=f"<b>Data Quality Trend - All Issues</b><br><sup>Use slider below to select date range</sup>",
        x=0.5,
        y=0.97,
        xanchor="center",
        yanchor="bottom",
    ),
    xaxis=dict(
        title="Date",
        rangeslider=dict(
            visible=True,
            thickness=0.08,
            bgcolor="#f0f0f0",
        ),
        rangeselector=dict(
            buttons=[
                dict(count=7, label="1W", step="day", stepmode="backward"),
                dict(count=1, label="1M", step="month", stepmode="backward"),
                dict(count=3, label="3M", step="month", stepmode="backward"),
                dict(count=6, label="6M", step="month", stepmode="backward"),
                dict(step="all", label="All"),
            ],
            bgcolor="white",
            activecolor=COLOR_PRIMARY,
            x=0.0,
            y=1.0,
        ),
        type="date",
    ),
    yaxis=dict(
        title="Issue Percentage (%)",
        range=[0, initial_stats["y_max"]],
    ),
    height=650,
    margin=dict(t=100, b=100, l=60, r=40),
    hovermode="x unified",
    legend=dict(
        orientation="h",
        yanchor="bottom",
        y=-0.25,
        xanchor="center",
        x=0.5,
    ),
    annotations=[
        dict(
            text="<b>Issue Filter:</b>",
            x=0.02,
            xref="paper",
            y=1.12,
            yref="paper",
            showarrow=False,
            font=dict(size=11),
        ),
        dict(
            text="<b>View:</b>",
            x=0.45,
            xref="paper",
            y=1.12,
            yref="paper",
            showarrow=False,
            font=dict(size=11),
        ),
    ],
)

fig_dq_trend.show()

# Calculate trend direction using last 90 days of data
df_recent = df_dq_trend_all.tail(90) if len(df_dq_trend_all) >= 90 else df_dq_trend_all
recent_issues_pct = df_recent.tail(7)["transactions_with_issues_pct"].mean()
earlier_issues_pct = df_recent.head(7)["transactions_with_issues_pct"].mean()
trend_direction = (
    "üìâ Improving"
    if recent_issues_pct < earlier_issues_pct
    else "üìà Degrading" if recent_issues_pct > earlier_issues_pct else "‚û°Ô∏è Stable"
)
trend_color = (
    COLOR_SUCCESS
    if recent_issues_pct < earlier_issues_pct
    else COLOR_DANGER if recent_issues_pct > earlier_issues_pct else COLOR_GRAY
)

# Summary stats for all issue types with dynamic thresholds
issue_summary = []
for issue_name, stats in issue_stats.items():
    if issue_name != "All Issues":
        avg_pct = stats["mean"]
        threshold = stats["threshold"]
        status = "‚ö†Ô∏è" if avg_pct > threshold * 0.8 else "‚úÖ"
        if avg_pct > 0.5:  # Only show issues with > 0.5% occurrence
            issue_summary.append(
                f"<li>{status} <strong>{issue_name}:</strong> {avg_pct:.1f}% avg (threshold: {threshold:.1f}%)</li>"
            )

# Display trend analysis summary
display(
    HTML(
        f"""
<div style="background-color: #fff3cd; padding: 15px; border-radius: 10px; border-left: 5px solid {trend_color}; margin-top: 20px;">
    <h4 style="margin-top: 0;">üìà Data Quality Trend Analysis</h4>
    <p><strong>Full Data Range:</strong> {DATA_MIN_DATE.strftime('%b %d, %Y') if hasattr(DATA_MIN_DATE, 'strftime') else DATA_MIN_DATE} to {DATA_MAX_DATE.strftime('%b %d, %Y') if hasattr(DATA_MAX_DATE, 'strftime') else DATA_MAX_DATE}</p>
    <p><strong>Trend Direction (Recent 90 days):</strong> <span style="color: {trend_color}; font-weight: bold;">{trend_direction}</span></p>
    <p><strong>Recent 7-Day Avg Issues %:</strong> {recent_issues_pct:.1f}%</p>
    <p><strong>Earlier 7-Day Avg Issues %:</strong> {earlier_issues_pct:.1f}%</p>
    <p><strong>All Issues Alert Threshold:</strong> {issue_stats['All Issues']['threshold']:.1f}% (mean + 1.5œÉ)</p>
    <hr>
    <p><strong>üìä Issue Breakdown (full period avg vs dynamic threshold):</strong></p>
    <ul style="margin: 5px 0;">
        {''.join(issue_summary) if issue_summary else '<li>No significant individual issues detected</li>'}
    </ul>
    <p style="font-style: italic; color: #666; margin-top: 10px;">
        <strong>Tip:</strong> Use the date range selector buttons (1W, 1M, 3M, 6M, All) or drag the slider below the chart to zoom into specific periods.
    </p>
</div>
"""
    )
)
