# PagerDuty Usage Cost Report

This notebook analyzes PagerDuty alert usage and attributes product costs to services, teams, and escalation policies.


## CONFIG


In [None]:
import json
import os
import time
from datetime import datetime, date, timedelta
from pathlib import Path
from typing import Dict, List, Tuple

import requests
import pandas as pd
import matplotlib.pyplot as plt
from dateutil.relativedelta import relativedelta
from IPython.display import display, Markdown, clear_output
import ipywidgets as widgets

PAGERDUTY_API_TOKEN = os.getenv("PAGERDUTY_API_TOKEN")
PAGERDUTY_API_BASE_URL = "https://api.pagerduty.com"
MONTHLY_PAGERDUTY_COST_USD = 4200
CACHE_DIR = Path(".cache")
CACHE_TTL_HOURS = 24

if not PAGERDUTY_API_TOKEN:
    raise ValueError("Missing PAGERDUTY_API_TOKEN environment variable.")

CACHE_DIR.mkdir(parents=True, exist_ok=True)


def get_current_month_range() -> Tuple[date, date]:
    today = date.today()
    start = today.replace(day=1)
    end = (start + relativedelta(months=1)) - timedelta(days=1)
    return start, end


## UI


In [None]:
start_default, end_default = get_current_month_range()

start_picker = widgets.DatePicker(
    description="Start",
    value=start_default,
    disabled=False,
)
end_picker = widgets.DatePicker(
    description="End",
    value=end_default,
    disabled=False,
)

teams_select = widgets.SelectMultiple(
    options=["All"],
    value=("All",),
    description="Teams",
)
services_select = widgets.SelectMultiple(
    options=["All"],
    value=("All",),
    description="Services",
)
escalation_select = widgets.SelectMultiple(
    options=["All"],
    value=("All",),
    description="Escalation Policies",
)


top_n_dropdown = widgets.Dropdown(
    options=[5, 10, 20, "All"],
    value=10,
    description="Top-N",
)

run_button = widgets.Button(description="Run report", button_style="primary")

ui_box = widgets.VBox([
    widgets.HBox([start_picker, end_picker, top_n_dropdown]),
    teams_select,
    services_select,
    escalation_select,
    run_button,
])

log_output = widgets.Output()
report_output = widgets.Output()
executive_output = widgets.Output()


def _display_ui():
    display(ui_box)
    display(log_output)
    display(executive_output)
    display(report_output)

_display_ui()


## DATA


In [None]:
HEADERS = {
    "Authorization": f"Token token={PAGERDUTY_API_TOKEN}",
    "Accept": "application/vnd.pagerduty+json;version=2",
}


def pd_get(endpoint: str, params: Dict) -> Dict:
    url = f"{PAGERDUTY_API_BASE_URL}{endpoint}"
    response = requests.get(url, headers=HEADERS, params=params, timeout=60)
    if not response.ok:
        snippet = response.text[:300]
        raise RuntimeError(f"PagerDuty API error {response.status_code}: {snippet}")
    return response.json()


def fetch_paginated(endpoint: str, params: Dict, root_key: str) -> List[Dict]:
    items = []
    offset = 0
    limit = 100
    while True:
        payload = params.copy()
        payload.update({"limit": limit, "offset": offset})
        data = pd_get(endpoint, payload)
        batch = data.get(root_key, [])
        items.extend(batch)
        if not data.get("more"):
            break
        offset += limit
    return items


def fetch_incidents(since: date, until: date) -> List[Dict]:
    params = {
        "since": since.isoformat(),
        "until": until.isoformat(),
        "statuses[]": ["triggered", "acknowledged", "resolved"],
    }
    return fetch_paginated("/incidents", params, "incidents")


def fetch_metadata() -> Dict[str, List[Dict]]:
    services = fetch_paginated("/services", {}, "services")
    teams = fetch_paginated("/teams", {}, "teams")
    escalation_policies = fetch_paginated("/escalation_policies", {}, "escalation_policies")
    return {
        "services": services,
        "teams": teams,
        "escalation_policies": escalation_policies,
    }


def normalize_incidents_to_df(incidents: List[Dict]) -> pd.DataFrame:
    rows = []
    for incident in incidents:
        incident_id = incident.get("id")
        created_at = pd.to_datetime(incident.get("created_at"))
        service = incident.get("service") or {}
        escalation_policy = incident.get("escalation_policy") or {}
        teams = incident.get("teams") or []
        team_names = ", ".join([t.get("summary", "Unknown") for t in teams]) or "Unknown"
        team_ids = ", ".join([t.get("id", "") for t in teams]) or "Unknown"

        # PagerDuty alerts can be fetched per incident; using incident_id as incident-as-alert fallback.
        alert_id = incident_id

        rows.append({
            "alert_id": alert_id,
            "incident_id": incident_id,
            "created_at": created_at,
            "day": created_at.date() if pd.notnull(created_at) else None,
            "service_name": service.get("summary", "Unknown"),
            "service_id": service.get("id", "Unknown"),
            "team_name": team_names,
            "team_id": team_ids,
            "escalation_policy_name": escalation_policy.get("summary", "Unknown"),
            "escalation_policy_id": escalation_policy.get("id", "Unknown"),
        })

    df = pd.DataFrame(rows)
    return df


## CACHE


In [None]:
def _cache_path(cache_key: str) -> Path:
    safe_key = cache_key.replace("/", "_")
    return CACHE_DIR / f"{safe_key}.json"


def _is_cache_fresh(path: Path, ttl_hours: int) -> bool:
    if not path.exists():
        return False
    age_seconds = time.time() - path.stat().st_mtime
    return age_seconds < ttl_hours * 3600


def load_or_fetch_cache(cache_key: str, fetch_fn):
    path = _cache_path(cache_key)
    if _is_cache_fresh(path, CACHE_TTL_HOURS):
        print(f"Cache hit for {cache_key}. Using cached PagerDuty data.")
        return json.loads(path.read_text())

    print(f"Cache miss for {cache_key}. Fetching from API.")
    data = fetch_fn()
    path.write_text(json.dumps(data))
    return data


def load_data(since: date, until: date) -> Tuple[pd.DataFrame, Dict[str, List[Dict]]]:
    since_str = since.isoformat()
    until_str = until.isoformat()

    incidents_key = f"incidents_{since_str}_{until_str}"
    metadata_key = f"metadata_services_teams_policies_{since_str}_{until_str}"

    incidents = load_or_fetch_cache(incidents_key, lambda: fetch_incidents(since, until))
    metadata = load_or_fetch_cache(metadata_key, fetch_metadata)

    df = normalize_incidents_to_df(incidents)
    return df, metadata


## CALC


In [None]:
def apply_filters(df: pd.DataFrame, teams, services, escalation_policies) -> pd.DataFrame:
    filtered = df.copy()

    if teams and "All" not in teams:
        filtered = filtered[filtered["team_name"].isin(teams)]
    if services and "All" not in services:
        filtered = filtered[filtered["service_name"].isin(services)]
    if escalation_policies and "All" not in escalation_policies:
        filtered = filtered[filtered["escalation_policy_name"].isin(escalation_policies)]

    return filtered


def compute_costs_and_aggregations(df: pd.DataFrame):
    total_alerts = df["alert_id"].nunique()
    cost_per_alert = MONTHLY_PAGERDUTY_COST_USD / total_alerts if total_alerts else 0

    def add_cost(grouped):
        grouped = grouped.copy()
        grouped["cost_usd"] = grouped["alert_count"] * cost_per_alert
        return grouped

    by_service = (
        df.groupby(["service_name", "service_id"], dropna=False)["alert_id"]
        .nunique()
        .reset_index(name="alert_count")
    )
    by_service = add_cost(by_service)

    by_team = (
        df.groupby(["team_name", "team_id"], dropna=False)["alert_id"]
        .nunique()
        .reset_index(name="alert_count")
    )
    by_team = add_cost(by_team)

    by_escalation = (
        df.groupby(["escalation_policy_name", "escalation_policy_id"], dropna=False)["alert_id"]
        .nunique()
        .reset_index(name="alert_count")
    )
    by_escalation = add_cost(by_escalation)

    alerts_over_time = (
        df.groupby("day")["alert_id"]
        .nunique()
        .reset_index(name="alert_count")
        .sort_values("day")
    )

    metrics = {
        "total_alerts": total_alerts,
        "cost_per_alert": cost_per_alert,
    }

    return metrics, by_service, by_team, by_escalation, alerts_over_time


## VISUAL


In [None]:
def render_charts_and_tables(by_service, by_team, by_escalation, alerts_over_time, top_n):
    plt.style.use("seaborn-v0_8-whitegrid")

    if top_n != "All":
        top_services = by_service.sort_values("cost_usd", ascending=False).head(int(top_n))
    else:
        top_services = by_service.sort_values("cost_usd", ascending=False)

    fig, ax = plt.subplots(figsize=(10, 5))
    ax.bar(top_services["service_name"], top_services["cost_usd"], color="#4C72B0")
    ax.set_title("Top Services by Attributed Cost")
    ax.set_xlabel("Service")
    ax.set_ylabel("Cost (USD)")
    ax.tick_params(axis="x", rotation=45, ha="right")
    plt.tight_layout()
    display(fig)
    plt.close(fig)

    fig, ax = plt.subplots(figsize=(10, 5))
    sorted_team = by_team.sort_values("cost_usd", ascending=False)
    ax.bar(sorted_team["team_name"], sorted_team["cost_usd"], color="#55A868")
    ax.set_title("Cost by Team")
    ax.set_xlabel("Team")
    ax.set_ylabel("Cost (USD)")
    ax.tick_params(axis="x", rotation=45, ha="right")
    plt.tight_layout()
    display(fig)
    plt.close(fig)

    fig, ax = plt.subplots(figsize=(10, 5))
    sorted_policy = by_escalation.sort_values("cost_usd", ascending=False)
    ax.bar(sorted_policy["escalation_policy_name"], sorted_policy["cost_usd"], color="#C44E52")
    ax.set_title("Cost by Escalation Policy")
    ax.set_xlabel("Escalation Policy")
    ax.set_ylabel("Cost (USD)")
    ax.tick_params(axis="x", rotation=45, ha="right")
    plt.tight_layout()
    display(fig)
    plt.close(fig)

    display(Markdown("### Alerts and Cost by Service"))
    display(by_service.sort_values("cost_usd", ascending=False))

    display(Markdown("### Alerts and Cost by Team"))
    display(by_team.sort_values("cost_usd", ascending=False))

    fig, ax = plt.subplots(figsize=(10, 4))
    ax.plot(alerts_over_time["day"], alerts_over_time["alert_count"], marker="o")
    ax.set_title("Alerts Over Time (Daily)")
    ax.set_xlabel("Day")
    ax.set_ylabel("Alert Count")
    ax.tick_params(axis="x", rotation=45, ha="right")
    plt.tight_layout()
    display(fig)
    plt.close(fig)


## EXECUTIVE


In [None]:
def render_executive_summary(metrics, by_service, by_team):
    total_cost = by_service["cost_usd"].sum()
    total_alerts = metrics["total_alerts"]
    cost_per_alert = metrics["cost_per_alert"]

    top_services = by_service.sort_values("cost_usd", ascending=False).head(3)
    top_cost = top_services["cost_usd"].sum()
    cost_share = (top_cost / total_cost * 100) if total_cost else 0

    top_teams = by_team.sort_values("cost_usd", ascending=False).head(3)
    team_names = ", ".join(top_teams["team_name"].tolist()) if not top_teams.empty else "None"

    lines = [
        f"Top 3 services account for {cost_share:.1f}% of total PagerDuty attributed cost.",
        f"Total attributed PagerDuty cost for the selected period: ${total_cost:,.2f}.",
        f"Total alerts in period: {total_alerts}; cost per alert: ${cost_per_alert:,.2f}.",
        f"Top teams by attributed cost: {team_names}.",
    ]
    return "\n".join([f"- {line}" for line in lines])


_cached_range = None
_cached_df = None
_cached_metadata = None


def _reset_select_options(select_widget, options):
    select_widget.value = ("All",)
    select_widget.options = ["All"] + options


def _update_filter_options(metadata, df):
    services = sorted(set(df["service_name"].dropna().tolist()))
    teams = sorted(set(df["team_name"].dropna().tolist()))
    escalation_policies = sorted(set(df["escalation_policy_name"].dropna().tolist()))

    _reset_select_options(services_select, services)
    _reset_select_options(teams_select, teams)
    _reset_select_options(escalation_select, escalation_policies)


def run_report(_=None):
    global _cached_range, _cached_df, _cached_metadata

    with log_output:
        clear_output()
        print("Starting report...")

    since = start_picker.value
    until = end_picker.value
    if since is None or until is None:
        raise ValueError("Both start and end dates are required.")

    range_key = (since, until)
    incidents_key = f"incidents_{since.isoformat()}_{until.isoformat()}"
    metadata_key = f"metadata_services_teams_policies_{since.isoformat()}_{until.isoformat()}"
    incidents_fresh = _is_cache_fresh(_cache_path(incidents_key), CACHE_TTL_HOURS)
    metadata_fresh = _is_cache_fresh(_cache_path(metadata_key), CACHE_TTL_HOURS)

    if _cached_range != range_key or not incidents_fresh or not metadata_fresh:
        with log_output:
            print(f"Loading data for {since} to {until}...")
        try:
            _cached_df, _cached_metadata = load_data(since, until)
        except RuntimeError as exc:
            with log_output:
                print(f"Report failed: {exc}")
            return
        _cached_range = range_key
    else:
        with log_output:
            print("Using in-memory data for current date range.")

    _update_filter_options(_cached_metadata, _cached_df)

    filtered_df = apply_filters(
        _cached_df,
        teams_select.value,
        services_select.value,
        escalation_select.value,
    )

    metrics, by_service, by_team, by_escalation, alerts_over_time = compute_costs_and_aggregations(filtered_df)

    with executive_output:
        clear_output()
        display(Markdown("### Executive Summary"))
        display(Markdown(render_executive_summary(metrics, by_service, by_team)))

    with report_output:
        clear_output()
        render_charts_and_tables(by_service, by_team, by_escalation, alerts_over_time, top_n_dropdown.value)


run_button.on_click(run_report)
