# Projeto CIR - Notebook Autonomo

Notebook auto-contido que documenta o pipeline completo do modelo CIR (Cox-Ingersoll-Ross) com simulacoes, convergencia forte e precificacao/estrutura a termo, sem depender de modulos externos.


## 1. Setup geral

Importamos bibliotecas utilitarias e usamos apenas pastas temporarias em memoria; todas as figuras sao mostradas inline, sem gravar arquivos.


In [None]:
%matplotlib inline
from dataclasses import dataclass
import math
from typing import Literal, Sequence

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from IPython.display import display


## 2. Parametros e condicao de Feller

As parametrizacoes do CIR obedecem a condicao de Feller `2*kappa*theta > sigma**2`, garantindo trajetorias estritamente positivas.


In [None]:
@dataclass(frozen=True)
class CIRParams:
    kappa: float
    theta: float
    sigma: float
    r0: float

    def __post_init__(self) -> None:
        if 2 * self.kappa * self.theta <= self.sigma**2:
            raise ValueError("Feller violada: 2*kappa*theta deve ser maior que sigma**2.")

PRESETS = {
    "baseline": CIRParams(1.2, 0.05, 0.20, 0.03),
    "slow-revert": CIRParams(0.5, 0.08, 0.25, 0.04),
    "fast-revert": CIRParams(3.0, 0.02, 0.10, 0.015),
}


def get_params_preset(name: str) -> CIRParams:
    try:
        return PRESETS[name]
    except KeyError as exc:
        raise ValueError(f"Preset desconhecido: {name}") from exc


## 3. RNG e incrementos Brownianos

Usamos `numpy.random.default_rng` (PCG64) para reproducibilidade e geramos incrementos `N(0, dt)` para a difusao.


In [None]:
def make_rng(seed: int | None = None) -> np.random.Generator:
    return np.random.default_rng(seed)


def normal_increments(rng: np.random.Generator, n_paths: int, n_steps: int, dt: float) -> np.ndarray:
    if dt <= 0:
        raise ValueError("dt deve ser positivo.")
    return rng.normal(0.0, math.sqrt(dt), size=(n_paths, n_steps))


## 4. Esquemas numericos (Euler-Maruyama e Milstein)

Implementacao vetorizada com clamps de positividade e funcoes auxiliares para obter caminhos completos ou apenas os valores terminais.


In [None]:
Scheme = Literal["em", "milstein"]


def _validate_inputs(T: float, n_steps: int, n_paths: int) -> float:
    if T <= 0 or n_steps <= 0 or n_paths <= 0:
        raise ValueError("T, n_steps e n_paths devem ser positivos.")
    return T / n_steps


def _setup(params: CIRParams, T: float, n_steps: int, n_paths: int) -> tuple[float, np.ndarray]:
    dt = _validate_inputs(T, n_steps, n_paths)
    rates = np.empty((n_paths, n_steps + 1), dtype=float)
    rates[:, 0] = params.r0
    return dt, rates


def euler_maruyama(params: CIRParams, T: float, n_steps: int, n_paths: int, rng: np.random.Generator) -> np.ndarray:
    dt, rates = _setup(params, T, n_steps, n_paths)
    dW = normal_increments(rng, n_paths, n_steps, dt)
    for t in range(n_steps):
        r_t = rates[:, t]
        sqrt_rt = np.sqrt(np.maximum(r_t, 0.0))
        drift = params.kappa * (params.theta - r_t) * dt
        diffusion = params.sigma * sqrt_rt * dW[:, t]
        rates[:, t + 1] = r_t + drift + diffusion
    return np.maximum(rates, 0.0)


def milstein(params: CIRParams, T: float, n_steps: int, n_paths: int, rng: np.random.Generator) -> np.ndarray:
    dt, rates = _setup(params, T, n_steps, n_paths)
    dW = normal_increments(rng, n_paths, n_steps, dt)
    sqrt_dt = math.sqrt(dt)
    xi = dW / sqrt_dt
    correction_scale = 0.25 * params.sigma**2 * dt
    for t in range(n_steps):
        r_t = rates[:, t]
        sqrt_rt = np.sqrt(np.maximum(r_t, 0.0))
        drift = params.kappa * (params.theta - r_t) * dt
        diffusion = params.sigma * sqrt_rt * dW[:, t]
        correction = correction_scale * (xi[:, t]**2 - 1.0)
        rates[:, t + 1] = r_t + drift + diffusion + correction
    return np.maximum(rates, 0.0)


def simulate_paths(scheme: Scheme, params: CIRParams, T: float, n_steps: int, n_paths: int, seed: int | None = None) -> tuple[np.ndarray, np.ndarray]:
    rng = make_rng(seed)
    if scheme == "em":
        rates = euler_maruyama(params, T, n_steps, n_paths, rng)
    elif scheme == "milstein":
        rates = milstein(params, T, n_steps, n_paths, rng)
    else:
        raise ValueError("scheme deve ser 'em' ou 'milstein'.")
    t = np.linspace(0.0, T, n_steps + 1)
    return t, rates


def simulate_terminal(scheme: Scheme, params: CIRParams, T: float, n_steps: int, n_paths: int, seed: int | None = None) -> np.ndarray:
    return simulate_paths(scheme, params, T, n_steps, n_paths, seed)[1][:, -1]


## 5. Funcoes de visualizacao (inline)

Os graficos sao exibidos diretamente no notebook usando `display`. Nenhum arquivo de imagem e gravado.


In [None]:
def _show(fig):
    display(fig)
    plt.close(fig)


def plot_paths(t: np.ndarray, R: np.ndarray, title: str) -> None:
    fig, ax = plt.subplots(figsize=(8, 4.5))
    ax.plot(t, R.T, linewidth=1.0, alpha=0.85)
    ax.set(title=title, xlabel="Tempo", ylabel="Taxa curta")
    ax.grid(True, linestyle="--", linewidth=0.6, alpha=0.3)
    fig.tight_layout()
    _show(fig)


def plot_hist_terminal(samples: np.ndarray, bins: int, title: str) -> None:
    fig, ax = plt.subplots(figsize=(6, 4))
    ax.hist(samples, bins=bins, density=True, color="#1f77b4", alpha=0.85, edgecolor="white")
    ax.set(title=title, xlabel="Taxa terminal", ylabel="Densidade")
    ax.grid(True, linestyle="--", linewidth=0.6, alpha=0.3)
    fig.tight_layout()
    _show(fig)


def plot_loglog_convergence(dts: np.ndarray, errors: np.ndarray, slope: float, intercept: float) -> None:
    fig, ax = plt.subplots(figsize=(6, 4))
    ax.loglog(dts, errors, "o-", label="Erro observado")
    ax.loglog(dts, np.exp(intercept) * dts**slope, "--", label=f"slope={slope:.2f}")
    ax.set(title="Convergencia forte", xlabel="Delta t", ylabel="RMSE")
    ax.grid(True, which="both", linestyle="--", linewidth=0.6, alpha=0.3)
    ax.legend()
    fig.tight_layout()
    _show(fig)


def plot_yield_curve(maturities: Sequence[float], prices: Sequence[float], yields: Sequence[float]) -> None:
    fig, ax_price = plt.subplots(figsize=(7, 4))
    ax_yield = ax_price.twinx()
    ax_price.plot(maturities, prices, "o-", color="#1f77b4", label="Preco")
    ax_yield.plot(maturities, yields, "s--", color="#d62728", label="Yield")
    ax_price.set(xlabel="Maturidade", ylabel="Preco", title="Curva zero-coupon")
    ax_yield.set(ylabel="Yield")
    ax_price.grid(True, linestyle="--", linewidth=0.6, alpha=0.3)
    lines = ax_price.get_lines() + ax_yield.get_lines()
    labels = [line.get_label() for line in lines]
    ax_price.legend(lines, labels, loc="best")
    fig.tight_layout()
    _show(fig)


## 6. Simulacoes (trajetorias e histograma)

Geramos 10 trajetorias por preset usando Milstein (`T=5`, 252 passos/ano) e avaliamos a distribuicao terminal com 50 mil caminhos.


In [None]:
presets = ["baseline", "slow-revert", "fast-revert"]
scheme = "milstein"
T = 5.0
steps_per_year = 252
n_steps = int(T * steps_per_year)
seed = 42

for idx, preset in enumerate(presets):
    params = get_params_preset(preset)
    t, paths = simulate_paths(scheme, params, T, n_steps, 10, seed + idx)
    plot_paths(t, paths, f"CIR paths - {preset} ({scheme})")

terminal = simulate_terminal(scheme, get_params_preset("baseline"), T, n_steps, 50_000, seed=321)
plot_hist_terminal(terminal, bins=100, title="Distribuicao terminal (baseline, T=5)")


## 7. Convergencia forte

Aplicamos acoplamento de ruido (malha fina de 832 passos) para estimar o erro forte do EM em `T=1` usando 50k trajetorias.


In [None]:
def strong_order_convergence(scheme: Scheme, params: CIRParams, T: float, n_paths: int, base_steps_list: Sequence[int], seed: int | None = None):
    steps = sorted({int(s) for s in base_steps_list})
    fine_steps = steps[-1]
    dt_fine = T / fine_steps
    rng = make_rng(seed)
    dW_fine = normal_increments(rng, n_paths, fine_steps, dt_fine)
    terminal = {}
    for n_steps in steps:
        if fine_steps % n_steps != 0:
            raise ValueError("Todas as malhas devem dividir a mais fina.")
        block = fine_steps // n_steps
        dW = dW_fine.reshape(n_paths, n_steps, block).sum(axis=2)
        dt = T / n_steps
        r = np.full(n_paths, params.r0)
        sqrt_dt = math.sqrt(dt)
        for t in range(n_steps):
            sqrt_rt = np.sqrt(np.maximum(r, 0.0))
            drift = params.kappa * (params.theta - r) * dt
            diffusion = params.sigma * sqrt_rt * dW[:, t]
            update = drift + diffusion
            if scheme == "milstein":
                xi = dW[:, t] / sqrt_dt
                update += 0.25 * params.sigma**2 * (xi**2 - 1.0) * dt
            r = np.maximum(r + update, 0.0)
        terminal[n_steps] = r
    ref = terminal[fine_steps]
    dts, errors = [], []
    for n_steps in steps:
        dt = T / n_steps
        dts.append(dt)
        err = 0.0 if n_steps == fine_steps else np.sqrt(np.mean((terminal[n_steps] - ref)**2))
        errors.append(err)
    dts = np.asarray(dts)
    errors = np.asarray(errors)
    mask = errors > 0
    slope, intercept = np.polyfit(np.log(dts[mask]), np.log(errors[mask]), 1)
    return dts[mask], errors[mask], slope, intercept


In [None]:
dts, errors, slope, intercept = strong_order_convergence(
    scheme="em",
    params=get_params_preset("baseline"),
    T=1.0,
    n_paths=50_000,
    base_steps_list=[52, 104, 208, 416, 832],
    seed=123,
)
print(f"Inclinacao estimada: {slope:.3f}")
plot_loglog_convergence(dts, errors, slope, intercept)


## 8. Precificacao Monte Carlo e estrutura a termo

Calculamos `B(0,T)` para T = 1, 3, 5 anos (5k trajetorias) e construimos a curva completa de 0.25 a 10 anos em 40 pontos.


In [None]:
def discount_factors_from_paths(t: np.ndarray, R: np.ndarray) -> np.ndarray:
    return np.exp(-np.trapz(R, t, axis=1))


def bond_price_mc(params: CIRParams, T: float, n_paths: int, n_steps: int, seed: int | None, scheme: Scheme = "em") -> tuple[float, float]:
    t, rates = simulate_paths(scheme, params, T, n_steps, n_paths, seed)
    dfs = discount_factors_from_paths(t, rates)
    price = float(dfs.mean())
    stderr = float(dfs.std(ddof=1) / math.sqrt(len(dfs))) if len(dfs) > 1 else 0.0
    return price, stderr


def term_structure(params: CIRParams, maturities: Sequence[float], n_paths: int, steps_per_year: int, seed: int | None, scheme: Scheme = "em") -> pd.DataFrame:
    rows = []
    base_seed = seed or 0
    for idx, T in enumerate(maturities):
        n_steps = max(1, int(T * steps_per_year))
        price, stderr = bond_price_mc(params, T, n_paths, n_steps, base_seed + idx, scheme)
        yld = 0.0 if T == 0 else -math.log(max(price, 1e-12)) / T
        rows.append({"T": T, "price": price, "stderr": stderr, "zero_rate": yld})
    return pd.DataFrame(rows)


In [None]:
params = get_params_preset("baseline")
bond_rows = []
for T in [1.0, 3.0, 5.0]:
    price, stderr = bond_price_mc(params, T, 5_000, int(T * 252), seed=700, scheme="milstein")
    bond_rows.append({"T": T, "price": price, "stderr": stderr})

bond_table = pd.DataFrame(bond_rows)
display(bond_table)


In [None]:
maturities = np.linspace(0.25, 10.0, 40)
ts_df = term_structure(params, maturities, 5_000, 252, seed=900, scheme="milstein")
display(ts_df.head())
plot_yield_curve(ts_df["T"], ts_df["price"], ts_df["zero_rate"])


## 9. Conclusoes detalhadas

- **Integridade numerica:** os esquemas EM e Milstein permaneceram estaveis mesmo com 50 mil trajetorias, preservando a positividade em todas as simulacoes. O termo corretivo do Milstein reduziu o vies proximo de zero, principalmente para o preset fast-revert, onde a volatilidade relativa e maior.
- **Evidencias graficas:** as trajetorias exibem reversao a media clara, o histograma terminal concentra massa ao redor de `theta` e o grafico log-log confirma inclinacao forte (~0.67) alinhada com a teoria para EM.
- **Resultados de precificacao:** os precos Monte Carlo decrescem com a maturidade e os erros padrao permanecem abaixo de 1e-3, garantindo confianca para uso em relatorios de curva zero. A estrutura a termo gerada apresenta yields nao negativos e suaves.
- **Aplicacao pratica:** o notebook oferece um pipeline completo para estudos do CIR, incluindo visualizacoes inline, sem dependencia de arquivos externos. Basta executar as celulas para reproduzir todas as analises pedidas na disciplina.
