# Natural Capital Pipeline (Colab)
この順番で `Input -> Output` を繰り返してください。


## Input 1


In [None]:
from google.colab import drive
drive.mount("/content/drive")


## Output 1
`Mounted at /content/drive`


## Input 2


In [None]:
# 2) プロジェクトフォルダを作成
import os
PROJECT_DIR = "/content/drive/MyDrive/pj-TERM"
os.makedirs(PROJECT_DIR, exist_ok=True)
%cd /content/drive/MyDrive/pj-TERM


## Output 2
`/content/drive/MyDrive/pj-TERM`


## Input 3


In [None]:
%%writefile /content/drive/MyDrive/pj-TERM/natural_capital_pipeline.py
#!/usr/bin/env python3
from __future__ import annotations

# コマンドライン引数、CSV/JSON入出力、日付計算に使う標準ライブラリ
import argparse
import csv
import json
from dataclasses import dataclass, field
from datetime import date, datetime, timedelta
from pathlib import Path
from typing import Any, Dict, List

# 数値計算
import numpy as np


# 設定用データクラス

@dataclass
class PeriodConfig:
    # 分析期間
    start_date: str
    end_date: str


@dataclass
class DataConfig:
    # 出力先ディレクトリ設定
    cache_dir: str = "./cache"
    outputs_subdir: str = "fujisawa_demo"


@dataclass
class ModelConfig:
    # モデル側の主要パラメータ
    lookback_weeks: int = 12
    garch_forecast_horizon: int = 8


@dataclass
class FinanceConfig:
    # 金融シグナル計算に使うパラメータ
    derivative_notional: float = 1_000_000.0
    derivative_vol_quantile: float = 0.9
    derivative_max_payout_ratio: float = 0.2
    bond_base_coupon: float = 0.045
    bond_step_down_bps: float = 30.0
    bond_step_up_bps: float = 40.0
    bond_target_quarterly_growth: float = 0.01


@dataclass
class PipelineConfig:
    # 全体設定をまとめる
    period: PeriodConfig
    data: DataConfig = field(default_factory=DataConfig)
    model: ModelConfig = field(default_factory=ModelConfig)
    finance: FinanceConfig = field(default_factory=FinanceConfig)


def load_config(path: Path) -> PipelineConfig:
    # JSON設定ファイルを読み込んで dataclass に変換
    raw = json.loads(path.read_text(encoding="utf-8"))
    return PipelineConfig(
        period=PeriodConfig(**raw["period"]),
        data=DataConfig(**raw.get("data", {})),
        model=ModelConfig(**raw.get("model", {})),
        finance=FinanceConfig(**raw.get("finance", {})),
    )



# 日付・入出力を行う

def parse_date(s: str) -> date:
    # "YYYY-MM-DD" 文字列を date に変換
    return datetime.strptime(s, "%Y-%m-%d").date()


def date_to_str(d: date) -> str:
    # date を "YYYY-MM-DD" に変換
    return d.strftime("%Y-%m-%d")


def week_start(d: date) -> date:
    # その週の月曜日を返す
    return d - timedelta(days=d.weekday())


def week_range(start: date, end: date) -> List[date]:
    # start〜end の週次（月曜）リストを作る
    cur = week_start(start)
    out = []
    while cur <= end:
        out.append(cur)
        cur += timedelta(days=7)
    return out


def ensure_dir(path: Path) -> None:
    # ディレクトリがなければ作成
    path.mkdir(parents=True, exist_ok=True)


def write_json(path: Path, obj: Any) -> None:
    # JSONファイル保存
    ensure_dir(path.parent)
    path.write_text(json.dumps(obj, ensure_ascii=True, indent=2), encoding="utf-8")


def read_json(path: Path) -> Any:
    # JSONファイル読み込み
    return json.loads(path.read_text(encoding="utf-8"))


def write_csv(path: Path, headers: List[str], rows: List[List[Any]]) -> None:
    # CSVファイル保存
    ensure_dir(path.parent)
    with path.open("w", encoding="utf-8", newline="") as f:
        w = csv.writer(f)
        w.writerow(headers)
        for row in rows:
            w.writerow(row)


# 合成データ作成

def synthetic_weekly_rows(cfg: PipelineConfig) -> List[Dict[str, Any]]:
    # 期間から週次の疑似特徴量を作る（実データ取得の代替）
    weeks = week_range(parse_date(cfg.period.start_date), parse_date(cfg.period.end_date))
    n = max(1, len(weeks))
    rows = []
    for i, w in enumerate(weeks):
        t = i / max(1, n - 1)  # 0〜1の進行度
        rows.append({
            "week_start": date_to_str(w),
            "s2_green_fraction": 0.45 + 0.2 * t,
            "s2_fragmentation": 0.30 - 0.12 * t,
            "gbif_species_richness": 10 + 2.5 * np.sin(i / 4.0),
            "weather_soil_moisture": 0.24 + 0.05 * np.sin(i / 8.0),
            "weather_temp_stress": 0.45 + 0.08 * np.sin(i / 9.0),
        })
    return rows


#t = i / (n-1)
#期間の先頭→末尾を 0→1 に正規化して、ゆるい長期トレンドを作るため。
#0.45 + 0.2*t（s2_green_fraction）
#範囲を 0.45〜0.65 にして、0〜1の中で自然な「やや改善」トレンドを表現。#0.30 - 0.12*t（s2_fragmentation）
#範囲を 0.30〜0.18 にして、緑被率と逆方向に動くように設定。
#（gbif_species_richness）
#平均10、振幅2.5で季節的な上下を作る（急変しすぎない）。
#8.0)（weather_soil_moisture）
#平均0.24、振幅0.05で湿潤度のゆるい周期変動。
#9.0)（weather_temp_stress）
#平均0.45、振幅0.08で温度ストレスの中程度変動。

#要検証！


# 指数（インデックス）計算

def build_natural_capital_index(weekly_rows: List[Dict[str, Any]], lookback_weeks: int = 12) -> Dict[str, Any]:
    # 引数互換のため残す（この簡易版では未使用）
    del lookback_weeks

    # 入力が空なら空を返す
    if not weekly_rows:
        return {"rows": [], "feature_names": [], "weights": []}

    # week_start 以外を特徴量として使う
    feature_names = [k for k in weekly_rows[0].keys() if k != "week_start"]
    x = np.array([[float(r[k]) for k in feature_names] for r in weekly_rows], dtype=np.float64)

    # 特徴量ごとに標準化（z-score）
    mu = x.mean(axis=0)
    sd = x.std(axis=0)
    sd[sd < 1e-9] = 1.0
    z = (x - mu) / sd

    # 特徴量を等重み平均してスコア化
    score = z.mean(axis=1)

    # 週次リターンをスコア差分から作る
    ret = np.zeros(len(score), dtype=np.float64)
    if len(score) > 1:
        ret[1:] = np.clip(0.08 * np.diff(score), -0.3, 0.3)

    # リターンを累積して指数レベルを作る（初期値100）
    idx = np.zeros(len(score), dtype=np.float64)
    idx[0] = 100.0
    for i in range(1, len(idx)):
        idx[i] = max(1e-6, idx[i - 1] * (1.0 + ret[i]))

    # 出力行に指数とリターンを追加
    out_rows = []
    for i, row in enumerate(weekly_rows):
        r = dict(row)
        r["natural_capital_index"] = float(idx[i])
        r["ecological_return"] = float(ret[i])
        out_rows.append(r)

    # 等重み（説明用に返す）
    weights = [1.0 / len(feature_names)] * len(feature_names)
    return {"rows": out_rows, "feature_names": feature_names, "weights": weights}


# GARCH(1,1)から考える

class Garch11:
    def __init__(self, alpha: float = 0.08, beta: float = 0.9):
        # GARCHパラメータ初期値
        self.alpha = alpha
        self.beta = beta
        self.omega = 0.0
        self.mu = 0.0
        self.sigma2 = np.array([], dtype=np.float64)

    def fit(self, returns: np.ndarray) -> "Garch11":
        # リターン列から条件付き分散系列 sigma2 を推定
        r = np.asarray(returns, dtype=np.float64)
        self.mu = float(np.mean(r))
        e = r - self.mu
        var = float(np.var(e)) + 1e-8

        # 定常条件 alpha + beta < 1 を守る
        if self.alpha + self.beta >= 0.995:
            self.beta = 0.995 - self.alpha

        # 長期分散に基づき omega を設定
        self.omega = max(1e-10, var * (1.0 - self.alpha - self.beta))

        # 再帰計算
        s2 = np.zeros(len(r), dtype=np.float64)
        s2[0] = var
        for t in range(1, len(r)):
            s2[t] = self.omega + self.alpha * (e[t - 1] ** 2) + self.beta * s2[t - 1]
            s2[t] = max(1e-10, s2[t])
        self.sigma2 = s2
        return self

    @staticmethod
    def forecast(model: "Garch11", horizon: int) -> np.ndarray:
        # 将来horizon週の分散予測
        h = max(1, int(horizon))
        if len(model.sigma2) == 0:
            return np.full(h, 1e-6, dtype=np.float64)

        p = min(0.999, model.alpha + model.beta)
        long_run = model.omega / max(1e-8, 1.0 - p)

        out = np.zeros(h, dtype=np.float64)
        prev = float(model.sigma2[-1])
        for i in range(h):
            prev = long_run + p * (prev - long_run)
            out[i] = max(1e-10, prev)
        return out


# 金融シグナル計算


def build_finance_signals(cfg: PipelineConfig, vol_hist: np.ndarray, vol_fore: np.ndarray) -> Dict[str, Any]:
    # 履歴分散の上位quantileを閾値としてトリガー判定
    threshold = float(np.quantile(vol_hist, cfg.finance.derivative_vol_quantile))
    triggered = bool(np.max(vol_fore) > threshold)

    # 閾値超過率に応じてペイアウトを計算
    excess = max(0.0, float(np.max(vol_fore) / max(1e-10, threshold) - 1.0))
    payout_ratio = min(cfg.finance.derivative_max_payout_ratio, 0.5 * excess)
    payout = payout_ratio * cfg.finance.derivative_notional

    # 直近13週とその前13週の比較でクーポン上下
    coupon = cfg.finance.bond_base_coupon
    recent = float(np.mean(vol_hist[-13:])) if len(vol_hist) >= 13 else float(np.mean(vol_hist))
    prev = float(np.mean(vol_hist[-26:-13])) if len(vol_hist) >= 26 else recent
    growth_proxy = (prev - recent) / prev if prev > 1e-10 else 0.0
    if growth_proxy >= cfg.finance.bond_target_quarterly_growth:
        coupon -= cfg.finance.bond_step_down_bps / 10000.0
    else:
        coupon += cfg.finance.bond_step_up_bps / 10000.0

    return {
        "derivative": {
            "vol_threshold": threshold,
            "triggered": triggered,
            "payout_ratio": payout_ratio,
            "payout_amount": payout,
        },
        "nature_bond": {
            "base_coupon": cfg.finance.bond_base_coupon,
            "final_coupon": coupon,
            "quarterly_growth_proxy": growth_proxy,
        },
    }



# キャッシュ生成・読み込

def build_data_cache(cfg: PipelineConfig, out_root: Path) -> Dict[str, Any]:
    # raw配下に簡易データを書き出す
    raw = out_root / "raw"
    ensure_dir(raw)
    weekly_rows = synthetic_weekly_rows(cfg)

    write_json(raw / "sentinel2_items.json", {"features": []})
    write_json(raw / "sentinel1_items.json", {"features": []})
    write_json(raw / "sentinel2_thumbnails.json", [])
    write_json(raw / "sentinel1_thumbnails.json", [])
    write_json(raw / "gbif_occurrences.json", {"results": []})
    write_json(raw / "gbif_images.json", [])
    write_json(raw / "weather_daily.json", {"daily": {}})
    write_json(raw / "weekly_rows.json", weekly_rows)
    return {"weekly_rows": weekly_rows}


def load_cached_data(out_root: Path) -> Dict[str, Any]:
    # 週次行キャッシュを読み込む
    p = out_root / "raw" / "weekly_rows.json"
    return {"weekly_rows": read_json(p) if p.exists() else []}


# モデリング実行

def run_modeling_from_data(cfg: PipelineConfig, out_root: Path, data_bundle: Dict[str, Any]) -> Dict[str, Any]:
    # データがなければ合成データで補完
    weekly_rows = data_bundle.get("weekly_rows", []) or synthetic_weekly_rows(cfg)

    # インデックス計算
    idx_pack = build_natural_capital_index(weekly_rows, cfg.model.lookback_weeks)
    idx_rows = idx_pack["rows"]

    # GARCH推定と予測
    returns = np.array([r["ecological_return"] for r in idx_rows], dtype=np.float64)
    g = Garch11().fit(returns)
    g_fore = Garch11.forecast(g, cfg.model.garch_forecast_horizon)

    # Transformer代替: 直近4点平均を水平予測
    tf_like_fore = np.full(cfg.model.garch_forecast_horizon, float(np.mean(g.sigma2[-4:])), dtype=np.float64)

    # 金融シグナル
    signals = build_finance_signals(cfg, g.sigma2, tf_like_fore)

    # 出力保存
    derived = out_root / "derived"
    ensure_dir(derived)

    headers = list(weekly_rows[0].keys())
    write_csv(derived / "features_weekly.csv", headers, [[r[k] for k in headers] for r in weekly_rows])

    write_csv(
        derived / "natural_capital_index.csv",
        ["week_start", "natural_capital_index", "ecological_return"],
        [[r["week_start"], r["natural_capital_index"], r["ecological_return"]] for r in idx_rows],
    )

    write_csv(
        derived / "volatility_forecast.csv",
        ["horizon_week", "garch_sigma2", "transformer_sigma2"],
        [[i + 1, float(g_fore[i]), float(tf_like_fore[i])] for i in range(cfg.model.garch_forecast_horizon)],
    )

    write_json(derived / "finance_signals.json", signals)
    write_json(derived / "model_meta.json", {"garch": {"mu": g.mu, "omega": g.omega, "alpha": g.alpha, "beta": g.beta}})

    # まとめ指標
    summary = {
        "n_weeks": len(idx_rows),
        "last_index": float(idx_rows[-1]["natural_capital_index"]),
        "last_return": float(idx_rows[-1]["ecological_return"]),
        "last_garch_sigma2": float(g.sigma2[-1]),
        "max_forecast_sigma2": float(np.max(tf_like_fore)),
    }
    write_json(derived / "summary.json", summary)
    return summary



# CLI

def parse_args() -> argparse.Namespace:
    # 実行モードと設定ファイルを受け取る
    p = argparse.ArgumentParser()
    p.add_argument("command", choices=["run", "fetch", "from-cache"])
    p.add_argument("--config", required=True)
    return p.parse_args()


def main() -> None:
    args = parse_args()
    cfg = load_config(Path(args.config))
    out_root = Path(cfg.data.cache_dir) / cfg.data.outputs_subdir

    # fetch はデータ作成だけ
    if args.command == "fetch":
        build_data_cache(cfg, out_root)
        print(f"[ok] cached raw data at: {out_root / 'raw'}")
        return

    # from-cache はキャッシュ読み込み（なければ作る）
    if args.command == "from-cache":
        data = load_cached_data(out_root)
        if not data["weekly_rows"]:
            data = build_data_cache(cfg, out_root)
    else:
        # run は毎回作り直し
        data = build_data_cache(cfg, out_root)

    # モデル実行
    summary = run_modeling_from_data(cfg, out_root, data)
    print(json.dumps(summary, ensure_ascii=True, indent=2))
    print(f"[ok] outputs at: {out_root}")


if __name__ == "__main__":
    main()


## Output 3
`Overwriting /content/drive/MyDrive/pj-TERM/natural_capital_pipeline.py`


## Input 4


In [None]:
%%writefile /content/drive/MyDrive/pj-TERM/config_example.json
{
  "period": {
    "start_date": "2024-01-01",
    "end_date": "2025-12-31"
  },
  "data": {
    "cache_dir": "./cache",
    "outputs_subdir": "fujisawa_demo"
  },
  "model": {
    "lookback_weeks": 7,
    "garch_forecast_horizon": 8
  }
}


## Output 4
`Overwriting /content/drive/MyDrive/pj-TERM/config_example.json`


## Input 5


In [None]:
!python /content/drive/MyDrive/pj-TERM/natural_capital_pipeline.py run --config /content/drive/MyDrive/pj-TERM/config_example.json
!python /content/drive/MyDrive/pj-TERM/natural_capital_pipeline.py from-cache --config /content/drive/MyDrive/pj-TERM/config_example.json


## Output 5
サマリJSONが2回表示され、`[ok] outputs at: cache/fujisawa_demo`

```json
{
  "n_weeks": 105,
  "last_index": 100.67292210309215,
  "last_return": 0.008085509547213057,
  "last_garch_sigma2": 4.231563940382314e-05,
  "max_forecast_sigma2": 3.762811892237847e-05
}
```



## Input 6


In [None]:
import pandas as pd
import matplotlib.pyplot as plt
from pathlib import Path

base = Path("/content/drive/MyDrive/pj-TERM/cache/fujisawa_demo/derived")

idx = pd.read_csv(base / "natural_capital_index.csv")
vol = pd.read_csv(base / "volatility_forecast.csv")

idx["week_start"] = pd.to_datetime(idx["week_start"])

fig, axes = plt.subplots(2, 1, figsize=(12, 8), constrained_layout=True)

# 1) 自然資本インデックス + リターン
ax1 = axes[0]
ax1.plot(idx["week_start"], idx["natural_capital_index"], label="Natural Capital Index")
ax1.set_title("Natural Capital Index")
ax1.set_xlabel("Week")
ax1.set_ylabel("Index Level")
ax1.grid(True, alpha=0.3)

ax1b = ax1.twinx()
ax1b.plot(idx["week_start"], idx["ecological_return"], linestyle="--", label="Ecological Return")
ax1b.set_ylabel("Return")

lines1, labels1 = ax1.get_legend_handles_labels()
lines2, labels2 = ax1b.get_legend_handles_labels()
ax1.legend(lines1 + lines2, labels1 + labels2, loc="best")

# 2) 予測ボラティリティ
ax2 = axes[1]
ax2.plot(vol["horizon_week"], vol["garch_sigma2"], marker="o", label="GARCH Sigma^2")
ax2.plot(vol["horizon_week"], vol["transformer_sigma2"], marker="s", label="Transformer Sigma^2")
ax2.set_title("Volatility Forecast")
ax2.set_xlabel("Horizon Week")
ax2.set_ylabel("Variance")
ax2.grid(True, alpha=0.3)
ax2.legend(loc="best")

plt.show()


## Output 6
`Natural Capital Index` と `Volatility Forecast` の2段プロット


## Input 7


In [None]:
# Colabでこの1セルを実行してください（Python 3.12対応版）
import sys
import importlib.util
from pathlib import Path
import numpy as np

SCRIPT = "/content/drive/MyDrive/pj-TERM/natural_capital_pipeline.py"
CONFIG = "/content/drive/MyDrive/pj-TERM/config_example.json"

# --- 安全にモジュール読込（dataclassエラー回避） ---
sys.modules.pop("ncp", None)
spec = importlib.util.spec_from_file_location("ncp", SCRIPT)
ncp = importlib.util.module_from_spec(spec)
sys.modules["ncp"] = ncp
spec.loader.exec_module(ncp)

cfg = ncp.load_config(Path(CONFIG))
rows = ncp.synthetic_weekly_rows(cfg)

def build_returns(rows, return_scale, return_clip):
    feature_names = [k for k in rows[0].keys() if k != "week_start"]
    x = np.array([[float(r[k]) for k in feature_names] for r in rows], dtype=np.float64)
    mu = x.mean(axis=0)
    sd = x.std(axis=0)
    sd[sd < 1e-9] = 1.0
    z = (x - mu) / sd
    score = z.mean(axis=1)

    ret = np.zeros(len(score), dtype=np.float64)
    if len(score) > 1:
        ret[1:] = np.clip(return_scale * np.diff(score), -return_clip, return_clip)
    return ret

def one_step_var_rmse(returns, alpha, beta):
    n = len(returns)
    split = max(12, int(n * 0.7))
    if split >= n - 2:
        return np.inf

    if alpha <= 0 or beta <= 0 or alpha + beta >= 0.995:
        return np.inf

    tr = returns[:split]
    va = returns[split:]

    mu = float(np.mean(tr))
    e = tr - mu
    var0 = float(np.var(e)) + 1e-8
    omega = max(1e-10, var0 * (1.0 - alpha - beta))

    s2 = var0
    e_prev = float(e[-1]) if len(e) else 0.0
    pred = []
    for r in va:
        s2 = max(1e-10, omega + alpha * (e_prev ** 2) + beta * s2)
        pred.append(s2)
        e_prev = float(r - mu)

    realized = (va - mu) ** 2
    return float(np.sqrt(np.mean((np.array(pred) - realized) ** 2)))

# ベースライン
baseline = {
    "return_scale": 0.08,
    "return_clip": 0.30,
    "alpha": 0.08,
    "beta": 0.90,
}
r0 = build_returns(rows, baseline["return_scale"], baseline["return_clip"])
baseline_rmse = one_step_var_rmse(r0, baseline["alpha"], baseline["beta"])

# ランダムサーチ
rng = np.random.default_rng(42)
best = {"rmse": np.inf}

for _ in range(600):
    p = {
        "return_scale": float(rng.uniform(0.03, 0.14)),
        "return_clip": float(rng.uniform(0.10, 0.40)),
        "alpha": float(rng.uniform(0.03, 0.20)),
        "beta": float(rng.uniform(0.70, 0.96)),
    }
    if p["alpha"] + p["beta"] >= 0.995:
        continue

    rr = build_returns(rows, p["return_scale"], p["return_clip"])
    rmse = one_step_var_rmse(rr, p["alpha"], p["beta"])
    if rmse < best["rmse"]:
        best = {**p, "rmse": rmse}

print("baseline:", {**baseline, "rmse": baseline_rmse})
print("best:", best)

print("\n--- 置換用 ---")
print(f'ret[1:] = np.clip({best["return_scale"]:.6f} * np.diff(score), -{best["return_clip"]:.6f}, {best["return_clip"]:.6f})')
print(f'g = Garch11(alpha={best["alpha"]:.6f}, beta={best["beta"]:.6f}).fit(returns)')


## Output 7
`baseline` と `best`、置換用2行が表示

- baseline RMSE: `2.7193715012009834e-05`
- best params:
  - `return_scale = 0.030182677844228057`
  - `return_clip = 0.13362111129960927`
  - `alpha = 0.1766699758359493`
  - `beta = 0.7003205962530961`
  - `rmse = 3.240149354737738e-06`

置換用の表示例:
- `ret[1:] = np.clip(0.030183 * np.diff(score), -0.133621, 0.133621)`
- `g = Garch11(alpha=0.176670, beta=0.700321).fit(returns)`



## Input 8


In [None]:
from pathlib import Path

p = Path("/content/drive/MyDrive/pj-TERM/natural_capital_pipeline.py")
s = p.read_text(encoding="utf-8")

s = s.replace(
    "ret[1:] = np.clip(0.08 * np.diff(score), -0.3, 0.3)",
    "ret[1:] = np.clip(0.030183 * np.diff(score), -0.133621, 0.133621)",
)

s = s.replace(
    "g = Garch11().fit(returns)",
    "g = Garch11(alpha=0.176670, beta=0.700321).fit(returns)",
)

p.write_text(s, encoding="utf-8")
print("updated:", p)


## Output 8
`updated: /content/drive/MyDrive/pj-TERM/natural_capital_pipeline.py`


## Input 9


In [None]:
import sys, importlib.util
from pathlib import Path
import numpy as np

SCRIPT = "/content/drive/MyDrive/pj-TERM/natural_capital_pipeline.py"
CONFIG = "/content/drive/MyDrive/pj-TERM/config_example.json"

sys.modules.pop("ncp", None)
spec = importlib.util.spec_from_file_location("ncp", SCRIPT)
ncp = importlib.util.module_from_spec(spec)
sys.modules["ncp"] = ncp
spec.loader.exec_module(ncp)

cfg = ncp.load_config(Path(CONFIG))
rows = ncp.synthetic_weekly_rows(cfg)

# 現在の式に合わせたreturns
feature_names = [k for k in rows[0].keys() if k != "week_start"]
x = np.array([[float(r[k]) for k in feature_names] for r in rows], dtype=np.float64)
mu = x.mean(axis=0); sd = x.std(axis=0); sd[sd < 1e-9] = 1.0
score = ((x - mu) / sd).mean(axis=1)
ret = np.zeros(len(score), dtype=np.float64)
ret[1:] = np.clip(0.030183 * np.diff(score), -0.133621, 0.133621)

alpha, beta = 0.176670, 0.700321

def rmse_on_slice(r, train_end):
    tr = r[:train_end]
    va = r[train_end:]
    if len(tr) < 20 or len(va) < 5:
        return np.nan
    m = float(np.mean(tr))
    e = tr - m
    v0 = float(np.var(e)) + 1e-8
    omega = max(1e-10, v0 * (1.0 - alpha - beta))
    s2 = v0
    e_prev = float(e[-1])

    pred = []
    for rr in va:
        s2 = max(1e-10, omega + alpha * (e_prev**2) + beta * s2)
        pred.append(s2)
        e_prev = float(rr - m)
    realized = (va - m) ** 2
    return float(np.sqrt(np.mean((np.array(pred) - realized) ** 2)))

cuts = [int(len(ret)*k) for k in [0.55, 0.60, 0.65, 0.70, 0.75]]
scores = [rmse_on_slice(ret, c) for c in cuts]
scores = [s for s in scores if np.isfinite(s)]

print("walk-forward RMSEs:", scores)
print("mean RMSE:", float(np.mean(scores)))


## Output 9
`walk-forward RMSEs` と `mean RMSE` が表示

- walk-forward RMSEs:
  - `2.8806368007381736e-06`
  - `2.911673340748027e-06`
  - `3.0862704089505656e-06`
  - `3.2402189833112497e-06`
  - `3.4362797144937282e-06`
- mean RMSE: `3.1110158496483492e-06`



## Input 10


In [None]:
import json
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

base = Path("/content/drive/MyDrive/pj-TERM/cache/fujisawa_demo/derived")

idx = pd.read_csv(base / "natural_capital_index.csv")
vol = pd.read_csv(base / "volatility_forecast.csv")
signals = json.loads((base / "finance_signals.json").read_text(encoding="utf-8"))

idx["week_start"] = pd.to_datetime(idx["week_start"])
idx = idx.sort_values("week_start").reset_index(drop=True)
idx["ret_vol_12w"] = idx["ecological_return"].rolling(12).std()

thr = signals["derivative"]["vol_threshold"]
triggered = int(signals["derivative"]["triggered"])
payout_ratio_pct = 100 * signals["derivative"]["payout_ratio"]
base_coupon_pct = 100 * signals["nature_bond"]["base_coupon"]
final_coupon_pct = 100 * signals["nature_bond"]["final_coupon"]

fig, axes = plt.subplots(2, 2, figsize=(14, 9), constrained_layout=True)

# 1) Natural Capital Index
axes[0, 0].plot(idx["week_start"], idx["natural_capital_index"], lw=2)
axes[0, 0].set_title("Natural Capital Index")
axes[0, 0].set_xlabel("Week")
axes[0, 0].set_ylabel("Index")
axes[0, 0].grid(alpha=0.3)

# 2) Ecological Return + 12-week vol
ax2 = axes[0, 1]
ax2.plot(idx["week_start"], idx["ecological_return"], lw=1.5, label="Ecological Return")
ax2.set_title("Ecological Return")
ax2.set_xlabel("Week")
ax2.set_ylabel("Return")
ax2.grid(alpha=0.3)

ax2b = ax2.twinx()
ax2b.plot(idx["week_start"], idx["ret_vol_12w"], ls="--", lw=1.5, label="12w Volatility")
ax2b.set_ylabel("Volatility (std)")

l1, lb1 = ax2.get_legend_handles_labels()
l2, lb2 = ax2b.get_legend_handles_labels()
ax2.legend(l1 + l2, lb1 + lb2, loc="upper right")

# 3) Forecast variance
axes[1, 0].plot(vol["horizon_week"], vol["garch_sigma2"], marker="o", label="GARCH")
axes[1, 0].plot(vol["horizon_week"], vol["transformer_sigma2"], marker="s", label="Transformer-like")
axes[1, 0].axhline(thr, color="red", ls="--", label=f"Threshold={thr:.2e}")
axes[1, 0].set_title("Volatility Forecast (Sigma^2)")
axes[1, 0].set_xlabel("Horizon Week")
axes[1, 0].set_ylabel("Variance")
axes[1, 0].grid(alpha=0.3)
axes[1, 0].legend()

# 4) Finance summary bars
labels = ["Payout Ratio(%)", "Base Coupon(%)", "Final Coupon(%)", "Triggered(0/1)"]
vals = [payout_ratio_pct, base_coupon_pct, final_coupon_pct, triggered]
axes[1, 1].bar(labels, vals)
axes[1, 1].set_title("Finance Signals Summary")
axes[1, 1].grid(axis="y", alpha=0.3)
axes[1, 1].tick_params(axis="x", rotation=20)

plt.show()


## Output 10
2x2ダッシュボードプロット

（チューニング後に再実行した結果の読み取り例）
- `Triggered = 0`
- `Payout Ratio = 0%`
- `Base Coupon = 4.5%`
- `Final Coupon = 4.9%`
- `Threshold ≈ 7.34e-06`



## Input 11


In [None]:
import pandas as pd
import matplotlib.pyplot as plt
from pathlib import Path

feat = pd.read_csv(Path("/content/drive/MyDrive/pj-TERM/cache/fujisawa_demo/derived/features_weekly.csv"))
feat["week_start"] = pd.to_datetime(feat["week_start"])

num_cols = [c for c in feat.columns if c != "week_start"]
z = feat[num_cols].copy()
z = (z - z.mean()) / z.std().replace(0, 1)

plt.figure(figsize=(14, 5))
for c in num_cols:
    plt.plot(feat["week_start"], z[c], label=c, alpha=0.9)
plt.title("Weekly Features (Z-score normalized)")
plt.xlabel("Week")
plt.ylabel("Z-score")
plt.grid(alpha=0.3)
plt.legend(ncol=3, fontsize=9)
plt.show()


## Output 11
`Weekly Features (Z-score normalized)` プロット


## Input 12
画像を全て1回で保存（2panel / 2x2 / weekly features）


In [None]:
import json
from pathlib import Path
import pandas as pd
import matplotlib.pyplot as plt

base = Path("/content/drive/MyDrive/pj-TERM/cache/fujisawa_demo/derived")
out_img = base / "figures"
out_img.mkdir(parents=True, exist_ok=True)

# Load data
idx = pd.read_csv(base / "natural_capital_index.csv")
vol = pd.read_csv(base / "volatility_forecast.csv")
feat = pd.read_csv(base / "features_weekly.csv")
signals = json.loads((base / "finance_signals.json").read_text(encoding="utf-8"))

idx["week_start"] = pd.to_datetime(idx["week_start"])
idx = idx.sort_values("week_start").reset_index(drop=True)
idx["ret_vol_12w"] = idx["ecological_return"].rolling(12).std()
feat["week_start"] = pd.to_datetime(feat["week_start"])

# Figure 1: 2-panel
fig1, axes = plt.subplots(2, 1, figsize=(12, 8), constrained_layout=True)
ax1 = axes[0]
ax1.plot(idx["week_start"], idx["natural_capital_index"], label="Natural Capital Index")
ax1.set_title("Natural Capital Index")
ax1.set_xlabel("Week")
ax1.set_ylabel("Index Level")
ax1.grid(True, alpha=0.3)
ax1b = ax1.twinx()
ax1b.plot(idx["week_start"], idx["ecological_return"], linestyle="--", label="Ecological Return")
ax1b.set_ylabel("Return")
l1, lb1 = ax1.get_legend_handles_labels()
l2, lb2 = ax1b.get_legend_handles_labels()
ax1.legend(l1 + l2, lb1 + lb2, loc="best")
ax2 = axes[1]
ax2.plot(vol["horizon_week"], vol["garch_sigma2"], marker="o", label="GARCH Sigma^2")
ax2.plot(vol["horizon_week"], vol["transformer_sigma2"], marker="s", label="Transformer Sigma^2")
ax2.set_title("Volatility Forecast")
ax2.set_xlabel("Horizon Week")
ax2.set_ylabel("Variance")
ax2.grid(True, alpha=0.3)
ax2.legend(loc="best")
path1 = out_img / "index_volatility_2panel.png"
fig1.savefig(path1, dpi=150, bbox_inches="tight")
plt.show()

# Figure 2: 2x2 dashboard
thr = signals["derivative"]["vol_threshold"]
triggered = int(signals["derivative"]["triggered"])
payout_ratio_pct = 100 * signals["derivative"]["payout_ratio"]
base_coupon_pct = 100 * signals["nature_bond"]["base_coupon"]
final_coupon_pct = 100 * signals["nature_bond"]["final_coupon"]

fig2, axes = plt.subplots(2, 2, figsize=(14, 9), constrained_layout=True)
axes[0, 0].plot(idx["week_start"], idx["natural_capital_index"], lw=2)
axes[0, 0].set_title("Natural Capital Index")
axes[0, 0].set_xlabel("Week")
axes[0, 0].set_ylabel("Index")
axes[0, 0].grid(alpha=0.3)
axr = axes[0, 1]
axr.plot(idx["week_start"], idx["ecological_return"], lw=1.5, label="Ecological Return")
axr.set_title("Ecological Return")
axr.set_xlabel("Week")
axr.set_ylabel("Return")
axr.grid(alpha=0.3)
axrb = axr.twinx()
axrb.plot(idx["week_start"], idx["ret_vol_12w"], ls="--", lw=1.5, label="12w Volatility")
axrb.set_ylabel("Volatility (std)")
a, b = axr.get_legend_handles_labels()
c, d = axrb.get_legend_handles_labels()
axr.legend(a + c, b + d, loc="upper right")
axes[1, 0].plot(vol["horizon_week"], vol["garch_sigma2"], marker="o", label="GARCH")
axes[1, 0].plot(vol["horizon_week"], vol["transformer_sigma2"], marker="s", label="Transformer-like")
axes[1, 0].axhline(thr, color="red", ls="--", label=f"Threshold={thr:.2e}")
axes[1, 0].set_title("Volatility Forecast (Sigma^2)")
axes[1, 0].set_xlabel("Horizon Week")
axes[1, 0].set_ylabel("Variance")
axes[1, 0].grid(alpha=0.3)
axes[1, 0].legend()
labels = ["Payout Ratio(%)", "Base Coupon(%)", "Final Coupon(%)", "Triggered(0/1)"]
vals = [payout_ratio_pct, base_coupon_pct, final_coupon_pct, triggered]
axes[1, 1].bar(labels, vals)
axes[1, 1].set_title("Finance Signals Summary")
axes[1, 1].grid(axis="y", alpha=0.3)
axes[1, 1].tick_params(axis="x", rotation=20)
path2 = out_img / "dashboard_2x2.png"
fig2.savefig(path2, dpi=150, bbox_inches="tight")
plt.show()

# Figure 3: weekly features
num_cols = [c for c in feat.columns if c != "week_start"]
z = feat[num_cols].copy()
z = (z - z.mean()) / z.std().replace(0, 1)
fig3 = plt.figure(figsize=(14, 5))
for c in num_cols:
    plt.plot(feat["week_start"], z[c], label=c, alpha=0.9)
plt.title("Weekly Features (Z-score normalized)")
plt.xlabel("Week")
plt.ylabel("Z-score")
plt.grid(alpha=0.3)
plt.legend(ncol=3, fontsize=9)
path3 = out_img / "weekly_features_zscore.png"
fig3.savefig(path3, dpi=150, bbox_inches="tight")
plt.show()

print("saved:")
print(path1)
print(path2)
print(path3)



## Output 12
3枚の図を表示し、以下に保存
- `/content/drive/MyDrive/pj-TERM/cache/fujisawa_demo/derived/figures/index_volatility_2panel.png`
- `/content/drive/MyDrive/pj-TERM/cache/fujisawa_demo/derived/figures/dashboard_2x2.png`
- `/content/drive/MyDrive/pj-TERM/cache/fujisawa_demo/derived/figures/weekly_features_zscore.png`

