## Climate Stress Testing and Portfolio Simulation

This notebook develops an integrated quantitative framework to assess climate-related transition risk at both the asset and portfolio levels. It combines scenario analysis, machine learning, stochastic simulation, and unsupervised/supervised learning techniques to translate climate pathways into financial risk metrics. The overall objective is to move beyond static carbon metrics and toward a forward-looking, distributional view of climate risk consistent with emerging academic and regulatory standards.

### Importing the required libraries

This section gathers the scientific computing, machine learning, and visualization libraries used throughout the notebook. The choice of libraries reflects a hybrid methodological approach: numerical libraries (NumPy, Pandas) for data manipulation, machine learning frameworks (scikit-learn) for prediction and clustering, and visualization tools (Matplotlib, Plotly) for exploratory and diagnostic analysis. Importantly, the stack is designed to ensure reproducibility and transparency, which are essential in academic research and regulatory-facing climate risk analysis.

In [1]:
import warnings
warnings.filterwarnings(
    "ignore",
    message="n_jobs value 1 overridden to 1 by setting random_state"
)

In [2]:
# %%
from pathlib import Path
import ipywidgets as widgets
widgets.IntProgress()

import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go

from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.impute import SimpleImputer
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, roc_auc_score, silhouette_score
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline

import umap
import hdbscan

### Setup & Loading Data

This part initializes the analytical environment and loads the core datasets used in the study. It typically includes parameter definitions (time horizons, discounting assumptions, scenario identifiers) and data ingestion steps. Conceptually, this section anchors the empirical analysis by defining the universe of companies, assets, and scenarios under consideration. From an academic perspective, this step corresponds to defining the sample and assumptions that condition all subsequent results, and therefore plays a critical role in the interpretability of the findings.

In [3]:
# Global configuration
# Set a fixed random seed for reproducibility
DEFAULT_SEED = 42
# Number of Monte Carlo simulation paths
N_MC_PATHS = 500
# Range for decarbonization targets (min, max)
DECARB_TARGET_RANGE = (0.01, 0.06)
# Different climate scenarios to model
SCENARIO_NAMES = ("orderly", "disorderly", "hothouse")

# Directory setup
DATA_DIR = Path.cwd()
# Output directory for simulation results
OUTPUT_DIR = DATA_DIR / "../datasets/output_data/portfolio_simulation"

# File paths for different datasets
DATA_FILES = {
    "low": "../datasets/output_data/stranded_assets/low_risk_opportunities.csv",
    "critical": "../datasets/output_data/stranded_assets/critical_risk_assets.csv",
    "company": "../datasets/output_data/stranded_assets/company_carbon_exposure.csv",
    "divest": "../datasets/output_data/stranded_assets/divestment_candidates.csv",
}

# Ensure output directory exists
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# Pandas display configuration
pd.set_option("display.max_columns", 50)
pd.set_option("display.width", 160)

In [4]:
def load_dataset(key: str, data_dir: Path = DATA_DIR) -> pd.DataFrame:
    """
    Load a dataset from disk based on the provided key.
    
    Args:
        key: The key corresponding to the dataset in DATA_FILES
        data_dir: Base directory containing the data files
        
    Returns:
        Loaded DataFrame
        
    Raises:
        KeyError: If the provided key doesn't exist in DATA_FILES
        FileNotFoundError: If the data file doesn't exist
    """
    try:
        filename = DATA_FILES[key]
    except KeyError as exc:
        raise KeyError(
            f"Unknown dataset key '{key}'. Available keys: {list(DATA_FILES)}"
        ) from exc

    path = data_dir / filename
    if not path.exists():
        raise FileNotFoundError(f"Expected dataset at {path}, but it was not found.")
    return pd.read_csv(path)

In [5]:
def summarize_dataframe(df: pd.DataFrame, *, name: str) -> pd.Series:
    """
    Generate a summary of key statistics for a DataFrame.
    
    Args:
        df: Input DataFrame to summarize
        name: Name to identify this dataset in the output
        
    Returns:
        Series containing summary statistics
    """
    numeric_cols = df.select_dtypes(include=[np.number]).shape[1]
    categorical_cols = df.select_dtypes(exclude=[np.number]).shape[1]
    missing_pct = float(df.isna().mean().mean()) * 100
    return pd.Series(
        {
            "rows": df.shape[0],
            "columns": df.shape[1],
            "numeric_cols": numeric_cols,
            "categorical_cols": categorical_cols,
            "avg_missing_pct": round(missing_pct, 2),
        },
        name=name,
    )

In [6]:
def preview_dataset(df: pd.DataFrame, *, name: str, n: int = 5) -> None:
    """
    Display a preview of the dataset with a formatted header.
    
    Args:
        df: DataFrame to display
        name: Name of the dataset for the display caption
        n: Number of rows to show in the preview
    """
    display(
        df.head(n).style.set_caption(f"{name} – first {n} rows"),
    )

In [7]:
def export_dataframe(df: pd.DataFrame, filename: str, *, index: bool = False) -> Path:
    """
    Save a DataFrame to disk, trying Parquet format first with CSV fallback.
    
    Args:
        df: DataFrame to save
        filename: Base filename (without extension)
        index: Whether to save the index
        
    Returns:
        Path to the saved file
    """
    OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
    parquet_path = OUTPUT_DIR / f"{filename}.parquet"
    try:
        df.to_parquet(parquet_path, index=index)
        return parquet_path
    except Exception as exc:  # noqa: BLE001
        warnings.warn(
            f"Parquet export failed ({exc}); falling back to CSV.",
            RuntimeWarning,
        )
        csv_path = OUTPUT_DIR / f"{filename}.csv"
        df.to_csv(csv_path, index=index)
        return csv_path

In [8]:
# Load all required datasets
low = load_dataset("low")  # Low risk opportunities dataset
crit = load_dataset("critical")  # Critical risk assets dataset
comp = load_dataset("company")  # Company carbon exposure data
div = load_dataset("divest")  # Divestment candidates

# Clean up column names in company data by stripping whitespace
comp.columns = [c.strip() for c in comp.columns]

In [9]:
summaries = pd.concat(
    [
        summarize_dataframe(low, name="Low-risk mines"),
        summarize_dataframe(crit, name="Critical-risk mines"),
        summarize_dataframe(comp, name="Company exposures"),
        summarize_dataframe(div, name="Divest candidates"),
    ]
, axis=1).T

display(summaries.style.set_caption("Dataset overview"))

for name, df in (
    ("Low-risk mines", low),
    ("Critical-risk mines", crit),
    ("Company exposures", comp),
    ("Divest candidates", div),
):
    preview_dataset(df, name=name, n=3)

Unnamed: 0,rows,columns,numeric_cols,categorical_cols,avg_missing_pct
Low-risk mines,28.0,8.0,4.0,4.0,8.04
Critical-risk mines,21.0,10.0,5.0,5.0,1.9
Company exposures,25.0,10.0,8.0,2.0,0.0
Divest candidates,20.0,4.0,3.0,1.0,0.0


Unnamed: 0,Mine,Country,Parent,Intensity,Capacity,Production,Cost@$100,Confidence
0,Abyz Mine,KAZ,Kazakhmys Holding LLP,0.0056,0.68007,132094466.647848,73972901.322795,very low
1,Akchi-Spassky Mine,KAZ,,0.0056,0.68007,132094466.647848,73972901.322795,very low
2,Aralchinsky Mine,KAZ,,0.0056,0.68007,132094466.647848,73972901.322795,very low


Unnamed: 0,Mine,Country,Parent,Type,Emissions,Intensity,Cost@$100,Cost@$200,Capacity,Confidence
0,Mount Isa Operation,AUS,Government of Qatar,Both,204750.0,0.0455,20475000.0,40950000.0,0.692308,high
1,El Salvador Mine,CHL,Codelco Corp,Both,216433.0,0.3005,21643300.0,43286600.0,0.001062,medium
2,Lomas Bayas Mine,CHL,Glencore PLC,Open Pit,1038655.0,0.0455,103865500.0,207731000.0,0.080379,medium


Unnamed: 0,Company,HQ Country,Mines,Total Emissions (tCO₂),Production (t),$50/t,$100/t,$150/t,$200/t,Portfolio Intensity
0,FreePort-McMoran Inc,USA,11,4858750.019972,468335534.553201,242937500.998614,485875001.997227,728812502.995841,971750003.994455,0.010375
1,Government of Iran,IRN,3,4844845.0,201868552.0,242242250.0,484484500.0,726726750.0,968969000.0,0.024
2,Qatar Investment Authority,QAT,11,2318213.001601,52153138.03519,115910650.080058,231821300.160116,347731950.240174,463642600.320233,0.04445


Unnamed: 0,Company,Assets at Risk,Exposure@$100/t,Emissions
0,Government of Iran,3,484484500.0,4844845.0
1,FreePort-McMoran Inc,6,438491501.99373,4384915.019937
2,Qatar Investment Authority,5,211210300.074502,2112103.000745


### 1. Scenario Analysis & ML-based Decarbonization Rate Estimation

This section establishes the link between macro-level climate scenarios and micro-level firm behavior. It operationalizes transition scenarios by translating them into decarbonization trajectories at the company level, combining deterministic pathways with data-driven estimation.

#### 1.1 Deterministic NGFS-style Scenario

This sub-section implements stylized climate transition scenarios inspired by NGFS (Network for Greening the Financial System) frameworks. These scenarios provide exogenous, economy-wide assumptions about emissions reduction pathways. Their deterministic nature makes them suitable as benchmarks and stress-testing baselines, enabling comparison across portfolios and studies. Academically, this aligns with scenario-based stress testing approaches widely used in climate economics and financial stability research.

In [10]:
# Define the time horizon for the simulation (2025-2040)
YEARS = list(range(2025, 2041))

def make_scenarios(
    years,
    *,
    # Orderly transition parameters
    orderly_start: float = 45,  # Starting carbon price ($/ton)
    orderly_target: float = 220,  # Target carbon price ($/ton)
    orderly_inflection: int = 6,  # Year when transition is halfway (from start)
    orderly_k: float = 0.45,  # Logistic growth rate for orderly transition
    
    # Disorderly transition parameters
    disorderly_floor: float = 38,  # Minimum price floor ($/ton)
    disorderly_pre_growth: float = 0.012,  # Annual growth rate before shock
    disorderly_shock_year: int = 6,  # Year when price shock occurs
    disorderly_shock: float = 190,  # Maximum shock level ($/ton)
    disorderly_long_run: float = 230,  # Long-term equilibrium price ($/ton)
    disorderly_rebound: float = 0.35,  # Speed of reversion to long-run price
    
    # Hothouse scenario parameters
    hothouse_baseline: float = 35,  # Initial price ($/ton)
    hothouse_growth: float = 0.018,  # Base growth rate
    hothouse_damage: float = 0.012,  # Additional growth from climate damages
    
    # Carbon intensity reduction parameters
    intensity_half_life_orderly: int = 7,  # Years to halve intensity gap in orderly
    intensity_half_life_disorderly: int = 9,  # Years to halve in disorderly
    intensity_delay_disorderly: int = 3,  # Years before intensity reduction starts in disorderly
    intensity_floor_orderly: float = 0.33,  # Minimum intensity factor (orderly)
    intensity_floor_disorderly: float = 0.42,  # Minimum intensity factor (disorderly)
    intensity_growth_hothouse: float = 0.008  # Annual intensity increase in hothouse
) -> dict[str, pd.DataFrame]:
    """
    Generate NGFS-inspired climate transition scenarios with dynamic parameters.
    
    Returns:
        Dictionary mapping scenario names to DataFrames with 'Price' and 'Intensity_factor' columns
    """
    years = list(years)
    t = np.arange(len(years), dtype=float)  # Time array for calculations

    def logistic_transition(start: float, target: float, k: float, midpoint: float) -> np.ndarray:
        """Generate S-curve transition using logistic function."""
        return start + (target - start) / (1 + np.exp(-k * (t - midpoint)))

    def mean_reverting_shock(
        baseline: np.ndarray,
        shock_year: int,
        shock_level: float,
        long_run: float,
        rebound: float
    ) -> np.ndarray:
        """
        Generate a shock pattern with mean reversion.
        
        Args:
            baseline: Array of baseline values
            shock_year: When the shock occurs (index in array)
            shock_level: Peak shock value
            long_run: Long-term equilibrium value
            rebound: Speed of reversion to long-run value (0-1)
        """
        path = baseline.copy()
        for idx in range(len(path)):
            if idx < shock_year:
                continue
            if idx == shock_year:
                path[idx] = shock_level
                continue
            previous = path[idx - 1]
            path[idx] = previous + rebound * (long_run - previous)
        return path

    def policy_decay(
        *,
        floor: float,
        half_life: float,
        delay: int = 0,
        overshoot: float = 0.0
    ) -> np.ndarray:
        """
        Generate exponential decay with configurable floor and delay.
        
        Used for modeling policy-driven intensity reductions.
        """
        ramp = np.maximum(0, t - delay)
        rate = np.log(2) / half_life  # Convert half-life to decay rate
        decay = (1 - floor) * np.exp(-rate * ramp)
        if overshoot:
            decay *= 1 - overshoot * np.exp(-rate * ramp)
        return floor + decay

    # Generate price paths for each scenario
    # 1. Orderly transition: Smooth S-curve transition
    price_orderly = logistic_transition(orderly_start, orderly_target, 
                                      orderly_k, orderly_inflection)

    # 2. Disorderly transition: Initial shock followed by mean reversion
    baseline_disorderly = disorderly_floor * (1 + disorderly_pre_growth) ** t
    price_disorderly = mean_reverting_shock(
        baseline_disorderly,
        shock_year=disorderly_shock_year,
        shock_level=disorderly_shock,
        long_run=disorderly_long_run,
        rebound=disorderly_rebound,
    )

    # 3. Hothouse scenario: Accelerating price growth due to climate damages
    normalized_time = t / max(t)  # 0 to 1 over simulation period
    damage_multiplier = 1 + hothouse_damage * normalized_time ** 1.5  # Accelerating damage
    price_hothouse = hothouse_baseline * (1 + hothouse_growth * damage_multiplier) ** t

    # Calculate intensity reduction factors for each scenario
    factor_orderly = policy_decay(
        floor=intensity_floor_orderly,
        half_life=intensity_half_life_orderly,
        delay=0,
        overshoot=0.05,  # Slight overshoot before stabilizing
    )
    
    factor_disorderly = policy_decay(
        floor=intensity_floor_disorderly,
        half_life=intensity_half_life_disorderly,
        delay=intensity_delay_disorderly,  # Delayed policy response
        overshoot=0.02,
    )
    
    # In hothouse scenario, intensity actually increases due to lack of policy
    factor_hothouse = 1 + intensity_growth_hothouse * normalized_time ** 1.5

    # Package results into DataFrames
    idx = pd.Index(years, name="Year")
    return {
        "orderly": pd.DataFrame(
            {"Price": price_orderly, "Intensity_factor": factor_orderly}, 
            index=idx
        ),
        "disorderly": pd.DataFrame(
            {"Price": price_disorderly, "Intensity_factor": factor_disorderly}, 
            index=idx
        ),
        "hothouse": pd.DataFrame(
            {"Price": price_hothouse, "Intensity_factor": factor_hothouse}, 
            index=idx
        ),
    }

# Generate scenarios using default parameters
scenarios = make_scenarios(YEARS)

In [11]:
scenario_frames = []
for scen_name, scen_df in scenarios.items():
    scenario_frames.append(scen_df.reset_index().assign(Scenario=scen_name))

scenarios_long = (
    pd.concat(scenario_frames, ignore_index=True)
    .melt(id_vars=["Year", "Scenario"], var_name="Metric", value_name="Value")
)

fig = px.line(
    scenarios_long,
    x="Year",
    y="Value",
    color="Scenario",
    facet_row="Metric",
    title="NGFS-style scenario paths: price vs. intensity factor",
    height=600,
)
fig.update_traces(mode="lines+markers")
fig.update_yaxes(matches=None)
fig.update_xaxes(title="")
fig.update_yaxes(title="")
fig.update_layout(margin=dict(l=40, r=40, t=60, b=40))
fig.show()

#### 1.2 ML proxy for Company Decarbonization Rates

Here, machine learning is employed to estimate company-specific decarbonization rates based on observable firm characteristics. This approach relaxes the assumption of homogeneous behavior within sectors and introduces heterogeneity consistent with empirical evidence. By using an ML proxy, the model captures nonlinear relationships and interaction effects that are difficult to specify parametrically, thereby enhancing realism while maintaining scalability.

In [12]:
# Calculate emissions intensity (emissions per unit of production)
comp["emissions_intensity"] = comp["Total Emissions (tCO₂)"] / comp["Production (t)"]

# Get the range of portfolio intensity values for normalization
intensity_min, intensity_max = comp["Portfolio Intensity"].agg(["min", "max"])

# Normalize portfolio intensity to [0, 1] range
nrm_intensity = (comp["Portfolio Intensity"] - intensity_min) / (intensity_max - intensity_min)

# Set target decarbonization rate range from global config
low_target, high_target = DECARB_TARGET_RANGE

# Calculate target decarbonization rate for each company
# Higher intensity companies get higher target rates within the specified range
comp["target_decarb_rate"] = low_target + (high_target - low_target) * nrm_intensity

# Create a machine learning pipeline for predicting decarbonization rates
# 1. Standardize features (zero mean, unit variance)
# 2. Apply linear regression to predict decarbonization rates
reg_pipeline = Pipeline(
    steps=[
        ("scaler", StandardScaler()),
        ("model", LinearRegression()),
    ]
)

# Prepare features (X) and target (y) for the model
# Features: Portfolio Intensity and emissions intensity
X_int = comp[["Portfolio Intensity", "emissions_intensity"]]
# Target: Pre-calculated target decarbonization rates
y_target = comp["target_decarb_rate"]

# Train the model
reg_pipeline.fit(X_int, y_target)

# Predict decarbonization rates using the trained model
# Clip predictions to ensure they stay within the defined target range
comp["decarb_rate_ml"] = reg_pipeline.predict(X_int).clip(*DECARB_TARGET_RANGE)

# Display the first few rows of company data with their portfolio intensity and ML-predicted decarbonization rates
display(comp[["Company", "Portfolio Intensity", "decarb_rate_ml"]].head())

Unnamed: 0,Company,Portfolio Intensity,decarb_rate_ml
0,FreePort-McMoran Inc,0.010375,0.016875
1,Government of Iran,0.024,0.028885
2,Qatar Investment Authority,0.04445,0.046911
3,Kazakhmys Holding LLP,0.0056,0.012666
4,The Vanguard Group Inc,0.004528,0.011721


#### 1.3 Company-level Trajectories per Scenario

This sub-section combines scenario-level constraints with firm-level ML estimates to generate emissions and intensity trajectories for each company under each scenario. The resulting trajectories form the core forward-looking climate state variables used later in the portfolio and risk analysis. From a methodological standpoint, this step bridges top-down scenario narratives with bottom-up financial modeling.

In [13]:
def simulate_company_scenario(row, scen_name, scen_df, years):
    """
    Simulate emissions and costs for a company under a specific climate scenario.
    
    Args:
        row: Company data row containing emissions and decarbonization rate
        scen_name: Name of the climate scenario (e.g., 'orderly', 'disorderly', 'hothouse')
        scen_df: DataFrame with scenario parameters (Price and Intensity_factor) by year
        years: List of years to simulate
        
    Returns:
        List of dictionaries containing yearly simulation results
    """
    base_e = row["Total Emissions (tCO₂)"]  # Base year emissions
    r = row["decarb_rate_ml"]  # Annual decarbonization rate from ML model
    out = []
    
    # Simulate each year in the forecast period
    for i, y in enumerate(years):
        # Get scenario parameters for the current year
        p = scen_df.loc[y, "Price"]  # Carbon price ($/tCO₂)
        f_scen = scen_df.loc[y, "Intensity_factor"]  # Scenario intensity factor
        
        # Calculate cumulative decarbonization factor
        f_co = (1 - r) ** i  # Compounded annual reduction
        
        # Calculate emissions for current year
        e_t = base_e * f_scen * f_co  # Apply both scenario and company-specific reductions
        
        # Calculate carbon cost for current year
        c_t = e_t * p  # Total cost = emissions * price per ton
        
        # Store results
        out.append({
            "Company": row["Company"],
            "Year": y,
            "Scenario": scen_name,
            "Price": p,
            "Emissions_t": e_t,
            "Cost_t": c_t
        })
    return out

paths_all = []
for scen_name, scen_df in scenarios.items():
    for _, r in comp.iterrows():
        paths_all.extend(simulate_company_scenario(r, scen_name, scen_df, YEARS))

paths_df = pd.DataFrame(paths_all)
paths_df.head()

Unnamed: 0,Company,Year,Scenario,Price,Emissions_t,Cost_t
0,FreePort-McMoran Inc,2025,orderly,56.020337,4695982.0,263070500.0
1,FreePort-McMoran Inc,2026,orderly,61.686156,4343763.0,267950000.0
2,FreePort-McMoran Inc,2027,orderly,69.823936,4024981.0,281040000.0
3,FreePort-McMoran Inc,2028,orderly,81.027315,3736528.0,302760800.0
4,FreePort-McMoran Inc,2029,orderly,95.583837,3475533.0,332204800.0


In [14]:
top_companies = (
    comp.sort_values("Total Emissions (tCO₂)", ascending=False)
        .head(5)["Company"].tolist()
)

paths_plot = paths_df[paths_df["Company"].isin(top_companies)].copy()
paths_plot["Cost_mUSD"] = paths_plot["Cost_t"] / 1_000_000

fig = px.line(
    paths_plot,
    x="Year",
    y="Cost_mUSD",
    color="Company",
    facet_col="Scenario",
    facet_col_wrap=1,
    title="Cost trajectories (top 5 emitters) across scenarios"
)
fig.update_traces(mode="lines+markers")
fig.update_yaxes(matches=None)
fig.for_each_yaxis(lambda axis: axis.update(title=""))
fig.add_annotation(
    text="<b>Cost (million USD)</b>",
    x=-0.08,
    xref="paper",
    y=0.5,
    yref="paper",
    textangle=-90,
    showarrow=False,
    font=dict(size=12),
)
fig.show()

### 2. Portfolio Definition & Trajectories

This section shifts the analysis from individual companies to portfolios, reflecting the perspective of an investor or financial institution. It formalizes how asset-level trajectories aggregate into portfolio-level outcomes.

#### 2.1 Portfolio Weights Strategy

This sub-section defines the portfolio construction rule, including weighting schemes (e.g., equal-weighted, value-weighted, or strategy-driven allocations). The choice of weights is not neutral: it embeds implicit preferences and constraints that materially affect climate exposure. Academically, this highlights how portfolio design interacts with transition risk and can either amplify or mitigate climate-related vulnerabilities.

In [15]:
# Different approaches to allocate capital across companies based on various criteria

# 1. Equal Weighting: Simple diversification strategy
#    - Each company gets the same weight
#    - Maximizes diversification, ignores company characteristics
comp["w_equal"] = 1.0 / len(comp)

# 2. Production-Weighted: Allocate based on production volume
#    - Larger producers get proportionally more weight
#    - Represents market share in terms of production capacity
comp["w_prod"] = comp["Production (t)"] / comp["Production (t)"].sum()

# 3. Carbon-Intensity Tilt: Underweight high carbon intensity companies
#    - Uses inverse of portfolio intensity (lower intensity = higher weight)
#    - Replaces zeros with median to avoid division by zero
#    - Normalizes to sum to 1 for valid portfolio weights
inv_intensity = 1 / comp["Portfolio Intensity"].replace(0, np.nan)
inv_intensity = inv_intensity.fillna(inv_intensity.median())
comp["w_carbon_tilt"] = inv_intensity / inv_intensity.sum()

MC_WEIGHT_STRATEGIES = {
    "Equal": "w_equal",
    "Production": "w_prod",
    "Carbon Tilt": "w_carbon_tilt",
}

comp[["Company", "w_equal", "w_prod", "w_carbon_tilt"]].head()

Unnamed: 0,Company,w_equal,w_prod,w_carbon_tilt
0,FreePort-McMoran Inc,0.04,0.162674,0.032709
1,Government of Iran,0.04,0.070118,0.014139
2,Qatar Investment Authority,0.04,0.018115,0.007634
3,Kazakhmys Holding LLP,0.04,0.140316,0.060597
4,The Vanguard Group Inc,0.04,0.160419,0.074948


#### 2.2 Portfolio Trajectories

Here, company-level trajectories are aggregated using the defined weights to produce portfolio-level emissions, intensity, and cost paths. This aggregation step is critical for translating micro-level dynamics into metrics that are meaningful for portfolio management, reporting, and regulatory stress testing.

In [16]:
def compute_portfolio_trajectory(paths_df: pd.DataFrame, weights_col: str, comp_df: pd.DataFrame) -> pd.DataFrame:
    """
    Aggregate company-level climate trajectories into a single portfolio-level view.
    
    This function combines individual company emissions and cost projections with
    portfolio weights to create an aggregated view of portfolio performance under
    different climate scenarios.
    
    Parameters:
    -----------
    paths_df : pd.DataFrame
        DataFrame containing company-level simulation results with columns:
        - Company: Company identifier
        - Year: Simulation year
        - Scenario: Climate scenario name
        - Cost_t: Company's carbon cost for the year
        - Emissions_t: Company's emissions for the year
    
    weights_col : str
        Name of the column in comp_df containing the portfolio weights.
        Weights should sum to 1 for proper portfolio aggregation.
    
    comp_df : pd.DataFrame
        Company metadata DataFrame containing at least:
        - Company: Company identifier (must match paths_df)
        - [weights_col]: The portfolio weights to apply
    
    Returns:
    --------
    pd.DataFrame
        Aggregated portfolio metrics with columns:
        - Scenario: Climate scenario name
        - Year: Simulation year
        - Portfolio_Cost: Weighted sum of company carbon costs
        - Portfolio_Emissions: Weighted sum of company emissions
    
    Raises:
    -------
    ValueError
        If any companies in paths_df are missing from the weights, or if the
        weights column is not found in comp_df.
    """
    # Extract weights and index by company for efficient lookup
    weights = (
        comp_df
        .set_index("Company")[weights_col]
        .rename("weight")
    )
    # Merge weights with simulation paths
    merged = paths_df.merge(weights, on="Company", how="left")
    
    # Validate that all companies have weights
    if "weight" not in merged.columns:
        missing = set(paths_df["Company"]).difference(weights.index)
        raise ValueError(
            "Column 'weight' missing after merge. "
            f"Missing companies in weights: {sorted(missing)}"
        )
    # Calculate weighted metrics
    merged["Cost_weighted"] = merged["Cost_t"] * merged["weight"]
    merged["Emissions_weighted"] = merged["Emissions_t"] * merged["weight"]
    # Aggregate to portfolio level by scenario and year
    portfolio = (
        merged
        .groupby(["Scenario", "Year"], as_index=False)[
            ["Cost_weighted", "Emissions_weighted"]
        ]
        .sum()
        .rename(
            columns={
                "Cost_weighted": "Portfolio_Cost",
                "Emissions_weighted": "Portfolio_Emissions",
            }
        )
    )
    return portfolio

In [17]:
strategy_frames = []
for strategy_name, weight_col in MC_WEIGHT_STRATEGIES.items():
    port = compute_portfolio_trajectory(paths_df, weight_col, comp)
    port["Strategy"] = strategy_name
    strategy_frames.append(port)

port_all = pd.concat(strategy_frames, ignore_index=True)

fig = px.line(
    port_all,
    x="Year",
    y="Portfolio_Cost",
    color="Strategy",
    facet_col="Scenario",
    facet_col_wrap=1,
    title="Portfolio cost trajectories by strategy & scenario"
)
fig.update_traces(mode="lines+markers")
fig.update_yaxes(matches=None)
fig.for_each_yaxis(lambda axis: axis.update(title=""))
fig.add_annotation(
    text="<b>Portfolio cost</b>",
    x=-0.08,
    xref="paper",
    y=0.5,
    yref="paper",
    textangle=-90,
    showarrow=False,
    font=dict(size=12),
)
fig.show()

fig = px.line(
    port_all,
    x="Year",
    y="Portfolio_Emissions",
    color="Strategy",
    facet_col="Scenario",
    facet_col_wrap=1,
    title="Portfolio emissions trajectories by strategy & scenario"
)
fig.update_traces(mode="lines+markers")
fig.update_yaxes(matches=None)
fig.for_each_yaxis(lambda axis: axis.update(title=""))
fig.add_annotation(
    text="<b>Portfolio emissions</b>",
    x=-0.08,
    xref="paper",
    y=0.5,
    yref="paper",
    textangle=-90,
    showarrow=False,
    font=dict(size=12),
)
fig.show()

port_summary = (
    port_all.loc[port_all["Year"].isin([2030, 2040])]
    .pivot_table(
        index=["Scenario", "Strategy"],
        columns="Year",
        values=["Portfolio_Cost", "Portfolio_Emissions"],
    )
)
port_summary.columns = [f"{metric}_{year}" for metric, year in port_summary.columns]
port_summary = port_summary.reset_index()

format_cols = port_summary.select_dtypes(include=[np.number]).columns
style = port_summary.style.format({col: "{0:,.0f}" for col in format_cols})
style = style.set_caption("Key portfolio metrics by strategy")
display(style)

Unnamed: 0,Scenario,Strategy,Portfolio_Cost_2030,Portfolio_Cost_2040,Portfolio_Emissions_2030,Portfolio_Emissions_2040
0,disorderly,Carbon Tilt,34904654,120633176,865361,526388
1,disorderly,Equal,38752106,125710597,960747,548544
2,disorderly,Production,74477771,252982874,1846463,1103902
3,hothouse,Carbon Tilt,36504916,37551426,953797,818389
4,hothouse,Equal,40528760,39131956,1058931,852834
5,hothouse,Production,77892328,78750041,2035162,1716264
6,orderly,Carbon Tilt,78214633,84566555,691320,389702
7,orderly,Equal,86836033,88125941,767522,406104
8,orderly,Production,166890395,177346654,1475103,817253


### 3. Monte Carlo Simulation & Climate VaR

This section introduces stochasticity to account for uncertainty in prices, decarbonization paths, and other model components. It moves the analysis from point estimates to full distributions.

#### 3.1 Price Randomization & Decabonization Paths

This sub-section specifies the stochastic processes used to randomize prices and/or decarbonization trajectories. By allowing for correlated shocks and variability around baseline paths, the model acknowledges that transition outcomes are uncertain and path-dependent. This is consistent with modern risk theory, which emphasizes distributions rather than deterministic forecasts.

In [18]:
def sample_price_intensity_paths(
    years: list[int],
    base_scen_df: pd.DataFrame,
    *,
    rng: np.random.Generator,
    price_vol: float = 0.25,
    intensity_vol: float = 0.15,
    corr: float = 0.3,
    intensity_bounds: tuple[float, float] = (0.6, 1.4),
    draws: np.ndarray | None = None,
) -> tuple[pd.Series, np.ndarray, np.ndarray]:
    """
    Generate correlated random paths for carbon prices and intensity factors.
    
    This function creates Monte Carlo simulations of potential future paths for 
    carbon prices and intensity factors, accounting for correlation between them.
    The paths are generated around a base scenario with log-normal shocks.
    
    Parameters:
    -----------
    years : list[int]
        List of years to simulate
    base_scen_df : pd.DataFrame
        Base scenario DataFrame with 'Price' and 'Intensity_factor' columns
    rng : np.random.Generator
        Numpy random number generator for reproducible results
    price_vol : float, default=0.25
        Annual volatility of carbon prices (standard deviation of log returns)
    intensity_vol : float, default=0.15
        Annual volatility of intensity factors
    corr : float, default=0.3
        Correlation between price and intensity shocks
    intensity_bounds : tuple[float, float], default=(0.6, 1.4)
        Min and max bounds for intensity factor multipliers
    draws : np.ndarray, optional
        Pre-computed random draws (for testing or reproducibility)
        
    Returns:
    --------
    tuple containing:
        - price_path: pd.Series of simulated carbon prices
        - intensity_path: np.ndarray of simulated intensity factors
        - draws: The random draws used for the simulation
        
    Notes:
    ------
    - Price paths follow geometric Brownian motion (lognormal returns)
    - Intensity paths are mean-reverting within specified bounds
    - Shocks are correlated to model real-world dependencies
    - All paths start from the base scenario values
    """
    # Define mean and covariance matrix for correlated normal random variables
    mean = np.zeros(2)  # Zero mean for the normal distribution
    cov = np.array([
        [price_vol ** 2, corr * price_vol * intensity_vol],
        [corr * price_vol * intensity_vol, intensity_vol ** 2],
    ])
    # Generate or use provided random draws
    if draws is None:
        draws = rng.multivariate_normal(mean, cov, size=len(years))
    else:
        draws = np.asarray(draws)
    # Extract base scenario values
    price_base = base_scen_df["Price"].values
    intensity_base = base_scen_df["Intensity_factor"].values
    # Generate price path with log-normal shocks
    price_shock = np.exp(draws[:, 0] - 0.5 * price_vol ** 2)  # Adjust for lognormal mean
    price_path = price_base * price_shock  # Apply shocks to base prices
    # Generate intensity path with bounded shocks
    intensity_noise = np.clip(1 + draws[:, 1], *intensity_bounds)  # Keep within bounds
    intensity_path = intensity_base * intensity_noise  # Apply shocks to base intensities
    return (
        pd.Series(price_path, index=years, name="Price"),
        intensity_path,
        draws,
    )

#### 3.2 Monte Carlo Simulation at Portfolio Level

Here, repeated simulations are run to generate a distribution of portfolio outcomes under climate transition uncertainty. Monte Carlo methods are particularly well-suited for this task, as they naturally accommodate nonlinearities and complex dependencies between variables.

In [19]:
def simulate_portfolio_mc(
    comp_df: pd.DataFrame,
    base_scen_df: pd.DataFrame,
    years: list[int],
    weights_col: str,
    *,
    n_paths: int = N_MC_PATHS,
    seed: int | None = DEFAULT_SEED,
    price_vol: float = 0.25,
    intensity_vol: float = 0.15,
    price_intensity_corr: float = 0.3,
    intensity_bounds: tuple[float, float] = (0.6, 1.4),
    antithetic: bool = True,
    control_variate: bool = True,
    strategy_name: str = "Equal"
    ) -> pd.DataFrame:
    """
    Perform Monte Carlo simulation of portfolio carbon costs under uncertainty.
    
    This function simulates multiple potential future paths for portfolio carbon costs
    by modeling random variations in carbon prices and intensity factors, accounting for
    company-specific decarbonization rates and portfolio weights.
    
    Parameters:
    -----------
    comp_df : pd.DataFrame
        Company data with weights, emissions, and decarbonization rates
    base_scen_df : pd.DataFrame
        Base scenario with 'Price' and 'Intensity_factor' columns
    years : list[int]
        List of years to simulate
    weights_col : str
        Column name in comp_df containing portfolio weights
    n_paths : int, default=N_MC_PATHS
        Number of Monte Carlo paths to simulate
    seed : int, optional
        Random seed for reproducibility
    price_vol : float, default=0.25
        Volatility of carbon prices
    intensity_vol : float, default=0.15
        Volatility of intensity factors
    price_intensity_corr : float, default=0.3
        Correlation between price and intensity shocks
    intensity_bounds : tuple, default=(0.6, 1.4)
        Bounds for intensity factor multipliers
    antithetic : bool, default=True
        Whether to use antithetic variates for variance reduction
    control_variate : bool, default=True
        Whether to use control variates for variance reduction
    strategy_name : str, default="Equal"
        Name of the portfolio strategy being simulated
        
    Returns:
    --------
    pd.DataFrame
        Simulation results with columns:
        - MC_id: Path identifier
        - Year: Simulation year
        - Scenario: Base scenario name
        - Strategy: Portfolio strategy name
        - Portfolio_Cost: Simulated carbon cost
        - Price: Simulated carbon price
        - Intensity_factor: Simulated intensity factor
        - Portfolio_Cost_CV: Control variate adjusted cost (if control_variate=True)
    """
    # Initialize random number generator
    rng = np.random.default_rng(seed)
    results = []
    # Aggregate company data and calculate portfolio weights
    grouped = (
        comp_df
        .groupby("Company")
        .agg(
            weight=(weights_col, "sum"),
            emissions=("Total Emissions (tCO₂)", "sum"),
            decarb=("decarb_rate_ml", "mean"),
        )
    )
    # Input validation
    if grouped.empty:
        raise ValueError("Input dataframe produced no company aggregates for simulation.")
    # Normalize weights to sum to 1
    grouped["weight"] = grouped["weight"].fillna(0)
    weight_total = grouped["weight"].sum()
    if weight_total <= 0:
        raise ValueError("Sum of weights is zero; cannot simulate portfolio.")
    grouped["weight"] = grouped["weight"] / weight_total
    # Clean and validate data
    grouped = grouped.dropna(subset=["emissions", "decarb"])
    if grouped.empty:
        raise ValueError("No companies with emissions and decarb data after cleaning.")
    # Convert to numpy arrays for faster computation
    weights_arr = grouped["weight"].values
    base_emissions_arr = grouped["emissions"].values
    decarb_arr = grouped["decarb"].values
    # Get base scenario values
    base_prices = base_scen_df.loc[years, "Price"].values
    base_factor = base_scen_df.loc[years, "Intensity_factor"].values
    # Calculate price sensitivity for control variates
    price_sensitivity = np.zeros(len(years))
    for step in range(len(years)):
        decay = (1 - decarb_arr) ** step
        emissions_year = base_emissions_arr * base_factor[step] * decay
        price_sensitivity[step] = np.sum(emissions_year * weights_arr)
    # Define covariance matrix for correlated random variables
    cov = np.array([
        [price_vol ** 2, price_intensity_corr * price_vol * intensity_vol],
        [price_intensity_corr * price_vol * intensity_vol, intensity_vol ** 2],
    ])
    # Adjust number of paths if using antithetic variates
    total_paths = n_paths if not antithetic else int(np.ceil(n_paths / 2))
    # Main simulation loop
    for pair_id in range(total_paths):
        # Generate random draws for price and intensity
        draws = rng.multivariate_normal(np.zeros(2), cov, size=len(years))
        # Generate antithetic paths if enabled
        scenarios_draws = [draws]
        if antithetic:
            scenarios_draws.append(-draws)
        # Process each path in the current pair
        for local_id, current_draws in enumerate(scenarios_draws):
            path_id = pair_id * (2 if antithetic else 1) + local_id
            if path_id >= n_paths:
                break
            # Generate price and intensity paths
            price_path, intensity_path, _ = sample_price_intensity_paths(
                years,
                base_scen_df,
                rng=rng,
                price_vol=price_vol,
                intensity_vol=intensity_vol,
                corr=price_intensity_corr,
                intensity_bounds=intensity_bounds,
                draws=current_draws,
            )
            # Calculate portfolio metrics for each year
            for step, year in enumerate(years):
                price_year = price_path.iloc[step]
                factor_year = intensity_path[step]
                # Calculate emissions considering decarbonization
                decay = (1 - decarb_arr) ** step
                emissions_year = base_emissions_arr * factor_year * decay
                cost_year = np.sum(emissions_year * weights_arr) * price_year
                # Store results
                record = {
                    "MC_id": path_id,
                    "Year": year,
                    "Scenario": base_scen_df.name,
                    "Strategy": strategy_name,
                    "Portfolio_Cost": cost_year,
                    "Price": price_year,
                    "Intensity_factor": factor_year,
                }
                # Apply control variate adjustment if enabled
                if control_variate:
                    record["Portfolio_Cost_CV"] = (
                        cost_year
                        - price_sensitivity[step] * (price_year - base_prices[step])
                    )
                results.append(record)
                
    return pd.DataFrame(results)


mc_experiments = []
for strategy_name, weight_col in MC_WEIGHT_STRATEGIES.items():
    for scen_name, scen_df in scenarios.items():
        scen_df = scen_df.copy()
        scen_df.name = scen_name
        mc_sim = simulate_portfolio_mc(
            comp,
            scen_df,
            YEARS,
            weight_col,
            price_vol=0.2,
            intensity_vol=0.1,
            price_intensity_corr=0.25,
            n_paths=N_MC_PATHS,
            strategy_name=strategy_name,
        )
        mc_experiments.append(mc_sim)

mc_results = pd.concat(mc_experiments, ignore_index=True)
mc_results.head()

Unnamed: 0,MC_id,Year,Scenario,Strategy,Portfolio_Cost,Price,Intensity_factor,Portfolio_Cost_CV
0,0,2025,orderly,Equal,53806050.0,52.505599,0.861863,57845120.0
1,0,2026,orderly,Equal,57681970.0,51.31434,0.968199,68632230.0
2,0,2027,orderly,Equal,94111550.0,103.030673,0.80559,61840730.0
3,0,2028,orderly,Equal,67361690.0,77.800627,0.78175,70253940.0
4,0,2029,orderly,Equal,72584950.0,95.244972,0.704318,72865710.0


#### 3.3 Climate VaR at Horizon `T`

This sub-section derives a Climate Value-at-Risk (Climate VaR) metric at a specified horizon. Climate VaR summarizes downside transition risk in a single, interpretable statistic, analogous to traditional financial VaR but grounded in climate-specific drivers. Academically, it represents a key link between climate economics and financial risk management.

In [20]:
# Simulation horizon year for the analysis
horizon = 2040
# Confidence level for risk metrics (e.g., 95% VaR or CVaR)
alpha = 0.95

cost_column = (
    "Portfolio_Cost_CV"
    if "Portfolio_Cost_CV" in mc_results.columns
    else "Portfolio_Cost"
)

var_summary = []
for (scen_name, strategy), group in mc_results.groupby(["Scenario", "Strategy"]):
    horizon_slice = group[group["Year"] == horizon]
    horizon_costs = horizon_slice[cost_column].values
    raw_costs = horizon_slice["Portfolio_Cost"].values

    if len(horizon_costs) == 0:
        continue

    VaR = np.quantile(horizon_costs, alpha)
    CVaR = horizon_costs[horizon_costs >= VaR].mean()

    var_summary.append({
        "Scenario": scen_name,
        "Strategy": strategy,
        "Cost_metric": cost_column,
        "VaR_95": VaR,
        "CVaR_95": CVaR,
        "Mean": horizon_costs.mean(),
        "Std": horizon_costs.std(),
        "Raw_Mean": raw_costs.mean(),
        "Raw_Std": raw_costs.std(),
    })

var_df = (
    pd.DataFrame(var_summary)
    .sort_values(["Scenario", "Strategy"])
    .reset_index(drop=True)
)

format_cols = var_df.select_dtypes(include=[np.number]).columns
style = var_df.style.format({col: "{0:,.0f}" for col in format_cols})
style = style.set_caption(
    f"Portfolio risk metrics at Year {horizon} (alpha={alpha:.2f}) using {cost_column}"
)
display(style)

Unnamed: 0,Scenario,Strategy,Cost_metric,VaR_95,CVaR_95,Mean,Std,Raw_Mean,Raw_Std
0,disorderly,Carbon Tilt,Portfolio_Cost_CV,144480247,150771519,121389443,12582585,121647283,32088950
1,disorderly,Equal,Portfolio_Cost_CV,150588578,157145831,126521541,13114551,126790282,33445605
2,disorderly,Production,Portfolio_Cost_CV,303075698,316272875,254638200,26394443,255179070,67312876
3,hothouse,Carbon Tilt,Portfolio_Cost_CV,44974686,46933071,37786841,3916783,37867103,9988843
4,hothouse,Equal,Portfolio_Cost_CV,46876124,48917306,39384392,4082377,39468047,10411151
5,hothouse,Production,Portfolio_Cost_CV,94343239,98451336,79265321,8216222,79433686,20953560
6,orderly,Carbon Tilt,Portfolio_Cost_CV,101283886,105694208,85096715,8820673,85277467,22495072
7,orderly,Equal,Portfolio_Cost_CV,105565962,110162743,88694431,9193594,88882824,23446118
8,orderly,Production,Portfolio_Cost_CV,212462844,221714361,178507075,18503095,178886237,47187832


In [21]:
def convergence_profile(costs: np.ndarray, *, alpha: float, min_samples: int = 50, step: int = 50) -> pd.DataFrame:
    """
    Compute convergence profile for VaR and CVaR estimates.
    
    Parameters:
    -----------
    costs : np.ndarray
        Array of cost samples
    alpha : float
        Confidence level for VaR (e.g., 0.05 for 95% VaR)
    min_samples : int
        Minimum number of samples to start analysis
    step : int
        Step size for increasing sample size
        
    Returns:
    --------
    pd.DataFrame
        DataFrame with convergence metrics
    """
    metrics = []
    for n in range(min_samples, len(costs) + 1, step):
        sample = costs[:n]
        var = np.quantile(sample, alpha)
        cvar = sample[sample >= var].mean()
        metrics.append({"Samples": n, "VaR": var, "CVaR": cvar})
    return pd.DataFrame(metrics)


horizon_df = mc_results[mc_results["Year"] == horizon].copy()
conv_frames = []
for (scen_name, strategy), group in horizon_df.groupby(["Scenario", "Strategy"]):
    costs = group.sort_values("MC_id")[cost_column].values
    conv = convergence_profile(costs, alpha=alpha)
    conv["Scenario"] = scen_name
    conv["Strategy"] = strategy
    conv_frames.append(conv)

conv_df = pd.concat(conv_frames, ignore_index=True)
conv_long = conv_df.melt(
    id_vars=["Samples", "Scenario", "Strategy"],
    value_vars=["VaR", "CVaR"],
    var_name="Metric",
    value_name="Value",
)

fig = px.line(
    conv_long,
    x="Samples",
    y="Value",
    color="Strategy",
    facet_row="Metric",
    facet_col="Scenario",
    title=f"Convergence of VaR/CVaR estimates at horizon {horizon}",
    height=600,
)
fig.update_traces(mode="lines+markers")
fig.update_yaxes(matches=None)
fig.update_xaxes(title="")
fig.update_yaxes(title="")
fig.show()

***Convergence of Climate VaR and CVaR Estimates at the 2040 Horizon***

- This figure illustrates the convergence behavior of Climate Value-at-Risk (VaR) and Conditional Value-at-Risk (CVaR) estimates at the 2040 horizon as a function of the number of Monte Carlo simulations. Results are reported across climate scenarios (Disorderly, Hothouse, Orderly) and portfolio strategies (Carbon Tilt, Equal-weighted, Production-weighted), thereby providing a comprehensive view of both numerical stability and risk heterogeneity.

- The upper row reports VaR estimates, while the lower row presents CVaR estimates, which capture the expected loss in the tail beyond the VaR threshold. Across all scenarios and strategies, VaR and CVaR curves stabilize rapidly as the number of simulations increases, indicating satisfactory convergence of the Monte Carlo estimator. Beyond approximately a few hundred simulations, incremental changes in both risk measures become marginal, suggesting that the simulation budget is sufficient to obtain robust tail-risk estimates at the chosen confidence level.

- Several structural patterns are consistent across scenarios. The Production-weighted strategy exhibits the highest VaR and CVaR levels, reflecting its systematic exposure to emission-intensive assets and, consequently, elevated transition cost risk. In contrast, the Carbon Tilt strategy consistently produces the lowest tail-risk estimates, confirming that carbon-aware portfolio construction reduces not only expected transition costs but also extreme downside outcomes. The Equal-weighted strategy lies between these two extremes, serving as a useful benchmark.

- Differences across scenarios are also evident but comparatively muted at the 2040 horizon. While the Disorderly and Orderly scenarios display slightly higher tail-risk levels than the Hothouse scenario, the ranking of portfolio strategies remains stable. This stability indicates that portfolio construction effects dominate scenario effects in shaping relative tail-risk exposure at this horizon, even though absolute risk levels remain scenario-dependent.

- The close alignment between VaR and CVaR trajectories across increasing simulation counts further suggests that the loss distributions are not excessively unstable in the extreme tails, reinforcing confidence in the reliability of the estimated Climate VaR metrics. From a methodological standpoint, this convergence analysis provides an essential validation step, demonstrating that the reported climate risk measures are not artifacts of Monte Carlo noise but reflect underlying structural risk characteristics.

#### 3.4 Portfolio Distribution Analysis

This part analyzes and visualizes the simulated portfolio outcome distributions. Beyond summary statistics, distributional analysis reveals asymmetries, tail risks, and scenario-dependent features that are central to understanding climate-related financial risk.

In [22]:
fig = px.histogram(
    mc_results[mc_results["Year"] == horizon],
    x=cost_column,
    color="Strategy",
    facet_col="Scenario",
    nbins=100,
    marginal="box",
    title=f"Distribution of portfolio cost at {horizon} under correlated MC paths",
)

fig.update_xaxes(matches=None)
fig.for_each_xaxis(lambda axis: axis.update(title=""))
fig.update_layout(
    title_x=0.5,
    margin=dict(b=120, t=100),  # Increased top margin to 100
    title_y=0.92,  # Adjusted title position
    title_xanchor='center',
    title_yanchor='top',
    height=500  # Added fixed height for consistency
)
fig.add_annotation(
    x=0.5,
    y=-0.15,
    xref="paper",
    yref="paper",
    text="<b>Portfolio cost</b>",
    showarrow=False,
    font=dict(size=13),
    xanchor="center",
)
fig.show()

agg_stats = (
    mc_results
    .groupby(["Scenario", "Strategy", "Year"])["Portfolio_Cost"]
    .agg(["mean", "std"])
    .reset_index()
)

fig = px.line(
    agg_stats,
    x="Year",
    y="mean",
    color="Strategy",
    facet_col="Scenario",
    error_y="std",
    title="MC expected portfolio cost with ±1 std band",
)
fig.update_traces(mode="lines+markers")
fig.update_yaxes(matches=None)
fig.for_each_xaxis(lambda axis: axis.update(title=""))
fig.update_layout(
    title_x=0.5,
    margin=dict(b=80, t=100),  # Increased top margin to 100
    title_y=0.92,  # Adjusted title position
    height=500  # Added fixed height for consistency
)
fig.add_annotation(
    x=0.5,
    y=-0.12,
    xref="paper",
    yref="paper",
    text="<b>Year</b>",
    showarrow=False,
    font=dict(size=13),
    xanchor="center",
)
fig.show()

***Distribution of Portfolio Cost at 2040 under Correlated Monte Carlo Paths***

- The upper panel presents the simulated distribution of total portfolio transition costs in 2040 under correlated Monte Carlo paths, disaggregated by climate scenario (Orderly, Disorderly, Hothouse) and portfolio strategy (Equal, Production-weighted, Carbon Tilt). The histograms and associated boxplots provide a distributional view of transition risk rather than a single-point estimate.

- Several key patterns emerge. First, across all scenarios, the Production-weighted strategy exhibits the highest expected costs and the widest dispersion. This reflects its structural exposure to high-emission and high-output assets, which amplifies sensitivity to carbon prices and decarbonization constraints. The pronounced right tails observed in the Orderly and especially Disorderly scenarios indicate substantial downside risk, with extreme outcomes driven by delayed or abrupt policy adjustments.

- Second, the Carbon Tilt strategy consistently shifts the distribution leftward relative to the Equal-weighted portfolio, indicating lower expected transition costs and reduced tail risk. This effect is particularly visible in the Orderly scenario, where early and gradual policy action allows carbon-tilted allocations to benefit from smoother adjustment dynamics. The narrowing of the distribution suggests that carbon-aware allocation not only lowers mean costs but also reduces uncertainty.

- Finally, the Hothouse scenario displays markedly lower absolute cost levels across all strategies. However, this apparent reduction should not be interpreted as lower economic risk overall; rather, it reflects the absence of strong transition policies, which postpones transition costs at the expense of higher long-term physical risk not captured in this framework. The tighter distributions in this scenario emphasize that transition risk is scenario-dependent and highly sensitive to policy assumptions.

***Monte Carlo Expected Portfolio Cost with ±1 Standard Deviation Band***

- The lower panel complements the static 2040 distributions by showing the dynamic evolution of expected portfolio costs over time, together with ±1 standard deviation bands. This time-resolved view highlights both the timing and uncertainty of transition impacts.

- In the Disorderly scenario, a sharp increase in expected costs occurs around 2030 across all strategies, corresponding to a delayed but abrupt policy tightening. The widening uncertainty bands during this period indicate heightened volatility and path dependency, as portfolios are exposed to sudden repricing and accelerated decarbonization requirements. The Carbon Tilt strategy remains consistently below the Equal and Production strategies, demonstrating greater resilience to transition shocks.

- The Orderly scenario exhibits a smoother, hump-shaped cost profile, with costs gradually increasing before stabilizing or declining toward the horizon. This reflects early policy action that allows firms and portfolios to adjust progressively. Notably, uncertainty increases during the transition phase but remains more contained than in the disorderly case, underscoring the stabilizing role of predictable climate policy.

- In contrast, the Hothouse scenario shows relatively flat expected costs over time, with persistent but moderate uncertainty bands. While transition costs remain low and stable, this trajectory implicitly assumes limited mitigation efforts, highlighting a trade-off between near-term transition risk and unmodeled long-term physical risk.

In [23]:
# Save Monte Carlo results
output_path = export_dataframe(mc_results, "mc_results", index=False)
print(f"Saved Monte Carlo results to {output_path.as_posix()}")

Saved Monte Carlo results to c:/Users/kerri/OneDrive/Documents/Travail/AIDAMS/ESSEC Y3/Research & Emerging Topics/Project/Research-Emerging-Topics-Project/notebooks/../datasets/output_data/portfolio_simulation/mc_results.parquet


### 4. Clustering Analysis (UMAP + HDBSCAN)

This section applies unsupervised learning to uncover latent structure in the data. The objective is to identify groups of assets or companies with similar climate-risk profiles without imposing ex-ante classifications.

#### 4.1 Mines CLustering

This sub-section focuses on clustering at the asset or mine level, using dimensionality reduction (UMAP) combined with density-based clustering (HDBSCAN). This approach is well-suited for high-dimensional, nonlinear data and allows for the identification of outliers and heterogeneous risk clusters.

In [24]:
def prepare_mines_dataframe(low_df: pd.DataFrame, critical_df: pd.DataFrame) -> pd.DataFrame:
    """
    Combine and preprocess mine data with risk labels and derived metrics.
    
    This function merges low-risk and critical-risk mine datasets, adds risk labels,
    calculates emissions for low-risk mines, and computes capital efficiency metrics.
    
    Parameters:
    -----------
    low_df : pd.DataFrame
        DataFrame containing low-risk mine data with columns including 'Cost@$100'
    critical_df : pd.DataFrame
        DataFrame containing critical-risk mine data with similar structure to low_df
        
    Returns:
    --------
    pd.DataFrame
        Combined DataFrame with additional columns:
        - risk_label: 'low' or 'critical' indicating risk category
        - Emissions: Estimated emissions (derived from cost for low-risk mines)
        - Capex_per_tonne: Capital expenditure per tonne of capacity
        
    Notes:
    ------
    - For low-risk mines, emissions are estimated as Cost@$100 / 100
    - Capex_per_tonne is calculated as Cost@$100 / Capacity, with division by zero handled
    - Missing Capex_per_tonne values are filled with the median
    """
    # Create copies of input dataframes and add risk labels
    low_mines = low_df.copy()
    low_mines["risk_label"] = "low"

    crit_mines = critical_df.copy()
    crit_mines["risk_label"] = "critical"

    # Combine both datasets
    mines = pd.concat([low_mines, crit_mines], ignore_index=True)

    # Initialize Emissions column if it doesn't exist
    if "Emissions" not in mines.columns:
        mines["Emissions"] = np.nan

    # Calculate emissions for low-risk mines using cost data
    mask_low = mines["risk_label"] == "low"
    mines.loc[mask_low, "Emissions"] = (
        mines.loc[mask_low, "Cost@$100"] / 100.0
    )

    # Calculate capital efficiency metric (Capex per tonne of capacity)
    # Handle division by zero by replacing 0 with NaN, then fill missing with median
    mines["Capex_per_tonne"] = mines["Cost@$100"] / mines["Capacity"].replace(0, np.nan)
    mines["Capex_per_tonne"] = mines["Capex_per_tonne"].fillna(mines["Capex_per_tonne"].median())

    return mines

In [25]:
def run_umap_hdbscan(
    data: pd.DataFrame,
    *,
    feature_cols: list[str],
    n_neighbors: int = 15,
    min_dist: float = 0.1,
    min_cluster_size: int = 10,
    random_state: int = DEFAULT_SEED,
) -> tuple[pd.DataFrame, dict[str, float], hdbscan.HDBSCAN]:
    """
    Perform UMAP dimensionality reduction followed by HDBSCAN clustering.
    
    Reduces high-dimensional feature space to 2D for visualization while
    preserving local structure, then applies density-based clustering.
    
    Parameters:
    -----------
    data : pd.DataFrame
        Input dataset containing features to cluster
    feature_cols : list[str]
        Column names to use as clustering features
    n_neighbors : int, default=15
        UMAP parameter controlling local neighborhood size
    min_dist : float, default=0.1
        UMAP parameter controlling minimum distance between points in embedding
    min_cluster_size : int, default=10
        HDBSCAN parameter for minimum cluster membership
    random_state : int
        Random seed for reproducibility
        
    Returns:
    --------
    tuple containing:
        - enriched: Original data with added UMAP coordinates (umap1, umap2) and cluster labels
        - diagnostics: Dictionary with silhouette score, cluster count, and noise fraction
        - clusterer: The fitted HDBSCAN clusterer object
    """
    # Extract and prepare features for dimensionality reduction
    features = data[feature_cols].copy()
    # Standardize features to zero mean and unit variance
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(features)
    # Perform UMAP dimensionality reduction to 2D space
    reducer = umap.UMAP(
        n_neighbors=n_neighbors,
        min_dist=min_dist,
        n_components=2,
        random_state=random_state,
    )
    umap_embedding = reducer.fit_transform(X_scaled)
    # Apply density-based clustering on the UMAP embedding
    clusterer = hdbscan.HDBSCAN(
        min_cluster_size=min_cluster_size,
        metric="euclidean",
        cluster_selection_method="eom",
        prediction_data=True  # Enable prediction for new data points
    )
    labels = clusterer.fit_predict(umap_embedding)
    # Attach embedding coordinates and cluster assignments to original data
    enriched = data.copy()
    enriched["umap1"] = umap_embedding[:, 0]
    enriched["umap2"] = umap_embedding[:, 1]
    enriched["cluster"] = labels
    # Calculate clustering quality metrics
    mask_core = labels >= 0  # Mask for points in clusters (exclude noise points labeled -1)
    diagnostics = {}
    
    # Silhouette score measures how well-separated clusters are
    if mask_core.sum() > 1 and np.unique(labels[mask_core]).size > 1:
        diagnostics["silhouette"] = float(silhouette_score(
            umap_embedding[mask_core], labels[mask_core]
        ))
    else:
        diagnostics["silhouette"] = np.nan
    # Count of distinct clusters (excluding noise points)
    diagnostics["n_clusters"] = int(np.unique(labels[labels >= 0]).size)
    
    # Fraction of points classified as noise (-1 label)
    diagnostics["noise_fraction"] = float((labels == -1).mean())
    return enriched, diagnostics, clusterer

In [26]:
def add_cluster_labels(df: pd.DataFrame) -> pd.DataFrame:
    """
    Convert numeric cluster assignments to categorical labels with "Noise" designation.
    
    This function transforms cluster IDs into string labels for visualization and analysis,
    treating points with label -1 (noise points from HDBSCAN) as a separate "Noise" category.
    Cluster labels are ordered with numbered clusters first (in sorted order), followed by "Noise".
    
    Parameters:
    -----------
    df : pd.DataFrame
        Input DataFrame with a "cluster" column containing integer cluster IDs
        (where -1 represents noise points)
    
    Returns:
    --------
    pd.DataFrame
        DataFrame with added "cluster_label" column as an ordered categorical with string values
    """
    labeled = df.copy()
    labeled["cluster_label"] = np.where(
        labeled["cluster"] >= 0,
        labeled["cluster"].astype(int).astype(str),
        "Noise",
    )
    labeled["cluster_label"] = pd.Categorical(
        labeled["cluster_label"],
        categories=sorted({c for c in labeled["cluster_label"] if c != "Noise"}) + ["Noise"],
        ordered=False,
    )
    return labeled

In [27]:
mines = prepare_mines_dataframe(low, crit)

mine_feature_cols = [
    "Emissions",
    "Intensity",
    "Capacity",
    "Cost@$100",
    "Capex_per_tonne",
]

mines_embedded, mines_diag, k = run_umap_hdbscan(
    mines,
    feature_cols=mine_feature_cols,
    n_neighbors=18,
    min_dist=0.05,
    min_cluster_size=12,
    random_state=DEFAULT_SEED,
)
mines_embedded = add_cluster_labels(mines_embedded)

print("Mine clustering diagnostics:")
for key, value in mines_diag.items():
    if isinstance(value, float):
        print(f"  {key}: {value:.3f}")
    else:
        print(f"  {key}: {value}")

sil_m = mines_diag["silhouette"]

Mine clustering diagnostics:
  silhouette: 0.975
  n_clusters: 2
  noise_fraction: 0.000


In [28]:
fig = px.scatter(
    mines_embedded,
    x="umap1",
    y="umap2",
    color="cluster_label",
    symbol="risk_label",  # low vs critical
    hover_data={
        "Mine": True,          # existing name column
        "Country": True,
        "Parent": True,
        "risk_label": True,
        "cluster_label": True,
        "Emissions": ":.2f",
        "Intensity": ":.2f",
        "Capacity": ":.2f",
        "Cost@$100": ":.0f",
        "Capex_per_tonne": ":.0f",
    },
    color_discrete_sequence=px.colors.qualitative.Set2,
)

fig.update_layout(
    title=f"Mine clusters in UMAP space (silhouette={sil_m:.3f})",
    legend_title="Cluster / Risk",
    template="plotly_white",
)

fig.show()

#### 4.2 Company Clustering & Bootstrap Stability

Here, clustering is extended to the company level, and bootstrap techniques are used to assess the stability and robustness of the identified clusters. Stability analysis is crucial in academic settings, as it provides evidence that the results are not artifacts of sampling noise or specific parameter choices.

In [29]:
def prepare_company_dataset(companies_df: pd.DataFrame, div_df: pd.DataFrame) -> tuple[pd.DataFrame, list[str]]:
    """
    Create company-level feature matrix augmented with divestment metrics.
    
    This function merges company data with aggregated divestment candidate metrics,
    derives key financial and emissions ratios, handles missing values, and identifies
    features for downstream clustering and classification tasks.
    
    Parameters:
    -----------
    companies_df : pd.DataFrame
        Company-level data with columns including:
        - Total Emissions (tCO₂), Production (t), Portfolio Intensity
        - Price exposure columns ($50/t, $100/t, $150/t, $200/t)
    div_df : pd.DataFrame
        Divestment candidates DataFrame with columns:
        - Company, Assets at Risk, Exposure@$100/t, Emissions
    
    Returns:
    --------
    tuple containing:
        - dataset: Enhanced DataFrame with original features, divestment metrics, and derived ratios
        - feature_cols: List of feature column names for clustering/ML pipelines
    
    Notes:
    ------
    - Missing divestment metrics are filled with zeros (assuming no divestment exposure)
    - Derived ratios handle division-by-zero by replacing zeros with NaN before computing
    - Infinite values are replaced with NaN
    - All remaining missing values are imputed using the median of each column
    """
    div_agg = div_df.groupby("Company", as_index=False).agg({
        "Assets at Risk": "sum",
        "Exposure@$100/t": "sum",
        "Emissions": "sum",
    })

    dataset = companies_df.merge(div_agg, on="Company", how="left", suffixes=("", "_div"))
    for col in ["Assets at Risk", "Exposure@$100/t", "Emissions"]:
        dataset[col] = dataset[col].fillna(0)

    dataset["emissions_per_production"] = (
        dataset["Total Emissions (tCO₂)"] / dataset["Production (t)"].replace(0, np.nan)
    )
    dataset["capex_density"] = (
        dataset["Assets at Risk"] / dataset["Production (t)"].replace(0, np.nan)
    )
    dataset["exposure_ratio"] = (
        dataset["Exposure@$100/t"] / dataset["Assets at Risk"].replace(0, np.nan)
    )

    dataset = dataset.replace([np.inf, -np.inf], np.nan)

    fill_values = dataset.median(numeric_only=True)
    dataset = dataset.fillna(fill_values)

    feature_cols = [
        "Total Emissions (tCO₂)",
        "Production (t)",
        "$50/t", "$100/t", "$150/t", "$200/t",
        "Portfolio Intensity",
        "Assets at Risk",
        "Exposure@$100/t",
        "Emissions",
        "emissions_per_production",
        "capex_density",
        "exposure_ratio",
    ]
    return dataset, feature_cols

In [30]:
# Prepare company data and features
company_data, company_feature_cols = prepare_company_dataset(comp, div)

# Define clustering parameters
company_cluster_params = dict(
    n_neighbors=14,
    min_dist=0.08,
    min_cluster_size=6,
    random_state=DEFAULT_SEED,
)

# Initial clustering attempt
companies_embedded, company_diag, company_clusterer = run_umap_hdbscan(
    company_data,
    feature_cols=company_feature_cols,
    **company_cluster_params,
)

# Relax parameters if no clusters found
if company_diag["n_clusters"] == 0:
    company_cluster_params.update({
        "n_neighbors": max(6, company_cluster_params["n_neighbors"] - 6),
        "min_dist": min(0.2, company_cluster_params["min_dist"] * 1.5),
        "min_cluster_size": max(3, company_cluster_params["min_cluster_size"] // 2),
        "random_state": company_cluster_params["random_state"] + 97,
    })
    companies_embedded, company_diag, company_clusterer = run_umap_hdbscan(
        company_data,
        feature_cols=company_feature_cols,
        **company_cluster_params,
    )

company_cluster_params_final = dict(company_cluster_params)

# Display diagnostics
print("Company clustering diagnostics:")
for key, value in company_diag.items():
    if isinstance(value, float):
        print(f"  {key}: {value:.3f}")
    else:
        print(f"  {key}: {value}")

# Extract clustering results
companies_embedded = add_cluster_labels(companies_embedded)
company_embedding = companies_embedded[["umap1", "umap2"]].values
company_labels = companies_embedded["cluster"].values

if hasattr(company_clusterer, 'probabilities_'):
    companies_embedded["cluster_confidence"] = company_clusterer.probabilities_


sil_c = company_diag["silhouette"]
labels_base = company_labels.copy()
X_comp_umap = company_embedding
X_comp_scaled = StandardScaler().fit_transform(company_data[company_feature_cols])

Company clustering diagnostics:
  silhouette: 0.639
  n_clusters: 2
  noise_fraction: 0.120


In [31]:
def bootstrap_cluster_stability(
    X_scaled: np.ndarray,
    base_labels: np.ndarray,
    cluster_fn,
    *,
    n_boot: int,
    random_state: int,
    min_cluster_size: int,
) -> tuple[np.ndarray, pd.DataFrame]:
    """
    Estimate point-wise cluster stability via bootstrapping.

    Repeatedly resamples the data with replacement, reclusters each bootstrap
    sample, and tracks how often each point is assigned to the same cluster
    as in the base clustering. Also returns a dataframe describing how
    bootstrap clusters map back to base clusters, along with basic diagnostics
    such as the silhouette score.

    Parameters
    ----------
    X_scaled : np.ndarray
        Preprocessed data of shape (n_samples, n_features).
    base_labels : np.ndarray
        Cluster labels from the base clustering; noise or unassigned points
        should have negative labels (e.g. -1).
    cluster_fn : callable
        Function taking (X_boot, random_state=...) and returning a dict with:
        - "labels": np.ndarray of cluster labels
        - "embedding": np.ndarray low-dimensional embedding for diagnostics
    n_boot : int
        Number of bootstrap iterations.
    random_state : int
        Seed for the bootstrap sampler and clustering.
    min_cluster_size : int
        Currently unused placeholder for downstream filtering logic.

    Returns
    -------
    stability_scores : np.ndarray
        Array of shape (n_samples,) with, for each point, the fraction of
        bootstrap runs in which it was reassigned to its original cluster.
    transitions_df : pd.DataFrame
        Per-bootstrap cluster mapping with columns:
        ['bootstrap_id', 'boot_cluster', 'base_cluster',
         'members', 'matching_members', 'silhouette'].
    """

    def compute_cluster_diagnostics(embedding, labels):
        """Compute clustering diagnostics including silhouette score.
        
        Parameters:
        -----------
        embedding : np.ndarray
            The low-dimensional embedding of the data
        labels : np.ndarray
            Cluster labels for each point in the embedding
            
        Returns:
        --------
        dict
            Dictionary containing clustering metrics
        """
        if len(np.unique(labels[labels >= 0])) > 1:  # Need at least 2 clusters
            return {
                "silhouette": float(silhouette_score(embedding, labels))
            }
        return {"silhouette": np.nan}
        
    rng = np.random.default_rng(random_state)
    n = len(base_labels)
    stability_counts = np.zeros(n, dtype=int)
    transition_records: list[dict[str, object]] = []
    for b in range(n_boot):
        idx_boot = rng.choice(np.arange(n), size=n, replace=True)
        X_boot = X_scaled[idx_boot]
        result = cluster_fn(X_boot, random_state=random_state + b + 1)
        labels_boot = result["labels"]
        embedding_boot = result["embedding"]
        mask_core_boot = labels_boot >= 0
        if mask_core_boot.sum() > 1:
            boot_diag = compute_cluster_diagnostics(
                embedding_boot[mask_core_boot],
                labels_boot[mask_core_boot],
            )
        else:
            boot_diag = {"silhouette": np.nan}
        unique_boot_clusters = np.unique(labels_boot[labels_boot >= 0])
        bootstrap_cluster_map: dict[int, tuple[int, int]] = {}
        for c_boot in unique_boot_clusters:
            mask_cluster_boot = labels_boot == c_boot
            original_indices = idx_boot[mask_cluster_boot]
            base_cluster_labels = base_labels[original_indices]
            base_valid = base_cluster_labels[base_cluster_labels >= 0]
            if base_valid.size == 0:
                continue
            base_vals, counts = np.unique(base_valid, return_counts=True)
            best_base_cluster = base_vals[np.argmax(counts)]
            transition_records.append({
                "bootstrap_id": b,
                "boot_cluster": int(c_boot),
                "base_cluster": int(best_base_cluster),
                "members": int(mask_cluster_boot.sum()),
                "matching_members": int(counts.max()),
                "silhouette": boot_diag.get("silhouette", np.nan),
            })
            bootstrap_cluster_map[c_boot] = (best_base_cluster, counts.max())
            mask_matching = base_labels[original_indices] == best_base_cluster
            stability_counts[original_indices[mask_matching]] += 1
    stability_scores = stability_counts / n_boot
    transitions_df = pd.DataFrame(transition_records)
    return stability_scores, transitions_df

In [32]:
def company_cluster_fn(X, random_state):
    """
    Cluster points in a 2D embedding using HDBSCAN.

    Parameters
    ----------
    X : np.ndarray
        2D embedding of shape (n_samples, 2), e.g. UMAP output.
    random_state : int
        Unused placeholder for API compatibility.

    Returns
    -------
    dict
        Dictionary with:
        - "labels": cluster labels from HDBSCAN
        - "embedding": the input embedding X
    """
    # X is already the 2D UMAP embedding, so use it directly as the input space
    embedding = X

    # Apply HDBSCAN directly on the 2D embedding
    clusterer = hdbscan.HDBSCAN(
        min_cluster_size=company_cluster_params_final["min_cluster_size"],
        metric="euclidean",
        cluster_selection_method="eom",
    )
    labels = clusterer.fit_predict(embedding)

    # Return the format expected by bootstrap_cluster_stability
    return {
        "labels": labels,
        "embedding": embedding,  # Return the embedding used for clustering
    }

In [33]:
# Run stability analysis
n_boot = 60
stability_scores, transitions_df = bootstrap_cluster_stability(
    X_comp_scaled,
    labels_base,
    company_cluster_fn,
    n_boot=n_boot,
    random_state=DEFAULT_SEED,
    min_cluster_size=company_cluster_params_final["min_cluster_size"],
)

# Update stability scores in the DataFrame
companies_embedded["cluster_stability"] = stability_scores

# Display a sample of the bootstrap transitions
print("Bootstrap transitions sample:")
display(transitions_df.head())

# Summarize stability metrics per base cluster
stability_summary = (
    transitions_df.groupby("base_cluster", as_index=False)["matching_members"].agg([
        ("avg_matching", "mean"),
        ("max_matching", "max"),
        ("boot_occurrences", "count"),
    ])
)
stability_summary.columns = ["base_cluster", "avg_matching", "max_matching", "boot_occurrences"]
display(stability_summary)

# Show the first rows of the data with stability scores
companies_embedded.head()

Bootstrap transitions sample:


Unnamed: 0,bootstrap_id,boot_cluster,base_cluster,members,matching_members,silhouette
0,0,0,0,4,2,0.636523
1,0,1,1,7,7,0.636523
2,0,2,1,4,2,0.636523
3,1,0,1,4,3,0.595043
4,1,1,1,17,12,0.595043


Unnamed: 0,base_cluster,avg_matching,max_matching,boot_occurrences
0,0,4.153846,12,65
1,1,5.706522,12,92


Unnamed: 0,Company,HQ Country,Mines,Total Emissions (tCO₂),Production (t),$50/t,$100/t,$150/t,$200/t,Portfolio Intensity,emissions_intensity,target_decarb_rate,decarb_rate_ml,w_equal,w_prod,w_carbon_tilt,Assets at Risk,Exposure@$100/t,Emissions,emissions_per_production,capex_density,exposure_ratio,umap1,umap2,cluster,cluster_label,cluster_confidence,cluster_stability
0,FreePort-McMoran Inc,USA,11,4858750.0,468335500.0,242937500.0,485875000.0,728812500.0,971750000.0,0.010375,0.010375,0.016875,0.016875,0.04,0.162674,0.032709,6.0,438491500.0,4384915.0,0.010375,1.281133e-08,73081920.0,-5.717703,-6.973085,0,0,1.0,0.15
1,Government of Iran,IRN,3,4844845.0,201868600.0,242242200.0,484484500.0,726726700.0,968969000.0,0.024,0.024,0.028885,0.028885,0.04,0.070118,0.014139,3.0,484484500.0,4844845.0,0.024,1.486116e-08,161494800.0,-5.289029,-6.271599,0,0,0.903357,0.15
2,Qatar Investment Authority,QAT,11,2318213.0,52153140.0,115910700.0,231821300.0,347732000.0,463642600.0,0.04445,0.04445,0.046911,0.046911,0.04,0.018115,0.007634,5.0,211210300.0,2112103.0,0.04445,9.587151e-08,42242060.0,-5.133463,-6.846146,0,0,1.0,0.183333
3,Kazakhmys Holding LLP,KAZ,4,2262216.0,403967100.0,113110800.0,226221600.0,339332400.0,452443200.0,0.0056,0.0056,0.012666,0.012666,0.04,0.140316,0.060597,0.0,0.0,0.0,0.0056,0.0,42242060.0,-5.880744,-7.594727,0,0,0.802535,0.233333
4,The Vanguard Group Inc,USA,14,2091091.0,461844800.0,104554600.0,209109100.0,313663700.0,418218200.0,0.004528,0.004528,0.011721,0.011721,0.04,0.160419,0.074948,3.0,113411100.0,1134111.0,0.004528,6.495689e-09,37803700.0,-6.259082,-6.072741,0,0,0.92534,0.216667


In [34]:
mask_clusters = companies_embedded["cluster"] >= 0
cluster_summary_source = companies_embedded.loc[mask_clusters, [
    "cluster_label",
    "Company",
    "cluster_confidence",
    "cluster_stability",
    "Portfolio Intensity",
    "Total Emissions (tCO₂)",
]].copy()
cluster_summary_source["cluster_label"] = cluster_summary_source["cluster_label"].astype(str)

cluster_stats = (
    cluster_summary_source
    .groupby(["cluster_label"], observed=False)
    .agg(
        n_companies=("Company", "size"),
        avg_confidence=("cluster_confidence", "mean"),
        avg_stability=("cluster_stability", "mean"),
        intensity_mean=("Portfolio Intensity", "mean"),
        emissions_mean=("Total Emissions (tCO₂)", "mean"),
    )
    .reset_index()
)
cluster_stats = cluster_stats[cluster_stats["n_companies"] > 0]
cluster_stats["avg_confidence"] = cluster_stats["avg_confidence"].round(2)
cluster_stats["avg_stability"] = cluster_stats["avg_stability"].round(2)
cluster_stats["intensity_mean"] = cluster_stats["intensity_mean"].round(3)
cluster_stats["emissions_mean"] = cluster_stats["emissions_mean"].round(0)
display(
    cluster_stats.style.set_caption("Company cluster summary")
)

noise_mask = companies_embedded["cluster"] < 0
noise_stats = companies_embedded.loc[noise_mask, [
    "cluster_confidence",
    "cluster_stability",
    "Portfolio Intensity",
    "Total Emissions (tCO₂)",
]]
if not noise_stats.empty:
    noise_summary = pd.DataFrame({
        "category": ["Noise"],
        "n_companies": [len(noise_stats)],
        "avg_confidence": [noise_stats["cluster_confidence"].mean()],
        "avg_stability": [noise_stats["cluster_stability"].mean()],
        "intensity_mean": [noise_stats["Portfolio Intensity"].mean()],
        "emissions_mean": [noise_stats["Total Emissions (tCO₂)"].mean()],
    })
    noise_summary[["avg_confidence", "avg_stability"]] = noise_summary[["avg_confidence", "avg_stability"]].round(2)
    noise_summary[["intensity_mean"]] = noise_summary[["intensity_mean"]].round(3)
    noise_summary[["emissions_mean"]] = noise_summary[["emissions_mean"]].round(0)
    display(noise_summary.style.set_caption("Noise summary"))

companies_embedded["cluster_type"] = np.where(companies_embedded["cluster"] >= 0, "Cluster", "Noise")
companies_embedded["marker_size"] = np.where(
    companies_embedded["cluster"] >= 0,
    np.clip(companies_embedded["cluster_stability"], 0.2, 1.0),
    0.5,
)

Unnamed: 0,cluster_label,n_companies,avg_confidence,avg_stability,intensity_mean,emissions_mean
0,0,11,0.87,0.2,0.013,2064120.0
1,1,11,0.84,0.48,0.023,325999.0


Unnamed: 0,category,n_companies,avg_confidence,avg_stability,intensity_mean,emissions_mean
0,Noise,3,0.0,0.0,0.022,911278.0


In [None]:
cluster_categories = list(companies_embedded["cluster_label"].cat.categories)
color_map = {}
palette = px.colors.qualitative.Set2
idx_palette = 0
for category in cluster_categories:
    if category == "Noise":
        color_map[category] = "#545454"
    else:
        color_map[category] = palette[idx_palette % len(palette)]
        idx_palette += 1

fig = px.scatter(
    companies_embedded,
    x="umap1",
    y="umap2",
    color="cluster_label",
    color_discrete_map=color_map,
    size="marker_size",
    size_max=16,
    symbol="cluster_type",
    category_orders={
        "cluster_label": cluster_categories,
        "cluster_type": ["Cluster", "Noise"],
    },
    hover_name="Company",
    hover_data={
        "cluster_label": True,
        "cluster_type": True,
        "cluster_confidence": ":.2f",
        "cluster_stability": ":.2f",
        "Portfolio Intensity": ":.2f",
        "Total Emissions (tCO₂)": ":,.0f",
        "Assets at Risk": ":,.0f",
    },
    title=(
        "Companies – UMAP + HDBSCAN "
        f"(silhouette={sil_c:.2f}, clusters={company_diag['n_clusters']}, "
        f"noise={company_diag['noise_fraction']:.0%}, bootstrap n={n_boot})"
    ),
)
fig.update_layout(
    height=600,
    margin=dict(l=40, r=40, t=80, b=120),
    legend=dict(orientation="h", yanchor="bottom", y=-0.25, xanchor="center", x=0.5),
    legend_title_text="",
)
fig.update_traces(marker=dict(line=dict(color="black", width=1)))
fig.show()

### 5. Risk Label & Supervised Learning

This final section transitions from exploratory analysis to predictive modeling, enabling classification and potential decision support.

#### 5.1 Risk Label from Cost & Intensity

This sub-section defines a climate risk label based on cost and emissions intensity metrics. The construction of the label reflects normative choices about what constitutes “high” or “low” climate risk and provides a bridge between continuous risk measures and categorical decision frameworks.

In [36]:
# Build cost-by-horizon features
horizons = [2030, 2040]
max_horizon = max(horizons)

horizon_labels = {year: f"Cost_cum_{year}" for year in horizons}

cost_cum = (
    paths_df.loc[paths_df["Year"] <= max_horizon, ["Company", "Scenario", "Year", "Cost_t"]]
    .sort_values(["Company", "Scenario", "Year"])
    .assign(Cost_cum=lambda df: df.groupby(["Company", "Scenario"])["Cost_t"].cumsum())
)

cost_pivot = (
    cost_cum[cost_cum["Year"].isin(horizons)]
    .assign(horizon=lambda df: df["Year"].map(horizon_labels))
    .pivot_table(
        index="Company",
        columns=["horizon", "Scenario"],
        values="Cost_cum",
        aggfunc="last",
        fill_value=0,
    )
    .reset_index()
)

In [37]:
def flatten_column(col):
    if isinstance(col, str):
        return col
    pieces = [str(part) for part in col if part not in (None, "")]
    if len(pieces) == 1 and pieces[0] == "Company":
        return "Company"
    return "_".join(pieces)

In [38]:
cost_pivot.columns = [flatten_column(col) for col in cost_pivot.columns]
ordered_cols = ["Company"] + sorted(c for c in cost_pivot.columns if c != "Company")
cost_pivot = cost_pivot[ordered_cols]

label_df = comp[["Company", "Portfolio Intensity"]].merge(cost_pivot, on="Company", how="left")

cost_cols = [c for c in label_df.columns if c.startswith("Cost_cum_")]
label_df[cost_cols] = label_df[cost_cols].fillna(0.0)

scaler_cost = MinMaxScaler()
label_df[cost_cols] = scaler_cost.fit_transform(label_df[cost_cols])

label_df["score_composite"] = label_df[cost_cols].mean(axis=1) * label_df["Portfolio Intensity"]
thr = label_df["score_composite"].quantile(0.8)
label_df["high_risk_refined"] = (label_df["score_composite"] >= thr).astype(int)

label_df[["Company", "score_composite", "high_risk_refined"]].head()

Unnamed: 0,Company,score_composite,high_risk_refined
0,FreePort-McMoran Inc,0.010375,1
1,Government of Iran,0.022378,1
2,Qatar Investment Authority,0.017336,1
3,Kazakhmys Holding LLP,0.002585,0
4,The Vanguard Group Inc,0.001933,0


#### 5.2 RandomForest & UMAP

Here, supervised learning (Random Forest) is used to predict climate risk labels, while UMAP supports visualization and interpretability. This combination allows for both predictive accuracy and intuitive understanding of the underlying feature space, aligning with best practices in applied machine learning research.

In [47]:
# Company-level classification of refined climate risk labels
cost_total = (
    paths_df.groupby("Company", as_index=False)["Cost_t"]
    .sum()
    .rename(columns={"Cost_t": "Cost_total_all_scen"})
)

comp_ml = (
    companies_embedded
    .merge(cost_total, on="Company", how="left")
    .merge(
        label_df[["Company", "score_composite", "high_risk_refined"]],
        on="Company",
        how="left",
    )
)

feature_cols = [
    "Total Emissions (tCO₂)",
    "Production (t)",
    "$50/t", "$100/t", "$150/t", "$200/t",
    #"Portfolio Intensity",
    #"Assets at Risk",
    "Exposure@$100/t",
    #"Emissions",
    "cluster_stability",
    #"score_composite",
]

missing_features = [col for col in feature_cols if col not in comp_ml.columns]
if missing_features:
    raise KeyError(f"Missing required feature columns: {missing_features}")

X = comp_ml[feature_cols]
y = comp_ml["high_risk_refined"].fillna(0).astype(int)

if y.nunique() < 2:
    raise ValueError("Target 'high_risk_refined' must contain at least two classes.")

X_train, X_test, y_train, y_test = train_test_split(
    X,
    y,
    test_size=0.3,
    random_state=DEFAULT_SEED,
    stratify=y,
)

rf_pipeline = Pipeline(
    steps=[
        ("imputer", SimpleImputer(strategy="median")),
        (
            "model",
            RandomForestClassifier(
                n_estimators=400,
                max_depth=None,
                random_state=DEFAULT_SEED,
                class_weight="balanced",
                n_jobs=-1,
            ),
        ),
    ]
)

rf_pipeline.fit(X_train, y_train)

y_pred = rf_pipeline.predict(X_test)
prob_model = rf_pipeline.named_steps["model"]
y_proba = rf_pipeline.predict_proba(X_test)[:, 1]

report = classification_report(y_test, y_pred, output_dict=True)
report_df = pd.DataFrame(report).T.round(3)
print("Classification report (RandomForestClassifier):")
display(report_df)

roc_auc = roc_auc_score(y_test, y_proba) if y_test.nunique() > 1 else np.nan
print(f"ROC-AUC: {roc_auc:.3f}" if not np.isnan(roc_auc) else "ROC-AUC: undefined (single class in test split)")

feature_importances = (
    pd.Series(prob_model.feature_importances_, index=feature_cols, name="importance")
    .sort_values(ascending=False)
)
display(feature_importances.to_frame())

Classification report (RandomForestClassifier):


Unnamed: 0,precision,recall,f1-score,support
0,0.875,1.0,0.933,7.0
1,1.0,0.5,0.667,2.0
accuracy,0.889,0.889,0.889,0.889
macro avg,0.938,0.75,0.8,9.0
weighted avg,0.903,0.889,0.874,9.0


ROC-AUC: 0.929


Unnamed: 0,importance
cluster_stability,0.263638
$50/t,0.1396
$150/t,0.11645
Total Emissions (tCO₂),0.110885
$100/t,0.103839
$200/t,0.100568
Production (t),0.084794
Exposure@$100/t,0.080226


In [48]:
y_proba_all = rf_pipeline.predict_proba(X)[:, 1]
y_pred_all = rf_pipeline.predict(X)

comp_ml["pred_high_risk"] = y_pred_all
comp_ml["pred_score"] = y_proba_all

viz_df = companies_embedded[["Company", "umap1", "umap2", "cluster"]].merge(
    comp_ml[["Company", "high_risk_refined", "pred_high_risk", "pred_score"]],
    on="Company", how="left"
)

fig = px.scatter(
    viz_df,
    x="umap1", y="umap2",
    color="pred_high_risk",
    hover_name="Company",
    title="UMAP – predicted refined high_risk (0/1)",
    color_discrete_map={0: "blue", 1: "red"}
)
fig.show()

fig = px.scatter(
    viz_df,
    x="umap1", y="umap2",
    color="pred_score",
    hover_name="Company",
    title="UMAP – continuous risk score",
    color_continuous_scale="Magma"
)
fig.show()

### Conclusion

This notebook presents an integrated, forward-looking framework for assessing climate transition risk at both the asset and portfolio levels. By combining scenario analysis, machine learning, stochastic simulation, and clustering techniques, the analysis moves beyond static carbon metrics toward a dynamic and distributional characterization of climate-related financial risk.

At the core of the approach is the explicit linkage between macro-level climate scenarios and micro-level firm behavior. Deterministic NGFS-style pathways provide transparent stress-testing benchmarks, while machine learning–based proxies introduce firm-level heterogeneity in decarbonization dynamics. This hybrid design allows the framework to remain both interpretable and empirically grounded, addressing a key tension in climate risk modeling.

The portfolio aggregation and Monte Carlo simulation stages extend the analysis from individual trajectories to probabilistic portfolio outcomes. By incorporating uncertainty in prices and decarbonization paths, the model generates full distributions of transition costs and enables the computation of Climate Value-at-Risk metrics. These results highlight the importance of tail risks and nonlinear effects, which are often understated in deterministic or average-based assessments.

Unsupervised learning further reveals latent structures in climate risk exposure across assets and companies. The use of UMAP and HDBSCAN allows for the identification of heterogeneous risk clusters and outliers, while bootstrap stability analysis strengthens the robustness of the findings. This clustering perspective complements traditional sector-based analyses and offers a data-driven alternative for risk segmentation.

Finally, the introduction of climate risk labels and supervised learning demonstrates how complex, continuous climate risk signals can be translated into actionable classifications. The combination of Random Forest models with low-dimensional embeddings supports both predictive performance and interpretability, paving the way for integration into investment decision-making and risk monitoring processes.

Overall, this notebook illustrates how modern data science techniques can be coherently embedded within climate stress-testing frameworks. The proposed methodology is flexible and extensible, allowing for richer scenarios, alternative asset classes, and additional sources of uncertainty. As climate transition risk becomes increasingly central to financial stability and portfolio management, such integrated and probabilistic approaches will be essential for robust, forward-looking risk assessment.