## TimesFM（timesfm）公開API（関数・クラス）一覧を自動収集

`inspect`（内省）と `pkgutil`（パッケージ走査）で、`timesfm` パッケージの公開API（関数・クラス）を列挙し、`pandas.DataFrame` に格納して表示します。

- 収集対象：トップレベル + サブモジュール（importできる範囲）
- import失敗（オプション依存など）はログとして別途確認可能


In [None]:
%%bash
python "C:/moinfo/libs/timesfm/03_scripts/introspect_timesfm_features.py" \
  --pkg timesfm \
  --out "C:/moinfo/libs/timesfm/04_outputs/api/timesfm_public_api.json" \
  --print


## TimesFM（timesfm）公開API（関数・クラス）を内省して DataFrame 化

`inspect`（内省）と `pkgutil`（パッケージ走査）で、`timesfm` の公開API（関数・クラス）を抽出し、`pandas.DataFrame` に格納して表示します。

- 収集対象：トップレベル + import可能なサブモジュール
- import失敗（オプション依存など）は `modules_failed` に記録


In [None]:
import sys
import pandas as pd
from IPython.display import display

# 共通モジュールを import できるようにする
COMMON_DIR = r"C:\moinfo\common"
if COMMON_DIR not in sys.path:
    sys.path.insert(0, COMMON_DIR)

from moinfo_introspect.public_api import collect_public_api, to_dataframe

# 公開APIを収集
data = collect_public_api("timesfm", include_submodules=True)

# DataFrame化
df = to_dataframe(data)

# 概要（メトリクス）
summary_df = pd.DataFrame({
    "metric": ["api_items", "modules_ok", "modules_failed"],
    "value": [len(data["api"]), len(data["modules_ok"]), len(data["modules_failed"])]
})
display(summary_df)

# 種別内訳（class/function）
display(df["kind"].value_counts(dropna=False).to_frame("count"))

# テーブル本体（重いなら df.head(50) に変更）
display(df)


## 生成された JSON を読み込み、DataFrame として可視化

CLIで書き出した `timesfm_public_api.json` を読み込んで表示します（NotebookとCLIの結果一致確認用）。


In [None]:
import json
import pandas as pd
from IPython.display import display

json_path = r"C:\moinfo\libs\timesfm\04_outputs\api\timesfm_public_api.json"

with open(json_path, "r", encoding="utf-8") as f:
    data_from_json = json.load(f)

df2 = pd.DataFrame(data_from_json.get("api", []))

display(pd.DataFrame({
    "metric": ["api_items", "modules_ok", "modules_failed"],
    "value": [len(df2), len(data_from_json.get("modules_ok", [])), len(data_from_json.get("modules_failed", []))]
}))
display(df2["kind"].value_counts(dropna=False).to_frame("count"))
display(df2)


# TimesFM 機能（公開API）イントロスペクト

このノートは `timesfm` パッケージの関数・クラス・メソッドを機械的に列挙し、`pandas.DataFrame` に格納します。  
副作用（import時に重い依存関係がロードされる等）があり得るため、必要なら「サブモジュール走査」をオフにします。


In [1]:
from pathlib import Path
import sys

ROOT = Path(r"C:\moinfo")
SRC_DIR = ROOT / "libs" / "timesfm" / "02_src"

# ローカルモジュール（moinfo_timesfm_ext）を import できるようにする
if str(SRC_DIR) not in sys.path:
    sys.path.insert(0, str(SRC_DIR))

import pandas as pd

# timesfm がインストールされているか確認（失敗したら pip/conda 側の問題）
import importlib
timesfm = importlib.import_module("timesfm")

print("timesfm module:", timesfm.__name__)
print("timesfm file  :", getattr(timesfm, "__file__", "n/a"))
print("python        :", sys.version)


timesfm module: timesfm
timesfm file  : c:\Users\hashimoto.ryohei\miniconda3\envs\kaiseki\Lib\site-packages\timesfm\__init__.py
python        : 3.11.13 | packaged by Anaconda, Inc. | (main, Jun  5 2025, 13:03:15) [MSC v.1929 64 bit (AMD64)]


## API 解析（関数・クラス・メソッド一覧）→ DataFrame 化


In [None]:
from pathlib import Path
import sys

ROOT = Path(r"C:\moinfo")
PKG_ROOT = ROOT / "libs" / "timesfm" / "02_src" / "moinfo_timesfm_ext"  # ★ここが重要

if str(PKG_ROOT) not in sys.path:
    sys.path.insert(0, str(PKG_ROOT))

# 動作確認
import moinfo_timesfm_ext
print("moinfo_timesfm_ext:", moinfo_timesfm_ext.__file__)


In [None]:
from moinfo_timesfm_ext.moinfo_timesfm_ext.introspect_timesfm import collect_public_api, save_api_df

df = collect_public_api(
    package_name="timesfm",
    include_submodules=True,
    include_private=False,
    include_class_members=True,
    max_doc_chars=250,
)

df.to_csv(r"C:\moinfo\libs\timesfm\04_outputs\tables\timesfm_public_api_from_ext.csv", index=False)


## TimesFM 公開APIの概要分析・分類・可視化

- 種別（class/function/method/attribute/import_error）の分布
- モジュール階層ごとの規模（どこに機能が集中しているか）
- キーワードベースの機能分類（forecast / config / torch / util など）
- ヒートマップで「種別×分類」の密度を見る
- （任意）docstring を TF-IDF + k-means でクラスタリングして特徴語を抽出


In [None]:
# 任意：doc_summary を TF-IDF + k-means でクラスタリング
# 目的：ルール分類では拾いきれない「自然な機能グループ」を見つける

try:
    from sklearn.feature_extraction.text import TfidfVectorizer
    from sklearn.cluster import KMeans

    texts = df2["doc_summary"].fillna("").astype(str)
    # 空文は落とす（クラスタリングのノイズになる）
    mask = texts.str.len() > 20
    texts2 = texts[mask]
    df_text = df2.loc[mask].copy()

    vectorizer = TfidfVectorizer(
        lowercase=True,
        stop_words="english",
        max_features=5000,
        ngram_range=(1, 2),
    )
    X = vectorizer.fit_transform(texts2)

    k = 8  # ざっくり。増やすと細かく割れる
    km = KMeans(n_clusters=k, n_init=10, random_state=0)
    labels = km.fit_predict(X)
    df_text["cluster"] = labels

    # クラスタごとの代表語（上位）
    terms = np.array(vectorizer.get_feature_names_out())
    top_words = []
    for i in range(k):
        center = km.cluster_centers_[i]
        top_idx = center.argsort()[::-1][:12]
        top_words.append(", ".join(terms[top_idx]))

    cluster_sizes = df_text["cluster"].value_counts().sort_index()
    summary = pd.DataFrame({
        "cluster": range(k),
        "size": [cluster_sizes.get(i, 0) for i in range(k)],
        "top_terms": top_words,
    }).sort_values("size", ascending=False)

    display(summary)

    # 可視化：クラスタサイズ
    plt.figure()
    plt.bar(summary["cluster"].astype(str), summary["size"])
    plt.title("Docstring clusters (k-means) sizes")
    plt.tight_layout()
    plt.savefig(OUT_FIG / "timesfm_api_doc_clusters_sizes.png", dpi=200)
    plt.show()

    # 保存
    summary.to_csv(OUT_TBL / "timesfm_api_doc_clusters_summary.csv", index=False, encoding="utf-8-sig")
    df_text.to_csv(OUT_TBL / "timesfm_api_with_doc_clusters.csv", index=False, encoding="utf-8-sig")

    print("Saved cluster outputs to:", OUT_TBL)

except Exception as e:
    print("Clustering skipped (scikit-learn not available or other error):", repr(e))


In [None]:
import plotly.express as px

# 1) kind 分布（class/method/function...）
kind_counts = df2["kind"].value_counts().reset_index()
kind_counts.columns = ["kind", "count"]

fig = px.bar(kind_counts, x="kind", y="count", title="TimesFM API: kind distribution")
fig.update_layout(xaxis_tickangle=-45)
fig.show()
fig.write_html(str(OUT_FIG / "timesfm_kind_distribution.html"))

# 2) category 分布（ルール分類）
cat_counts = df2["category"].value_counts().reset_index()
cat_counts.columns = ["category", "count"]

fig = px.bar(cat_counts, x="category", y="count", title="TimesFM API: category distribution (rule-based)")
fig.update_layout(xaxis_tickangle=-45)
fig.show()
fig.write_html(str(OUT_FIG / "timesfm_category_distribution.html"))

# 3) 上位モジュール（どこに機能が集中してる？）
top_modules = (
    df2.groupby("module_l2")
       .size()
       .sort_values(ascending=False)
       .head(25)
       .reset_index(name="count")
)

fig = px.bar(top_modules[::-1], x="count", y="module_l2", orientation="h",
             title="Top modules (level2) by API surface")
fig.show()
fig.write_html(str(OUT_FIG / "timesfm_top_modules_l2.html"))

# 4) ヒートマップ：category × kind
pivot = df2.pivot_table(index="category", columns="kind", values="qualname", aggfunc="count", fill_value=0)
pivot = pivot.loc[pivot.sum(axis=1).sort_values(ascending=False).index, :]  # 行を多い順

fig = px.imshow(
    pivot,
    title="Heatmap: category × kind (counts)",
    labels=dict(x="kind", y="category", color="count"),
    aspect="auto",
)
fig.show()
fig.write_html(str(OUT_FIG / "timesfm_heatmap_category_kind.html"))

print("Saved HTML figures to:", OUT_FIG)


In [None]:
from __future__ import annotations

import re
from pathlib import Path
import pandas as pd

ROOT = Path(r"C:\moinfo")
OUT_TBL = ROOT / "libs" / "timesfm" / "04_outputs" / "tables"
OUT_TBL.mkdir(parents=True, exist_ok=True)

# --- どのDFに "cluster" が入っているか吸収（df_text があれば優先）---
df_clustered = None
if "df_text" in globals() and isinstance(df_text, pd.DataFrame) and "cluster" in df_text.columns:
    df_clustered = df_text.copy()
elif "df2" in globals() and isinstance(df2, pd.DataFrame) and "cluster" in df2.columns:
    df_clustered = df2.copy()
elif "df" in globals() and isinstance(df, pd.DataFrame) and "cluster" in df.columns:
    df_clustered = df.copy()

if df_clustered is None:
    raise RuntimeError("cluster 列が見つかりません。先に TF-IDF+kmeans のセルを実行して df_text を作ってください。")

# --- summary（cluster/size/top_terms）がある前提 ---
if "summary" not in globals():
    raise RuntimeError("summary(DataFrame: cluster/size/top_terms) が見つかりません。先にクラスタ要約を作ってください。")

# --- top_terms からテーマ推定（“テーブルから取得”のため、top_terms依存で判定）---
def infer_theme(top_terms: str) -> tuple[str, str]:
    t = (top_terms or "").lower()

    if re.search(r"\bcompile\b|share_mem|torch", t):
        return ("Torch統合・高速化", "モデルのコンパイル(compile)やTorch実行系（高速デコード等）")
    if re.search(r"state_dict|hook", t):
        return ("状態管理・フック", "重み(state_dict)の保存/読込、Hook(フック)で挙動を差し込む")
    if re.search(r"yields|iterator|return iterator", t):
        return ("探索・列挙", "モジュール/サブモジュール/パラメータをイテレータで列挙する")
    if re.search(r"parameters|buffers", t):
        return ("パラメータ・バッファ", "parameters/buffers の取得・操作（学習/推論の材料）")
    if re.search(r"modifies|module place|in place", t):
        return ("in-place変換", "dtype/device 変更など“その場で”モジュールを書き換える操作")
    if re.search(r"target exists|exists|error", t):
        return ("検証・例外", "存在チェックやエラー処理（主にユーティリティ/検証）")
    if re.search(r"evaluation mode|evaluation|mode", t):
        return ("モード切替", "train/eval など推論モード/学習モードの切り替え")
    if re.search(r"child|accessed|using given|get_", t):
        return ("階層アクセス", "子モジュール/特定パラメータへ名前でアクセス（get_submodule等）")

    return ("その他", "雑多な補助機能")

# --- 各クラスタの代表API（qualname）を抽出 ---
# 代表の選び方：doc_summaryが長い＝説明がありそう、さらに timesfm/forecast/compile を優先
def pick_examples(g: pd.DataFrame, k: int = 6) -> pd.DataFrame:
    g = g.copy()
    txt = (g["qualname"].fillna("") + " " + g["doc_summary"].fillna("")).str.lower()
    score = (
        g["doc_summary"].fillna("").str.len()
        + txt.str.contains("forecast").astype(int) * 200
        + txt.str.contains("compile").astype(int) * 150
        + txt.str.contains("timesfm").astype(int) * 80
    )
    g["_score"] = score
    return g.sort_values("_score", ascending=False).head(k)

examples = (
    df_clustered.groupby("cluster", group_keys=False)
    .apply(pick_examples, k=6)
    .loc[:, ["cluster", "qualname", "kind", "module", "doc_summary"]]
)

# --- 概要テーブル作成 ---
ov = summary.copy()
ov["theme"], ov["what_you_can_do"] = zip(*ov["top_terms"].apply(infer_theme))

# 代表APIを文字列にまとめる
ex_str = (
    examples.assign(ex=lambda d: d["qualname"] + " [" + d["kind"].astype(str) + "]")
    .groupby("cluster")["ex"]
    .apply(lambda s: "\n".join(s.tolist()))
    .reset_index()
    .rename(columns={"ex": "representative_api"})
)

overview = ov.merge(ex_str, on="cluster", how="left").sort_values("size", ascending=False)

display(overview)

# 保存（フルパス）
out_csv = OUT_TBL / "timesfm_cluster_overview.csv"
overview.to_csv(out_csv, index=False, encoding="utf-8-sig")
print("saved:", out_csv)


In [None]:
from __future__ import annotations

from pathlib import Path
import re

from IPython.display import Markdown, display

ROOT = Path(r"C:\moinfo")
OUT_FIG = ROOT / "libs" / "timesfm" / "04_outputs" / "figs"
OUT_FIG.mkdir(parents=True, exist_ok=True)

# Mermaidで壊れやすい文字を軽くエスケープ
def esc(s: str) -> str:
    s = (s or "")
    s = s.replace('"', "'")
    s = re.sub(r"[\r\n]+", " ", s)
    return s

lines = []
lines.append("graph TD")
lines.append('  R["timesfm API"]')

# overview がある前提
for _, r in overview.iterrows():
    c = int(r["cluster"])
    theme = esc(r["theme"])
    size = int(r["size"])
    node_c = f"C{c}"
    lines.append(f'  R --> {node_c}["C{c}: {theme}\\n(size={size})"]')

    reps = (r.get("representative_api") or "").splitlines()
    reps = reps[:6]  # 多すぎると見づらいので上限
    for i, api in enumerate(reps, start=1):
        api = esc(api)
        node_a = f"{node_c}_{i}"
        lines.append(f'  {node_c} --> {node_a}["{api}"]')

mermaid_code = "\n".join(lines)

# 1) Notebook上に表示（Mermaid対応ビューアがある環境ならそのまま描画される）
display(Markdown("```mermaid\n" + mermaid_code + "\n```"))

# 2) ファイル保存（フルパス）
out_mmd = OUT_FIG / "timesfm_api_clusters.mmd"
out_md  = OUT_FIG / "timesfm_api_clusters_mermaid.md"
out_mmd.write_text(mermaid_code, encoding="utf-8")
out_md.write_text("```mermaid\n" + mermaid_code + "\n```", encoding="utf-8")

print("saved:", out_mmd)
print("saved:", out_md)


# 1. サンプルデータセットの作成
TimesFMの機能検証を行うため、明確な**トレンド（傾向）**と**季節性（周期性）**を持つ人工的な時系列データを生成します。

モデルのコンテキスト長（入力）と予測ホライゾン（出力）をカバーできる十分な長さのデータを、以下の仕様で作成します。

* **データ構造**: Pandas DataFrame (ロング形式)
    * `date`: 日時インデックス
    * `unique_id`: 系列を識別するID（多変量/複数系列対応のため）
    * `value`: 観測値
* **データ内容**:
    * `series_01`: サイン波 + 上昇トレンド（基本的な周期的変動）
    * `series_02`: コサイン波 + 緩やかなトレンド（位相の異なる変動）

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

def generate_synthetic_series(length=1000, start_date="2024-01-01", freq="h"):
    """
    トレンド、季節性、ノイズを含む人工的な時系列データを生成する関数
    """
    # 日時インデックスの作成
    dates = pd.date_range(start=start_date, periods=length, freq=freq)
    x = np.linspace(0, 8 * np.pi, length)

    # 系列1: サイン波 + 強いトレンド
    y1 = np.sin(x) + np.linspace(0, 5, length) + np.random.normal(0, 0.1, length)
    df1 = pd.DataFrame({
        "unique_id": "series_01",
        "date": dates,
        "value": y1
    })

    # 系列2: コサイン波 + 弱いトレンド + 異なるノイズ
    y2 = np.cos(x) + np.linspace(0, 2, length) + np.random.normal(0, 0.15, length)
    df2 = pd.DataFrame({
        "unique_id": "series_02",
        "date": dates,
        "value": y2
    })

    # 結合してロング形式へ
    df_combined = pd.concat([df1, df2], ignore_index=True)
    return df_combined

# データ生成（TimesFMのデフォルトコンテキスト長 512 + 予測 128 をカバーできるサイズ）
DATA_LENGTH = 1024
df_sample = generate_synthetic_series(length=DATA_LENGTH)

# データの確認
print(f"Data Shape: {df_sample.shape}")
display(df_sample.head())

# 可視化
plt.figure(figsize=(12, 5))
for uid, group in df_sample.groupby("unique_id"):
    plt.plot(group["date"], group["value"], label=uid, alpha=0.8)
    
plt.title("Synthetic Time Series Data for TimesFM Testing")
plt.xlabel("Date")
plt.ylabel("Value")
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

# 2. モデルのロード (修正版)

`timesfm` ライブラリの仕様に合わせて、クラス名を `TimesFm` (mは小文字) に修正し、パラメータを `TimesFmHparams` 経由で設定します。

**主な修正点:**
1. クラス名: `TimesFM` -> `TimesFm`
2. パラメータ指定: `hparams` 引数に `TimesFmHparams` オブジェクトを渡す形式に変更
3. チェックポイント指定: `checkpoint` 引数に `TimesFmCheckpoint` オブジェクトを渡す形式に変更

In [None]:
import timesfm
import numpy as np

# 1. 予測設定 (ForecastConfig) の定義
# 実際の入力長や予測したい期間に合わせて設定します
forecast_config = timesfm.configs.ForecastConfig(
    max_context=512,       # 入力系列の最大長
    max_horizon=128,       # 予測期間の最大長
    per_core_batch_size=1  # 1回の推論で処理するバッチサイズ
)

# 2. モデルのコンパイル (必須手順)
print("Compiling model (this may take a few seconds)...")
tfm.compile(forecast_config)
print("Model compiled successfully.")

# 3. 動作確認: ダミーデータによる予測
# サイン波のダミーデータを作成 (長さ512)
dummy_input = [np.sin(np.linspace(0, 20, 512))]

# 予測実行
# 戻り値: (点予測の結果, その他の情報) のタプル
forecast_result = tfm.forecast(horizon=128, inputs=dummy_input)

# 結果の確認
# forecast_result[0] が予測値のメイン配列です
print(f"Forecast result type: {type(forecast_result)}")
print(f"Point forecast shape: {forecast_result[0].shape}")  # (Batch, Horizon) -> (1, 128) expected

# 簡易プロット (オプション)
import matplotlib.pyplot as plt
plt.plot(np.arange(512), dummy_input[0], label="History")
plt.plot(np.arange(512, 512+128), forecast_result[0][0], label="Forecast")
plt.legend()
plt.show()

# TimesFM.load_checkpoint 引数・データ型一覧

`inspect` モジュールを使用して、現在インストールされている `timesfm` ライブラリから `load_checkpoint` メソッドの引数定義を動的に抽出し、データ型とデフォルト値を確認します。

In [6]:
import inspect
import pandas as pd
import timesfm

def get_method_arguments_table(method):
    """
    指定されたメソッドの引数、型ヒント、デフォルト値を抽出し、DataFrameで返します。
    """
    if not callable(method):
        raise ValueError(f"{method} is not callable.")

    # シグネチャ（引数定義）を取得
    sig = inspect.signature(method)
    
    # Docstringの取得（概要表示用）
    doc = inspect.getdoc(method)
    print(f"--- Docstring Summary ---\n{doc}\n-------------------------\n")

    arg_list = []
    for name, param in sig.parameters.items():
        # 型ヒントの整形 (typing.Unionsなどを読みやすくする処理は適宜追加可能)
        type_hint = param.annotation
        if type_hint == inspect.Parameter.empty:
            type_hint = "Unspecified"
        else:
            # クラス型などの表記を少しきれいにする
            type_hint = str(type_hint).replace("<class '", "").replace("'>", "").replace("typing.", "")

        # デフォルト値の確認
        default_val = param.default
        if default_val == inspect.Parameter.empty:
            default_val = "**Required**"
        
        arg_list.append({
            "引数名 (Name)": name,
            "データ型 (Type Hint)": type_hint,
            "デフォルト値 (Default)": str(default_val),
            "引数の種類 (Kind)": str(param.kind)
        })

    return pd.DataFrame(arg_list)

# 1. クラスからメソッドを取得
# 注: 提供されたログに基づき、標準的な TimesFM クラス、または内部実装クラスをターゲットにします
try:
    target_method = timesfm.TimesFM.load_checkpoint
except AttributeError:
    # 万が一 TimesFM がラッパー関数等の場合、具体的なクラス（ログに基づく）を試行
    # ※環境に合わせて適宜調整してください
    try:
        from timesfm import TimesFM_2p5_200M_torch
        target_method = TimesFM_2p5_200M_torch.load_checkpoint
    except ImportError:
        print("Warning: Could not import specific class directly. Using timesfm.TimesFM interface.")
        target_method = timesfm.TimesFM.load_checkpoint

# 2. テーブルの生成と表示
df_args = get_method_arguments_table(target_method)

# 見やすく表示 (Stylerを使用)
df_args.style.set_properties(**{'text-align': 'left'}).set_table_styles([
    dict(selector='th', props=[('text-align', 'left')])
])

--- Docstring Summary ---
Loads a TimesFM model from a checkpoint.
-------------------------



Unnamed: 0,引数名 (Name),データ型 (Type Hint),デフォルト値 (Default),引数の種類 (Kind)
0,self,Unspecified,**Required**,POSITIONAL_OR_KEYWORD
1,path,str,**Required**,POSITIONAL_OR_KEYWORD


In [11]:
import inspect
import pandas as pd
import timesfm
import re
from typing import get_type_hints

# ==========================================
# 1. 知識ベース: TimesFM 2.5 の具体的な制約・推奨値
# ==========================================
# 自動抽出だけでは分からない「正解値」をここで定義します
KNOWN_SPECS = {
    ("ForecastConfig", "max_context"): "32 ~ 512 (推奨: 512)",
    ("ForecastConfig", "max_horizon"): "1 ~ 512+ (推奨: 128)",
    ("from_pretrained", "repo_id"): "例: 'google/timesfm-2.5-200m-pytorch'",
    ("from_pretrained", "backend"): "['cpu', 'gpu', 'cuda', 'mps']",
    ("forecast", "inputs"): "List[float] 形状: (Batch, Length)",
    ("forecast", "freq"): "不要 (v2.5はZero-shot自動対応)",
    ("forecast", "horizon"): "int (max_horizon以下)",
    ("compile", "forecast_config"): "ForecastConfigオブジェクト",
    ("TimesFM_2p5_200M_torch", "backend"): "['cpu', 'gpu', 'cuda']",
}

# ==========================================
# 2. 高度なDocstringパーサー
# ==========================================
def parse_docstring_args(docstring):
    """Google/NumPyスタイルのDocstringから引数説明を抽出する"""
    if not docstring:
        return {}
    
    args_info = {}
    # Args: セクションを探す
    match = re.search(r'Args:\s*(.*?)(?:Raises:|Returns:|Example:|$)', docstring, re.DOTALL)
    if match:
        content = match.group(1)
        # "  arg_name (type): description" のパターンを抽出
        # 行頭の空白と引数名をキャプチャ
        lines = content.split('\n')
        current_arg = None
        for line in lines:
            line = line.strip()
            if not line: continue
            
            # 引数定義行の検出 (例: "input_len (int): ...")
            arg_match = re.match(r'^(\w+)(?:\s*\(.*?\))?:\s*(.*)', line)
            if arg_match:
                current_arg = arg_match.group(1)
                desc = arg_match.group(2)
                args_info[current_arg] = desc
            elif current_arg:
                # 前の引数の説明の続き
                args_info[current_arg] += " " + line
    return args_info

# ==========================================
# 3. 解析ロジック
# ==========================================
def analyze_meta_table(targets):
    data = []
    
    for category, items in targets.items():
        for func_or_class in items:
            name = func_or_class.__name__
            
            # クラスなら __init__ を対象にする
            if inspect.isclass(func_or_class):
                target_func = func_or_class.__init__
                is_class = True
                display_name = name + " (__init__)"
            else:
                target_func = func_or_class
                is_class = False
                display_name = name

            try:
                sig = inspect.signature(target_func)
                doc = inspect.getdoc(target_func)
                doc_args = parse_docstring_args(doc) # Docstringから説明を抽出
            except Exception:
                continue

            for param_name, param in sig.parameters.items():
                if param_name in ["self", "kwargs", "args"]:
                    continue

                # --- 情報収集 ---
                # 1. 型ヒント
                type_hint = str(param.annotation).replace("typing.", "").replace("<class '", "").replace("'>", "")
                if type_hint == "inspect._empty": type_hint = "Any"

                # 2. 必須チェック
                is_required = param.default == inspect.Parameter.empty
                default_val = "**Required**" if is_required else str(param.default)

                # 3. 知識ベースからの補完 (具体的な値)
                # キー: (関数名, 引数名) または (クラス名, 引数名)
                verified_val = KNOWN_SPECS.get((name, param_name), "")
                if not verified_val and is_class:
                     verified_val = KNOWN_SPECS.get((func_or_class.__name__, param_name), "")

                # 4. Docstring説明
                description = doc_args.get(param_name, "")
                if not description and doc:
                     # 簡易検索フォールバック
                     if param_name in doc:
                         description = "See docstring"

                data.append({
                    "Category": category,
                    "Function/Class": display_name,
                    "Argument": param_name,
                    "Required": "✅" if is_required else "",
                    "Type Hint": type_hint,
                    "Default": default_val,
                    "✅ Verified Specs / Constraints": verified_val,  # ★ここが重要
                    "Description": description[:100] + "..." if len(description) > 100 else description
                })

    return pd.DataFrame(data)

# ==========================================
# 4. 実行設定
# ==========================================
# クラスの特定 (環境に合わせて自動選択)
try:
    MainClass = timesfm.TimesFM_2p5_200M_torch
except AttributeError:
    MainClass = timesfm.TimesFM

targets = {
    "1. Initialization": [MainClass],
    "2. Loading": [MainClass.from_pretrained, MainClass.load_checkpoint],
    "3. Config": [timesfm.configs.ForecastConfig],
    "4. Execution": [MainClass.compile, MainClass.forecast],
    "5. Saving": [MainClass.save_pretrained],
}

# 解析実行
df_meta = analyze_meta_table(targets)

# ==========================================
# 5. 見やすいスタイルで表示
# ==========================================
# pandas Stylerを使って「確認すべき場所」を強調
def highlight_required(s):
    return ['background-color: #ffcccc' if v == '✅' else '' for v in s]

def highlight_verified(s):
    return ['font-weight: bold; color: #005a9c' if v != '' else '' for v in s]

styler = df_meta.style\
    .apply(highlight_required, subset=['Required'])\
    .apply(highlight_verified, subset=['✅ Verified Specs / Constraints'])\
    .set_properties(**{'text-align': 'left'})\
    .set_table_styles([
        {'selector': 'th', 'props': [('text-align', 'left'), ('background-color', '#f0f0f0')]},
        {'selector': 'td', 'props': [('padding', '8px')]}
    ])

print("=== TimesFM Meta Execution Table ===")
display(styler)

# CSV保存
df_meta.to_csv("timesfm_meta_execution_table.csv", index=False)

=== TimesFM Meta Execution Table ===


Unnamed: 0,Category,Function/Class,Argument,Required,Type Hint,Default,✅ Verified Specs / Constraints,Description
0,2. Loading,from_pretrained,pretrained_model_name_or_path,✅,"Union[str, pathlib.Path]",**Required**,,"- Either the `model_id` (string) of a model hosted on the Hub, e.g. `bigscience/bloom`. - Or a path..."
1,2. Loading,from_pretrained,force_download,,bool,False,,"Whether to force (re-)downloading the model weights and configuration files from the Hub, overridin..."
2,2. Loading,from_pretrained,resume_download,,Optional[bool],,,
3,2. Loading,from_pretrained,proxies,,Optional[Dict],,,"A dictionary of proxy servers to use by protocol or endpoint, e.g., `{'http': 'foo.bar:3128', 'http..."
4,2. Loading,from_pretrained,token,,"Union[bool, str, NoneType]",,,"The token to use as HTTP bearer authorization for remote files. By default, it will use the token c..."
5,2. Loading,from_pretrained,cache_dir,,"Union[str, pathlib.Path, NoneType]",,,Path to the folder where cached files are stored.
6,2. Loading,from_pretrained,local_files_only,,bool,False,,"If `True`, avoid downloading the file and return the path to the local cached file if it exists."
7,2. Loading,from_pretrained,revision,,Optional[str],,,"Revision of the model on the Hub. Can be a branch name, a git tag or any commit id. Defaults to the..."
8,2. Loading,from_pretrained,model_kwargs,✅,Any,**Required**,,Additional kwargs to pass to the model during initialization.
9,2. Loading,load_checkpoint,path,✅,str,**Required**,,


# 2-1. モデルロード方法の比較検証

ご要望の通り、以下の2つのロード方法を両方試して検証します。

1.  **`from_pretrained()`**: Hugging Face Hub からモデル定義と重みを一括でロードする（標準的方法）。
2.  **`load_checkpoint()`**: ローカルに保存された重みファイルを、初期化済みのインスタンスに読み込む。

**検証フロー:**
1.  `from_pretrained` でモデル (A) をロード。
2.  モデル (A) をローカルディレクトリに保存 (`save_pretrained`)。
3.  新しい空のモデルインスタンス (B) を作成。
4.  モデル (B) に `load_checkpoint` を使用して、保存したローカルの重みを読み込む。

In [None]:
import timesfm
import torch
import os

# デバイス設定
device = "cuda" if torch.cuda.is_available() else "cpu"

# --- 準備: 元モデルのロード ---
print("=== Loading Source Model ===")
tfm_hub = timesfm.TimesFM_2p5_200M_torch.from_pretrained(
    "google/timesfm-2.5-200m-pytorch",
    backend=device
)

# =========================================================
# アプローチ1: 正攻法 (save_pretrained -> from_pretrained)
# =========================================================
print("\n=== Approach 1: save_pretrained -> from_pretrained (Recommended) ===")
save_dir = "./timesfm_hf_local"
tfm_hub.save_pretrained(save_dir)
print(f"Model saved to (HF format): {save_dir}")

# ローカルパスを指定してロード
try:
    tfm_local_hf = timesfm.TimesFM_2p5_200M_torch.from_pretrained(save_dir, backend=device)
    print("Success: Loaded locally using from_pretrained()!")
except Exception as e:
    print(f"Failed: {e}")

# =========================================================
# アプローチ2: load_checkpoint の検証 (torch.save -> load_checkpoint)
# =========================================================
print("\n=== Approach 2: torch.save -> load_checkpoint (Low-level) ===")
pt_file = "./timesfm_state_dict.pt"

# 1. 生の state_dict を .pt ファイルとして保存
# (Hugging Faceの形式ではなく、PyTorch標準の単一ファイル形式)
torch.save(tfm_hub.model.state_dict(), pt_file)
print(f"Model weights saved to (PyTorch format): {pt_file}")

# 2. 空のモデルを作成して load_checkpoint で読み込む
tfm_local_pt = timesfm.TimesFM_2p5_200M_torch(backend=device)

try:
    # load_checkpoint は内部で torch.load を呼んでいる可能性が高い
    tfm_local_pt.load_checkpoint(pt_file)
    print("Success: Loaded locally using load_checkpoint()!")
    
    # コンパイル確認
    print("Compiling loaded model...")
    tfm_local_pt.compile(timesfm.configs.ForecastConfig(max_context=64, max_horizon=32))
    print("Compilation successful.")
    
except Exception as e:
    print(f"Failed to load checkpoint: {e}")
    # 失敗した場合、モデルの構造(キー名)を確認するデバッグ情報を出す
    try:
        loaded_dict = torch.load(pt_file)
        print("Debug: Keys in saved file (top 5):", list(loaded_dict.keys())[:5])
    except:
        pass

=== Loading Source Model ===


config.json:   0%|          | 0.00/475 [00:00<?, ?B/s]

Downloaded.

=== Approach 1: save_pretrained -> from_pretrained (Recommended) ===
Model saved to (HF format): ./timesfm_hf_local


config.json:   0%|          | 0.00/475 [00:00<?, ?B/s]

Downloaded.
Success: Loaded locally using from_pretrained()!

=== Approach 2: torch.save -> load_checkpoint (Low-level) ===
Model weights saved to (PyTorch format): ./timesfm_state_dict.pt
Failed to load checkpoint: 
Debug: Keys in saved file (top 5): ['tokenizer.hidden_layer.weight', 'tokenizer.hidden_layer.bias', 'tokenizer.output_layer.weight', 'tokenizer.output_layer.bias', 'tokenizer.residual_layer.weight']


# 3. 予測設定 (Forecast Configuration)

モデルのコンパイルを行う前に、予測時の挙動を制御する **`ForecastConfig`** オブジェクトを作成します。
ここで設定するパラメータは、メモリ使用量や推論速度、予測の精度に影響します。

主な設定項目:
* `max_context`: モデルに入力する過去データの最大長（これより長い系列は切り詰められます）。
* `max_horizon`: 一度に予測する未来の期間（予測ホライゾン）。
* `per_core_batch_size`: 1回の推論処理でまとめて処理する系列の数（バッチサイズ）。
* `normalize_inputs`: 入力データを正規化するかどうか（デフォルトは `False` ですが、データによっては有効）。
* `use_continuous_quantile_head`: 分位点予測（不確実性の推定）を行う場合の設定。

In [None]:
import timesfm

# 予測設定の定義
# API定義に基づき、timesfm.configs.ForecastConfig を使用します
forecast_config = timesfm.configs.ForecastConfig(
    max_context=512,       # 入力コンテキスト長 (例: 過去512ステップ)
    max_horizon=128,       # 予測ホライゾン (例: 未来128ステップ)
    per_core_batch_size=32,# バッチサイズ (GPUメモリに応じて調整)
    
    # その他のオプション設定 (必要に応じて有効化)
    normalize_inputs=True, # 入力系列の正規化を行うか
    # quantiles=[0.1, 0.5, 0.9] # 分位点予測が必要な場合 (モデルの対応状況による)
)

# 設定内容の確認
print("Forecast Config created:")
print(f"  Max Context: {forecast_config.max_context}")
print(f"  Max Horizon: {forecast_config.max_horizon}")
print(f"  Batch Size : {forecast_config.per_core_batch_size}")
print(f"  Normalize  : {forecast_config.normalize_inputs}")

# 3-1. ForecastConfig 引数の網羅的確認

`ForecastConfig` で設定可能なすべての引数を確認します。
解析結果およびライブラリの仕様に基づく設定項目は以下の通りです。

| 引数名 | 型 | デフォルト値 | 概要 (推定) |
| :--- | :--- | :--- | :--- |
| **`max_context`** | `int` | `0` (必須) | 入力系列の最大長。これを超える過去データは切り捨てられます。 |
| **`max_horizon`** | `int` | `0` (必須) | 予測する未来の期間（予測ホライゾン）の最大長。 |
| **`per_core_batch_size`** | `int` | `1` | 1回の推論ステップで処理する系列数（バッチサイズ）。 |
| **`normalize_inputs`** | `bool` | `False` | 入力系列をモデルに入力する前に正規化（平均0、分散1など）するかどうか。 |
| **`window_size`** | `int` | `0` | (高度な設定) 特定のウィンドウ処理を行う場合のサイズ。 |
| **`use_continuous_quantile_head`** | `bool` | `False` | 連続的な分位点予測ヘッドを使用するかどうか（不確実性予測用）。 |
| **`force_flip_invariance`** | `bool` | `True` | 上下反転に対する不変性を強制するかどうか。 |
| **`infer_is_positive`** | `bool` | `True` | 値が正であることを推論時に考慮するかどうか。 |
| **`fix_quantile_crossing`** | `bool` | `False` | 分位点同士の交差（矛盾）を修正するかどうか。 |
| **`return_backcast`** | `bool` | `False` | 予測だけでなく、入力期間の適合値（バックキャスト）も返すかどうか。 |

以下のコードで、実際のライブラリ定義からドキュメントとシグネチャを出力して確認します。

In [7]:
import timesfm
import inspect

# ForecastConfig クラスのシグネチャ（引数定義）を取得して表示
print("--- ForecastConfig Signature ---")
sig = inspect.signature(timesfm.configs.ForecastConfig)
for name, param in sig.parameters.items():
    print(f"{name}: {param.annotation} = {param.default}")

print("\n--- Docstring ---")
print(timesfm.configs.ForecastConfig.__doc__)

--- ForecastConfig Signature ---
max_context: <class 'int'> = 0
max_horizon: <class 'int'> = 0
normalize_inputs: <class 'bool'> = False
window_size: <class 'int'> = 0
per_core_batch_size: <class 'int'> = 1
use_continuous_quantile_head: <class 'bool'> = False
force_flip_invariance: <class 'bool'> = True
infer_is_positive: <class 'bool'> = True
fix_quantile_crossing: <class 'bool'> = False
return_backcast: <class 'bool'> = False

--- Docstring ---
Options for forecasting.

  Attributes:
    max_context: The maximum context length. This is used by the complied decode
      function at inference time during batched inference. Any input time series
      with length less than max_context will be padded with zeros, and with
      length greater than max_context will be truncated.
    max_horizon: The maximum horizon length. This is used by the complied decode
      function at inference time during batched inference. The compiled cached
      decoding function will by default forecast till m

# 3-2. 設定パラメータの一覧化と網羅的検証

`ForecastConfig` に設定可能なすべての引数をカラム（列）として持ち、検証したい値を格納した一覧表（DataFrame）を作成します。
この表データを基に、実際に `ForecastConfig` オブジェクトを生成し、すべての値が正しく反映されるか検証します。

**検証パターン:**
1. **Case 0 (Standard)**: 一般的な推奨設定（正規化ON、非負制約ONなど）。
2. **Case 1 (Experimental)**: すべてのオプション機能を有効化または変更した設定（分位点ヘッド使用、バックキャスト取得など）。

In [8]:
import pandas as pd
import timesfm

# 1. 検証用パラメータセットの定義
# 引数名をキー、設定したい値のリストを値とする辞書を作成
# Case 0: 実用的で標準的な設定
# Case 1: オプション機能を網羅的に変更した設定
validation_data = {
    "max_context":                  [512,   1024],
    "max_horizon":                  [128,   256],
    "per_core_batch_size":          [32,    1],
    "normalize_inputs":             [True,  False],
    "infer_is_positive":            [True,  False],
    "force_flip_invariance":        [True,  False],
    "use_continuous_quantile_head": [False, True],
    "fix_quantile_crossing":        [False, True],
    "return_backcast":              [False, True],
    "window_size":                  [0,     10]  # TODO機能だが引数としては存在
}

# 一覧表 (DataFrame) の作成
df_configs = pd.DataFrame(validation_data)
df_configs.index.name = "Case"

print("--- ForecastConfig 検証用パラメータ一覧表 ---")
display(df_configs)

# 2. 網羅的な検証実行
print("\n--- 設定反映の検証結果 ---")

generated_configs = []

for idx, row in df_configs.iterrows():
    # DataFrameの行を辞書に変換
    params = row.to_dict()
    
    # 型の修正 (PandasはIntをFloatにすることがあるため、明示的にintへキャスト)
    int_fields = ["max_context", "max_horizon", "per_core_batch_size", "window_size"]
    for field in int_fields:
        params[field] = int(params[field])
    
    try:
        # Configオブジェクトの生成（引数展開 **params を使用）
        config = timesfm.configs.ForecastConfig(**params)
        
        # 検証ロジック: 入力値とオブジェクトの属性値がすべて一致するか確認
        mismatches = []
        for key, expected_val in params.items():
            actual_val = getattr(config, key)
            if actual_val != expected_val:
                mismatches.append(f"{key}: expected {expected_val}, got {actual_val}")
        
        if not mismatches:
            print(f"[Case {idx}] OK: 全てのパラメータが正常に設定されました。")
            generated_configs.append(config)
        else:
            print(f"[Case {idx}] FAILED: パラメータの不一致があります -> {mismatches}")
            
    except Exception as e:
        print(f"[Case {idx}] ERROR: 設定中にエラーが発生しました -> {e}")

# 最後に、検証に使用する設定（Case 0）を変数に保持
forecast_config = generated_configs[0]
print(f"\n最終的に採用する設定 (Case 0):\n{forecast_config}")

--- ForecastConfig 検証用パラメータ一覧表 ---


Unnamed: 0_level_0,max_context,max_horizon,per_core_batch_size,normalize_inputs,infer_is_positive,force_flip_invariance,use_continuous_quantile_head,fix_quantile_crossing,return_backcast,window_size
Case,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
0,512,128,32,True,True,True,False,False,False,0
1,1024,256,1,False,False,False,True,True,True,10



--- 設定反映の検証結果 ---
[Case 0] OK: 全てのパラメータが正常に設定されました。
[Case 1] OK: 全てのパラメータが正常に設定されました。

最終的に採用する設定 (Case 0):
ForecastConfig(max_context=512, max_horizon=128, normalize_inputs=True, window_size=0, per_core_batch_size=32, use_continuous_quantile_head=False, force_flip_invariance=True, infer_is_positive=True, fix_quantile_crossing=False, return_backcast=False)


# 3-3. 予測設定クラスのアクセス方法検証

TimesFMのAPIには、トップレベルの `timesfm.ForecastConfig` と、サブモジュールの `timesfm.configs.ForecastConfig` の2つのアクセス経路が存在します。
両方を使用して設定オブジェクトを生成し、動作に違いがないか、およびこれらが同一のクラス定義を指しているかを確認します。

In [None]:
import timesfm

# テスト用の設定値
test_params = {
    "max_context": 512,
    "max_horizon": 128,
    "per_core_batch_size": 16,
    "normalize_inputs": True
}

print("=== 1. timesfm.ForecastConfig (Top-level) ===")
try:
    config_v1 = timesfm.ForecastConfig(**test_params)
    print("Success: Instance created via timesfm.ForecastConfig")
    print(f"Object type: {type(config_v1)}")
    print(f"Values: context={config_v1.max_context}, horizon={config_v1.max_horizon}")
except AttributeError:
    print("Error: timesfm.ForecastConfig not found.")

print("\n=== 2. timesfm.configs.ForecastConfig (Sub-module) ===")
try:
    config_v2 = timesfm.configs.ForecastConfig(**test_params)
    print("Success: Instance created via timesfm.configs.ForecastConfig")
    print(f"Object type: {type(config_v2)}")
    print(f"Values: context={config_v2.max_context}, horizon={config_v2.max_horizon}")
except AttributeError:
    print("Error: timesfm.configs.ForecastConfig not found.")

print("\n=== 3. Identity Verification ===")
# 両者が同じクラス定義を指しているか確認
try:
    is_same_class = timesfm.ForecastConfig is timesfm.configs.ForecastConfig
    print(f"Are they the same class definition? : {is_same_class}")
    
    # データの内容が一致するか確認 (属性レベル)
    if 'config_v1' in locals() and 'config_v2' in locals():
        # __dict__ を比較して属性が全て同じかチェック
        attrs_match = config_v1.__dict__ == config_v2.__dict__
        print(f"Do the instances have identical attributes? : {attrs_match}")
        
except Exception as e:
    print(f"Comparison failed: {e}")

# 引数確認

In [9]:
import inspect
import pandas as pd
import timesfm

# 解析対象のクラスやメソッドを定義
# (必要に応じてクラス名を timesfm.TimesFM_2p5_200M_torch などに変更してください)
try:
    # メインクラスの特定（バージョンによって異なる場合があるため）
    TargetClass = timesfm.TimesFM_2p5_200M_torch 
except AttributeError:
    try:
        TargetClass = timesfm.TimesFM
    except AttributeError:
        print("Error: Could not find the main TimesFM class. Please check imports.")
        TargetClass = None

# 解析したい機能のリスト (カテゴリ, メソッド/クラス)
targets = {
    "1. Model Initialization": [TargetClass],  # __init__
    "2. Loading": [TargetClass.from_pretrained, TargetClass.load_checkpoint],
    "3. Saving": [TargetClass.save_pretrained],
    "4. Configuration": [timesfm.configs.ForecastConfig],
    "5. Forecasting": [TargetClass.forecast, TargetClass.forecast_with_covariates],
    "6. Compilation": [TargetClass.compile],
}

def analyze_signature(func_or_class):
    """関数またはクラスのシグネチャを解析してDataFrameを返す"""
    if func_or_class is None:
        return pd.DataFrame()
    
    name = func_or_class.__name__
    try:
        sig = inspect.signature(func_or_class)
    except ValueError:
        return pd.DataFrame([{"Arg": "Error", "Type": "N/A", "Default": "Could not inspect"}])

    arg_data = []
    for param_name, param in sig.parameters.items():
        # 型ヒントの整形
        type_hint = param.annotation
        if type_hint == inspect.Parameter.empty:
            type_hint = ""
        else:
            type_hint = str(type_hint).replace("typing.", "").replace("<class '", "").replace("'>", "")
        
        # デフォルト値の整形
        default_val = param.default
        if default_val == inspect.Parameter.empty:
            default_val = "**Required**"
        
        arg_data.append({
            "Method/Class": name,
            "Argument": param_name,
            "Type Hint": type_hint,
            "Default": str(default_val)
        })
    
    return pd.DataFrame(arg_data)

# 一括実行と表示
print(f"=== TimesFM API Inspection (Target: {TargetClass.__name__}) ===\n")

all_dfs = []

for category, func_list in targets.items():
    print(f"--- {category} ---")
    for func in func_list:
        df = analyze_signature(func)
        if not df.empty:
            # 見やすく表示
            display(df.style.set_caption(f"Function: {func.__name__}").set_properties(**{'text-align': 'left'}))
            all_dfs.append(df)
        else:
            print(f"Skipping {func.__name__}: No signature found.")
    print("\n")

=== TimesFM API Inspection (Target: TimesFM_2p5_200M_torch) ===

--- 1. Model Initialization ---


Unnamed: 0,Method/Class,Argument,Type Hint,Default
0,TimesFM_2p5_200M_torch,args,,**Required**
1,TimesFM_2p5_200M_torch,kwargs,,**Required**




--- 2. Loading ---


Unnamed: 0,Method/Class,Argument,Type Hint,Default
0,from_pretrained,pretrained_model_name_or_path,"Union[str, pathlib.Path]",**Required**
1,from_pretrained,force_download,bool,False
2,from_pretrained,resume_download,Optional[bool],
3,from_pretrained,proxies,Optional[Dict],
4,from_pretrained,token,"Union[bool, str, NoneType]",
5,from_pretrained,cache_dir,"Union[str, pathlib.Path, NoneType]",
6,from_pretrained,local_files_only,bool,False
7,from_pretrained,revision,Optional[str],
8,from_pretrained,model_kwargs,,**Required**


Unnamed: 0,Method/Class,Argument,Type Hint,Default
0,load_checkpoint,self,,**Required**
1,load_checkpoint,path,str,**Required**




--- 3. Saving ---


Unnamed: 0,Method/Class,Argument,Type Hint,Default
0,save_pretrained,self,,**Required**
1,save_pretrained,save_directory,"Union[str, pathlib.Path]",**Required**
2,save_pretrained,config,"Union[dict, huggingface_hub.hub_mixin.DataclassInstance, NoneType]",
3,save_pretrained,repo_id,Optional[str],
4,save_pretrained,push_to_hub,bool,False
5,save_pretrained,model_card_kwargs,"Optional[Dict[str, Any]]",
6,save_pretrained,push_to_hub_kwargs,,**Required**




--- 4. Configuration ---


Unnamed: 0,Method/Class,Argument,Type Hint,Default
0,ForecastConfig,max_context,int,0
1,ForecastConfig,max_horizon,int,0
2,ForecastConfig,normalize_inputs,bool,False
3,ForecastConfig,window_size,int,0
4,ForecastConfig,per_core_batch_size,int,1
5,ForecastConfig,use_continuous_quantile_head,bool,False
6,ForecastConfig,force_flip_invariance,bool,True
7,ForecastConfig,infer_is_positive,bool,True
8,ForecastConfig,fix_quantile_crossing,bool,False
9,ForecastConfig,return_backcast,bool,False




--- 5. Forecasting ---


Unnamed: 0,Method/Class,Argument,Type Hint,Default
0,forecast,self,,**Required**
1,forecast,horizon,int,**Required**
2,forecast,inputs,list[numpy.ndarray],**Required**


Unnamed: 0,Method/Class,Argument,Type Hint,Default
0,forecast_with_covariates,self,,**Required**
1,forecast_with_covariates,inputs,list[Sequence[float]],**Required**
2,forecast_with_covariates,dynamic_numerical_covariates,"dict[str, Sequence[Sequence[float]]] | None",
3,forecast_with_covariates,dynamic_categorical_covariates,"dict[str, Sequence[Sequence[int | str]]] | None",
4,forecast_with_covariates,static_numerical_covariates,"dict[str, Sequence[float]] | None",
5,forecast_with_covariates,static_categorical_covariates,"dict[str, Sequence[int | str]] | None",
6,forecast_with_covariates,xreg_mode,str,xreg + timesfm
7,forecast_with_covariates,normalize_xreg_target_per_input,bool,True
8,forecast_with_covariates,ridge,float,0.0
9,forecast_with_covariates,max_rows_per_col,int,0




--- 6. Compilation ---


Unnamed: 0,Method/Class,Argument,Type Hint,Default
0,compile,self,,**Required**
1,compile,forecast_config,timesfm.configs.ForecastConfig,**Required**
2,compile,kwargs,,**Required**






In [10]:
import inspect
import pandas as pd
import timesfm
import re

# ==========================================
# 1. 解析対象の定義
# ==========================================
# 環境に合わせてクラスを取得 (v2.5系を優先)
try:
    MainClass = timesfm.TimesFM_2p5_200M_torch
    print(f"Target Class: {MainClass.__name__}")
except AttributeError:
    try:
        MainClass = timesfm.TimesFM
        print(f"Target Class: {MainClass.__name__}")
    except AttributeError:
        print("Error: Could not find TimesFM class.")
        MainClass = None

# 解析したいメソッドやクラスのリスト
targets = {
    "1. Initialization": [MainClass], # __init__
    "2. Loading": [MainClass.from_pretrained, MainClass.load_checkpoint],
    "3. Configuration": [timesfm.configs.ForecastConfig],
    "4. Compilation": [MainClass.compile],
    "5. Forecasting": [MainClass.forecast, MainClass.forecast_with_covariates],
    "6. Saving": [MainClass.save_pretrained],
}

# ==========================================
# 2. 解析関数の定義
# ==========================================
def extract_doc_info(docstring, arg_name):
    """Docstringから引数に関する説明や制約を簡易抽出する"""
    if not docstring:
        return ""
    
    # 引数名が含まれる行を探す（簡易的な検索）
    lines = docstring.split('\n')
    for i, line in enumerate(lines):
        if arg_name in line:
            # その行と次の数行を取得して説明とする
            desc = line.strip() + " " + " ".join([l.strip() for l in lines[i+1:i+3] if l.strip()])
            return desc[:200] + "..." if len(desc) > 200 else desc
    return ""

def analyze_arguments(category, func_or_class):
    """関数/クラスのシグネチャとドキュメントを解析して辞書リストを返す"""
    if func_or_class is None:
        return []

    name = func_or_class.__name__
    
    # クラスの場合は __init__ を解析対象にする
    if inspect.isclass(func_or_class):
        target_func = func_or_class.__init__
        is_class = True
    else:
        target_func = func_or_class
        is_class = False

    try:
        sig = inspect.signature(target_func)
        doc = inspect.getdoc(target_func)
    except (ValueError, TypeError):
        return [{"Category": category, "Function": name, "Argument": "Error", "Type": "-", "Default": "-"}]

    arg_list = []
    for param_name, param in sig.parameters.items():
        if param_name == "self":
            continue

        # 1. 型ヒントの整理
        type_hint = param.annotation
        if type_hint == inspect.Parameter.empty:
            type_str = "Any"
        else:
            type_str = str(type_hint).replace("typing.", "").replace("<class '", "").replace("'>", "")

        # 2. デフォルト値の整理
        default_val = param.default
        if default_val == inspect.Parameter.empty:
            default_str = "**Required**"
        else:
            default_str = str(default_val)

        # 3. Docstringからの情報抽出（制約や範囲のヒント）
        doc_extract = extract_doc_info(doc, param_name)

        arg_list.append({
            "Category": category,
            "Function/Class": name,
            "Argument": param_name,
            "Type Hint": type_str,
            "Default": default_str,
            "Docstring Hint": doc_extract, # ドキュメント内の記述
            "Verified Range/Options": ""   # ★ここに後で手動検証結果を埋める
        })
    
    return arg_list

# ==========================================
# 3. 実行とテーブル作成
# ==========================================
all_data = []

if MainClass:
    for cat, func_list in targets.items():
        for func in func_list:
            all_data.extend(analyze_arguments(cat, func))

# DataFrame化
df_api = pd.DataFrame(all_data)

# 見やすく表示するためのスタイル設定
# (長すぎるDocstringは省略表示など、実用的なフォーマット)
pd.set_option('display.max_colwidth', 100)
pd.set_option('display.max_rows', 100)

print(f"Total Arguments Found: {len(df_api)}")
display(df_api)

# CSVとして保存（これをベースにExcel等で編集可能）
csv_path = "timesfm_api_verification_table.csv"
df_api.to_csv(csv_path, index=False)
print(f"\nTable saved to: {csv_path}")

Target Class: TimesFM_2p5_200M_torch
Total Arguments Found: 42


Unnamed: 0,Category,Function/Class,Argument,Type Hint,Default,Docstring Hint,Verified Range/Options
0,1. Initialization,TimesFM_2p5_200M_torch,args,Any,**Required**,,
1,1. Initialization,TimesFM_2p5_200M_torch,kwargs,Any,**Required**,,
2,2. Loading,from_pretrained,pretrained_model_name_or_path,"Union[str, pathlib.Path]",**Required**,"pretrained_model_name_or_path (`str`, `Path`): - Either the `model_id` (string) of a model hoste...",
3,2. Loading,from_pretrained,force_download,bool,False,"force_download (`bool`, *optional*, defaults to `False`): Whether to force (re-)downloading the ...",
4,2. Loading,from_pretrained,resume_download,Optional[bool],,,
5,2. Loading,from_pretrained,proxies,Optional[Dict],,"proxies (`Dict[str, str]`, *optional*): A dictionary of proxy servers to use by protocol or endp...",
6,2. Loading,from_pretrained,token,"Union[bool, str, NoneType]",,"token (`str` or `bool`, *optional*): The token to use as HTTP bearer authorization for remote fi...",
7,2. Loading,from_pretrained,cache_dir,"Union[str, pathlib.Path, NoneType]",,"cache_dir (`str`, `Path`, *optional*): Path to the folder where cached files are stored. local_f...",
8,2. Loading,from_pretrained,local_files_only,bool,False,"local_files_only (`bool`, *optional*, defaults to `False`): If `True`, avoid downloading the fil...",
9,2. Loading,from_pretrained,revision,Optional[str],,"revision (`str`, *optional*): Revision of the model on the Hub. Can be a branch name, a git tag ...",



Table saved to: timesfm_api_verification_table.csv


# 3-4. ForecastConfig クラス定義と引数の完全一致検証

`timesfm.ForecastConfig` と `timesfm.configs.ForecastConfig` が、エイリアス（同一のクラス）であるか、あるいは異なる定義であるかを検証します。
特に「設定できる引数」に差異がないか、`inspect` モジュールを使ってシグネチャレベルで比較します。

In [None]:
import timesfm
import inspect
import pandas as pd

print("=== ForecastConfig Class Verification ===\n")

# 1. クラスオブジェクトの取得
try:
    cls_top = timesfm.ForecastConfig
    print("Found: timesfm.ForecastConfig")
except AttributeError:
    cls_top = None
    print("Not Found: timesfm.ForecastConfig")

try:
    cls_sub = timesfm.configs.ForecastConfig
    print("Found: timesfm.configs.ForecastConfig")
except AttributeError:
    cls_sub = None
    print("Not Found: timesfm.configs.ForecastConfig")

# 2. 同一クラスかの判定 (Identity Check)
if cls_top and cls_sub:
    is_same = cls_top is cls_sub
    print(f"\n[Identity Check] Are they the same object? : {is_same}")
    if is_same:
        print(">> 結論: 完全に同一のクラスです（トップレベルはエイリアスです）。引数に違いはありません。")
    else:
        print(">> 結論: 異なるクラスオブジェクトです。詳細な引数比較を行います。")

# 3. 引数(シグネチャ)の厳密比較
if cls_top and cls_sub:
    sig_top = inspect.signature(cls_top)
    sig_sub = inspect.signature(cls_sub)
    
    print(f"\n[Signature Check]")
    print(f"Top-level args: {len(sig_top.parameters)} items")
    print(f"Sub-module args: {len(sig_sub.parameters)} items")
    
    # 比較用データフレーム作成
    params_top = {k: str(v) for k, v in sig_top.parameters.items()}
    params_sub = {k: str(v) for k, v in sig_sub.parameters.items()}
    
    # 全ての引数キーを統合
    all_keys = sorted(set(params_top.keys()) | set(params_sub.keys()))
    
    comparison_data = []
    for key in all_keys:
        val_top = params_top.get(key, "(Not Present)")
        val_sub = params_sub.get(key, "(Not Present)")
        match = (val_top == val_sub)
        comparison_data.append([key, val_top, val_sub, match])
        
    df_comparison = pd.DataFrame(comparison_data, columns=["Argument", "timesfm.ForecastConfig", "timesfm.configs.ForecastConfig", "Match"])
    
    # 不一致がある場合のみ表示、または全一致を表示
    if df_comparison["Match"].all():
        print(">> 引数定義は完全に一致しています。")
    else:
        print(">> 差異が見つかりました：")
        display(df_comparison[~df_comparison["Match"]])

    # 全体一覧（確認用）
    print("\n--- 引数一覧 ---")
    display(df_comparison)

elif not cls_top:
    print("\n検証不可: timesfm.ForecastConfig が存在しません。timesfm.configs.ForecastConfig を使用してください。")

# 4. コンパイルと予測実行 (Compile & Forecast)

検証済みの設定 (`forecast_config`) を使用して、以下のフローを実行します。

1.  **コンパイル (`compile`)**: 設定に基づき、モデルの推論プロセスを最適化します（**必須**）。
2.  **データ準備**: サンプル DataFrame (`df_sample`) を、モデルが受け付ける `List[numpy.ndarray]` 形式に変換します。
3.  **予測 (`forecast`)**: コンパイルされたモデルで未来の値を予測します。
4.  **結合・可視化**: 予測結果を元の DataFrame と結合し、グラフで確認します。

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# --- 1. モデルのコンパイル (必須) ---
# 前のステップで作成した推奨設定 (Case 0) を使用
print(f"Compiling model with config: {forecast_config} ...")
tfm.compile(forecast_config)
print("Model compiled successfully.")

# --- 2. データ準備 (DataFrame -> List[np.array]) ---
# TimesFM v2.5 の forecast メソッドはリスト形式の numpy 配列を受け取ります
# unique_id ごとにデータを分割してリスト化
input_data_map = {}
input_arrays = []

# 系列IDの順序を保持するためにリストを作成
unique_ids = df_sample['unique_id'].unique()

for uid in unique_ids:
    # 各系列の値を抽出
    series_values = df_sample[df_sample['unique_id'] == uid]['value'].values
    input_arrays.append(series_values)
    input_data_map[uid] = series_values

print(f"Prepared {len(input_arrays)} input series for forecasting.")

# --- 3. 予測実行 (Forecast) ---
# horizon は config の max_horizon と同じか、それ以下である必要があります
forecast_horizon = forecast_config.max_horizon
print(f"Forecasting next {forecast_horizon} steps...")

# 戻り値: (点予測[Batch, Horizon], その他情報)
forecast_result = tfm.forecast(
    inputs=input_arrays,
    horizon=forecast_horizon
)

# 予測値の取得 (0番目の要素が点予測)
point_forecasts = forecast_result[0]
print(f"Forecast shape: {point_forecasts.shape}") # (2, 128) expected

# --- 4. 結果の整理と可視化 ---
# 予測結果をDataFrame化して可視化
plt.figure(figsize=(15, 6))

for i, uid in enumerate(unique_ids):
    history = input_arrays[i]
    forecast = point_forecasts[i]
    
    # 日時軸の作成 (簡易的に履歴の続きとして生成)
    last_date = df_sample[df_sample['unique_id'] == uid]['date'].max()
    freq = "h" # サンプルデータの頻度
    future_dates = pd.date_range(start=last_date + pd.Timedelta(hours=1), periods=forecast_horizon, freq=freq)
    
    # 履歴のプロット (直近の一部のみ表示)
    display_len = 200
    plt.plot(
        df_sample[df_sample['unique_id'] == uid]['date'].iloc[-display_len:], 
        history[-display_len:], 
        label=f"{uid} (History)", 
        linestyle="--"
    )
    
    # 予測のプロット
    plt.plot(future_dates, forecast, label=f"{uid} (Forecast)", linewidth=2)

plt.title("TimesFM Forecast Result")
plt.xlabel("Date")
plt.ylabel("Value")
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

# 5. 単変量予測の比較検証 (Univariate Forecasting)

`forecast()` と `forecast_on_df()` の両方のアプローチを比較します。

1.  **`forecast()`**:
    * **概要**: NumPy配列のリストを入力とする基本API。
    * **特徴**: 高速でオーバーヘッドが少ないが、日付やIDの管理は自分で行う必要がある。
    * **現状**: `TimesFM_2p5_200M_torch` クラス標準実装。

2.  **`forecast_on_df()` (再現実装)**:
    * **概要**: Pandas DataFrame (Long形式) を直接受け取り、予測結果もDataFrameで返す。
    * **特徴**: 前処理・後処理（日付生成、ID結合）を自動化し、分析フローに組み込みやすい。
    * **現状**: v2.5のモデルクラスには直接実装されていないため、ヘルパー関数として定義して使用する。

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# --- A. 標準機能: forecast() の実行 ---
print("=== A. Testing forecast() (Array Input) ===")

# 1. データの準備 (DataFrame -> List[np.ndarray])
# 各系列の値をリストに格納
inputs_list = []
ids = df_sample['unique_id'].unique()
for uid in ids:
    series = df_sample[df_sample['unique_id'] == uid]['value'].values
    inputs_list.append(series)

# 2. 予測実行
# コンパイル済みの設定 (max_horizon=128) を使用
print(f"Input type: List of {len(inputs_list)} arrays")
raw_forecast = tfm.forecast(inputs=inputs_list, horizon=128)

# 結果は (PointForecast, ...) のタプル
point_forecast_array = raw_forecast[0]
print(f"Output shape: {point_forecast_array.shape}")
print("forecast() test passed.\n")


# --- B. 拡張機能: forecast_on_df() の再現と実行 ---
print("=== B. Testing forecast_on_df() (DataFrame Input) ===")

def forecast_on_df(model, df, unique_id_col="unique_id", value_col="value", date_col="date", freq="h"):
    """
    TimesFM v2.5向けに forecast_on_df の機能を再現するラッパー関数
    """
    # 1. 前処理: データフレームから配列リストへ変換
    unique_ids = df[unique_id_col].unique()
    inputs = []
    last_dates = []
    
    for uid in unique_ids:
        sub_df = df[df[unique_id_col] == uid].sort_values(date_col)
        inputs.append(sub_df[value_col].values)
        last_dates.append(sub_df[date_col].max())
    
    # 2. 推論実行
    # configから予測期間を取得
    horizon = model.forecast_config.max_horizon
    forecast_tuple = model.forecast(inputs=inputs, horizon=horizon)
    forecast_values = forecast_tuple[0] # (N, Horizon)
    
    # 3. 後処理: 結果をDataFrame形式に復元
    results = []
    for idx, uid in enumerate(unique_ids):
        # 未来の日付インデックスを作成
        start_date = last_dates[idx] + pd.Timedelta(1, unit=freq)
        future_dates = pd.date_range(start=start_date, periods=horizon, freq=freq)
        
        # 結果用DF作成
        res_df = pd.DataFrame({
            unique_id_col: uid,
            date_col: future_dates,
            "timesfm_forecast": forecast_values[idx]
        })
        results.append(res_df)
        
    return pd.concat(results, ignore_index=True)

# 実行テスト
df_forecast_result = forecast_on_df(
    model=tfm, 
    df=df_sample, 
    unique_id_col="unique_id", 
    value_col="value", 
    date_col="date",
    freq="h"
)

# 結果確認
print("Forecast DataFrame Head:")
display(df_forecast_result.head())

# 可視化 (forecast_on_df の結果を使用)
plt.figure(figsize=(12, 5))
for uid in ids:
    # 実績
    history = df_sample[df_sample['unique_id'] == uid]
    plt.plot(history['date'].iloc[-100:], history['value'].iloc[-100:], label=f"{uid} History", linestyle="--", alpha=0.7)
    
    # 予測
    pred = df_forecast_result[df_forecast_result['unique_id'] == uid]
    plt.plot(pred['date'], pred['timesfm_forecast'], label=f"{uid} Forecast", linewidth=2)

plt.title("Forecast Result using 'forecast_on_df' wrapper")
plt.xlabel("Date")
plt.ylabel("Value")
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

# 6. 外部変数付き予測 (再修正版: Backcast有効化)
このノートでは `forecast_with_covariates()` を使って、以下4種類の共変量（Covariates）を与えた予測を試す。

- 動的・数値（dynamic numerical）：例）プロモ有無、気温など（時刻ごとに変化）
- 動的・カテゴリ（dynamic categorical）：例）曜日、祝日フラグなど（時刻ごとに変化）
- 静的・数値（static numerical）：例）店舗面積、平均単価など（系列ごとに固定）
- 静的・カテゴリ（static categorical）：例）国、地域、店舗IDなど（系列ごとに固定）

重要：動的共変量は「過去(context) + 未来(horizon)」の長さが必要。
つまり `len(cov[t]) == context_len + horizon_len`。

返り値：
- `cov_forecast`：既定では `"xreg + timesfm"`（XRegで補正したTimesFM予測）
- `ols_forecast`：`"xreg"` 成分のみ（回帰だけの予測、比較用）

※ TimesFM 2.5 は XReg による共変量サポートが復活している（`.[xreg]` が必要）。


In [None]:
import sys

print("sys.executable =", sys.executable)

try:
    import jax, jaxlib
    print("jax =", jax.__version__)
    print("jaxlib =", jaxlib.__version__)
except Exception as e:
    print("JAX import failed:", repr(e))


In [None]:
%pip uninstall -y jax jaxlib
%pip install -U pip
%pip install -U jax


In [None]:
import jax, jaxlib
print(jax.__version__, jaxlib.__version__)


In [None]:
import os
import random
import inspect
import numpy as np
import torch
import matplotlib.pyplot as plt
import timesfm

torch.set_float32_matmul_precision("high")

# --- 1) 設定 ---
BATCH_SIZE  = 8
CONTEXT_LEN = 512
HORIZON_LEN = 128
TOTAL_LEN   = CONTEXT_LEN + HORIZON_LEN

MODEL_LOCAL_DIR = r"C:\moinfo\timesfm_v2.5_local"
MODEL_HF_ID     = "google/timesfm-2.5-200m-pytorch"

# --- 2) モデル読込（ローカル -> HF の順で試す） ---
def load_timesfm_2p5():
    try:
        if os.path.isdir(MODEL_LOCAL_DIR):
            print(f"Loading model from local dir: {MODEL_LOCAL_DIR}")
            return timesfm.TimesFM_2p5_200M_torch.from_pretrained(MODEL_LOCAL_DIR)
    except Exception as e:
        print("Local load failed, fallback to HF:", repr(e))

    print(f"Loading model from HF: {MODEL_HF_ID}")
    return timesfm.TimesFM_2p5_200M_torch.from_pretrained(MODEL_HF_ID)

model = load_timesfm_2p5()

# ★重要：XRegを使うなら return_backcast=True が必須
bench_cfg = timesfm.ForecastConfig(
    max_context=CONTEXT_LEN,
    max_horizon=HORIZON_LEN,
    per_core_batch_size=BATCH_SIZE,
    normalize_inputs=True,
    return_backcast=True,   # ← これを追加！
)

print("Compiling with return_backcast=True ...")
model.compile(bench_cfg)

# --- 3) ダミー入力（ターゲット系列） ---
inputs = [np.sin(np.linspace(0, 30, CONTEXT_LEN)).astype(np.float32) for _ in range(BATCH_SIZE)]

# --- 4) 共変量を作る（最小例） ---
t = np.arange(TOTAL_LEN, dtype=np.int32)

dynamic_numerical_covariates = {
    "promo": [(np.random.rand(TOTAL_LEN) < 0.1).astype(np.float32) for _ in range(BATCH_SIZE)]
}

dynamic_categorical_covariates = {
    "week_day": [((t + np.random.randint(0, 7)) % 7).astype(np.int32) for _ in range(BATCH_SIZE)]
}

static_numerical_covariates = {
    "store_size": [float(np.random.uniform(0.5, 2.0)) for _ in range(BATCH_SIZE)]
}

static_categorical_covariates = {
    "region": [random.choice(["JP", "US", "EU"]) for _ in range(BATCH_SIZE)]
}

# --- 5) 互換ラッパ（引数差を吸収） ---
def forecast_with_covariates_compat(
    model,
    inputs,
    horizon,
    dynamic_numerical_covariates=None,
    dynamic_categorical_covariates=None,
    static_numerical_covariates=None,
    static_categorical_covariates=None,
    xreg_mode="xreg + timesfm",
    ridge=0.0,
    force_on_cpu=False,
    normalize_xreg_target_per_input=True,
):
    if not hasattr(model, "forecast_with_covariates"):
        raise AttributeError("この model に forecast_with_covariates が見つかりません。timesfm を xreg 対応で入れているか確認してください。")

    fn = model.forecast_with_covariates
    sig = inspect.signature(fn)
    kwargs = dict(
        inputs=inputs,
        dynamic_numerical_covariates=dynamic_numerical_covariates or {},
        dynamic_categorical_covariates=dynamic_categorical_covariates or {},
        static_numerical_covariates=static_numerical_covariates or {},
        static_categorical_covariates=static_categorical_covariates or {},
        xreg_mode=xreg_mode,
        ridge=ridge,
        force_on_cpu=force_on_cpu,
        normalize_xreg_target_per_input=normalize_xreg_target_per_input,
    )

    if "horizon" in sig.parameters:
        kwargs["horizon"] = horizon
    if "freq" in sig.parameters:
        kwargs["freq"] = [0] * len(inputs)

    return fn(**kwargs)

# --- 6) TimesFM 単体の予測 ---
try:
    base_point, _ = model.forecast(horizon=HORIZON_LEN, inputs=inputs)
except TypeError:
    base_point, _ = model.forecast(inputs=inputs, freq=[0]*len(inputs))

base_point = np.asarray(base_point)
print("base_point shape:", base_point.shape)

# --- 7) 共変量付き予測（XReg）---
cov_forecast, ols_forecast = forecast_with_covariates_compat(
    model=model,
    inputs=inputs,
    horizon=HORIZON_LEN,
    dynamic_numerical_covariates=dynamic_numerical_covariates,
    dynamic_categorical_covariates=dynamic_categorical_covariates,
    static_numerical_covariates=static_numerical_covariates,
    static_categorical_covariates=static_categorical_covariates,
    xreg_mode="xreg + timesfm",
    ridge=0.0,
    force_on_cpu=False,
    normalize_xreg_target_per_input=True,
)

cov_forecast = np.asarray(cov_forecast)
ols_forecast = np.asarray(ols_forecast)

print("cov_forecast shape:", cov_forecast.shape)
print("ols_forecast shape:", ols_forecast.shape)

# --- 8) 可視化（先頭系列だけ） ---

base_point = np.asarray(base_point)
cov_forecast = np.asarray(cov_forecast)
ols_forecast = np.asarray(ols_forecast)

# ★ 未来だけ取り出す（return_backcast=True のとき必須）
base_future = base_point[:, -HORIZON_LEN:]   # shape: (B, 128)

# OLS側は (B, 128, 10) なので、代表として中央値(0.5)っぽい列を選ぶ
# どの列が中央値かは実装依存なので、まず真ん中を取る（10本→ index=5）
ols_median = ols_forecast[:, :, ols_forecast.shape[-1] // 2]  # shape: (B, 128)

x_ctx = np.arange(CONTEXT_LEN)
x_fut = np.arange(CONTEXT_LEN, CONTEXT_LEN + HORIZON_LEN)

plt.figure(figsize=(12, 4))
plt.plot(x_ctx, inputs[0], label="input (context)")
plt.plot(x_fut, base_future[0], label="TimesFM (base, future only)")
plt.plot(x_fut, cov_forecast[0], label="XReg + TimesFM (cov_forecast)")
plt.plot(x_fut, ols_median[0], label="XReg only (ols median)", linestyle="--")
plt.title("TimesFM forecast_with_covariates: base vs covariates")
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()



In [None]:
backcast_len = base_point.shape[1] - HORIZON_LEN
base_backcast = base_point[:, :backcast_len]

plt.figure(figsize=(12, 4))
plt.plot(np.arange(CONTEXT_LEN), inputs[0], label="input (context)")
plt.plot(np.arange(CONTEXT_LEN - backcast_len, CONTEXT_LEN), base_backcast[0], label="TimesFM (backcast)")
plt.plot(np.arange(CONTEXT_LEN, CONTEXT_LEN + HORIZON_LEN), base_future[0], label="TimesFM (forecast)")
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()


# 7. ベクトル抽出 (構造特定と自動フック)

モデルの内部構造を表示し、特徴量抽出に最適なレイヤー（通常は最後のTransformerブロックや正規化層）を特定してフックします。

**アプローチ:**
1. `named_modules()` を走査し、モデルの階層構造をテキストで出力して確認します。
2. 名前リストの中から、`layers` や `blocks` を含む最も深いインデックスを持つ層を自動検出し、ターゲットにします。

In [None]:
import torch
import timesfm
import numpy as np
import matplotlib.pyplot as plt

# --- 1. モデル構造の「レントゲン撮影」 ---
print("=== Inspecting Model Internal Layers ===")

# モデル内の主要なモジュール名を表示 (深さ2まで)
# これで layers, blocks, transformer などの名前を確認します
found_layers = []
for name, module in tfm.model.named_modules():
    # 名前が空（自分自身）はスキップ
    if not name: continue
    
    # 全て表示すると多すぎるので、主要なコンポーネントのみ記録
    found_layers.append(name)
    
    # トップレベル付近のみ表示
    if name.count('.') <= 1:
        print(f" - {name}: {module.__class__.__name__}")

print(f"\nTotal modules found: {len(found_layers)}")

# --- 2. ターゲット層の自動選定 ---
# 「layers.数字」のようなパターンを探し、その最大番号（最終層）を特定します
target_name = None
max_layer_idx = -1

for name in found_layers:
    parts = name.split('.')
    # "layers.19" や "blocks.19" のようなパターンを検出
    for i, part in enumerate(parts):
        if part.isdigit() and i > 0:
            # 数字の前の部分がコンテナ名 (layers, blocks等)
            container_name = parts[i-1]
            idx = int(part)
            
            # コンテナ名が一般的かチェック
            if container_name in ['layers', 'blocks', 'h', 'transformer']:
                if idx > max_layer_idx:
                    max_layer_idx = idx
                    # この数字までのパスをターゲット候補とする
                    target_name = ".".join(parts[:i+1])

# もし数字付きレイヤーが見つからない場合、output_layerやnormを探す
if target_name is None:
    for name in found_layers:
        if 'output' in name or 'final_norm' in name or 'ln_f' in name:
            target_name = name
            break

print(f"\nTarget Layer Auto-Selected: '{target_name}'")

# --- 3. 選択した層へのフックとベクトル抽出 ---
if target_name:
    # 名前から実際のモジュールオブジェクトを取得
    target_module = dict(tfm.model.named_modules())[target_name]
    
    embeddings_storage = {}
    
    # 堅牢なフック関数 (再定義)
    def robust_hook(name):
        def hook(model, input, output):
            # Tensorを探す
            if isinstance(output, torch.Tensor):
                tensor = output
            elif isinstance(output, (tuple, list)):
                tensor = output[0] if len(output) > 0 and isinstance(output[0], torch.Tensor) else None
            else:
                tensor = None
                
            if tensor is not None:
                embeddings_storage[name] = tensor.detach().cpu().numpy()
        return hook

    # フック登録
    handle = target_module.register_forward_hook(robust_hook('feature_embedding'))
    print(f"Hook registered on: {target_module.__class__.__name__}")

    # 予測実行
    print("Running forecast to extract features...")
    try:
        _ = tfm.forecast(inputs=input_data, horizon=128)
    except Exception as e:
        print(f"Forecast warning: {e}")

    # フック解除
    handle.remove()

    # --- 4. 可視化 ---
    if 'feature_embedding' in embeddings_storage:
        emb = embeddings_storage['feature_embedding']
        print(f"\nSuccess! Extracted Embedding Shape: {emb.shape}")
        
        # ヒートマップ
        feature_map = emb[0, :, :200].T
        plt.figure(figsize=(12, 5))
        plt.imshow(feature_map, aspect='auto', cmap='magma', origin='lower')
        plt.colorbar(label='Activation')
        plt.title(f"TimesFM Feature Embeddings\nLayer: {target_name}")
        plt.xlabel("Time Patch Index")
        plt.ylabel("Feature Dimension (First 200)")
        plt.show()
    else:
        print("Error: Failed to capture embeddings (Tensor not found in output).")

else:
    print("Error: Could not automatically identify a target layer.")

# 7-2. 特徴量のテーブル表示 (DataFrame)

モデルの内部構造解析により、`stacked_xf` という名前でTransformerブロックが積み重なっていることが分かりました。
最も抽象度の高い特徴量を持つ **`stacked_xf.19` (最終層)** をターゲットにして再度ベクトル抽出を行い、その数値を**テーブル（DataFrame）**として整形・表示します。

* **行 (Index)**: 時系列のパッチ（時間ステップ）
* **列 (Columns)**: 特徴量の次元（0 ~ 1279）

In [None]:
import pandas as pd
import torch
import timesfm
import matplotlib.pyplot as plt
import seaborn as sns

# --- 1. ターゲット層 (stacked_xf.19) へのフック登録 ---
target_layer_name = 'stacked_xf.19'  # 構造解析で見つかった最終Transformer層
print(f"=== Hooking into target: {target_layer_name} ===")

# モジュールの取得
target_module = dict(tfm.model.named_modules())[target_layer_name]

# データ格納用
embeddings_storage = {}

# 堅牢なフック関数
def robust_hook(name):
    def hook(model, input, output):
        # Tensorを探す (Tupleの中に隠れている場合があるため)
        if isinstance(output, torch.Tensor):
            tensor = output
        elif isinstance(output, (tuple, list)):
            tensor = output[0] if len(output) > 0 and isinstance(output[0], torch.Tensor) else None
        else:
            tensor = None
            
        if tensor is not None:
            embeddings_storage[name] = tensor.detach().cpu().numpy()
    return hook

# フック登録
handle = target_module.register_forward_hook(robust_hook('deep_features'))

# --- 2. 予測実行 (特徴量抽出) ---
print("Running forecast...")
try:
    # バッチサイズ1で実行
    _ = tfm.forecast(inputs=[input_data[0]], horizon=128)
except Exception as e:
    # forecastの戻り値処理でエラーが出ても、フックさえ動けばOK
    print(f"Forecast processed (Warning: {e})")

handle.remove()

# --- 3. テーブル (DataFrame) 化と表示 ---
if 'deep_features' in embeddings_storage:
    # Shape: (Batch, Patch_Length, Dim) -> (1, 16, 1280)
    emb = embeddings_storage['deep_features'][0]  # バッチ先頭を取り出し
    
    # DataFrame作成
    # 行: パッチ(時間), 列: 次元
    df_features = pd.DataFrame(emb)
    df_features.index.name = "Time_Patch_Index"
    df_features.columns.name = "Feature_Dim"
    
    print(f"\nFeature Table Shape: {df_features.shape}")
    print("=== Feature Matrix (Head) ===")
    
    # 大きすぎるので一部を表示 (最初の10列だけなど)
    display(df_features.iloc[:, :10].head())
    
    # 統計情報の確認
    print("\n=== Basic Statistics of Features ===")
    display(df_features.describe().iloc[:, :10]) # 記述統計
    
    # --- (オプション) クラスタリング用データの作成 ---
    # このテーブルを使えば、類似度計算などが可能です
    # 例: パッチごとの相関行列
    corr_matrix = df_features.T.corr()
    
    plt.figure(figsize=(8, 6))
    sns.heatmap(corr_matrix, cmap='coolwarm', center=0)
    plt.title("Time Patch Correlation (Similarity)")
    plt.xlabel("Patch Time")
    plt.ylabel("Patch Time")
    plt.show()

else:
    print("Error: Failed to capture features.")

# 8. 最適化と高速化の検証 (Optimization & Benchmark)

TimesFM v2.5 の `compile()` による最適化効果を検証します。
モデルの計算グラフを事前に構築・最適化することで、2回目以降の推論速度が大幅に向上する（キャッシュ効果）ことを確認します。

**検証項目:**
1.  **Compilation Time**: `tfm.compile()` にかかる時間。
2.  **Warm-up Inference**: コンパイル後、最初の推論にかかる時間（初期化オーバーヘッドを含む）。
3.  **Cached Inference**: 2回目以降の推論にかかる時間（ここが実運用時の速度）。

In [None]:


import time
import numpy as np
import torch
import matplotlib.pyplot as plt

import timesfm


# =========================
# 設定
# =========================
BATCH_SIZE = 32
CONTEXT_LEN = 512
HORIZON_LEN = 128

# hookで中間表現を取りたい場合だけ True（ベンチだけなら False 推奨）
CAPTURE_ACTIVATIONS = False

# 使うチェックポイント
# - TimesFM 2.5が使える環境なら 2.5 をデフォルト
# - 2.5が無い（=2.0系）なら 2.0 をデフォルト
HF_REPO_ID_2P5 = "google/timesfm-2.5-200m-pytorch"
HF_REPO_ID_2P0 = "google/timesfm-2.0-500m-pytorch"


# =========================
# ユーティリティ
# =========================
def sync_cuda():
    """CUDAの非同期実行で計測がズレるのを防ぐ。"""
    if torch.cuda.is_available():
        torch.cuda.synchronize()


def pick_first_tensor(x):
    """入れ子構造(tuple/list/dict)から最初の torch.Tensor を拾う。無ければ None."""
    if isinstance(x, torch.Tensor):
        return x
    if isinstance(x, (tuple, list)):
        for v in x:
            t = pick_first_tensor(v)
            if t is not None:
                return t
    if isinstance(x, dict):
        for v in x.values():
            t = pick_first_tensor(v)
            if t is not None:
                return t
    return None


def get_activation_hook(name: str, storage: dict):
    """
    forward hook: outputがtupleでも落ちない版。
    あなたのエラー（'tuple' object has no attribute 'detach'）を潰す主役。
    """
    def hook(_module, _inputs, output):
        t = pick_first_tensor(output)
        if t is None:
            return
        storage[name] = t.detach().cpu().numpy()
    return hook


def make_forecast_config(**kwargs):
    """
    timesfm.ForecastConfig があればそれを使う。
    互換性のため、受け付けない引数があれば落として再作成。
    """
    ForecastConfig = getattr(timesfm, "ForecastConfig", None)
    if ForecastConfig is None and hasattr(timesfm, "configs"):
        ForecastConfig = getattr(timesfm.configs, "ForecastConfig", None)
    if ForecastConfig is None:
        raise RuntimeError("ForecastConfig が見つかりません。timesfm のバージョンを確認してください。")

    try:
        return ForecastConfig(**kwargs)
    except TypeError:
        # バージョン差で受け付けないキーがある場合の保険
        # まずは per_core_batch_size を落として再試行
        kwargs.pop("per_core_batch_size", None)
        return ForecastConfig(**kwargs)


def load_timesfm_model():
    """
    TimesFM 2.5 API を優先してロード。
    2.5クラスが無ければ 2.0 API へフォールバック。
    """
    torch.set_float32_matmul_precision("high")  # 公式例 :contentReference[oaicite:2]{index=2}

    if hasattr(timesfm, "TimesFM_2p5_200M_torch"):
        model = timesfm.TimesFM_2p5_200M_torch.from_pretrained(HF_REPO_ID_2P5)  # 公式例 :contentReference[oaicite:3]{index=3}
        api_ver = "2.5"
        return model, api_ver

    # ---- フォールバック: TimesFM 2.0 API（TimesFm + TimesFmHparams） ----
    if hasattr(timesfm, "TimesFm") and hasattr(timesfm, "TimesFmHparams") and hasattr(timesfm, "TimesFmCheckpoint"):
        model = timesfm.TimesFm(
            hparams=timesfm.TimesFmHparams(
                backend="gpu" if torch.cuda.is_available() else "cpu",
                per_core_batch_size=BATCH_SIZE,
                horizon_len=HORIZON_LEN,
                # 2.0系は context_len を持つ（max 2048 推奨） :contentReference[oaicite:4]{index=4}
                context_len=2048,
                num_layers=50,
                use_positional_embedding=False,
            ),
            checkpoint=timesfm.TimesFmCheckpoint(huggingface_repo_id=HF_REPO_ID_2P0),
        )
        api_ver = "2.0"
        return model, api_ver

    raise RuntimeError("TimesFM のロード方法が見つかりません。timesfm のインストール状況を確認してください。")


# =========================
# メイン
# =========================
def main():
    print("=== Starting Optimization Benchmark ===")
    print(f"torch: {torch.__version__}")
    print(f"cuda available: {torch.cuda.is_available()}")

    # ダミーデータ生成（Batch, Length）
    # 正弦波を32本用意
    benchmark_input = [np.sin(np.linspace(0, 100, CONTEXT_LEN)).astype(np.float32) for _ in range(BATCH_SIZE)]

    # モデルロード
    tfm, api_ver = load_timesfm_model()
    print(f"Loaded TimesFM API version: {api_ver}")

    # eval相当（内部に torch.nn.Module を持っていれば）
    if hasattr(tfm, "model") and isinstance(tfm.model, torch.nn.Module):
        tfm.model.eval()

    # forward hook（任意）
    embeddings_storage = {}
    hook_handles = []
    if CAPTURE_ACTIVATIONS:
        # 2.5系は tfm.model が本体モジュールになっていることが多い
        target = tfm.model if hasattr(tfm, "model") and isinstance(tfm.model, torch.nn.Module) else None
        if target is not None:
            h = target.register_forward_hook(get_activation_hook("tfm.model:first_tensor", embeddings_storage))
            hook_handles.append(h)
            print("Activation hook enabled (safe for tuple outputs).")
        else:
            print("Activation hook requested but no torch.nn.Module found; skipping hooks.")

    # 2.5系：compile + ForecastConfig が使える :contentReference[oaicite:5]{index=5}
    compile_time = 0.0
    if hasattr(tfm, "compile"):
        print(f"Compiling model (Batch Size: {BATCH_SIZE})...")
        bench_config = make_forecast_config(
            max_context=CONTEXT_LEN,
            max_horizon=HORIZON_LEN,
            per_core_batch_size=BATCH_SIZE,  # バージョン差で無効なら自動で落とす
            normalize_inputs=True,
        )

        sync_cuda()
        t0 = time.perf_counter()
        tfm.compile(bench_config)
        sync_cuda()
        compile_time = time.perf_counter() - t0
        print(f"Compilation Time: {compile_time:.6f} seconds")
    else:
        print("compile() not available in this TimesFM version; skipping compilation timing.")

    # 推論ベンチ
    print("Running inference benchmark...")

    def run_once():
        # 推論専用モード（autograd無効でオーバーヘッド削減） :contentReference[oaicite:6]{index=6}
        with torch.inference_mode():
            _ = tfm.forecast(inputs=benchmark_input, horizon=HORIZON_LEN)

    # A. 1st Run (Warm-up)
    sync_cuda()
    t0 = time.perf_counter()
    run_once()
    sync_cuda()
    warmup_time = time.perf_counter() - t0
    print(f"1st Run (Warm-up): {warmup_time:.6f} seconds")

    # B. 2nd Run (Cached)
    sync_cuda()
    t0 = time.perf_counter()
    run_once()
    sync_cuda()
    optimized_time = time.perf_counter() - t0
    print(f"2nd Run (Cached) : {optimized_time:.6f} seconds")

    # C. Average (Stable)
    num_loops = 10
    total = 0.0
    for _ in range(num_loops):
        sync_cuda()
        t0 = time.perf_counter()
        run_once()
        sync_cuda()
        total += (time.perf_counter() - t0)

    avg_time = total / num_loops
    print(f"Avg Run ({num_loops} loops): {avg_time:.6f} seconds")

    # 可視化
    times = [compile_time, warmup_time, optimized_time, avg_time]
    labels = ['Compilation', 'Warm-up\n(1st)', 'Optimized\n(2nd)', 'Average\n(Stable)']

    plt.figure(figsize=(10, 6))
    bars = plt.bar(labels, times)

    for bar in bars:
        h = bar.get_height()
        plt.text(bar.get_x() + bar.get_width()/2., h, f"{h:.6f}s", ha="center", va="bottom", fontweight="bold")

    plt.ylabel("Execution Time (seconds)")
    plt.title(f"TimesFM Speed Benchmark (API {api_ver})\n(Batch Size: {BATCH_SIZE}, Horizon: {HORIZON_LEN})")
    plt.grid(axis='y', linestyle='--', alpha=0.5)
    plt.show()

    # 速度向上率
    if optimized_time > 0:
        speedup = warmup_time / optimized_time
        print(f"\n>> Speedup Factor (Warm-up / Optimized): {speedup:.2f}x")

    # hook後片付け
    for h in hook_handles:
        h.remove()

    if CAPTURE_ACTIVATIONS:
        print(f"Captured activations keys: {list(embeddings_storage.keys())[:5]} ...")


if __name__ == "__main__":
    main()


## TimesFM `save_pretrained()` 引数の網羅抽出と、保存結果の差分検証

このノートでは、いま使っている `TimesFM` インスタンスの `save_pretrained()` について

1) 実際のシグネチャ（引数一覧）を `inspect.signature()` で抽出  
2) 主要な引数（例：`max_shard_size`, `variant`）を変えて保存  
3) 生成されたファイル一覧・合計サイズ・再ロード可否を比較

を自動で行う。


In [None]:
import os
import json
import time
import hashlib
from pathlib import Path
import timesfm

# ===== 設定（フルパス）=====
MODEL_LOCAL_DIR = r"C:\moinfo\timesfm_v2.5_local"
MODEL_HF_ID     = "google/timesfm-2.5-200m-pytorch"

OUT_ROOT = Path(r"C:\moinfo\_artifacts\timesfm_save_pretrained_matrix_tests")
OUT_ROOT.mkdir(parents=True, exist_ok=True)

def load_model():
    if os.path.isdir(MODEL_LOCAL_DIR):
        print(f"[load] local: {MODEL_LOCAL_DIR}")
        return timesfm.TimesFM_2p5_200M_torch.from_pretrained(MODEL_LOCAL_DIR)
    print(f"[load] hf: {MODEL_HF_ID}")
    return timesfm.TimesFM_2p5_200M_torch.from_pretrained(MODEL_HF_ID)

model = load_model()

# ===== ユーティリティ =====
def sha256_file(p: Path) -> str:
    h = hashlib.sha256()
    with p.open("rb") as f:
        for chunk in iter(lambda: f.read(1024 * 1024), b""):
            h.update(chunk)
    return h.hexdigest()

def list_files_with_stats(d: Path):
    rows = []
    for p in sorted(d.rglob("*")):
        if p.is_file():
            rows.append({
                "path": str(p.relative_to(d)),
                "bytes": p.stat().st_size,
                "sha256": sha256_file(p),
            })
    return rows

def save_and_report(name: str, **kwargs):
    out_dir = OUT_ROOT / name
    if out_dir.exists():
        # 既存があると差分が紛れるので消す
        for p in out_dir.rglob("*"):
            if p.is_file():
                p.unlink()
        for p in sorted(out_dir.rglob("*"), reverse=True):
            if p.is_dir():
                try:
                    p.rmdir()
                except OSError:
                    pass
    out_dir.mkdir(parents=True, exist_ok=True)

    print(f"\n=== EXP: {name} ===")
    print("save_dir:", str(out_dir))
    print("kwargs   :", kwargs)

    t0 = time.time()
    ret = model.save_pretrained(str(out_dir), **kwargs)  # ここが本体
    dt = time.time() - t0

    files = list_files_with_stats(out_dir)
    total = sum(r["bytes"] for r in files)
    files_sorted = sorted(files, key=lambda r: r["bytes"], reverse=True)

    print(f"saved_files: {len(files)} total: {total/1024/1024:.2f}MB save_time: {dt:.3f}s")
    print("top files:")
    for r in files_sorted[:5]:
        print(f"  - {r['path']}: {r['bytes']/1024/1024:.2f}MB sha256={r['sha256'][:12]}...")

    return {"name": name, "dir": str(out_dir), "ret": ret, "files": files, "total_bytes": total, "save_sec": dt}

# ===== 実験メニュー =====
# 1) baseline：何も渡さない
exp1 = save_and_report("baseline_default")

# 2) README（モデルカード）だけ変える：model_card_kwargs
# ※ docs上、ModelHubMixin 側が model card を生成するための kwargs です。:contentReference[oaicite:9]{index=9}
exp2 = save_and_report(
    "model_card_kwargs_test",
    model_card_kwargs={
        "language": ["ja"],
        "tags": ["moinfo-test", "timesfm"],
        "license": "apache-2.0",
    },
)

# 3) config.json だけ変える：config
# ※ ここは「再ロード互換」を壊す可能性があるので、まずは“差分が出る”の確認に徹します。
exp3 = save_and_report(
    "config_override_test",
    config={
        "_moinfo_note": "config override test",
        "_saved_at": time.strftime("%Y-%m-%d %H:%M:%S"),
    },
)

# ===== 差分比較（baseline vs others）=====
def diff_from_baseline(base, other):
    b = {r["path"]: r for r in base["files"]}
    o = {r["path"]: r for r in other["files"]}
    all_paths = sorted(set(b) | set(o))

    changed = []
    for p in all_paths:
        if p not in b:
            changed.append((p, "ADDED"))
        elif p not in o:
            changed.append((p, "REMOVED"))
        else:
            if b[p]["sha256"] != o[p]["sha256"] or b[p]["bytes"] != o[p]["bytes"]:
                changed.append((p, "CHANGED"))
    return changed

print("\n=== DIFF vs baseline ===")
for exp in [exp2, exp3]:
    changed = diff_from_baseline(exp1, exp)
    print(f"\n[{exp['name']}] changed_files={len(changed)}")
    for p, st in changed:
        print(f"  - {st}: {p}")


In [None]:
from pathlib import Path
p1 = Path(r"C:\moinfo\_artifacts\timesfm_save_pretrained_matrix_tests\baseline_default\README.md")
p2 = Path(r"C:\moinfo\_artifacts\timesfm_save_pretrained_matrix_tests\model_card_kwargs_test\README.md")

print("same?", p1.read_bytes() == p2.read_bytes())
print("baseline head:\n", p1.read_text(encoding="utf-8", errors="ignore")[:400])
print("\nkwargs head:\n", p2.read_text(encoding="utf-8", errors="ignore")[:400])


In [None]:
import os
import torch
from pathlib import Path
from safetensors.torch import save_file

OUT_DIR = Path(r"C:\moinfo\_artifacts\timesfm_fp16_export")
OUT_DIR.mkdir(parents=True, exist_ok=True)

# TimesFM 2.5の内部が model ならそれを使う
core = model.model if hasattr(model, "model") else None
if core is None:
    raise RuntimeError("内部 torch.nn.Module が見つかりません（model.model が無い）。")

core.eval()

# fp16化した state_dict を作る
sd = core.state_dict()
sd_fp16 = {}
for k, v in sd.items():
    if torch.is_floating_point(v):
        sd_fp16[k] = v.detach().cpu().half()
    else:
        sd_fp16[k] = v.detach().cpu()

out_path = OUT_DIR / "model.fp16.safetensors"
save_file(sd_fp16, str(out_path))

print("saved:", out_path, "size(MB)=", out_path.stat().st_size/1024/1024)


## state_dict を保存する（.pt と .safetensors 両対応）

In [None]:
import os
from pathlib import Path
import torch

def _get_torch_module(timesfm_obj):
    """TimesFMラッパ or nn.Module どちらでも受けて、内部torch.nn.Moduleを返す。"""
    if isinstance(timesfm_obj, torch.nn.Module):
        return timesfm_obj
    if hasattr(timesfm_obj, "model") and isinstance(timesfm_obj.model, torch.nn.Module):
        return timesfm_obj.model
    raise TypeError("torch.nn.Module が見つかりません。timesfm_obj か timesfm_obj.model を確認してください。")

def save_state_dict(
    timesfm_obj,
    out_path: str,
    *,
    dtype: str | None = None,   # None / "fp16" / "bf16" / "fp32"
    use_safetensors: bool = False,
):
    """
    推論・学習で使う「重み」を保存する。
    dtype を指定すると、保存前に重みの型を変換して容量を削減できる（例: fp16）。
    """
    out_path = Path(out_path)
    out_path.parent.mkdir(parents=True, exist_ok=True)

    m = _get_torch_module(timesfm_obj)
    sd = m.state_dict()

    # dtype変換（容量削減・ただし出力が微妙に変わる可能性あり）
    if dtype is not None:
        if dtype.lower() == "fp16":
            cast = torch.float16
        elif dtype.lower() == "bf16":
            cast = torch.bfloat16
        elif dtype.lower() == "fp32":
            cast = torch.float32
        else:
            raise ValueError("dtype は None / 'fp16' / 'bf16' / 'fp32' のみ対応です。")

        sd2 = {}
        for k, v in sd.items():
            if torch.is_floating_point(v):
                sd2[k] = v.detach().cpu().to(cast)
            else:
                sd2[k] = v.detach().cpu()
        sd = sd2
    else:
        # pickle/safetensorsのため、CPUに移しておくのが安全
        sd = {k: v.detach().cpu() for k, v in sd.items()}

    if use_safetensors:
        from safetensors.torch import save_file
        if out_path.suffix.lower() != ".safetensors":
            out_path = out_path.with_suffix(".safetensors")
        save_file(sd, str(out_path))
    else:
        if out_path.suffix.lower() != ".pt":
            out_path = out_path.with_suffix(".pt")
        torch.save(sd, str(out_path))  # PyTorch標準（pickle）

    size_mb = out_path.stat().st_size / 1024 / 1024
    print(f"[saved] {out_path} ({size_mb:.2f} MB)")
    return str(out_path)


In [None]:
# 例：いま使っている TimesFM インスタンスを model だとする
# fp32（元のまま）
save_state_dict(model, r"C:\moinfo\_artifacts\state_dict\timesfm_2p5_fp32.pt", dtype=None, use_safetensors=False)

# fp16（容量削減）
save_state_dict(model, r"C:\moinfo\_artifacts\state_dict\timesfm_2p5_fp16.safetensors", dtype="fp16", use_safetensors=True)


## state_dict をロードする（strict/map_location 含む）

In [None]:
from pathlib import Path
import torch

def load_state_dict(
    timesfm_obj,
    weight_path: str,
    *,
    strict: bool = True,
    map_location: str = "cpu",
):
    """
    保存した state_dict をロードしてモデルへ反映する。
    strict=True：キー不一致があればエラー（再現性重視）
    strict=False：多少違っても読み進める（改造・版違いの暫定対応）
    """
    weight_path = Path(weight_path)
    m = _get_torch_module(timesfm_obj)

    if weight_path.suffix.lower() == ".safetensors":
        from safetensors.torch import load_file
        sd = load_file(str(weight_path))  # safetensorsは安全に読める
    else:
        sd = torch.load(str(weight_path), map_location=map_location)  # PyTorch標準

    missing, unexpected = m.load_state_dict(sd, strict=strict)
    print("[loaded]", str(weight_path))
    print("  missing_keys   =", len(missing))
    print("  unexpected_keys=", len(unexpected))
    if len(missing) > 0:
        print("  sample missing:", missing[:10])
    if len(unexpected) > 0:
        print("  sample unexpected:", unexpected[:10])
    return missing, unexpected


In [None]:
# # 新しく model2 を作る（あなたの既存ロード関数を使う想定）
# model2 = load_timesfm_2p5()

# fp16を読み込む（注意：ロード後に推論時のdtypeやdeviceの扱いは自分で決める）
load_state_dict(model2, r"C:\moinfo\_artifacts\state_dict\timesfm_2p5_fp16.safetensors", strict=True, map_location="cpu")


In [None]:
import torch
from pathlib import Path

def save_checkpoint(
    timesfm_obj,
    out_path: str,
    *,
    optimizer=None,
    scheduler=None,
    scaler=None,     # AMP（自動混合精度）を使う場合
    step: int | None = None,
    epoch: int | None = None,
):
    out_path = Path(out_path)
    out_path.parent.mkdir(parents=True, exist_ok=True)

    m = _get_torch_module(timesfm_obj)

    ckpt = {
        "model_state_dict": {k: v.detach().cpu() for k, v in m.state_dict().items()},
        "step": step,
        "epoch": epoch,
    }
    if optimizer is not None:
        ckpt["optimizer_state_dict"] = optimizer.state_dict()
    if scheduler is not None:
        ckpt["scheduler_state_dict"] = scheduler.state_dict()
    if scaler is not None:
        ckpt["scaler_state_dict"] = scaler.state_dict()

    torch.save(ckpt, str(out_path))
    print(f"[saved checkpoint] {out_path} ({out_path.stat().st_size/1024/1024:.2f} MB)")
    return str(out_path)

def load_checkpoint(
    timesfm_obj,
    ckpt_path: str,
    *,
    optimizer=None,
    scheduler=None,
    scaler=None,
    strict: bool = True,
    map_location: str = "cpu",
):
    ckpt = torch.load(ckpt_path, map_location=map_location)
    m = _get_torch_module(timesfm_obj)

    missing, unexpected = m.load_state_dict(ckpt["model_state_dict"], strict=strict)

    if optimizer is not None and "optimizer_state_dict" in ckpt:
        optimizer.load_state_dict(ckpt["optimizer_state_dict"])
    if scheduler is not None and "scheduler_state_dict" in ckpt:
        scheduler.load_state_dict(ckpt["scheduler_state_dict"])
    if scaler is not None and "scaler_state_dict" in ckpt:
        scaler.load_state_dict(ckpt["scaler_state_dict"])

    print("[loaded checkpoint]", ckpt_path)
    print("  step =", ckpt.get("step"), "epoch =", ckpt.get("epoch"))
    print("  missing_keys =", len(missing), "unexpected_keys =", len(unexpected))
    return ckpt


In [None]:
import numpy as np
import time
import timesfm

# --- 設定（あなたのノートに合わせて） ---
BATCH_SIZE  = 8
CONTEXT_LEN = 512
HORIZON_LEN = 128

# 既に作ってある前提：
# model  = load_timesfm_2p5()           # fp32（元）
# model2 = load_timesfm_2p5(); load_state_dict(model2, fp16_path)  # fp16重みをロード済み

# --- compile を強制（両方に必要） ---
def compile_timesfm(m):
    cfg = timesfm.ForecastConfig(
        max_context=CONTEXT_LEN,
        max_horizon=HORIZON_LEN,
        per_core_batch_size=BATCH_SIZE,
        normalize_inputs=True,
        # XReg（forecast_with_covariates）も使うなら True のままが安全
        return_backcast=True,
    )
    m.compile(cfg)
    return cfg

print("Compiling model (fp32) ...")
cfg1 = compile_timesfm(model)

print("Compiling model2 (fp16 weights loaded) ...")
cfg2 = compile_timesfm(model2)

# --- 入力 ---
inputs = [np.sin(np.linspace(0, 30, CONTEXT_LEN)).astype(np.float32) for _ in range(BATCH_SIZE)]

# --- 予測（互換：horizon引数 or freq引数の差分を吸収） ---
def forecast_point_only(m):
    try:
        y, _ = m.forecast(horizon=HORIZON_LEN, inputs=inputs)
    except TypeError:
        y, _ = m.forecast(inputs=inputs, freq=[0]*len(inputs))
    y = np.asarray(y)
    # return_backcast=True の場合 (B, backcast+horizon) なので末尾だけ取る
    return y[:, -HORIZON_LEN:]

# --- 差分評価（fp32 vs fp16）---
y32 = forecast_point_only(model)
y16 = forecast_point_only(model2)

abs_err = np.abs(y32 - y16)
max_abs = float(abs_err.max())
mae = float(abs_err.mean())
rmse = float(np.sqrt(((y32 - y16) ** 2).mean()))

den = np.maximum(np.abs(y32), 1e-6)
rel = abs_err / den
max_rel = float(rel.max())
mean_rel = float(rel.mean())

print("\n=== fp32 vs fp16 difference (forecast horizon only) ===")
print("max_abs =", max_abs)
print("mae     =", mae)
print("rmse    =", rmse)
print("max_rel =", max_rel)
print("mean_rel=", mean_rel)

# --- 速度（簡易）---
def bench(m, loops=10):
    _ = forecast_point_only(m)  # warm-up
    t0 = time.time()
    for _ in range(loops):
        _ = forecast_point_only(m)
    return (time.time() - t0) / loops

t32 = bench(model, loops=10)
t16 = bench(model2, loops=10)

print("\n=== speed (sec / call) ===")
print("fp32:", t32)
print("fp16:", t16)
print("speedup(fp32/fp16):", (t32 / t16) if t16 > 0 else None)


In [None]:
import torch
import timesfm
import numpy as np
import time

# ====== 設定 ======
BATCH_SIZE  = 8
CONTEXT_LEN = 512
HORIZON_LEN = 128

# ====== dtype 分布の可視化 ======
def summarize_param_dtypes(timesfm_obj):
    core = timesfm_obj.model if hasattr(timesfm_obj, "model") else timesfm_obj
    if not isinstance(core, torch.nn.Module):
        raise TypeError("内部 torch.nn.Module が見つかりません（timesfm_obj.model を確認）。")

    d = {}
    for _, v in core.state_dict().items():
        if torch.is_floating_point(v):
            key = str(v.dtype)
            d[key] = d.get(key, 0) + v.numel()

    total = sum(d.values()) or 1
    print("float param dtype distribution (by numel):")
    for k in sorted(d, key=lambda x: -d[x]):
        print(f"  {k}: {d[k] / total * 100:.2f}%")

# ====== compile 設定 ======
def compile_cfg():
    return timesfm.ForecastConfig(
        max_context=CONTEXT_LEN,
        max_horizon=HORIZON_LEN,
        per_core_batch_size=BATCH_SIZE,
        normalize_inputs=True,
        return_backcast=True,
    )

# ====== fp16 化（in-place） ======
def to_fp16_inplace(timesfm_obj):
    core = timesfm_obj.model if hasattr(timesfm_obj, "model") else timesfm_obj
    if not isinstance(core, torch.nn.Module):
        raise TypeError("内部 torch.nn.Module が見つかりません（timesfm_obj.model を確認）。")
    core.eval()
    core.half()
    return timesfm_obj

# ====== 予測（末尾128だけ） ======
inputs = [np.sin(np.linspace(0, 30, CONTEXT_LEN)).astype(np.float32) for _ in range(BATCH_SIZE)]

def forecast_point_only(m):
    y, _ = m.forecast(horizon=HORIZON_LEN, inputs=inputs)
    y = np.asarray(y)
    return y[:, -HORIZON_LEN:]

def bench(m, loops=10):
    _ = forecast_point_only(m)  # warm-up
    t0 = time.time()
    for _ in range(loops):
        _ = forecast_point_only(m)
    return (time.time() - t0) / loops

# ====== 実行：fp16化→compile→検証 ======
print("=== BEFORE fp16 cast ===")
summarize_param_dtypes(model2)

print("\nCasting model2 to fp16 ...")
to_fp16_inplace(model2)

print("\nCompiling model2 after fp16 cast ...")
model2.compile(compile_cfg())

print("\n=== AFTER fp16 cast ===")
summarize_param_dtypes(model2)

# 速度だけ軽く測る（fp16が本当に効いてれば変化が出る可能性）
t16 = bench(model2, loops=10)
print("\nfp16 model2 sec/call:", t16)

# 参考：GPU確認（使えるなら fp16の恩恵が出やすい）
print("\ncuda available:", torch.cuda.is_available())
if torch.cuda.is_available():
    print("gpu:", torch.cuda.get_device_name(0))



In [None]:
import torch
if hasattr(model2, "model") and isinstance(model2.model, torch.nn.Module):
    model2.model = model2.model.half()


In [None]:
import timesfm
import torch
import numpy as np

# 1. モデルのロード (TimesFM 2.5 - 200Mパラメータ PyTorch版)
# GPUが利用可能な場合は自動的にGPUを使用します
print("Loading model...")
model = timesfm.TimesFM_2p5_200M_torch.from_pretrained(
    "google/timesfm-2.5-200m-pytorch"
)

# 2. 予測設定のコンパイル (高速化のため)
# max_context: 入力の最大長 (これより長い系列はトリミング)
# max_horizon: 予測したい未来の最大ステップ数
config = timesfm.ForecastConfig(
    max_context=512,       # 入力系列の最大長
    max_horizon=128,       # 予測期間の長さ
    per_core_batch_size=1, # バッチサイズ
)
model.compile(config)

# 3. ダミーデータの作成 (正弦波など)
# 実際にはPandas DataFrameやNumPy配列のリストを渡します
t = np.linspace(0, 20, 512)
input_signal = np.sin(t) * t  # 振幅が大きくなる正弦波
inputs = [input_signal]       # リスト形式で複数の時系列を渡せます

# 4. 予測の実行
print("Forecasting...")
# freq: 頻度 (0:高頻度/不明, 1:中頻度, 2:低頻度)。v2.5では自動推論も可。
point_forecast, quantile_forecast = model.forecast(
    inputs=inputs,
    #freq=[0],  # 0は一般的な時系列(秒, 分, 時間, 日など)を指します
    horizon=128
)

# 5. 結果の表示
print(f"Input shape: {inputs[0].shape}")
print(f"Forecast shape: {point_forecast.shape}")
print(f"First 5 predictions: {point_forecast[0, :5]}")

# (オプション) 可視化
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 5))
plt.plot(np.arange(512), inputs[0], label="History")
plt.plot(np.arange(512, 512+128), point_forecast[0], label="Forecast")
plt.legend()
plt.title("TimesFM Forecast Sample")
plt.show()

In [1]:
import torch
import torch.nn as nn
from timesfm import TimesFM_2p5_200M_torch
from peft import get_peft_model, LoraConfig

# 1. モデルのロード
print("Loading model...")
tfm = TimesFM_2p5_200M_torch.from_pretrained("google/timesfm-2.5-200m-pytorch")

# 2. モデル本体の取り出し
# 【重要】ライブラリの仕様上、tfm.model を直接書き換えるとクラス全体に影響するため
# 念のため構造確認だけ行い、get_peft_model はこのインスタンスに対して適用します。
pytorch_model = tfm.model
pytorch_model.train()

# 3. LoRA設定
peft_config = LoraConfig(
    r=16,
    lora_alpha=32,
    target_modules="all-linear",
    lora_dropout=0.05,
    bias="none",
)

# 4. LoRA適用
# ※ここで pytorch_model が書き換わります
model = get_peft_model(pytorch_model, peft_config)
model.print_trainable_parameters()

# --- 以降、学習ループ ---
# ...
# 3. ダミーデータと学習ループの準備
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
criterion = nn.MSELoss()

# 【修正ポイント】
# TimesFMの入力パッチサイズ (v2.5は 32)
patch_len = 32

# 入力データを (Batch, Num_Patches, Patch_Len) に変形します
# 512ステップ / 32 = 16パッチ
train_inputs = torch.randn(32, 512).reshape(32, -1, patch_len).to(tfm.model.device)
train_masks = torch.zeros(32, 512).reshape(32, -1, patch_len).to(tfm.model.device)

# ターゲットデータ
# 出力次元に合わせてターゲットを用意 (簡易的な例)
# TimesFMの出力は (Batch, Num_Patches, Output_Dim) です
# Output_Dim = 1280 (hidden_dims と同じ)
targets = torch.randn(32, 16, 1280).to(tfm.model.device)

# 4. 学習ループ (簡易版)
print("Starting training loop...")
for epoch in range(3):
    optimizer.zero_grad()
    
    # Forwardパス
    # output: (input_embeddings, output_embeddings, output_ts, output_quantiles)
    outputs, _ = model(train_inputs, train_masks)
    
    # ここでは出力埋め込み(output_embeddings)をターゲットと比較する例
    # 実際には output_ts (予測値のlogits) など、目的に応じた出力を使用します
    output_embeddings = outputs[1] # Shape: (32, 16, 1280)
    
    loss = criterion(output_embeddings, targets)
    
    loss.backward()
    optimizer.step()
    
    print(f"Epoch {epoch+1}: Loss = {loss.item():.4f}")

print("Finetuning finished.")


Loading model...


config.json:   0%|          | 0.00/475 [00:00<?, ?B/s]

Downloaded.
trainable params: 4,712,448 || all params: 236,001,728 || trainable%: 1.9968
Starting training loop...
Epoch 1: Loss = 1.5212
Epoch 2: Loss = 1.5057
Epoch 3: Loss = 1.4902
Finetuning finished.


In [3]:
from moinfo_timesfm_ext import FineTuneConfig, finetune_from_csv, make_dummy_train_csv


In [4]:
from moinfo_timesfm_ext import make_dummy_train_csv
make_dummy_train_csv(r"C:\moinfo\data\train.csv", n_points=40000)


WindowsPath('C:/moinfo/data/train.csv')

In [5]:
from moinfo_timesfm_ext import FineTuneConfig, finetune_from_csv

cfg = FineTuneConfig(
    model_path=r"C:\moinfo\timesfm_v2.5_local",   # or r"C:\moinfo\timesfm_hf_local"
    csv_path=r"C:\moinfo\data\train.csv",
    value_col="y",
    series_id_col=None,    # 多系列なら "series_id" 等
    context_len=512,
    horizon_len=128,
    batch_size=8,
    epochs=3,
    lr=1e-4,
    output_dir=r"C:\moinfo\_artifacts\timesfm_finetuned",
    device="auto",
    dtype="auto",
)

out_dir = finetune_from_csv(cfg)
out_dir


`torch_dtype` is deprecated! Use `dtype` instead!
Some weights of the model checkpoint at C:\moinfo\timesfm_v2.5_local were not used when initializing TimesFmModelForPrediction: ['output_projection_point.hidden_layer.weight', 'output_projection_point.output_layer.weight', 'output_projection_point.residual_layer.weight', 'output_projection_quantiles.hidden_layer.weight', 'output_projection_quantiles.output_layer.weight', 'output_projection_quantiles.residual_layer.weight', 'stacked_xf.0.attn.key_ln.scale', 'stacked_xf.0.attn.out.weight', 'stacked_xf.0.attn.per_dim_scale.per_dim_scale', 'stacked_xf.0.attn.qkv_proj.weight', 'stacked_xf.0.attn.query_ln.scale', 'stacked_xf.0.ff0.weight', 'stacked_xf.0.ff1.weight', 'stacked_xf.0.post_attn_ln.scale', 'stacked_xf.0.post_ff_ln.scale', 'stacked_xf.0.pre_attn_ln.scale', 'stacked_xf.0.pre_ff_ln.scale', 'stacked_xf.1.attn.key_ln.scale', 'stacked_xf.1.attn.out.weight', 'stacked_xf.1.attn.per_dim_scale.per_dim_scale', 'stacked_xf.1.attn.qkv_proj.weig

: 

# amazon/chronos-2

In [12]:
import inspect
import importlib
import pkgutil
import pandas as pd
import re
import types
from typing import List, Dict, Any, Optional

class LibraryFeatureInspector:
    """
    指定されたライブラリを探索し、特定の機能（キーワード）に関連する
    クラス、メソッド、ドキュメントを抽出してファクトチェックを行うクラス。
    """
    
    def __init__(self, library_name: str):
        self.library_name = library_name
        self.found_items = []
        try:
            self.module = importlib.import_module(library_name)
            print(f"✅ Successfully imported library: {library_name}")
        except ImportError:
            print(f"❌ Library '{library_name}' not found. Please install it first (e.g., !pip install chronos-forecasting).")
            self.module = None

    def _recursive_inspect(self, module: Any, visited: set, depth: int = 0, max_depth: int = 3):
        """モジュールを再帰的に探索してクラスと関数を収集"""
        if module in visited or depth > max_depth:
            return
        visited.add(module)

        # モジュール内の属性を走査
        for name, obj in inspect.getmembers(module):
            if name.startswith("_"): continue

            # クラスまたは関数の場合
            if inspect.isclass(obj) or inspect.isfunction(obj):
                if hasattr(obj, '__module__') and obj.__module__ and obj.__module__.startswith(self.library_name):
                    doc = inspect.getdoc(obj) or ""
                    # ソースコードの取得（可能な場合）
                    try:
                        source = inspect.getsource(obj)
                    except (OSError, TypeError):
                        source = ""
                    
                    self.found_items.append({
                        "name": name,
                        "type": "class" if inspect.isclass(obj) else "function",
                        "module": obj.__module__,
                        "docstring": doc,
                        "source_preview": source[:500] # 最初の500文字を確認用に保持
                    })
            
            # サブモジュールの探索
            elif isinstance(obj, types.ModuleType):
                if hasattr(obj, '__name__') and obj.__name__.startswith(self.library_name):
                    self._recursive_inspect(obj, visited, depth + 1, max_depth)

    def scan_library(self):
        """ライブラリ全体をスキャン"""
        if not self.module: return
        print(f"🔍 Scanning {self.library_name} structure... (this may take a moment)")
        visited = set()
        self._recursive_inspect(self.module, visited)
        print(f"📊 Found {len(self.found_items)} accessible objects (classes/functions).")

    def verify_features(self, feature_queries: Dict[str, List[str]]) -> pd.DataFrame:
        """
        特定の機能キーワードがドキュメントやコード内に存在するか検証する
        
        Args:
            feature_queries: { "機能カテゴリ": ["検索キーワード1", "検索キーワード2"] }
        """
        results = []
        
        df_items = pd.DataFrame(self.found_items)
        if df_items.empty:
            return pd.DataFrame()

        for category, keywords in feature_queries.items():
            found_count = 0
            evidence = []
            
            for keyword in keywords:
                # ドキュメントまたはソースコードにキーワードが含まれるか検索 (Case Insensitive)
                mask = (
                    df_items["docstring"].str.contains(keyword, case=False, na=False) | 
                    df_items["name"].str.contains(keyword, case=False, na=False) |
                    df_items["source_preview"].str.contains(keyword, case=False, na=False)
                )
                matched = df_items[mask]
                
                if not matched.empty:
                    found_count += len(matched)
                    # 証拠として上位3つのオブジェクト名を記録
                    top_matches = matched["name"].head(3).tolist()
                    evidence.extend([f"{m} ({keyword})" for m in top_matches])

            # 結果の判定
            status = "✅ Confirmed" if found_count > 0 else "❓ Not Found"
            confidence = "High" if found_count > 5 else ("Low" if found_count > 0 else "None")
            
            results.append({
                "Category": category,
                "Status": status,
                "Hit Count": found_count,
                "Keywords Used": ", ".join(keywords),
                "Evidence (Sample Objects)": ", ".join(list(set(evidence))[:5]) # 重複排除して表示
            })
            
        return pd.DataFrame(results)

# --- 実行設定 ---

# 1. 解析対象のライブラリを指定 ('chronos' または 比較のために 'timesfm' も可)
# ※ Chronos-2のパッケージ名は通常 'chronos' です
TARGET_LIBRARY = "chronos" 

# 2. 検証したい機能（ファクトチェック対象）と検索キーワードの定義
feature_definitions = {
    "Multivariate (多変量)": [
        "multivariate", "group_attention", "jointly predict", "co-evolving", "ndim"
    ],
    "Covariates (共変量/外部変数)": [
        "covariate", "exogenous", "future_features", "past_features", "feat_dynamic", "static_features"
    ],
    "Probabilistic/Quantile (確率的予測)": [
        "quantile", "probabilistic", "prediction_interval", "forecast_distribution", "sample"
    ],
    "Zero-Shot / Pretrained (事前学習)": [
        "pretrained", "zero-shot", "pipeline", "from_pretrained"
    ],
    "Handling Missing Data (欠損値)": [
        "mask", "missing", "nan", "pad"
    ],
    "Arbitrary Dimensions (任意次元)": [
        "arbitrary", "dimension", "flexible"
    ]
}

# --- 実行プロセス ---
inspector = LibraryFeatureInspector(TARGET_LIBRARY)

# ライブラリがインストールされている場合のみ実行
if inspector.module:
    inspector.scan_library()
    df_verification = inspector.verify_features(feature_definitions)
    
    print("\n=== 🧪 Feature Verification Report: Chronos-2 Claims ===")
    display(df_verification) # Jupyter環境での表示用
    
    # 必要に応じてCSV保存
    # df_verification.to_csv("chronos_feature_verification.csv", index=False)
else:
    print("\nℹ️ Chronos-2を検証するには、以下のコマンドでインストールしてください:")
    print("!pip install chronos-forecasting")

✅ Successfully imported library: chronos
🔍 Scanning chronos structure... (this may take a moment)
📊 Found 90 accessible objects (classes/functions).

=== 🧪 Feature Verification Report: Chronos-2 Claims ===


Unnamed: 0,Category,Status,Hit Count,Keywords Used,Evidence (Sample Objects)
0,Multivariate (多変量),✅ Confirmed,3,"multivariate, group_attention, jointly predict, co-evolving, ndim",left_pad_and_stack_1D (ndim)
1,Covariates (共変量/外部変数),✅ Confirmed,11,"covariate, exogenous, future_features, past_features, feat_dynamic, static_features","Chronos2Dataset (covariate), Chronos2Model (covariate)"
2,Probabilistic/Quantile (確率的予測),✅ Confirmed,36,"quantile, probabilistic, prediction_interval, forecast_distribution, sample","ForecastType (sample), Chronos2Pipeline (quantile), Chronos2ForecastingConfig (quantile), Chrono..."
3,Zero-Shot / Pretrained (事前学習),✅ Confirmed,34,"pretrained, zero-shot, pipeline, from_pretrained","Chronos2Model (pretrained), Chronos2Pipeline (pipeline), ChronosBoltModelForForecasting (from_pr..."
4,Handling Missing Data (欠損値),✅ Confirmed,29,"mask, missing, nan, pad","validate_and_prepare_single_dict_task (nan), TimeCrossAttention (mask), Chronos2CoreConfig (miss..."
5,Arbitrary Dimensions (任意次元),✅ Confirmed,6,"arbitrary, dimension, flexible","InstanceNorm (dimension), interpolate_quantiles (dimension), weighted_quantile (dimension)"


In [13]:
import pkgutil
import importlib
import inspect
import pandas as pd
import chronos

def discover_all_capabilities(library_module):
    """
    ライブラリ内の全ての公開クラス・関数を探索し、
    その「機能（Docstring）」を一覧化して隠れた機能を発掘する。
    """
    catalog = []
    
    # パッケージのパスを取得
    package_path = library_module.__path__
    prefix = library_module.__name__ + "."

    print(f"🚀 Exploring library: {library_module.__name__}")

    # 再帰的にモジュールを探索
    for _, name, ispkg in pkgutil.walk_packages(package_path, prefix):
        try:
            module = importlib.import_module(name)
            
            # モジュール内のメンバーを検査
            for member_name, obj in inspect.getmembers(module):
                # プライベートメンバと他所からのインポートを除外
                if member_name.startswith("_"): continue
                
                # クラスまたは関数のみ対象
                if inspect.isclass(obj) or inspect.isfunction(obj):
                    # そのライブラリ等で定義されたものに限定（外部ライブラリの除外）
                    if hasattr(obj, '__module__') and obj.__module__ and obj.__module__.startswith(library_module.__name__):
                        
                        # Docstringの取得と整形
                        doc = inspect.getdoc(obj)
                        summary = doc.split('\n')[0] if doc else "(No description available)"
                        
                        # 引数（シグネチャ）の取得 - パラメータから機能を推測するため
                        try:
                            sig = str(inspect.signature(obj))
                        except (ValueError, TypeError):
                            sig = "(...)"

                        catalog.append({
                            "Module": name,
                            "Name": member_name,
                            "Type": "Class" if inspect.isclass(obj) else "Function",
                            "Signature": sig[:100], # 長すぎる場合はカット
                            "Description": summary[:150] # 長すぎる場合はカット
                        })
        except ImportError:
            continue
        except Exception as e:
            # エラーは無視して続行
            pass

    df = pd.DataFrame(catalog)
    # 重複を除去（異なるモジュールでimportされている場合があるため）
    df = df.drop_duplicates(subset=["Name", "Type"])
    return df

# --- 実行 ---
df_catalog = discover_all_capabilities(chronos)

# 機能カテゴリを推測して整理表示
print("\n=== 📦 Chronos-2 Full Capability Catalog ===")

# 1. Pipeline / Model 関連（中核機能）
print("\n🔹 Core Models & Pipelines:")
display(df_catalog[df_catalog["Name"].str.contains("Model|Pipeline|Chronos", case=False, regex=True)].sort_values("Name"))

# 2. Config / Settings 関連（設定項目から隠し機能が見えることが多い）
print("\n🔹 Configuration & Settings:")
display(df_catalog[df_catalog["Name"].str.contains("Config", case=False)].sort_values("Name"))

# 3. Utilities / Data 関連（データ処理、評価、変換など）
print("\n🔹 Utilities & Data Processing:")
# 上記以外を抽出
others = df_catalog[
    ~df_catalog["Name"].str.contains("Model|Pipeline|Chronos|Config", case=False, regex=True)
].sort_values("Name")
display(others)

# 4. "Bolt" (高速化モデル) 関連の確認
print("\n⚡ Check for Chronos-Bolt (Fast Inference):")
bolt_features = df_catalog[df_catalog["Name"].str.contains("Bolt", case=False)]
if not bolt_features.empty:
    display(bolt_features)
else:
    print("No explicit 'Bolt' components found in the API structure.")

🚀 Exploring library: chronos

=== 📦 Chronos-2 Full Capability Catalog ===

🔹 Core Models & Pipelines:


Unnamed: 0,Module,Name,Type,Signature,Description
0,chronos.base,BaseChronosPipeline,Class,(inner_model: 'PreTrainedModel'),(No description available)
15,chronos.chronos2,Chronos2CoreConfig,Class,"(d_model: int = 512, d_kv: int = 64, d_ff: int = 2048, num_layers: int = 6, num_heads: int = 8, ...","HF transformers-style pretrained model config for Chronos-2.0, based on T5Config."
16,chronos.chronos2,Chronos2Dataset,Class,"(inputs: Sequence[Mapping[str, Union[torch.Tensor, numpy.ndarray, Mapping[str, torch.Tensor | nu...",A dataset wrapper for Chronos-2 models.
42,chronos.chronos2.model,Chronos2Encoder,Class,(config: chronos.chronos2.config.Chronos2CoreConfig),Base class for all neural network modules.
43,chronos.chronos2.model,Chronos2EncoderBlock,Class,(config: chronos.chronos2.config.Chronos2CoreConfig),Base class for all neural network modules.
44,chronos.chronos2.model,Chronos2EncoderBlockOutput,Class,"(hidden_states: torch.Tensor | None = None, time_self_attn_weights: torch.Tensor | None = None, ...","Chronos2EncoderBlockOutput(hidden_states: torch.Tensor | None = None, time_self_attn_weights: to..."
45,chronos.chronos2.model,Chronos2EncoderOutput,Class,"(last_hidden_state: torch.Tensor | None = None, all_time_self_attn_weights: tuple[torch.Tensor, ...","Chronos2EncoderOutput(last_hidden_state: torch.Tensor | None = None, all_time_self_attn_weights:..."
17,chronos.chronos2,Chronos2ForecastingConfig,Class,"(context_length: int, output_patch_size: int, input_patch_size: int, input_patch_stride: int, qu...","Chronos2ForecastingConfig(context_length: int, output_patch_size: int, input_patch_size: int, in..."
31,chronos.chronos2.layers,Chronos2LayerNorm,Class,"(hidden_size: int, eps: float = 1e-06)",Base class for all neural network modules.
18,chronos.chronos2,Chronos2Model,Class,(config: chronos.chronos2.config.Chronos2CoreConfig),Base class for all models.



🔹 Configuration & Settings:


Unnamed: 0,Module,Name,Type,Signature,Description
15,chronos.chronos2,Chronos2CoreConfig,Class,"(d_model: int = 512, d_kv: int = 64, d_ff: int = 2048, num_layers: int = 6, num_heads: int = 8, ...","HF transformers-style pretrained model config for Chronos-2.0, based on T5Config."
17,chronos.chronos2,Chronos2ForecastingConfig,Class,"(context_length: int, output_patch_size: int, input_patch_size: int, input_patch_stride: int, qu...","Chronos2ForecastingConfig(context_length: int, output_patch_size: int, input_patch_size: int, in..."
71,chronos.chronos_bolt,ChronosBoltConfig,Class,"(context_length: int, prediction_length: int, input_patch_size: int, input_patch_stride: int, qu...","ChronosBoltConfig(context_length: int, prediction_length: int, input_patch_size: int, input_patc..."
8,chronos.chronos,ChronosConfig,Class,"(tokenizer_class: str, tokenizer_kwargs: Dict[str, Any], context_length: int, prediction_length:...",This class holds all the configuration parameters to be used



🔹 Utilities & Data Processing:


Unnamed: 0,Module,Name,Type,Signature,Description
29,chronos.chronos2.layers,AttentionOutput,Class,"(hidden_states: torch.Tensor | None = None, attn_weights: torch.Tensor | None = None) -> None","AttentionOutput(hidden_states: torch.Tensor | None = None, attn_weights: torch.Tensor | None = N..."
23,chronos.chronos2.dataset,DatasetMode,Class,"(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)",str(object='') -> str
68,chronos.chronos2.trainer,EvaluateAndSaveFinalStepCallback,Class,(),Callback to evaluate and save the model at last training step.
32,chronos.chronos2.layers,FeedForward,Class,(config: chronos.chronos2.config.Chronos2CoreConfig),Base class for all neural network modules.
1,chronos.base,ForecastType,Class,"(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)",Create a collection of name/value pairs.
33,chronos.chronos2.layers,GroupSelfAttention,Class,(config: chronos.chronos2.config.Chronos2CoreConfig),Self-attention applied along the batch axis masked by the group attention mask
52,chronos.chronos2.model,InstanceNorm,Class,"(eps: float = 1e-05, use_arcsinh: bool = False) -> None",Apply standardization along the last dimension and optionally apply arcsinh after standardization.
34,chronos.chronos2.layers,MHA,Class,"(config: chronos.chronos2.config.Chronos2CoreConfig, use_rope: bool = True)",Multi-head Attention Layer
35,chronos.chronos2.layers,MLP,Class,(config: chronos.chronos2.config.Chronos2CoreConfig),Base class for all neural network modules.
13,chronos.chronos,MeanScaleUniformBins,Class,"(low_limit: float, high_limit: float, config: chronos.chronos.ChronosConfig) -> None",A ``ChronosTokenizer`` defines how time series are mapped into token IDs



⚡ Check for Chronos-Bolt (Fast Inference):


Unnamed: 0,Module,Name,Type,Signature,Description
71,chronos.chronos_bolt,ChronosBoltConfig,Class,"(context_length: int, prediction_length: int, input_patch_size: int, input_patch_stride: int, qu...","ChronosBoltConfig(context_length: int, prediction_length: int, input_patch_size: int, input_patc..."
72,chronos.chronos_bolt,ChronosBoltModelForForecasting,Class,(config: transformers.models.t5.configuration_t5.T5Config),This model inherits from [`PreTrainedModel`]. Check the superclass documentation for the generic...
73,chronos.chronos_bolt,ChronosBoltOutput,Class,"(loss: Optional[torch.Tensor] = None, quantile_preds: Optional[torch.Tensor] = None, attentions:...","ChronosBoltOutput(loss: Optional[torch.Tensor] = None, quantile_preds: Optional[torch.Tensor] = ..."
74,chronos.chronos_bolt,ChronosBoltPipeline,Class,(model: chronos.chronos_bolt.ChronosBoltModelForForecasting),(No description available)


In [14]:
import pkgutil
import importlib
import inspect
import pandas as pd
import chronos
import os

def discover_all_capabilities(library_module):
    """
    ライブラリ内の全ての公開クラス・関数を探索し、
    その「機能（Docstring）」を一覧化して隠れた機能を発掘する。
    """
    catalog = []
    
    # パッケージのパスを取得
    # 注意: インストール環境によっては __path__ がリストの場合があるため対応
    package_path = library_module.__path__
    prefix = library_module.__name__ + "."

    print(f"🚀 Exploring library: {library_module.__name__}...")

    # 再帰的にモジュールを探索
    for _, name, ispkg in pkgutil.walk_packages(package_path, prefix):
        try:
            module = importlib.import_module(name)
            
            # モジュール内のメンバーを検査
            for member_name, obj in inspect.getmembers(module):
                # プライベートメンバと他所からのインポートを除外
                if member_name.startswith("_"): continue
                
                # クラスまたは関数のみ対象
                if inspect.isclass(obj) or inspect.isfunction(obj):
                    # そのライブラリ等で定義されたものに限定（外部ライブラリの除外）
                    if hasattr(obj, '__module__') and obj.__module__ and obj.__module__.startswith(library_module.__name__):
                        
                        # Docstringの取得と整形
                        doc = inspect.getdoc(obj)
                        summary = doc.split('\n')[0] if doc else "(No description available)"
                        
                        # シグネチャ（引数定義）の取得
                        try:
                            sig = str(inspect.signature(obj))
                        except (ValueError, TypeError):
                            sig = "(...)"

                        catalog.append({
                            "Category": "Class" if inspect.isclass(obj) else "Function",
                            "Name": member_name,
                            "Module": name,
                            "Signature": sig,  # 引数構成（機能の推測に有用）
                            "Description": summary # 機能概要
                        })
        except ImportError:
            continue
        except Exception as e:
            # 読み込みエラーは無視
            pass

    df = pd.DataFrame(catalog)
    if not df.empty:
        # 重複を除去（異なるモジュールでimportされている場合があるため）
        df = df.drop_duplicates(subset=["Name", "Category"])
        # 見やすいようにソート（モジュール順 -> 名前順）
        df = df.sort_values(by=["Module", "Name"])
    
    return df

# --- 実行 & CSV保存 ---

# 1. 探索実行
df_catalog = discover_all_capabilities(chronos)

if not df_catalog.empty:
    # 2. CSVファイル名
    output_filename = "chronos_capabilities.csv"
    
    # 3. CSV出力 (utf-8-sig はExcelでの文字化けを防ぎます)
    df_catalog.to_csv(output_filename, index=False, encoding='utf-8-sig')
    
    print(f"\n✅ CSV Output Generated: {output_filename}")
    print(f"📊 Total items found: {len(df_catalog)}")
    print(f"📂 Location: {os.path.abspath(output_filename)}")
    
    # (オプション) 上位5行を表示して確認
    print("\n--- Preview (Top 5) ---")
    display(df_catalog.head())
else:
    print("❌ No capabilities found. Please check if 'chronos' is installed correctly.")

🚀 Exploring library: chronos...

✅ CSV Output Generated: chronos_capabilities.csv
📊 Total items found: 51
📂 Location: c:\moinfo\chronos_capabilities.csv

--- Preview (Top 5) ---


Unnamed: 0,Category,Name,Module,Signature,Description
0,Class,BaseChronosPipeline,chronos.base,(inner_model: 'PreTrainedModel'),(No description available)
1,Class,ForecastType,chronos.base,"(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)",Create a collection of name/value pairs.
2,Class,PipelineRegistry,chronos.base,"(name, bases, attrs)",type(object) -> the object's type
3,Function,left_pad_and_stack_1D,chronos.base,(tensors: List[torch.Tensor]) -> torch.Tensor,(No description available)
4,Function,cache_model_from_s3,chronos.boto_utils,"(s3_uri: str, force_download: bool = False, boto3_session: boto3.session.Session | None = None)",(No description available)


# 📚 Chronos-2 機能カタログの階層化と可視化

取得した `df_catalog` (機能一覧) を分析し、ライブラリの構造を「カスケード（階層）構造」で整理します。
これにより、「どの機能がどのコンポーネントに属しているか」を体系的に把握します。

### 分類階層の定義
* **Level 1 (Component)**: 製品ライン（Chronos-2 / Bolt / Base / Utils）
* **Level 2 (Role)**: 役割（Pipeline / Model / Config / Data / Layers）
* **Level 3 (Detail)**: 具体的な機能詳細（推論、学習、Attention機構、入出力など）

In [19]:
import pandas as pd
import os

# --- 1. データの準備 ---
# 前のステップで作成したCSVがあれば読み込み、なければメモリ上のdf_catalogを使用
csv_path = "chronos_capabilities.csv"
if os.path.exists(csv_path):
    df_catalog = pd.read_csv(csv_path)
    print(f"✅ Loaded catalog from {csv_path}")
else:
    # df_catalogがメモリにない場合のフォールバック（通常は前のセルから引き継ぎます）
    print("⚠️ 'df_catalog' variable or CSV file not found. Please run the discovery step first.")

# --- 2. カスケード分類ロジックの実装 ---
def categorize_cascade(row):
    name = str(row['Name'])
    module = str(row['Module'])
    desc = str(row['Description']).lower()
    
    # --- Level 1: Major Component (製品ライン・大分類) ---
    if 'Bolt' in name:
        l1 = "⚡ Chronos-Bolt (Speed)"
    elif 'Chronos2' in name or 'chronos2' in module:
        l1 = "🚀 Chronos-2 (Core)"
    elif 'boto' in module:
        l1 = "☁️ AWS Integration"
    elif 'base' in module:
        l1 = "🏗️ Base Infrastructure"
    elif 'df_utils' in module or 'dataset' in module:
        l1 = "📊 Data Engineering"
    elif any(x in name for x in ['Attention', 'Layer', 'Norm', 'Embedding', 'ResNet']):
        l1 = "🧠 Model Internals"
    else:
        l1 = "🛠️ Utilities & Common"

    # --- Level 2: Role (役割・中分類) ---
    if 'Pipeline' in name:
        l2 = "1. Inference Pipeline"
    elif 'Config' in name:
        l2 = "0. Configuration"
    elif 'Model' in name or 'Encoder' in name:
        l2 = "2. Model Architecture"
    elif 'Dataset' in name or 'convert' in name or 'validate' in name:
        l2 = "3. Data Processing"
    elif 'Output' in name:
        l2 = "4. Output Structures"
    elif 'Attention' in name:
        l2 = "Attention Mechanism"
    elif 'download' in name or 'cache' in name:
        l2 = "S3 / IO"
    else:
        l2 = "Misc / Components"

    # --- Level 3: Detail (詳細・小分類) ---
    if 'Multivariate' in desc or 'group' in name.lower():
        l3 = "Multivariate (Group Interaction)"
    elif 'Quantile' in desc or 'quantiles' in name.lower() or 'probabilistic' in desc:
        l3 = "Probabilistic / Uncertainty"
    elif 'Covariate' in desc or 'cross' in name.lower():
        l3 = "Covariate Support"
    elif 'Pipeline' in name:
        l3 = "End-to-End Workflow"
    elif 'Config' in name:
        l3 = "Hyperparameters"
    else:
        # シグネチャや名前から推測
        l3 = "Component Implementation"

    return pd.Series([l1, l2, l3], index=['L1_Component', 'L2_Role', 'L3_Detail'])

# --- 3. 適用と整形 ---
# 分類を適用
cascade_df = df_catalog.apply(categorize_cascade, axis=1)
df_tree = pd.concat([df_catalog, cascade_df], axis=1)

# 見やすい順序にソート
df_tree = df_tree.sort_values(by=['L1_Component', 'L2_Role', 'Name'])

# 必要な列だけに絞り込み
display_cols = ['L1_Component', 'L2_Role', 'L3_Detail', 'Name', 'Description']
df_final = df_tree[display_cols]

# --- 4. 視覚化 (Tree View) ---
def print_tree_view(df):
    """テキスト形式でツリー構造を表示する"""
    print("\n🌳 Chronos-2 Library Structure Tree")
    print("="*60)
    
    current_l1 = ""
    current_l2 = ""
    
    for _, row in df.iterrows():
        # L1 (Component) Change
        if row['L1_Component'] != current_l1:
            print(f"\n📦 {row['L1_Component']}")
            current_l1 = row['L1_Component']
            current_l2 = "" # Reset L2
            
        # L2 (Role) Change
        if row['L2_Role'] != current_l2:
            print(f"  ├─ 📂 {row['L2_Role']}")
            current_l2 = row['L2_Role']
            
        # Leaf (Item)
        # 詳細説明が長い場合は切り詰め
        desc_short = (row['Description'][:60] + '...') if len(str(row['Description'])) > 60 else row['Description']
        print(f"  │    ├─ {row['Name']:<35} : {desc_short}")

# ツリー表示の実行
print_tree_view(df_final)

# --- 5. Pandas Stylerによるテーブル表示 (Notebook用) ---
# インタラクティブなテーブルとして表示
print("\n📊 Detailed Interactive Table")
(df_final.style
    .set_properties(**{'text-align': 'left'})
    .set_table_styles([
        {'selector': 'th', 'props': [('text-align', 'left'), ('background-color', '#f0f0f0')]}
    ])
    .hide(axis='index')
)

✅ Loaded catalog from chronos_capabilities.csv

🌳 Chronos-2 Library Structure Tree

📦 ☁️ AWS Integration
  ├─ 📂 S3 / IO
  │    ├─ cache_model_from_s3                 : (No description available)
  │    ├─ download_model_files_from_cloudfront : (No description available)
  │    ├─ download_model_files_from_s3        : (No description available)

📦 ⚡ Chronos-Bolt (Speed)
  ├─ 📂 0. Configuration
  │    ├─ ChronosBoltConfig                   : ChronosBoltConfig(context_length: int, prediction_length: in...
  ├─ 📂 1. Inference Pipeline
  │    ├─ ChronosBoltPipeline                 : (No description available)
  ├─ 📂 2. Model Architecture
  │    ├─ ChronosBoltModelForForecasting      : This model inherits from [`PreTrainedModel`]. Check the supe...
  ├─ 📂 4. Output Structures
  │    ├─ ChronosBoltOutput                   : ChronosBoltOutput(loss: Optional[torch.Tensor] = None, quant...

📦 🏗️ Base Infrastructure
  ├─ 📂 1. Inference Pipeline
  │    ├─ BaseChronosPipeline                 : (No 

L1_Component,L2_Role,L3_Detail,Name,Description
☁️ AWS Integration,S3 / IO,Component Implementation,cache_model_from_s3,(No description available)
☁️ AWS Integration,S3 / IO,Component Implementation,download_model_files_from_cloudfront,(No description available)
☁️ AWS Integration,S3 / IO,Component Implementation,download_model_files_from_s3,(No description available)
⚡ Chronos-Bolt (Speed),0. Configuration,Hyperparameters,ChronosBoltConfig,"ChronosBoltConfig(context_length: int, prediction_length: int, input_patch_size: int, input_patch_stride: int, quantiles: List[float], use_reg_token: bool = False)"
⚡ Chronos-Bolt (Speed),1. Inference Pipeline,End-to-End Workflow,ChronosBoltPipeline,(No description available)
⚡ Chronos-Bolt (Speed),2. Model Architecture,Component Implementation,ChronosBoltModelForForecasting,This model inherits from [`PreTrainedModel`]. Check the superclass documentation for the generic methods the
⚡ Chronos-Bolt (Speed),4. Output Structures,Component Implementation,ChronosBoltOutput,"ChronosBoltOutput(loss: Optional[torch.Tensor] = None, quantile_preds: Optional[torch.Tensor] = None, attentions: Optional[torch.Tensor] = None, cross_attentions: Optional[torch.Tensor] = None)"
🏗️ Base Infrastructure,1. Inference Pipeline,End-to-End Workflow,BaseChronosPipeline,(No description available)
🏗️ Base Infrastructure,1. Inference Pipeline,End-to-End Workflow,PipelineRegistry,type(object) -> the object's type
🏗️ Base Infrastructure,Misc / Components,Component Implementation,ForecastType,Create a collection of name/value pairs.


# 📅 Chronos-2 ライフサイクル（イベント）別機能マップ

ライブラリの全機能を、機械学習プロジェクトの**実行プロセス（イベント）**ごとに再分類します。
これにより、「今、データ準備フェーズだからこの関数を使う」といった実践的なナビゲーションが可能になります。

### 🏷️ イベント分類の定義
1.  **🏁 Setup & Config (設定・準備)**:
    * ハイパーパラメータ設定、環境構築、S3からのダウンロードなど。
2.  **🧹 Data Preparation (データ前処理)**:
    * DataFrameの検証、テンソル変換、パディング、データセット作成。
3.  **🧠 Model Loading (モデル読込・構築)**:
    * モデルアーキテクチャの定義、事前学習済みモデルのロード。
4.  **🏋️ Training & Tuning (学習・ファインチューニング)**:
    * Trainer、損失計算、学習ループ関連（追加学習を行う場合）。
5.  **🚀 Inference & Forecast (推論・予測)**:
    * パイプライン実行、未来予測、分位点（Quantile）計算。
6.  **🧩 Internal Processing (内部計算メカニズム)**:
    * Attention機構、正規化、エンコーダーなど、推論の裏側で動く部品。

In [20]:
import pandas as pd
import os

# --- 1. データ読み込み ---
csv_path = "chronos_capabilities.csv"
if os.path.exists(csv_path):
    df_catalog = pd.read_csv(csv_path)
else:
    # CSVがない場合は前のステップの変数を利用（エラーハンドリング）
    print("⚠️ CSV file not found. Using 'df_catalog' from memory if available.")

# --- 2. イベントベース分類ロジック ---
def classify_by_event(row):
    name = str(row['Name'])
    module = str(row['Module'])
    desc = str(row['Description']).lower()
    
    # Prioritize specific keywords to map to lifecycle events
    
    # 1. 🏁 Setup & Config (設定・環境・ダウンロード)
    if 'Config' in name or 'Registry' in name:
        return "1. 🏁 Setup & Config"
    if 'download' in name or 'cache' in name or 'boto' in module:
        return "1. 🏁 Setup & Config (Cloud/IO)"

    # 2. 🧹 Data Preparation (データ準備・前処理)
    if 'Dataset' in name or 'dataset' in module:
        return "2. 🧹 Data Preparation"
    if 'convert' in name or 'validate' in name or 'pad' in name:
        return "2. 🧹 Data Preparation (Utils)"
    if 'Tokenizer' in name or 'Bin' in name:
        return "2. 🧹 Data Preparation (Tokenization)"

    # 3. 🧠 Model Loading (モデル定義・構築)
    if ('Model' in name or 'Chronos' in name) and 'Pipeline' not in name and 'Config' not in name and 'Trainer' not in name:
         # Model class definition itself
         if 'Architecture' in desc or 'PreTrainedModel' in desc or 'Base class' in desc:
             return "3. 🧠 Model Definition"

    # 4. 🏋️ Training & Tuning (学習)
    if 'Trainer' in name or 'Loss' in name or 'Callback' in name:
        return "4. 🏋️ Training & Tuning"
    
    # 5. 🚀 Inference & Forecast (推論・予測)
    if 'Pipeline' in name:
        return "5. 🚀 Inference (High-Level)"
    if 'Forecast' in name or 'predict' in name or 'quantiles' in name or 'Output' in name:
        return "5. 🚀 Inference (Result)"

    # 6. 🧩 Internal Processing (内部計算・レイヤー)
    if any(x in name for x in ['Attention', 'Layer', 'Norm', 'Embedding', 'Block', 'Encoder', 'Decoder', 'MHA', 'MLP', 'RoPE']):
        return "6. 🧩 Internal Layers (Low-Level)"

    return "7. 🛠️ Misc Utilities"

# --- 3. 適用と整形 ---
df_event = df_catalog.copy()
df_event['Event_Phase'] = df_event.apply(classify_by_event, axis=1)

# Sort strictly by Event Phase then Name
df_event = df_event.sort_values(by=['Event_Phase', 'Name'])

# --- 4. 可視化 (Event Map) ---
def print_event_map(df):
    print("\n📅 Chronos-2 Workflow Event Map")
    print("="*70)
    
    current_phase = ""
    
    for _, row in df.iterrows():
        if row['Event_Phase'] != current_phase:
            print(f"\n{row['Event_Phase']}")
            print("-" * 50)
            current_phase = row['Event_Phase']
            
        # 機能名と短い説明を表示
        desc = row['Description'] if pd.notna(row['Description']) else ""
        desc = (desc[:70] + '...') if len(desc) > 70 else desc
        print(f"  🔹 {row['Name']:<40} : {desc}")

print_event_map(df_event)

# --- 5. 保存 (オプション) ---
# df_event.to_csv("chronos_event_map.csv", index=False, encoding='utf-8-sig')


📅 Chronos-2 Workflow Event Map

1. 🏁 Setup & Config
--------------------------------------------------
  🔹 Chronos2CoreConfig                       : HF transformers-style pretrained model config for Chronos-2.0, based o...
  🔹 Chronos2ForecastingConfig                : Chronos2ForecastingConfig(context_length: int, output_patch_size: int,...
  🔹 ChronosBoltConfig                        : ChronosBoltConfig(context_length: int, prediction_length: int, input_p...
  🔹 ChronosConfig                            : This class holds all the configuration parameters to be used
  🔹 PipelineRegistry                         : type(object) -> the object's type

1. 🏁 Setup & Config (Cloud/IO)
--------------------------------------------------
  🔹 cache_model_from_s3                      : (No description available)
  🔹 download_model_files_from_cloudfront     : (No description available)
  🔹 download_model_files_from_s3             : (No description available)

2. 🧹 Data Preparation
----------------

In [21]:
import os
from pathlib import Path

def create_project_structure(base_path: str):
    """
    Chronos-2開発環境用のディレクトリ構造を、既存のTimesFM環境と整合性を保ちつつ構築します。
    
    Args:
        base_path (str): プロジェクトのルートパス (例: C:/moinfo)
    """
    root = Path(base_path)
    
    # 1. 作成するディレクトリの定義 (階層構造)
    directories = [
        # モデル格納用 (Root直下)
        "chronos_hf_local",
        "chronos_bolt_local",
        
        # ライブラリ開発領域
        "libs/chronos/00_raw",
        "libs/chronos/01_sandbox",
        "libs/chronos/02_src/moinfo_chronos_ext/adapter",
        "libs/chronos/02_src/moinfo_chronos_ext/pipeline",
        "libs/chronos/02_src/moinfo_chronos_ext/utils",
        "libs/chronos/03_scripts",
        "libs/chronos/04_outputs/figs",
        "libs/chronos/04_outputs/logs",
        "libs/chronos/04_outputs/tables",
        "libs/chronos/04_outputs/api",
        "libs/chronos/05_reports",
        "libs/chronos/06_tests",
        "libs/chronos/07_configs",
        "libs/chronos/08_logs",
        "libs/chronos/09_docs",
    ]

    # 2. 空ファイル作成定義 (Pythonパッケージ化など)
    files_to_create = {
        # Keepファイル (Git管理用)
        "chronos_hf_local/.keep": "",
        "chronos_bolt_local/.keep": "",
        
        # Python Package Init
        "libs/chronos/02_src/moinfo_chronos_ext/__init__.py": 
            '"""moinfo_chronos_ext package initialized."""\n__version__ = "0.1.0"',
        "libs/chronos/02_src/moinfo_chronos_ext/adapter/__init__.py": "",
        "libs/chronos/02_src/moinfo_chronos_ext/pipeline/__init__.py": "",
        "libs/chronos/02_src/moinfo_chronos_ext/utils/__init__.py": "",
        
        # README templates
        "libs/chronos/README.md": 
            "# Chronos-2 Forecasting Workspace\n\nAmazon Chronos-2 & Bolt models integration workspace.\n",
        "libs/chronos/07_configs/default_config.yaml": 
            "# Default configuration for Chronos Pipeline\nmodel_id: 'amazon/chronos-t5-small'\ndevice: 'cpu'\n"
    }

    print(f"🚀 Starting environment setup at: {root.absolute()}")
    print("-" * 50)

    # 3. ディレクトリ作成実行
    for dir_path in directories:
        full_path = root / dir_path
        try:
            full_path.mkdir(parents=True, exist_ok=True)
            print(f"✅ Created Directory: {dir_path}")
        except Exception as e:
            print(f"❌ Failed to create {dir_path}: {e}")

    # 4. ファイル作成実行
    for file_rel_path, content in files_to_create.items():
        full_path = root / file_rel_path
        try:
            if not full_path.exists():
                full_path.write_text(content, encoding='utf-8')
                print(f"📄 Created File:      {file_rel_path}")
            else:
                print(f"⏩ Skipped (Exists):  {file_rel_path}")
        except Exception as e:
            print(f"❌ Failed to create file {file_rel_path}: {e}")

    print("-" * 50)
    print("🎉 Setup completed successfully.")
    print("   Next Step: Run 'pip install chronos-forecasting' if not installed.")

if __name__ == "__main__":
    # カレントディレクトリをルートとして実行
    create_project_structure(".")

🚀 Starting environment setup at: c:\moinfo
--------------------------------------------------
✅ Created Directory: chronos_hf_local
✅ Created Directory: chronos_bolt_local
✅ Created Directory: libs/chronos/00_raw
✅ Created Directory: libs/chronos/01_sandbox
✅ Created Directory: libs/chronos/02_src/moinfo_chronos_ext/adapter
✅ Created Directory: libs/chronos/02_src/moinfo_chronos_ext/pipeline
✅ Created Directory: libs/chronos/02_src/moinfo_chronos_ext/utils
✅ Created Directory: libs/chronos/03_scripts
✅ Created Directory: libs/chronos/04_outputs/figs
✅ Created Directory: libs/chronos/04_outputs/logs
✅ Created Directory: libs/chronos/04_outputs/tables
✅ Created Directory: libs/chronos/04_outputs/api
✅ Created Directory: libs/chronos/05_reports
✅ Created Directory: libs/chronos/06_tests
✅ Created Directory: libs/chronos/07_configs
✅ Created Directory: libs/chronos/08_logs
✅ Created Directory: libs/chronos/09_docs
📄 Created File:      chronos_hf_local/.keep
📄 Created File:      chronos_bol



## 機能一覧（公式ドキュメント由来で再分類）

| 分類     | 機能（何ができる？）                              | 具体の使い方/設定（見えている公式API）                                                                                  | 根拠                              |
| ------ | --------------------------------------- | ------------------------------------------------------------------------------------------------------ | ------------------------------- |
| 予測タスク  | 単変量（univariate：1系列）予測                   | 同一APIで実行                                                                                               | ([Hugging Face][1])             |
| 予測タスク  | 多変量（multivariate：複数系列を同時）予測             | 同一アーキテクチャ内で対応                                                                                          | ([Hugging Face][1])             |
| 予測タスク  | 共変量付き（covariate-informed）予測             | **過去のみ共変量**＋**既知の未来共変量**＋**カテゴリ共変量**に対応                                                                | ([Amazon Science][2])           |
| 予測タスク  | アイテム間クロス学習（cross-learning across items） | 複数系列から学習を共有（コールドスタートに効くと説明）                                                                            | ([Hugging Face][1])             |
| 推論特性   | ゼロショット（zero-shot：追加学習なしで予測）             | 事前学習モデルをそのまま `predict`/`predict_df`                                                                    | ([Hugging Face][1])             |
| 出力     | 確率予測：分位点（quantile）をマルチステップで出す           | `quantile_levels=[0.1,0.5,0.9]` のように指定                                                                 | ([Hugging Face][1])             |
| スケール   | 最大コンテキスト長 8192                          | 仕様表に明記                                                                                                 | ([Hugging Face][1])             |
| スケール   | 最大予測長 1024                              | 仕様表に明記                                                                                                 | ([Hugging Face][1])             |
| 性能/効率  | GPU/CPU 推論、A10Gで「毎秒300系列超」主張            | モデルカードに性能記載（※再現条件は別途確認推奨）                                                                              | ([Hugging Face][1])             |
| ローカル利用 | Pythonパッケージ提供                           | `pip install "chronos-forecasting>=2.0"`                                                               | ([Hugging Face][1])             |
| 入力I/F  | pandas API（DataFrame入力）                 | `Chronos2Pipeline.from_pretrained(...)` → `predict_df(...)`                                            | ([Hugging Face][1])             |
| 入力I/F  | 時系列識別子・時刻列・ターゲット列を指定                    | `id_column`, `timestamp_column`, `target` を渡す                                                          | ([Hugging Face][1])             |
| 入力I/F  | 未来共変量（known future covariates）を別DFで渡す   | `future_df=...`（ターゲット列を落としたDF例あり）                                                                      | ([Hugging Face][1])             |
| 運用     | 本番は JumpStart 推奨（少ないコードでエンドポイント）        | READMEで「本番はJumpStart」推奨                                                                                | ([GitHub][3])                   |
| デプロイ   | SageMaker へデプロイ（例：JumpStartModel）       | `JumpStartModel(model_id="pytorch-forecasting-chronos-2", instance_type="ml.g5.2xlarge")` → `deploy()` | ([Hugging Face][1])             |
| デプロイ形態 | リアルタイム（GPU/CPU）/サーバレス/バッチ変換             | “新ガイドが3方式をカバー” と明記                                                                                     | ([GitHub][3])                   |
| エコシステム | AWS公式ドキュメント側でも Chronos ノートブック導線         | 「JumpStartの例ノートブック」案内あり                                                                                | ([Amazon Web Services Docs][4]) |
| 研究的裏付け | グループ注意（group attention）で系列/共変量間の情報共有    | 技術レポート要旨で明記                                                                                            | ([arXiv][5])                    |

---

## どういう「機能の塊」だと理解すると速いか（初心者向け）

* **入力**：
  過去のターゲット（target：予測したい値）に加えて、共変量（covariate：販促、天気、祝日など）を **過去分**・**未来既知分**として与えられる。 ([Amazon Science][2])
* **中身**：
  ICL（in-context learning：文脈内学習）で「今見えている過去のパターン」から、その場で予測を組み立てる。多変量や共変量の“関係性”は **group attention（グループ注意）**で取り込む、という立て付け。 ([Amazon Science][2])
* **出力**：
  点予測だけでなく、分位点（quantile）で不確実性つきの予測を返す（例：0.1/0.5/0.9）。 ([Hugging Face][1])
* **運用**：
  実験はローカル、本番は JumpStart → SageMaker エンドポイント化、という導線が公式に用意されている。 ([GitHub][3])

---


## Model Downloader

In [24]:
import inspect
import torch
from chronos import Chronos2Pipeline, ChronosBoltPipeline

# --- 🛠️ 引数確認用ヘルパー関数 ---
def show_args(func):
    """関数の引数（シグネチャ）を見やすく表示する"""
    try:
        sig = inspect.signature(func)
        print(f"\n📋 Arguments for `{func.__name__}`:")
        print("-" * 70)
        for name, param in sig.parameters.items():
            # デフォルト値の取得
            if param.default == inspect.Parameter.empty:
                default = "(required)"
            else:
                default = f"= {param.default}"
            
            # 型ヒントの取得
            if param.annotation != inspect.Parameter.empty:
                try:
                    # クラス名などが取れる場合
                    ann = f": {param.annotation.__name__}"
                except AttributeError:
                    # Typingオブジェクトなどの場合
                    ann = f": {str(param.annotation).replace('typing.', '')}"
            else:
                ann = ""

            print(f"  • {name:<20} {ann} {default}")
        print("-" * 70)
    except Exception as e:
        print(f"Could not inspect signature: {e}")

# ==========================================
# 1. Chronos-2 (Base) のダウンロード
# ==========================================
print("\n⬇️ Downloading Chronos-2 (Base)...")

# from_pretrained の引数を確認
show_args(Chronos2Pipeline.from_pretrained)

pipeline_c2 = Chronos2Pipeline.from_pretrained(
    "amazon/chronos-2", 
    device_map="cpu", 
    torch_dtype=torch.float32
)

# 【修正点】tokenizerの保存は不要（削除）
pipeline_c2.model.save_pretrained("chronos_hf_local")
print("✅ Saved Chronos-2 model config & weights to: chronos_hf_local/")


# ==========================================
# 2. Chronos-Bolt (Small) のダウンロード
# ==========================================
print("\n⬇️ Downloading Chronos-Bolt (Small)...")

# from_pretrained の引数を確認
show_args(ChronosBoltPipeline.from_pretrained)

pipeline_bolt = ChronosBoltPipeline.from_pretrained(
    "amazon/chronos-bolt-small",
    device_map="cpu",
    torch_dtype=torch.float32
)

# 【修正点】こちらもtokenizerの保存は不要（削除）
pipeline_bolt.model.save_pretrained("chronos_bolt_local")
print("✅ Saved Chronos-Bolt model config & weights to: chronos_bolt_local/")


# ==========================================
# 3. 予測メソッド (predict) の引数を確認
# ==========================================
# 次のステップで使う重要なメソッドです
print("\n🔍 Checking Arguments for Chronos-2 Prediction:")
show_args(pipeline_c2.predict)

print("\n🔍 Checking Arguments for Bolt Prediction:")
show_args(pipeline_bolt.predict)


⬇️ Downloading Chronos-2 (Base)...

📋 Arguments for `from_pretrained`:
----------------------------------------------------------------------
  • pretrained_model_name_or_path  (required)
  • args                  (required)
  • kwargs                (required)
----------------------------------------------------------------------
✅ Saved Chronos-2 model config & weights to: chronos_hf_local/

⬇️ Downloading Chronos-Bolt (Small)...

📋 Arguments for `from_pretrained`:
----------------------------------------------------------------------
  • pretrained_model_name_or_path  (required)
  • args                  (required)
  • kwargs                (required)
----------------------------------------------------------------------


config.json: 0.00B [00:00, ?B/s]

model.safetensors:   0%|          | 0.00/191M [00:00<?, ?B/s]

✅ Saved Chronos-Bolt model config & weights to: chronos_bolt_local/

🔍 Checking Arguments for Chronos-2 Prediction:

📋 Arguments for `predict`:
----------------------------------------------------------------------
  • inputs               : Union (required)
  • prediction_length    : int | None = None
  • batch_size           : int = 256
  • context_length       : int | None = None
  • cross_learning       : bool = False
  • limit_prediction_length : bool = False
  • kwargs                (required)
----------------------------------------------------------------------

🔍 Checking Arguments for Bolt Prediction:

📋 Arguments for `predict`:
----------------------------------------------------------------------
  • inputs               : Union (required)
  • prediction_length    : Optional = None
  • limit_prediction_length : bool = False
----------------------------------------------------------------------
