# Trajectory Comparison Analysis

This notebook systematically compares step-increasing tax trajectories against static taxes at the time-averaged level. We test different starting taxes and end taxes to identify when step increases are Pareto-better than static taxes.


## Load Model and Setup


In [None]:
import pysd
import os
from pathlib import Path
import matplotlib.pyplot as plt
import pandas as pd
import yaml
import numpy as np
from pysd.py_backend.output import ModelOutput

current_dir = Path.cwd()
BASE_DIR = current_dir.parent if current_dir.name == 'notebooks' else current_dir
os.chdir(BASE_DIR)

# Load the default parameters
with open(BASE_DIR / 'params.yaml', 'r') as file:
    BASE_PARAMS = yaml.safe_load(file)

model = pysd.load('src/model.py')

# Define return columns
return_columns = [
    "cumulative_co2",
    "cumulative_profit",
    "viability_flag"
]

# Get tax frontier
from src.utils import calculate_max_viable_tax
tax_frontier = calculate_max_viable_tax(model, BASE_PARAMS)
print(f"Tax frontier: {tax_frontier}")


## Comparison Function

We create a function to compare a step-increasing trajectory (from `start_tax` to `end_tax`) against a static tax at the time-averaged level.


In [None]:
def compare_trajectory_fixed_end(model, base_params, start_tax, end_tax, final_time=120):
    """
    Compare trajectory from start_tax to end_tax (linear increase) vs static at average.
    
    Parameters:
    -----------
    model : PySD model
    base_params : dict
        Base parameters for the model
    start_tax : float
        Starting carbon tax rate
    end_tax : float
        Ending carbon tax rate (cap for the trajectory)
    final_time : int
        Simulation length in months
    
    Returns:
    --------
    dict with comparison results
    """
    # Calculate step size needed to reach end_tax
    step_size = (end_tax - start_tax) / final_time
    
    # Reset model and set up stepper
    output = ModelOutput()
    model.set_stepper(
        output,
        step_vars=["carbon_tax_rate"],
        final_time=final_time,
    )
    
    # Run step-increasing trajectory
    tax_rate_current = start_tax
    tax_rate_list = []
    
    for t in range(final_time):
        model.step(1, {"carbon_tax_rate": tax_rate_current})
        tax_rate_current = np.minimum(tax_rate_current + step_size, end_tax)
        tax_rate_list.append(tax_rate_current)
    
    results_step = output.collect(model)
    co2_step = results_step["Cumulative CO2"].iloc[-1]
    profit_step = results_step["Cumulative Profit"].iloc[-1]
    viability_step = results_step["Viability Flag"].iloc[-1]
    
    # Calculate time-averaged tax
    time_avg_tax = np.mean(tax_rate_list)
    
    # Run static tax at time-averaged level
    params_static = base_params.copy()
    params_static["carbon_tax_rate"] = time_avg_tax
    static_run = model.run(params=params_static, return_columns=return_columns)
    
    co2_static = static_run["cumulative_co2"].iloc[-1]
    profit_static = static_run["cumulative_profit"].iloc[-1]
    viability_static = static_run["viability_flag"].iloc[-1]
    
    # Calculate differences
    co2_diff = co2_step - co2_static  # Negative means step is better (lower CO2)
    profit_diff = profit_step - profit_static  # Positive means step is better (higher profit)
    
    # Pareto check
    pareto_better = (co2_step < co2_static) and (profit_step > profit_static)
    pareto_worse = (co2_step > co2_static) and (profit_step < profit_static)
    
    return {
        "start_tax": start_tax,
        "end_tax": end_tax,
        "tax_range": end_tax - start_tax,
        "time_avg_tax": time_avg_tax,
        "co2_step": co2_step,
        "profit_step": profit_step,
        "co2_static": co2_static,
        "profit_static": profit_static,
        "co2_diff": co2_diff,
        "profit_diff": profit_diff,
        "co2_diff_pct": 100 * co2_diff / co2_static if co2_static != 0 else 0,
        "profit_diff_pct": 100 * profit_diff / profit_static if profit_static != 0 else 0,
        "pareto_better": pareto_better,
        "pareto_worse": pareto_worse,
        "viability_step": viability_step,
        "viability_static": viability_static,
    }


## Parameter Sweep

We test different combinations of starting taxes and ending taxes to identify patterns in when step increases outperform static taxes.


In [None]:
# Define parameter grid
# Test different starting points and ending points
start_taxes = np.linspace(289, 3000, 12)
end_taxes = np.linspace(2000, tax_frontier, 12)

# Store results
trajectory_results = []
params = BASE_PARAMS.copy()
print("Running trajectory comparisons...")
print(f"Total combinations: {len(start_taxes) * len(end_taxes)}")

for i, start_tax in enumerate(start_taxes):
    for j, end_tax in enumerate(end_taxes):
        if end_tax > start_tax:  # Only valid trajectories
            try:
                result = compare_trajectory_fixed_end(
                    model, params, start_tax, end_tax
                )
                trajectory_results.append(result)
            except Exception as e:
                print(f"Error at start={start_tax:.0f}, end={end_tax:.0f}: {e}")
                continue
    if (i + 1) % 3 == 0:
        print(f"Completed {i + 1}/{len(start_taxes)} start tax levels...")

trajectory_df = pd.DataFrame(trajectory_results)
print(f"\nCompleted {len(trajectory_df)} successful comparisons")


## Results Summary


In [None]:
# Display summary statistics
print("Summary Statistics:")
print(f"Total comparisons: {len(trajectory_df)}")
print(f"Step Pareto Better: {trajectory_df['pareto_better'].sum()} ({100 * trajectory_df['pareto_better'].mean():.1f}%)")
print(f"Static Pareto Better: {trajectory_df['pareto_worse'].sum()} ({100 * trajectory_df['pareto_worse'].mean():.1f}%)")
print(f"\nAverage CO2 difference (%): {trajectory_df['co2_diff_pct'].mean():.2f}")
print(f"Average Profit difference (%): {trajectory_df['profit_diff_pct'].mean():.2f}")

# Show first few results
print("\nFirst 10 results:")
trajectory_df.head(10)


## Visualization: Heatmaps

Create heatmaps showing CO2 differences, profit differences, and Pareto-better regions across the parameter space.


In [None]:
# Create pivot tables for heatmaps
pivot_co2_diff = trajectory_df.pivot(index='start_tax', columns='end_tax', values='co2_diff_pct')
pivot_profit_diff = trajectory_df.pivot(index='start_tax', columns='end_tax', values='profit_diff_pct')
pivot_pareto = trajectory_df.pivot(index='start_tax', columns='end_tax', values='pareto_better')

# Create figure with multiple subplots
fig, axes = plt.subplots(2, 2, figsize=(16, 12))

# CO2 difference heatmap
im1 = axes[0, 0].imshow(pivot_co2_diff, aspect='auto', cmap='RdYlGn', origin='lower', 
                        extent=[pivot_co2_diff.columns.min(), pivot_co2_diff.columns.max(),
                                pivot_co2_diff.index.min(), pivot_co2_diff.index.max()])
axes[0, 0].set_title('CO2 Difference: Step vs Static (%)\n(Negative = Step Better)', fontsize=12)
axes[0, 0].set_xlabel('End Tax (¥/tCO₂)')
axes[0, 0].set_ylabel('Start Tax (¥/tCO₂)')
plt.colorbar(im1, ax=axes[0, 0], label='CO2 Diff (%)')

# Profit difference heatmap
im2 = axes[0, 1].imshow(pivot_profit_diff, aspect='auto', cmap='RdYlGn', origin='lower',
                        extent=[pivot_profit_diff.columns.min(), pivot_profit_diff.columns.max(),
                                pivot_profit_diff.index.min(), pivot_profit_diff.index.max()])
axes[0, 1].set_title('Profit Difference: Step vs Static (%)\n(Positive = Step Better)', fontsize=12)
axes[0, 1].set_xlabel('End Tax (¥/tCO₂)')
axes[0, 1].set_ylabel('Start Tax (¥/tCO₂)')
plt.colorbar(im2, ax=axes[0, 1], label='Profit Diff (%)')

# Pareto better regions
im3 = axes[1, 0].imshow(pivot_pareto.astype(float), aspect='auto', cmap='RdYlGn', origin='lower', 
                        vmin=0, vmax=1,
                        extent=[pivot_pareto.columns.min(), pivot_pareto.columns.max(),
                                pivot_pareto.index.min(), pivot_pareto.index.max()])
axes[1, 0].set_title('Pareto Better: Step vs Static\n(Green = Step Pareto Better)', fontsize=12)
axes[1, 0].set_xlabel('End Tax (¥/tCO₂)')
axes[1, 0].set_ylabel('Start Tax (¥/tCO₂)')
plt.colorbar(im3, ax=axes[1, 0], label='Pareto Better')

# Scatter: Tax range vs Pareto outcome
scatter = axes[1, 1].scatter(
    trajectory_df['tax_range'],
    trajectory_df['start_tax'],
    c=trajectory_df['pareto_better'].astype(float),
    cmap='RdYlGn',
    s=50,
    alpha=0.6,
    vmin=0,
    vmax=1
)
axes[1, 1].set_xlabel('Tax Range (End - Start) (¥/tCO₂)')
axes[1, 1].set_ylabel('Start Tax (¥/tCO₂)')
axes[1, 1].set_title('Pareto Better by Tax Range and Start', fontsize=12)
plt.colorbar(scatter, ax=axes[1, 1], label='Pareto Better')

plt.tight_layout()
plt.savefig(f"{BASE_DIR}/figures/results/trajectory_comparison_heatmap.png", dpi=300, bbox_inches='tight')
plt.show()


## Boundary Analysis

Find where the transition happens between step increases being Pareto-better vs static taxes.


In [None]:
# Plot the boundary
fig, ax = plt.subplots(figsize=(10, 6))

# Plot points where step is Pareto better
pareto_better_df = trajectory_df[trajectory_df['pareto_better']]
pareto_worse_df = trajectory_df[~trajectory_df['pareto_better']]

ax.scatter(
    pareto_better_df['start_tax'],
    pareto_better_df['end_tax'],
    label='Step Pareto Better',
    alpha=0.6,
    s=50,
    color='green'
)
ax.scatter(
    pareto_worse_df['start_tax'],
    pareto_worse_df['end_tax'],
    label='Static Better or Mixed',
    alpha=0.6,
    s=50,
    marker='x',
    color='red'
)

ax.set_xlabel('Start Tax (¥/tCO₂)')
ax.set_ylabel('End Tax (¥/tCO₂)')
ax.set_title('Pareto Better Regions: Step vs Static Tax')
ax.legend()
ax.grid(alpha=0.3)
plt.tight_layout()
plt.savefig(f"{BASE_DIR}/figures/results/trajectory_pareto_boundary.png", dpi=300, bbox_inches='tight')
plt.show()

# Find minimum end tax for Pareto-better by start tax
def min_end_tax_for_pareto(group):
    """Find minimum end_tax where pareto_better is True for this start_tax group"""
    pareto_cases = group[group['pareto_better']]
    if len(pareto_cases) > 0:
        return pareto_cases['end_tax'].min()
    else:
        return np.nan

pareto_by_start = trajectory_df.groupby('start_tax').apply(
    lambda g: pd.Series({
        'has_pareto_better': g['pareto_better'].any(),
        'min_end_tax': min_end_tax_for_pareto(g)
    })
).reset_index()

print("Minimum end tax for Pareto-better step increase by start tax:")
print(pareto_by_start[pareto_by_start['has_pareto_better']][['start_tax', 'min_end_tax']])


## Summary Statistics by Tax Range

Analyze how outcomes vary with the tax range (difference between start and end tax).


In [None]:
# Summary by tax range
trajectory_df['tax_range_bin'] = pd.cut(trajectory_df['tax_range'], bins=5)
summary_range = trajectory_df.groupby('tax_range_bin').agg({
    'pareto_better': 'mean',
    'co2_diff_pct': 'mean',
    'profit_diff_pct': 'mean',
    'start_tax': ['min', 'max'],
}).round(2)

print("Summary by Tax Range:")
print(summary_range)

# Summary by start tax
trajectory_df['start_tax_bin'] = pd.cut(trajectory_df['start_tax'], bins=5)
summary_start = trajectory_df.groupby('start_tax_bin').agg({
    'pareto_better': 'mean',
    'co2_diff_pct': 'mean',
    'profit_diff_pct': 'mean',
}).round(2)

print("\nSummary by Start Tax:")
print(summary_start)


## Detailed Comparison: Example Cases

Compare specific examples to understand the differences better.


In [None]:
# Find examples of each case
example_pareto_better = trajectory_df[trajectory_df['pareto_better']].iloc[0]
example_pareto_worse = trajectory_df[trajectory_df['pareto_worse']].iloc[0] if trajectory_df['pareto_worse'].any() else None

print("Example: Step Increase is Pareto Better")
print(f"Start Tax: {example_pareto_better['start_tax']:.0f} ¥/tCO₂")
print(f"End Tax: {example_pareto_better['end_tax']:.0f} ¥/tCO₂")
print(f"Tax Range: {example_pareto_better['tax_range']:.0f} ¥/tCO₂")
print(f"Time-Averaged Tax: {example_pareto_better['time_avg_tax']:.0f} ¥/tCO₂")
print(f"CO2 Difference: {example_pareto_better['co2_diff_pct']:.2f}%")
print(f"Profit Difference: {example_pareto_better['profit_diff_pct']:.2f}%")
print(f"\nStep: CO2={example_pareto_better['co2_step']:.2e}, Profit={example_pareto_better['profit_step']:.2e}")
print(f"Static: CO2={example_pareto_better['co2_static']:.2e}, Profit={example_pareto_better['profit_static']:.2e}")

if example_pareto_worse is not None:
    print("\n" + "="*50)
    print("Example: Static is Pareto Better")
    print(f"Start Tax: {example_pareto_worse['start_tax']:.0f} ¥/tCO₂")
    print(f"End Tax: {example_pareto_worse['end_tax']:.0f} ¥/tCO₂")
    print(f"Tax Range: {example_pareto_worse['tax_range']:.0f} ¥/tCO₂")
    print(f"Time-Averaged Tax: {example_pareto_worse['time_avg_tax']:.0f} ¥/tCO₂")
    print(f"CO2 Difference: {example_pareto_worse['co2_diff_pct']:.2f}%")
    print(f"Profit Difference: {example_pareto_worse['profit_diff_pct']:.2f}%")
    print(f"\nStep: CO2={example_pareto_worse['co2_step']:.2e}, Profit={example_pareto_worse['profit_step']:.2e}")
    print(f"Static: CO2={example_pareto_worse['co2_static']:.2e}, Profit={example_pareto_worse['profit_static']:.2e}")


## Export Results

Save the results DataFrame for further analysis.


In [None]:
# Save results to CSV
# output_path = BASE_DIR / 'results' / 'trajectory_comparison_results.csv'
# output_path.parent.mkdir(exist_ok=True)
# trajectory_df.to_csv(output_path, index=False)
# print(f"Results saved to {output_path}")
