# Interactive PCE & NGD Visualization

Use the slider to step through iterations and see how both algorithms evolve.

**Cost function:** $J(x) = (x - 5)^2 + 0.1 x^2$ (state cost + control cost)

**Optimal:** $x^* = \frac{50}{11} \approx 4.545$

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.gridspec import GridSpec
from ipywidgets import interact, IntSlider
from IPython.display import display
from enum import Enum
import casadi as ca

np.random.seed(42)

## Cost Functions

In [None]:
def quadratic_state_cost(x: np.ndarray, x_star: float = 5.0) -> np.ndarray:
    return (x - x_star) ** 2

def control_cost(x: np.ndarray, R: float = 0.1) -> np.ndarray:
    return R * x ** 2

def total_cost(x: np.ndarray, x_star: float = 5.0, R: float = 0.1) -> np.ndarray:
    return quadratic_state_cost(x, x_star) + control_cost(x, R)

X_STAR = 50 / 11
OPTIMAL_COST = total_cost(np.array([X_STAR]))[0]
print(f"Optimal x* = {X_STAR:.4f}")
print(f"Optimal cost J(x*) = {OPTIMAL_COST:.4f}")

## Covariance Scheduling

In [None]:
class CovarianceSchedule(Enum):
    CONSTANT = 0
    LINEAR = 1
    EXPONENTIAL = 2
    COSINE = 3
    STEP = 4
    ADAPTIVE = 5

def compute_covariance_scale(
    iteration: int, n_iterations: int, schedule: CovarianceSchedule,
    cov_scale_initial: float = 1.0, cov_scale_final: float = 0.01,
    cov_decay_rate: float = 0.95, cov_step_factor: float = 0.5,
    cov_step_interval: int = 10, cov_adaptive_threshold: float = 0.01,
    cov_scale_current: float = None, prev_cost: float = 0.0, curr_cost: float = 0.0,
) -> float:
    t = float(iteration - 1)
    T = float(n_iterations)
    
    if schedule == CovarianceSchedule.CONSTANT:
        return cov_scale_initial
    elif schedule == CovarianceSchedule.LINEAR:
        return cov_scale_initial + (cov_scale_final - cov_scale_initial) * (t / T)
    elif schedule == CovarianceSchedule.EXPONENTIAL:
        return max(cov_scale_final, cov_scale_initial * (cov_decay_rate ** t))
    elif schedule == CovarianceSchedule.COSINE:
        return cov_scale_final + 0.5 * (cov_scale_initial - cov_scale_final) * (1.0 + np.cos(np.pi * t / T))
    elif schedule == CovarianceSchedule.STEP:
        return max(cov_scale_final, cov_scale_initial * (cov_step_factor ** np.floor(t / cov_step_interval)))
    elif schedule == CovarianceSchedule.ADAPTIVE:
        if cov_scale_current is None:
            cov_scale_current = cov_scale_initial
        if iteration <= 1:
            return cov_scale_current
        improvement = (prev_cost - curr_cost) / (abs(prev_cost) + 1e-6)
        if improvement > cov_adaptive_threshold:
            cov_scale_current *= cov_decay_rate
        elif improvement < 0:
            cov_scale_current *= (1.0 + 0.1 * (1.0 - cov_decay_rate))
        return np.clip(cov_scale_current, cov_scale_final, cov_scale_initial)
    else:
        return cov_scale_initial

## PCE Algorithm

In [None]:
def run_pce_with_history(
    cost_fn, y_init: float = 0.0, sigma_init: float = 2.0,
    n_samples: int = 100, n_iterations: int = 50, elite_ratio: float = 0.1,
    ema_alpha: float = 0.5, gamma: float = 1.0, temperature: float = 1.5,
    temperature_final: float = 0.05, R: float = 0.1,
    cov_schedule: CovarianceSchedule = CovarianceSchedule.COSINE,
    cov_scale_initial: float = 1.0, cov_scale_final: float = 0.01,
    cov_decay_rate: float = 0.95,
):
    y = y_init
    sigma_base = sigma_init
    n_elites = max(1, int(n_samples * elite_ratio))
    cov_scale_current = cov_scale_initial
    prev_cost = cost_fn(np.array([y]))[0]
    
    history = {
        'y': [y], 'sigma': [sigma_base * cov_scale_initial],
        'cov_scale': [cov_scale_initial], 'cost': [prev_cost],
        'temperature': [temperature], 'samples': [], 'sample_costs': [],
        'elite_indices': [], 'elite_weights': [], 'weighted_y': [],
    }
    
    for iteration in range(1, n_iterations + 1):
        progress = (iteration - 1) / max(1, n_iterations - 1)
        current_temp = temperature * (temperature_final / temperature) ** progress
        
        curr_cost = cost_fn(np.array([y]))[0]
        cov_scale = compute_covariance_scale(
            iteration, n_iterations, cov_schedule,
            cov_scale_initial, cov_scale_final, cov_decay_rate,
            cov_scale_current=cov_scale_current, prev_cost=prev_cost, curr_cost=curr_cost,
        )
        cov_scale_current = cov_scale
        effective_sigma = sigma_base * cov_scale
        
        epsilon = effective_sigma * np.random.randn(n_samples)
        samples = y + epsilon
        costs = cost_fn(samples)
        indices = np.argsort(costs)
        elite_indices = indices[:n_elites]
        
        weights = np.zeros(n_samples)
        for m in elite_indices:
            reg_term = epsilon[m] * R * y
            exponent = (-gamma * (costs[m] + reg_term)) / current_temp
            weights[m] = exponent
        
        max_exp = np.max(weights[elite_indices])
        for m in elite_indices:
            weights[m] = np.exp(weights[m] - max_exp)
        weights /= (np.sum(weights[elite_indices]) + 1e-10)
        
        x_weighted = sum(weights[m] * (y + epsilon[m]) for m in elite_indices)
        
        history['samples'].append(samples.copy())
        history['sample_costs'].append(costs.copy())
        history['elite_indices'].append(elite_indices.copy())
        history['elite_weights'].append(weights[elite_indices].copy())
        history['weighted_y'].append(x_weighted)
        
        y = (1 - ema_alpha) * y + ema_alpha * x_weighted
        
        history['y'].append(y)
        history['sigma'].append(effective_sigma)
        history['cov_scale'].append(cov_scale)
        history['cost'].append(cost_fn(np.array([y]))[0])
        history['temperature'].append(current_temp)
        prev_cost = curr_cost
    
    return history

## NGD Algorithm

In [None]:
def run_ngd_with_history(
    cost_fn, y_init: float = 0.0, sigma_init: float = 2.0,
    n_samples: int = 100, n_iterations: int = 50, learning_rate: float = 0.1,
    temperature: float = 1.0, cov_schedule: CovarianceSchedule = CovarianceSchedule.COSINE,
    cov_scale_initial: float = 1.0, cov_scale_final: float = 0.01,
    cov_decay_rate: float = 0.95,
):
    y = y_init
    sigma_base = sigma_init
    cov_scale_current = cov_scale_initial
    prev_cost = cost_fn(np.array([y]))[0]
    
    history = {
        'y': [y], 'sigma': [sigma_base * cov_scale_initial],
        'cov_scale': [cov_scale_initial], 'cost': [prev_cost],
        'temperature': [temperature], 'learning_rate': [learning_rate],
        'samples': [], 'sample_costs': [], 'gradient': [], 'epsilon': [],
    }
    
    for iteration in range(1, n_iterations + 1):
        curr_cost = cost_fn(np.array([y]))[0]
        cov_scale = compute_covariance_scale(
            iteration, n_iterations, cov_schedule,
            cov_scale_initial, cov_scale_final, cov_decay_rate,
            cov_scale_current=cov_scale_current, prev_cost=prev_cost, curr_cost=curr_cost,
        )
        cov_scale_current = cov_scale
        effective_sigma = sigma_base * cov_scale
        
        epsilon = effective_sigma * np.random.randn(n_samples)
        samples = y + epsilon
        costs = cost_fn(samples)
        costs = np.where(np.isfinite(costs), costs, 1e6)
        
        natural_gradient = np.mean((costs / temperature) * epsilon)
        
        history['samples'].append(samples.copy())
        history['sample_costs'].append(costs.copy())
        history['gradient'].append(natural_gradient)
        history['epsilon'].append(epsilon.copy())
        
        y = y - learning_rate * natural_gradient
        
        history['y'].append(y)
        history['sigma'].append(effective_sigma)
        history['cov_scale'].append(cov_scale)
        history['cost'].append(cost_fn(np.array([y]))[0])
        history['temperature'].append(temperature)
        history['learning_rate'].append(learning_rate)
        prev_cost = curr_cost
    
    return history

## CasADi Solver (Newton's Method)

In [None]:
def run_casadi_with_history(y_init: float = 0.0, x_star: float = 5.0, R: float = 0.1):
    """
    Solve using CasADi with Newton's method.
    min_x  (x - x_star)² + R * x²
    """
    x = ca.SX.sym('x')
    J = (x - x_star)**2 + R * x**2
    grad_J = ca.gradient(J, x)
    hess_J = ca.hessian(J, x)[0]
    
    cost_fn = ca.Function('J', [x], [J])
    grad_fn = ca.Function('dJ', [x], [grad_J])
    hess_fn = ca.Function('ddJ', [x], [hess_J])
    
    y = y_init
    history = {'y': [y], 'cost': [float(cost_fn(y))]}
    
    for i in range(20):
        g = float(grad_fn(y))
        H = float(hess_fn(y))
        
        if abs(H) > 1e-12:
            step = -g / H
        else:
            break
        
        y_new = y + step
        history['y'].append(y_new)
        history['cost'].append(float(cost_fn(y_new)))
        
        if abs(g) < 1e-10:
            break
        y = y_new
    
    return {
        'y': history['y'],
        'cost': history['cost'],
        'x_opt': history['y'][-1],
        'f_opt': history['cost'][-1],
    }

## Run All Algorithms

In [None]:
Y_INIT = 0.0
SIGMA = 2.0
N_SAMPLES = 100
N_ITERATIONS = 50
COV_SCHEDULE = CovarianceSchedule.COSINE

pce_history = run_pce_with_history(
    cost_fn=total_cost, y_init=Y_INIT, sigma_init=SIGMA,
    n_samples=N_SAMPLES, n_iterations=N_ITERATIONS,
    cov_schedule=COV_SCHEDULE,
)

ngd_history = run_ngd_with_history(
    cost_fn=total_cost, y_init=Y_INIT, sigma_init=SIGMA,
    n_samples=N_SAMPLES, n_iterations=N_ITERATIONS,
    cov_schedule=COV_SCHEDULE,
)

casadi_history = run_casadi_with_history(y_init=Y_INIT)

print(f"Covariance schedule: {COV_SCHEDULE.name}")
print(f"PCE:    y={pce_history['y'][-1]:.4f}, cost={pce_history['cost'][-1]:.4f}")
print(f"NGD:    y={ngd_history['y'][-1]:.4f}, cost={ngd_history['cost'][-1]:.4f}")
print(f"CasADi: y={casadi_history['x_opt']:.4f}, cost={casadi_history['f_opt']:.6f} ({len(casadi_history['y'])-1} iters)")
print(f"Optimal: x*={X_STAR:.4f}, cost={OPTIMAL_COST:.4f}")

## Interactive Visualization

**Layout:**
- Top row: PCE (left), NGD (right)
- Bottom row: Cost Convergence, Covariance Schedule, CasADi

In [None]:
def plot_iteration(iteration: int):
    fig = plt.figure(figsize=(16, 10))
    gs = GridSpec(2, 3, figure=fig, height_ratios=[1.2, 1])
    
    x_plot = np.linspace(-3, 10, 500)
    cost_curve = total_cost(x_plot)
    
    # TOP LEFT: PCE
    ax = fig.add_subplot(gs[0, 0])
    ax.plot(x_plot, cost_curve, 'k-', lw=2, label='J(x)', zorder=1)
    ax.axvline(X_STAR, color='red', ls='--', lw=2, alpha=0.7, label=f'x* = {X_STAR:.2f}')
    
    if iteration < len(pce_history['samples']):
        samples = pce_history['samples'][iteration]
        costs = pce_history['sample_costs'][iteration]
        elite_idx = pce_history['elite_indices'][iteration]
        elite_weights = pce_history['elite_weights'][iteration]
        weighted_y = pce_history['weighted_y'][iteration]
        
        ax.scatter(samples, costs, c='lightgreen', s=40, alpha=0.6, label='Samples', zorder=2, edgecolors='gray', linewidths=0.5)
        sizes = 50 + 300 * elite_weights / elite_weights.max()
        ax.scatter(samples[elite_idx], costs[elite_idx], c='green', s=sizes, label='Elites', zorder=3, edgecolors='darkgreen', linewidths=1.5, alpha=0.8)
        ax.axvline(weighted_y, color='darkgreen', ls='-', lw=2, label=f'Weighted y = {weighted_y:.2f}', zorder=4)
    
    y_hist = pce_history['y'][:iteration+2]
    cost_hist = [total_cost(np.array([val]))[0] for val in y_hist]
    ax.plot(y_hist, cost_hist, 'g-', lw=2, alpha=0.5, zorder=5)
    ax.scatter(y_hist[:-1], cost_hist[:-1], c='green', s=60, alpha=0.4, zorder=5, marker='o')
    ax.scatter([pce_history['y'][iteration]], [pce_history['cost'][iteration]], c='green', s=200, zorder=6, marker='*', edgecolors='darkgreen', linewidths=2)
    
    ax.annotate(f'Iter {iteration}\nTemp = {pce_history["temperature"][iteration]:.3f}\nσ = {pce_history["sigma"][iteration]:.3f}\nCost = {pce_history["cost"][iteration]:.4f}', 
                xy=(0.02, 0.98), xycoords='axes fraction', fontsize=10, verticalalignment='top',
                bbox=dict(boxstyle='round', facecolor='lightgreen', alpha=0.7))
    ax.set_xlabel('x'); ax.set_ylabel('J(x)'); ax.set_title('PCE', fontsize=14, fontweight='bold', color='green')
    ax.legend(loc='upper right', fontsize=8); ax.set_xlim(-3, 10); ax.set_ylim(-1, 30); ax.grid(True, alpha=0.3)
    
    # TOP RIGHT: NGD (spanning 2 columns)
    ax = fig.add_subplot(gs[0, 1:])
    ax.plot(x_plot, cost_curve, 'k-', lw=2, label='J(x)', zorder=1)
    ax.axvline(X_STAR, color='red', ls='--', lw=2, alpha=0.7, label=f'x* = {X_STAR:.2f}')
    
    if iteration < len(ngd_history['samples']):
        samples = ngd_history['samples'][iteration]
        costs = ngd_history['sample_costs'][iteration]
        epsilon = ngd_history['epsilon'][iteration]
        
        ax.scatter(samples, costs, c='lightblue', s=40, alpha=0.6, label='Samples', zorder=2, edgecolors='gray', linewidths=0.5)
        contributions = (costs / ngd_history['temperature'][iteration]) * epsilon
        high_mask = np.abs(contributions) > np.percentile(np.abs(contributions), 90)
        ax.scatter(samples[high_mask], costs[high_mask], c='blue', s=80, label='High contrib.', zorder=3, edgecolors='darkblue', linewidths=1.5, alpha=0.8)
    
    y_hist = ngd_history['y'][:iteration+2]
    cost_hist = [total_cost(np.array([val]))[0] for val in y_hist]
    ax.plot(y_hist, cost_hist, 'b-', lw=2, alpha=0.5, zorder=5)
    ax.scatter(y_hist[:-1], cost_hist[:-1], c='blue', s=60, alpha=0.4, zorder=5, marker='o')
    ax.scatter([ngd_history['y'][iteration]], [ngd_history['cost'][iteration]], c='blue', s=200, zorder=6, marker='*', edgecolors='darkblue', linewidths=2)
    
    if iteration < len(ngd_history['gradient']):
        ax.annotate(f'Iter {iteration}\n∇ = {ngd_history["gradient"][iteration]:.3f}\nσ = {ngd_history["sigma"][iteration]:.3f}\nCost = {ngd_history["cost"][iteration]:.4f}', 
                    xy=(0.02, 0.98), xycoords='axes fraction', fontsize=10, verticalalignment='top',
                    bbox=dict(boxstyle='round', facecolor='lightblue', alpha=0.7))
    ax.set_xlabel('x'); ax.set_ylabel('J(x)'); ax.set_title('NGD', fontsize=14, fontweight='bold', color='blue')
    ax.legend(loc='upper right', fontsize=8); ax.set_xlim(-3, 10); ax.set_ylim(-1, 30); ax.grid(True, alpha=0.3)
    
    # BOTTOM LEFT: Cost Convergence
    ax = fig.add_subplot(gs[1, 0])
    ax.plot(pce_history['cost'], 'g-', lw=2, label='PCE')
    ax.plot(ngd_history['cost'], 'b-', lw=2, label='NGD')
    ax.axhline(OPTIMAL_COST, color='red', ls='--', lw=1.5, alpha=0.7, label=f'Optimal')
    ax.scatter([iteration], [pce_history['cost'][iteration]], c='green', s=150, zorder=10, marker='*', edgecolors='darkgreen', linewidths=2)
    ax.scatter([iteration], [ngd_history['cost'][iteration]], c='blue', s=150, zorder=10, marker='*', edgecolors='darkblue', linewidths=2)
    ax.axvline(iteration, color='gray', ls='--', lw=1, alpha=0.5)
    ax.set_xlabel('Iteration'); ax.set_ylabel('Cost J(y)'); ax.set_title('Cost Convergence', fontsize=14, fontweight='bold')
    ax.set_yscale('log'); ax.set_xlim(-1, len(pce_history['cost'])); ax.legend(loc='upper right', fontsize=9); ax.grid(True, alpha=0.3)
    
    # BOTTOM MIDDLE: Covariance Schedule
    ax = fig.add_subplot(gs[1, 1])
    ax.plot(pce_history['sigma'], 'g-', lw=2, label='PCE σ')
    ax.plot(ngd_history['sigma'], 'b-', lw=2, label='NGD σ')
    ax.scatter([iteration], [pce_history['sigma'][iteration]], c='green', s=150, zorder=10, marker='*', edgecolors='darkgreen', linewidths=2)
    ax.scatter([iteration], [ngd_history['sigma'][iteration]], c='blue', s=150, zorder=10, marker='*', edgecolors='darkblue', linewidths=2)
    ax.axvline(iteration, color='gray', ls='--', lw=1, alpha=0.5)
    ax.set_xlabel('Iteration'); ax.set_ylabel('σ (effective)'); ax.set_title(f'Covariance ({COV_SCHEDULE.name})', fontsize=14, fontweight='bold')
    ax.set_xlim(-1, len(pce_history['sigma'])); ax.legend(loc='upper right', fontsize=9); ax.grid(True, alpha=0.3)
    
    # BOTTOM RIGHT: CasADi
    ax = fig.add_subplot(gs[1, 2])
    casadi_iters = np.arange(len(casadi_history['cost']))
    ax.plot(casadi_iters, casadi_history['cost'], 'purple', lw=2, marker='o', markersize=8, label='CasADi (Newton)')
    ax.axhline(OPTIMAL_COST, color='red', ls='--', lw=1.5, alpha=0.7, label=f'Optimal')
    ax.scatter([len(casadi_history['cost'])-1], [casadi_history['f_opt']], c='purple', s=200, zorder=10, marker='*', edgecolors='darkmagenta', linewidths=2)
    ax.set_xlabel('Iteration'); ax.set_ylabel('Cost J(x)'); ax.set_title(f'CasADi (Newton) - {len(casadi_history["cost"])-1} iters', fontsize=14, fontweight='bold', color='purple')
    ax.set_yscale('log'); ax.set_xlim(-0.5, max(len(casadi_history['cost']), 5)); ax.legend(loc='upper right', fontsize=9); ax.grid(True, alpha=0.3)
    ax.annotate(f'Final: x={casadi_history["x_opt"]:.4f}\nCost={casadi_history["f_opt"]:.6f}', 
                xy=(0.98, 0.98), xycoords='axes fraction', fontsize=10, verticalalignment='top', horizontalalignment='right',
                bbox=dict(boxstyle='round', facecolor='lavender', alpha=0.7))
    
    plt.tight_layout()
    plt.show()

In [None]:
interact(
    plot_iteration,
    iteration=IntSlider(
        min=0, max=min(len(pce_history['samples']), len(ngd_history['samples'])) - 1,
        step=1, value=0, description='Iteration:',
        continuous_update=False, style={'description_width': 'initial'}
    )
);

## Animation (optional)

In [None]:
from IPython.display import clear_output
import time

def animate(delay: float = 0.3):
    n_iter = min(len(pce_history['samples']), len(ngd_history['samples']))
    for i in range(n_iter):
        clear_output(wait=True)
        plot_iteration(i)
        time.sleep(delay)

# Uncomment to run: animate(delay=0.2)