# PostgreSQL データベース検査

以下のスクリプトを実行して、データベース（`dataset`）の格納状況を確認します。

**主な機能:**
1. **テーブル一覧取得**: 指定スキーマ（`public`）内の全テーブルをリストアップ
2. **件数カウント**: 各テーブルの総レコード数を計算
3. **データプレビュー**: 最新データを10件取得し、以下のフォーマットで表示
   - 数値のカンマ区切り
   - 値の大きさに応じた背景色グラデーション（ヒートマップ）

**使用ライブラリ:** `pandas`, `sqlalchemy`, `IPython.display`

In [1]:
import sys
import os
import importlib

# 1. 現在のディレクトリ (tslib) を取得
current_dir = os.getcwd()

# 2. 'db' パッケージが入っている親ディレクトリ (ds/src) のパスを作成
#    実体: /mnt/e/env/ts/tslib/ds/src/db  -> 追加すべきパス: /mnt/e/env/ts/tslib/ds/src
target_path = os.path.join(current_dir, 'ds', 'src')

# 3. パスが通っていなければ追加
if target_path not in sys.path:
    sys.path.append(target_path)

# 確認用出力
print(f"Added path: {target_path}")
if os.path.exists(target_path):
    print(f"Files in target path: {os.listdir(target_path)}") # ここに 'db' が表示されるはず
else:
    print("⚠️ 警告: 指定したパスが存在しません。ディレクトリ構成を確認してください。")

# 4. パスが通った状態でインポート
import db.view.db_app
importlib.reload(db.view.db_app)
from db.view.db_app import DBViewerApp

# DB設定
DB_HOST = os.environ.get("DB_HOST", "127.0.0.1")
DB_PORT = os.environ.get("DB_PORT", "5432")
DB_NAME = os.environ.get("DB_NAME", "foundation_model")
DB_USER = os.environ.get("DB_USER", "postgres")
DB_PASS = os.environ.get("DB_PASS", "z")
DB_SCHEMA = os.environ.get("DB_SCHEMA", "public")

# アプリ起動
app = DBViewerApp(DB_HOST, DB_PORT, DB_NAME, DB_USER, DB_PASS, DB_SCHEMA)
app.ui.display()

print("engine =", getattr(app.state, "engine", None))
print("db_version =", getattr(app.state, "db_version", None))

Added path: /mnt/e/env/ts/tslib/ds/src
Files in target path: ['exog', 'anomalies', '3', '10', '__pycache__', 'db_viewer_patch.zip', 'db_viewer_patch.zip:Zone.Identifier', 'base', 'db']


VBox(children=(Output(), HTML(value="\n        <div style='background:#ffffff; padding: 8px 10px; border-radiu…

engine = Engine(postgresql+psycopg://postgres:***@127.0.0.1:5432/foundation_model)
db_version = PostgreSQL 16.11 (Ubuntu 16.11-0ubuntu0.24.04.1) on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 13.3.0-6ubuntu2~24.04) 13.3.0, 64-bit


# 0. Notebook Setup

この章の目的：
- Notebookを上から再実行しても副作用が残りにくい（再現性が高い）
- `src/` 配下のコード変更が即反映される（autoreload）
- プロジェクトパスが常に通っている（import安定）

実行ルール：
- この章のセルは、Notebook起動後に最初に必ず実行する
- 以後のセルは「この章に依存」する


## ✅ codecell: 0-1. 環境情報とカレント確認

In [3]:
import os, sys, platform
from pathlib import Path

NOTEBOOK_PATH = Path("/mnt/e/env/ts/tslib/neuralforecast.ipynb")
PROJECT_ROOT  = Path("/mnt/e/env/ts/tslib")
SRC_ROOT      = PROJECT_ROOT / "src"

print("Python:", sys.version)
print("Platform:", platform.platform())
print("CWD:", os.getcwd())
print("PROJECT_ROOT exists:", PROJECT_ROOT.exists())
print("SRC_ROOT exists:", SRC_ROOT.exists())
print("NOTEBOOK_PATH:", NOTEBOOK_PATH)


Python: 3.11.14 (main, Oct 21 2025, 18:31:21) [GCC 11.2.0]
Platform: Linux-6.6.87.2-microsoft-standard-WSL2-x86_64-with-glibc2.39
CWD: /mnt/e/env/ts/tslib
PROJECT_ROOT exists: True
SRC_ROOT exists: True
NOTEBOOK_PATH: /mnt/e/env/ts/tslib/neuralforecast.ipynb


## ✅ codecell: 0-2. src/ を import パスに追加（重複なし）＋ autoreload

In [4]:
# 1) src を sys.path の先頭に入れて import を安定化（重複は避ける）
src_str = str(SRC_ROOT)
if src_str not in sys.path:
    sys.path.insert(0, src_str)

print("sys.path[0]:", sys.path[0])

# 2) Notebook上で src 配下の変更を自動反映
%load_ext autoreload
%autoreload 2


sys.path[0]: /mnt/e/env/ts/tslib/src


## ✅ codecell: 0-3. “副作用を残しにくい”ための基本設定（乱数・警告・ログ）

In [5]:
import random
import numpy as np

# ---- 再現性（まずは最小）----
SEED = 42
random.seed(SEED)
np.random.seed(SEED)

# ---- 警告ノイズを抑える（必要に応じて調整）----
import warnings
warnings.filterwarnings("default")

# ---- ログ：Notebook再実行でハンドラが増殖しがちなので、毎回初期化 ----
import logging

def setup_notebook_logging(level=logging.INFO) -> logging.Logger:
    logger = logging.getLogger("nf_app")
    logger.setLevel(level)

    # 既存ハンドラをクリア（再実行しても二重出力にならない）
    if logger.handlers:
        for h in list(logger.handlers):
            logger.removeHandler(h)

    handler = logging.StreamHandler()
    formatter = logging.Formatter(
        fmt="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S"
    )
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    logger.propagate = False
    return logger

logger = setup_notebook_logging()
logger.info("Notebook logging initialized. SEED=%s", SEED)


2026-01-15 08:03:25 | INFO | nf_app | Notebook logging initialized. SEED=42


# 0. Package Bootstrap（nf_app 作成）

目的：
- `src/nf_app` を Python package として成立させる（import 可能にする）
- 最小の `__init__.py` を置いて “import の起点” を作る


## ✅ codecell: 0-4. nf_app/__init__.py を作成（コマンド）

In [6]:
from pathlib import Path

init_path = Path("/mnt/e/env/ts/tslib/src/nf_app/__init__.py")
init_path.parent.mkdir(parents=True, exist_ok=True)

init_code = '''\
"""
nf_app package.

Notebook駆動での実験・運用を支えるアプリ層。
- 実体の処理は src/nf_app 以下のモジュールに切り出す
- Notebookはオーケストレーション（実行・確認）に集中する
"""

__all__ = ["__version__"]
__version__ = "0.1.0"
'''

init_path.write_text(init_code, encoding="utf-8")
print("Wrote:", init_path)
print(init_path.read_text(encoding="utf-8"))


Wrote: /mnt/e/env/ts/tslib/src/nf_app/__init__.py
"""
nf_app package.

Notebook駆動での実験・運用を支えるアプリ層。
- 実体の処理は src/nf_app 以下のモジュールに切り出す
- Notebookはオーケストレーション（実行・確認）に集中する
"""

__all__ = ["__version__"]
__version__ = "0.1.0"



## ✅ codecell: 0-5. import 動作確認（autoreloadの効きもここで確認）

In [7]:
import nf_app
print("nf_app imported:", nf_app)
print("nf_app.__version__:", nf_app.__version__)


nf_app imported: <module 'nf_app' from '/mnt/e/env/ts/tslib/src/nf_app/__init__.py'>
nf_app.__version__: 0.1.0


# 0. 再実行安全のための「リセット関数」

Notebookでは同じセルを何度も回すため、次を標準化する：
- ロガーの再初期化（ハンドラ増殖防止）
- 大きいキャッシュ/グローバル変数を明示的に破棄できる口を用意

DB接続などは後工程でここに“閉じる処理”を追加する。


## ✅ codecell: 0-6. リセット関数（後でDB/セッションをここに登録）

In [9]:
def notebook_reset():
    """
    Notebookの再実行前に呼ぶ想定のリセット。
    後工程で、DB接続/セッション/一時ファイルなどもここで閉じる。
    """
    global logger
    logger = setup_notebook_logging()
    logger.info("Notebook reset done.")

notebook_reset()


2026-01-15 08:04:55 | INFO | nf_app | Notebook reset done.


# 0. Config

目的：
- DB接続URL（dataset/model）・成果物ルート（artifact root）・実行モード（run mode）を一元管理する
- `.env` と環境変数で切り替え可能にする（環境変数が優先）
- pytestで設定の読み取り・優先順位・妥当性を検証する


## ✅ codecell: 0-Config-1. config.py を作成

In [10]:
from pathlib import Path

config_path = Path("/mnt/e/env/ts/tslib/src/nf_app/config.py")
config_path.parent.mkdir(parents=True, exist_ok=True)

config_code = r'''\
from __future__ import annotations

import os
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import Optional


class RunMode(str, Enum):
    """
    Notebook/CLIでの実行モード。
    - DRY_RUN: DB書込みやartifact保存を抑制（読み取りと計算の検証）
    - TRAIN_SAVE_PREDICT: 学習→保存→予測→登録まで実施
    - TRAIN_PREDICT: 学習→予測（保存はしない）
    - PREDICT_ONLY: 既存モデルをロードして予測のみ
    """
    DRY_RUN = "DRY_RUN"
    TRAIN_SAVE_PREDICT = "TRAIN_SAVE_PREDICT"
    TRAIN_PREDICT = "TRAIN_PREDICT"
    PREDICT_ONLY = "PREDICT_ONLY"


def _parse_bool(v: Optional[str], default: bool = False) -> bool:
    if v is None:
        return default
    s = v.strip().lower()
    return s in ("1", "true", "t", "yes", "y", "on")


def _load_dotenv_if_present(dotenv_path: Path) -> None:
    """
    `.env` を読み込む。環境変数が既に存在する場合は上書きしない（env優先）。
    - python-dotenv があればそれを使う
    - なければ最小パーサで読み込む
    """
    if not dotenv_path.exists():
        return

    try:
        from dotenv import load_dotenv  # type: ignore
        load_dotenv(dotenv_path=str(dotenv_path), override=False)
        return
    except Exception:
        # フォールバック：KEY=VALUE のみ対応（簡易）
        for line in dotenv_path.read_text(encoding="utf-8").splitlines():
            line = line.strip()
            if not line or line.startswith("#"):
                continue
            if "=" not in line:
                continue
            k, v = line.split("=", 1)
            k = k.strip()
            v = v.strip().strip('"').strip("'")
            if k and (k not in os.environ):
                os.environ[k] = v


@dataclass(frozen=True)
class NFConfig:
    # DB URLs
    dataset_db_url: str
    model_db_url: str

    # artifacts
    artifact_root: Path

    # run behavior
    run_mode: RunMode
    log_level: str = "INFO"
    strict_validation: bool = True

    @staticmethod
    def from_env(
        project_root: Path,
        dotenv_filename: str = ".env",
    ) -> "NFConfig":
        """
        読み込み優先順位：
        1) 既に設定されている環境変数
        2) project_root/.env（存在すれば読み込む）
        3) デフォルト値（ただしDB URLは必須にするのが安全）
        """
        dotenv_path = project_root / dotenv_filename
        _load_dotenv_if_present(dotenv_path)

        dataset_db_url = os.environ.get("DATASET_DB_URL", "").strip()
        model_db_url = os.environ.get("MODEL_DB_URL", "").strip()

        # artifact root default
        artifact_root_str = os.environ.get("NF_ARTIFACT_ROOT", str(project_root / "artifacts")).strip()
        artifact_root = Path(artifact_root_str)

        # run mode default
        run_mode_str = os.environ.get("NF_RUN_MODE", RunMode.DRY_RUN.value).strip()
        try:
            run_mode = RunMode(run_mode_str)
        except ValueError as e:
            raise ValueError(f"NF_RUN_MODE is invalid: {run_mode_str}. allowed={list(RunMode)}") from e

        log_level = os.environ.get("NF_LOG_LEVEL", "INFO").strip()
        strict_validation = _parse_bool(os.environ.get("NF_STRICT_VALIDATION"), default=True)

        cfg = NFConfig(
            dataset_db_url=dataset_db_url,
            model_db_url=model_db_url,
            artifact_root=artifact_root,
            run_mode=run_mode,
            log_level=log_level,
            strict_validation=strict_validation,
        )

        cfg._validate()
        return cfg

    def _validate(self) -> None:
        # DB URLsは“原則必須”。DRY_RUNでもDB読み込みしたいので必須扱いが安全。
        if self.strict_validation:
            if not self.dataset_db_url:
                raise ValueError("DATASET_DB_URL is required (empty).")
            if not self.model_db_url:
                raise ValueError("MODEL_DB_URL is required (empty).")

        # artifact root は作れること（相対も許すが、Notebookでは絶対が安全）
        try:
            self.artifact_root.mkdir(parents=True, exist_ok=True)
        except Exception as e:
            raise ValueError(f"Cannot create artifact_root: {self.artifact_root}") from e

    def to_dict(self) -> dict:
        return {
            "dataset_db_url": self.dataset_db_url,
            "model_db_url": self.model_db_url,
            "artifact_root": str(self.artifact_root),
            "run_mode": self.run_mode.value,
            "log_level": self.log_level,
            "strict_validation": self.strict_validation,
        }
'''
config_path.write_text(config_code, encoding="utf-8")
print("Wrote:", config_path)
print(config_path.read_text(encoding="utf-8")[:800], "...\n")


Wrote: /mnt/e/env/ts/tslib/src/nf_app/config.py
\
from __future__ import annotations

import os
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import Optional


class RunMode(str, Enum):
    """
    Notebook/CLIでの実行モード。
    - DRY_RUN: DB書込みやartifact保存を抑制（読み取りと計算の検証）
    - TRAIN_SAVE_PREDICT: 学習→保存→予測→登録まで実施
    - TRAIN_PREDICT: 学習→予測（保存はしない）
    - PREDICT_ONLY: 既存モデルをロードして予測のみ
    """
    DRY_RUN = "DRY_RUN"
    TRAIN_SAVE_PREDICT = "TRAIN_SAVE_PREDICT"
    TRAIN_PREDICT = "TRAIN_PREDICT"
    PREDICT_ONLY = "PREDICT_ONLY"


def _parse_bool(v: Optional[str], default: bool = False) -> bool:
    if v is None:
        return default
    s = v.strip().lower()
    return s in ("1", "true", "t", "yes", "y", "on")


def _load_dotenv_if_present(dotenv_path: Path) -> None:
    """
    `.env` を読み込む。環境変 ...



## ✅ codecell: 0-Config-2. Notebookで動作確認（環境変数→読込）

In [11]:
import os
from pathlib import Path

# Notebook内で一時的に設定（本来は .env に書く）
os.environ["DATASET_DB_URL"] = "postgresql+psycopg://postgres@127.0.0.1:5432/dataset"
os.environ["MODEL_DB_URL"]   = "postgresql+psycopg://postgres@127.0.0.1:5432/model"
os.environ["NF_ARTIFACT_ROOT"] = "/mnt/e/env/ts/tslib/artifacts"
os.environ["NF_RUN_MODE"] = "DRY_RUN"

from nf_app.config import NFConfig

cfg = NFConfig.from_env(project_root=Path("/mnt/e/env/ts/tslib"))
cfg.to_dict()


{'dataset_db_url': 'postgresql+psycopg://postgres@127.0.0.1:5432/dataset',
 'model_db_url': 'postgresql+psycopg://postgres@127.0.0.1:5432/model',
 'artifact_root': '/mnt/e/env/ts/tslib/artifacts',
 'run_mode': 'DRY_RUN',
 'log_level': 'INFO',
 'strict_validation': True}

## ✅ codecell: 0-Config-4. pytest（tests/test_config.py を作成）

In [12]:
from pathlib import Path

test_path = Path("/mnt/e/env/ts/tslib/tests/test_config.py")
test_path.parent.mkdir(parents=True, exist_ok=True)

test_code = r'''\
from pathlib import Path
import os
import pytest

from nf_app.config import NFConfig, RunMode


def test_config_env_overrides_dotenv(tmp_path, monkeypatch):
    # .env を作る
    project_root = tmp_path
    dotenv = project_root / ".env"
    dotenv.write_text(
        "\n".join([
            "DATASET_DB_URL=postgresql+psycopg://dotenv/dataset",
            "MODEL_DB_URL=postgresql+psycopg://dotenv/model",
            f"NF_ARTIFACT_ROOT={project_root / 'artifacts_from_dotenv'}",
            "NF_RUN_MODE=TRAIN_PREDICT",
        ]) + "\n",
        encoding="utf-8",
    )

    # env を上書き（env優先のはず）
    monkeypatch.setenv("DATASET_DB_URL", "postgresql+psycopg://env/dataset")
    monkeypatch.setenv("MODEL_DB_URL", "postgresql+psycopg://env/model")
    monkeypatch.setenv("NF_RUN_MODE", "DRY_RUN")

    cfg = NFConfig.from_env(project_root=project_root)
    assert cfg.dataset_db_url.endswith("/dataset")
    assert "env" in cfg.dataset_db_url
    assert cfg.run_mode == RunMode.DRY_RUN


def test_config_requires_db_urls_when_strict(tmp_path):
    project_root = tmp_path
    # env も .env も無い状態
    with pytest.raises(ValueError):
        NFConfig.from_env(project_root=project_root)


def test_config_allows_empty_db_urls_when_not_strict(tmp_path, monkeypatch):
    project_root = tmp_path
    monkeypatch.setenv("NF_STRICT_VALIDATION", "false")
    cfg = NFConfig.from_env(project_root=project_root)
    # strict_validationがfalseなら空でも通る（ただし実運用では推奨しない）
    assert cfg.strict_validation is False
    assert cfg.dataset_db_url == ""
    assert cfg.model_db_url == ""
    assert cfg.artifact_root.exists()


def test_invalid_run_mode_raises(tmp_path, monkeypatch):
    project_root = tmp_path
    monkeypatch.setenv("DATASET_DB_URL", "postgresql+psycopg://x/dataset")
    monkeypatch.setenv("MODEL_DB_URL", "postgresql+psycopg://x/model")
    monkeypatch.setenv("NF_RUN_MODE", "NO_SUCH_MODE")

    with pytest.raises(ValueError):
        NFConfig.from_env(project_root=project_root)
'''
test_path.write_text(test_code, encoding="utf-8")
print("Wrote:", test_path)
print(test_path.read_text(encoding="utf-8")[:500], "...\n")


Wrote: /mnt/e/env/ts/tslib/tests/test_config.py
\
from pathlib import Path
import os
import pytest

from nf_app.config import NFConfig, RunMode


def test_config_env_overrides_dotenv(tmp_path, monkeypatch):
    # .env を作る
    project_root = tmp_path
    dotenv = project_root / ".env"
    dotenv.write_text(
        "\n".join([
            "DATASET_DB_URL=postgresql+psycopg://dotenv/dataset",
            "MODEL_DB_URL=postgresql+psycopg://dotenv/model",
            f"NF_ARTIFACT_ROOT={project_root / 'artifacts_from_dotenv'}",
            "NF_RU ...



## ✅ codecell: 0-Config-5. pytest 実行（Notebookから）

In [13]:
# Notebookのカレントをプロジェクトルートに合わせるのが安全
%cd /mnt/e/env/ts/tslib

# まずはこのテストだけ
!pytest -q tests/test_config.py


/mnt/e/env/ts/tslib

[31m[1m____________________ ERROR collecting tests/test_config.py _____________________[0m
[31mImportError while importing test module '/mnt/e/env/ts/tslib/tests/test_config.py'.
Hint: make sure your test modules/packages have valid Python names.
Traceback:
[1m[31m/home/az/miniconda3/envs/ts/lib/python3.11/importlib/__init__.py[0m:126: in import_module
    [0m[94mreturn[39;49;00m _bootstrap._gcd_import(name[level:], package, level)[90m[39;49;00m
[1m[31mtests/test_config.py[0m:6: in <module>
    [0m[94mfrom[39;49;00m[90m [39;49;00m[04m[96mnf_app[39;49;00m[04m[96m.[39;49;00m[04m[96mconfig[39;49;00m[90m [39;49;00m[94mimport[39;49;00m NFConfig, RunMode[90m[39;49;00m
[1m[31mE   ModuleNotFoundError: No module named 'nf_app'[0m[0m
[31mERROR[0m tests/test_config.py
!!!!!!!!!!!!!!!!!!!! Interrupted: 1 error during collection !!!!!!!!!!!!!!!!!!!!
[31m[31m[1m1 error[0m[31m in 0.14s[0m[0m


## pytestが `nf_app` を見つけられない問題の修正

pytestはNotebookの `sys.path` 設定を引き継がないため、
`tests/conftest.py` で `/mnt/e/env/ts/tslib/src` を import パスに追加する。


## /mnt/e/env/ts/tslib/tests/conftest.py を作成

In [14]:
from pathlib import Path

conftest_path = Path("/mnt/e/env/ts/tslib/tests/conftest.py")
conftest_path.parent.mkdir(parents=True, exist_ok=True)

conftest_code = '''\
import sys
from pathlib import Path

# /mnt/e/env/ts/tslib/tests/conftest.py
# pytest起動時に src を import path に追加して、`import nf_app` を可能にする

PROJECT_ROOT = Path(__file__).resolve().parents[1]
SRC_ROOT = PROJECT_ROOT / "src"

src_str = str(SRC_ROOT)
if src_str not in sys.path:
    sys.path.insert(0, src_str)
'''

conftest_path.write_text(conftest_code, encoding="utf-8")
print("Wrote:", conftest_path)
print(conftest_path.read_text(encoding="utf-8"))


Wrote: /mnt/e/env/ts/tslib/tests/conftest.py
import sys
from pathlib import Path

# /mnt/e/env/ts/tslib/tests/conftest.py
# pytest起動時に src を import path に追加して、`import nf_app` を可能にする

PROJECT_ROOT = Path(__file__).resolve().parents[1]
SRC_ROOT = PROJECT_ROOT / "src"

src_str = str(SRC_ROOT)
if src_str not in sys.path:
    sys.path.insert(0, src_str)



In [15]:
%cd /mnt/e/env/ts/tslib
!pytest -q tests/test_config.py


/mnt/e/env/ts/tslib
[32m.[0m[31mF[0m[31mF[0m[32m.[0m[31m                                                                     [100%][0m
[31m[1m___________________ test_config_requires_db_urls_when_strict ___________________[0m

tmp_path = PosixPath('/tmp/pytest-of-az/pytest-0/test_config_requires_db_urls_w0')

    [0m[94mdef[39;49;00m[90m [39;49;00m[92mtest_config_requires_db_urls_when_strict[39;49;00m(tmp_path):[90m[39;49;00m
        project_root = tmp_path[90m[39;49;00m
        [90m# env も .env も無い状態[39;49;00m[90m[39;49;00m
>       [94mwith[39;49;00m pytest.raises([96mValueError[39;49;00m):[90m[39;49;00m
[1m[31mE       Failed: DID NOT RAISE <class 'ValueError'>[0m

[1m[31mtests/test_config.py[0m:37: Failed
[31m[1m_______________ test_config_allows_empty_db_urls_when_not_strict _______________[0m

tmp_path = PosixPath('/tmp/pytest-of-az/pytest-0/test_config_allows_empty_db_ur0')
monkeypatch = <_pytest.monkeypatch.MonkeyPatch object at 0x72818c

## pytestの環境変数リークを防ぐ

Notebookやシェルで設定された環境変数がpytestに残り、テストが想定する状態（空のenv）とズレる。
`tests/conftest.py` に autouse fixture を追加して、各テスト開始時に設定系の環境変数を必ずクリアする。


## ✅ codecell: conftest.py を上書き（sys.path追加＋envクリア）

In [17]:
from pathlib import Path

conftest_path = Path("/mnt/e/env/ts/tslib/tests/conftest.py")
conftest_path.parent.mkdir(parents=True, exist_ok=True)

conftest_code = '''\
import sys
from pathlib import Path
import pytest

# pytest起動時に src を import path に追加して、`import nf_app` を可能にする
PROJECT_ROOT = Path(__file__).resolve().parents[1]
SRC_ROOT = PROJECT_ROOT / "src"

src_str = str(SRC_ROOT)
if src_str not in sys.path:
    sys.path.insert(0, src_str)

# テストは「環境変数が汚れていない」前提にしたいので毎回クリアする
_CONFIG_ENV_KEYS = [
    "DATASET_DB_URL",
    "MODEL_DB_URL",
    "NF_ARTIFACT_ROOT",
    "NF_RUN_MODE",
    "NF_STRICT_VALIDATION",
    "NF_LOG_LEVEL",
]

@pytest.fixture(autouse=True)
def _clean_config_env(monkeypatch):
    for k in _CONFIG_ENV_KEYS:
        monkeypatch.delenv(k, raising=False)
'''
conftest_path.write_text(conftest_code, encoding="utf-8")
print("Wrote:", conftest_path)
print(conftest_path.read_text(encoding="utf-8"))


Wrote: /mnt/e/env/ts/tslib/tests/conftest.py
import sys
from pathlib import Path
import pytest

# pytest起動時に src を import path に追加して、`import nf_app` を可能にする
PROJECT_ROOT = Path(__file__).resolve().parents[1]
SRC_ROOT = PROJECT_ROOT / "src"

src_str = str(SRC_ROOT)
if src_str not in sys.path:
    sys.path.insert(0, src_str)

# テストは「環境変数が汚れていない」前提にしたいので毎回クリアする
_CONFIG_ENV_KEYS = [
    "DATASET_DB_URL",
    "MODEL_DB_URL",
    "NF_ARTIFACT_ROOT",
    "NF_RUN_MODE",
    "NF_STRICT_VALIDATION",
    "NF_LOG_LEVEL",
]

@pytest.fixture(autouse=True)
def _clean_config_env(monkeypatch):
    for k in _CONFIG_ENV_KEYS:
        monkeypatch.delenv(k, raising=False)



## ✅ codecell: pytest 再実行

In [18]:
%cd /mnt/e/env/ts/tslib
!pytest -q tests/test_config.py


/mnt/e/env/ts/tslib
[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m                                                                     [100%][0m
[32m[32m[1m4 passed[0m[32m in 0.01s[0m[0m


# 0. Logging（run_id付き）

目的：
- すべてのログ行に `run_id` を含める（実験・実行単位の追跡を可能にする）
- Notebookの再実行でもログが二重出力にならない（ハンドラ増殖防止）
- pytestで「run_idが必ず入る」を担保する


## ✅ codecell: 0-Logging-1. nf_app/logging.py を作成

In [19]:
from pathlib import Path

logging_path = Path("/mnt/e/env/ts/tslib/src/nf_app/logging.py")
logging_path.parent.mkdir(parents=True, exist_ok=True)

logging_code = r'''\
from __future__ import annotations

import logging
import uuid
from dataclasses import dataclass
from typing import Optional


RUN_ID_MDC_KEY = "run_id"


class RunIdFilter(logging.Filter):
    """
    LogRecordに run_id を注入するフィルタ。
    formatterで %(run_id)s を参照できるようにする。
    """
    def __init__(self, run_id: str):
        super().__init__()
        self.run_id = run_id

    def filter(self, record: logging.LogRecord) -> bool:
        # 既に run_id が明示されている場合は尊重
        if not hasattr(record, RUN_ID_MDC_KEY):
            setattr(record, RUN_ID_MDC_KEY, self.run_id)
        return True


@dataclass(frozen=True)
class LoggerSpec:
    name: str = "nf_app"
    level: int = logging.INFO
    fmt: str = "%(asctime)s | %(levelname)s | %(name)s | run_id=%(run_id)s | %(message)s"
    datefmt: str = "%Y-%m-%d %H:%M:%S"


def new_run_id() -> str:
    return str(uuid.uuid4())


def get_logger(
    *,
    run_id: Optional[str] = None,
    spec: LoggerSpec = LoggerSpec(),
) -> logging.Logger:
    """
    run_id付きのロガーを取得する。
    - Notebook再実行でハンドラが増殖しないように毎回クリア
    - run_idはFilterで注入
    """
    if run_id is None:
        run_id = new_run_id()

    logger = logging.getLogger(spec.name)
    logger.setLevel(spec.level)

    # ハンドラ増殖防止
    if logger.handlers:
        for h in list(logger.handlers):
            logger.removeHandler(h)

    handler = logging.StreamHandler()
    handler.setLevel(spec.level)
    handler.setFormatter(logging.Formatter(fmt=spec.fmt, datefmt=spec.datefmt))

    # 既存フィルタが残らないように毎回クリア→付け直し
    if logger.filters:
        for f in list(logger.filters):
            logger.removeFilter(f)

    logger.addFilter(RunIdFilter(run_id))
    logger.addHandler(handler)
    logger.propagate = False

    return logger
'''
logging_path.write_text(logging_code, encoding="utf-8")
print("Wrote:", logging_path)
print(logging_path.read_text(encoding="utf-8")[:900], "...\n")


Wrote: /mnt/e/env/ts/tslib/src/nf_app/logging.py
\
from __future__ import annotations

import logging
import uuid
from dataclasses import dataclass
from typing import Optional


RUN_ID_MDC_KEY = "run_id"


class RunIdFilter(logging.Filter):
    """
    LogRecordに run_id を注入するフィルタ。
    formatterで %(run_id)s を参照できるようにする。
    """
    def __init__(self, run_id: str):
        super().__init__()
        self.run_id = run_id

    def filter(self, record: logging.LogRecord) -> bool:
        # 既に run_id が明示されている場合は尊重
        if not hasattr(record, RUN_ID_MDC_KEY):
            setattr(record, RUN_ID_MDC_KEY, self.run_id)
        return True


@dataclass(frozen=True)
class LoggerSpec:
    name: str = "nf_app"
    level: int = logging.INFO
    fmt: str = "%(asctime)s | %(levelname)s | %(name)s | run_id=%(run_id)s | %(message)s"
    datefmt: str = "%Y-%m-%d %H:%M:%S"


def new_run_id() -> str:
    return str(uuid.uuid4())


def get_logger(
    *,
  ...



## ✅ codecell: 0-Logging-2. Notebookで動作確認（run_idが全行に入る）

In [20]:
from nf_app.logging import get_logger, new_run_id

rid = new_run_id()
logger = get_logger(run_id=rid)

logger.info("hello")
logger.warning("warn")
logger.error("error")

# Notebook再実行や再取得でも二重出力しないことを確認
logger2 = get_logger(run_id=rid)
logger2.info("logger recreated (should appear once)")


2026-01-15 08:13:09 | INFO | nf_app | run_id=b8a2833a-56f5-4974-8c8d-d94fa48e2fe2 | hello
2026-01-15 08:13:09 | ERROR | nf_app | run_id=b8a2833a-56f5-4974-8c8d-d94fa48e2fe2 | error
2026-01-15 08:13:09 | INFO | nf_app | run_id=b8a2833a-56f5-4974-8c8d-d94fa48e2fe2 | logger recreated (should appear once)


## ✅ codecell: 0-Logging-3. pytest（tests/test_logging.py を作成）

In [21]:
from pathlib import Path

test_logging_path = Path("/mnt/e/env/ts/tslib/tests/test_logging.py")
test_logging_path.parent.mkdir(parents=True, exist_ok=True)

test_logging_code = r'''\
import re
from nf_app.logging import get_logger

def test_logger_injects_run_id(caplog):
    run_id = "TEST-RUN-ID-123"
    logger = get_logger(run_id=run_id)

    # caplogはroot側に集約するので、logger名に縛らずメッセージにrun_idが入るかを見る
    # ただし本実装はFormatterに run_id を入れているので、handler出力を検査したい。
    # pytestのcaplogは標準ではhandlerのフォーマット文字列まで捕まえないことがあるため、
    # LogRecord属性として run_id が付与されていることを確認する。
    with caplog.at_level("INFO"):
        logger.info("hello")

    # recordに run_id が付いていること
    assert any(getattr(r, "run_id", None) == run_id for r in caplog.records)

def test_logger_does_not_duplicate_handlers():
    run_id = "RID"
    logger = get_logger(run_id=run_id)
    n1 = len(logger.handlers)

    # 再取得してもハンドラ数が増えない
    logger2 = get_logger(run_id=run_id)
    n2 = len(logger2.handlers)

    assert n1 == 1
    assert n2 == 1
'''
test_logging_path.write_text(test_logging_code, encoding="utf-8")
print("Wrote:", test_logging_path)
print(test_logging_path.read_text(encoding="utf-8"))


Wrote: /mnt/e/env/ts/tslib/tests/test_logging.py
\
import re
from nf_app.logging import get_logger

def test_logger_injects_run_id(caplog):
    run_id = "TEST-RUN-ID-123"
    logger = get_logger(run_id=run_id)

    # caplogはroot側に集約するので、logger名に縛らずメッセージにrun_idが入るかを見る
    # ただし本実装はFormatterに run_id を入れているので、handler出力を検査したい。
    # pytestのcaplogは標準ではhandlerのフォーマット文字列まで捕まえないことがあるため、
    # LogRecord属性として run_id が付与されていることを確認する。
    with caplog.at_level("INFO"):
        logger.info("hello")

    # recordに run_id が付いていること
    assert any(getattr(r, "run_id", None) == run_id for r in caplog.records)

def test_logger_does_not_duplicate_handlers():
    run_id = "RID"
    logger = get_logger(run_id=run_id)
    n1 = len(logger.handlers)

    # 再取得してもハンドラ数が増えない
    logger2 = get_logger(run_id=run_id)
    n2 = len(logger2.handlers)

    assert n1 == 1
    assert n2 == 1



## ✅ codecell: 0-Logging-4. pytest 実行

In [22]:
%cd /mnt/e/env/ts/tslib
!pytest -q tests/test_logging.py


/mnt/e/env/ts/tslib
[31mF[0m[32m.[0m[31m                                                                       [100%][0m
[31m[1m__________________________ test_logger_injects_run_id __________________________[0m

caplog = <_pytest.logging.LogCaptureFixture object at 0x7b2098005790>

    [0m[94mdef[39;49;00m[90m [39;49;00m[92mtest_logger_injects_run_id[39;49;00m(caplog):[90m[39;49;00m
        run_id = [33m"[39;49;00m[33mTEST-RUN-ID-123[39;49;00m[33m"[39;49;00m[90m[39;49;00m
        logger = get_logger(run_id=run_id)[90m[39;49;00m
    [90m[39;49;00m
        [90m# caplogはroot側に集約するので、logger名に縛らずメッセージにrun_idが入るかを見る[39;49;00m[90m[39;49;00m
        [90m# ただし本実装はFormatterに run_id を入れているので、handler出力を検査したい。[39;49;00m[90m[39;49;00m
        [90m# pytestのcaplogは標準ではhandlerのフォーマット文字列まで捕まえないことがあるため、[39;49;00m[90m[39;49;00m
        [90m# LogRecord属性として run_id が付与されていることを確認する。[39;49;00m[90m[39;49;00m
        [94mwith[39;49;00m caplog.at_level([33m"[39;

### pytestのログ捕捉を修正

`nf_app` ロガーは propagate=False なので caplog(root捕捉)ではrecordsが取れない。
テストではロガーに一時ハンドラを付け、LogRecordの run_id 注入を直接検証する。


## ✅ codecell: tests/test_logging.py を修正（上書き）

In [24]:
from pathlib import Path

test_logging_path = Path("/mnt/e/env/ts/tslib/tests/test_logging.py")

test_logging_code = r'''\
import logging
from nf_app.logging import get_logger

class ListHandler(logging.Handler):
    """LogRecordを収集するだけのテスト用ハンドラ"""
    def __init__(self):
        super().__init__()
        self.records = []

    def emit(self, record: logging.LogRecord) -> None:
        self.records.append(record)


def test_logger_injects_run_id():
    run_id = "TEST-RUN-ID-123"
    logger = get_logger(run_id=run_id)

    lh = ListHandler()
    logger.addHandler(lh)

    logger.info("hello")

    assert len(lh.records) >= 1
    assert getattr(lh.records[0], "run_id", None) == run_id


def test_logger_does_not_duplicate_handlers():
    run_id = "RID"
    logger = get_logger(run_id=run_id)
    n1 = len(logger.handlers)

    # 再取得してもハンドラ数が増えない（get_logger内で毎回クリアしている想定）
    logger2 = get_logger(run_id=run_id)
    n2 = len(logger2.handlers)

    assert n1 == 1
    assert n2 == 1
'''
test_logging_path.write_text(test_logging_code, encoding="utf-8")
print("Wrote:", test_logging_path)
print(test_logging_path.read_text(encoding="utf-8"))


Wrote: /mnt/e/env/ts/tslib/tests/test_logging.py
\
import logging
from nf_app.logging import get_logger

class ListHandler(logging.Handler):
    """LogRecordを収集するだけのテスト用ハンドラ"""
    def __init__(self):
        super().__init__()
        self.records = []

    def emit(self, record: logging.LogRecord) -> None:
        self.records.append(record)


def test_logger_injects_run_id():
    run_id = "TEST-RUN-ID-123"
    logger = get_logger(run_id=run_id)

    lh = ListHandler()
    logger.addHandler(lh)

    logger.info("hello")

    assert len(lh.records) >= 1
    assert getattr(lh.records[0], "run_id", None) == run_id


def test_logger_does_not_duplicate_handlers():
    run_id = "RID"
    logger = get_logger(run_id=run_id)
    n1 = len(logger.handlers)

    # 再取得してもハンドラ数が増えない（get_logger内で毎回クリアしている想定）
    logger2 = get_logger(run_id=run_id)
    n2 = len(logger2.handlers)

    assert n1 == 1
    assert n2 == 1



## ✅ codecell: pytest 再実行

In [25]:
%cd /mnt/e/env/ts/tslib
!pytest -q tests/test_logging.py


/mnt/e/env/ts/tslib
[32m.[0m[32m.[0m[32m                                                                       [100%][0m
[32m[32m[1m2 passed[0m[32m in 0.01s[0m[0m


# 0. Errors（例外ポリシー：Data / DB / Model）

目的：
- 失敗を「原因カテゴリ」で分類する（Data / DB / Model / Config / Unknown）
- 例外をDBに保存できる形（dict/JSON）へ正規化する
- run_id を含む context（状況情報）を一緒に持てるようにする
- pytestで分類・シリアライズが担保される


## ✅ codecell: 0-Errors-1. nf_app/errors.py を作成

In [26]:
from pathlib import Path

errors_path = Path("/mnt/e/env/ts/tslib/src/nf_app/errors.py")
errors_path.parent.mkdir(parents=True, exist_ok=True)

errors_code = r'''\
from __future__ import annotations

import traceback as tb
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, Optional


class ErrorType(str, Enum):
    DATA = "DATA"        # データ品質、欠損、リーク、型不整合など
    DB = "DB"            # DB接続/SQL/整合性制約など
    MODEL = "MODEL"      # 学習/推論/保存/ロード、パラメータ不正など
    CONFIG = "CONFIG"    # 設定値不正（env/.env等）
    UNKNOWN = "UNKNOWN"  # 未分類


@dataclass
class NFError(Exception):
    """
    DBに保存できるように構造化した例外。
    - error_type: 失敗カテゴリ
    - error_code: 文字列コード（集計・アラートに使う）
    - message: 人間向けメッセージ
    - context: JSON化可能な補足情報（run_id, table名, unique_idなど）
    - cause: 元例外（例外チェーン）
    """
    error_type: ErrorType
    error_code: str
    message: str
    context: Dict[str, Any] = field(default_factory=dict)
    cause: Optional[BaseException] = None

    def __post_init__(self):
        super().__init__(self.message)

    def to_record(self) -> Dict[str, Any]:
        """
        runsテーブル等にそのまま入れられる形式。
        （SQLAlchemy/psycopgならJSONBにも入れやすい）
        """
        return {
            "error_type": self.error_type.value,
            "error_code": self.error_code,
            "error_message": self.message,
            "error_context": self.context,
            "cause_type": type(self.cause).__name__ if self.cause else None,
            "cause_message": str(self.cause) if self.cause else None,
            "traceback": self.format_traceback(),
        }

    def format_traceback(self) -> str:
        # 自分自身がraiseされた位置のstackも残したいが、
        # causeがあればcauseのtraceback優先で採る運用が多い
        if self.cause is None:
            return "".join(tb.format_exception(type(self), self, self.__traceback__))
        return "".join(tb.format_exception(type(self.cause), self.cause, self.cause.__traceback__))


# --- 便利コンストラクタ（統一したコード体系にしやすい） ---
def data_error(code: str, message: str, *, context: Optional[Dict[str, Any]] = None, cause: Optional[BaseException] = None) -> NFError:
    return NFError(ErrorType.DATA, code, message, context=context or {}, cause=cause)

def db_error(code: str, message: str, *, context: Optional[Dict[str, Any]] = None, cause: Optional[BaseException] = None) -> NFError:
    return NFError(ErrorType.DB, code, message, context=context or {}, cause=cause)

def model_error(code: str, message: str, *, context: Optional[Dict[str, Any]] = None, cause: Optional[BaseException] = None) -> NFError:
    return NFError(ErrorType.MODEL, code, message, context=context or {}, cause=cause)

def config_error(code: str, message: str, *, context: Optional[Dict[str, Any]] = None, cause: Optional[BaseException] = None) -> NFError:
    return NFError(ErrorType.CONFIG, code, message, context=context or {}, cause=cause)


def classify_exception(ex: BaseException) -> NFError:
    """
    既存の例外をNFErrorへ分類ラップする。
    例：SQLAlchemyError, psycopg.Error, ValueError(データ型)などを分類したい。
    ※後工程で依存ライブラリが増えるので、ここは“拡張ポイント”。
    """
    # すでにNFErrorならそのまま
    if isinstance(ex, NFError):
        return ex

    # DB系：psycopg/SQLAlchemyに依存したいが、ここでは文字列ベースの安全分類に留める
    name = type(ex).__name__.lower()
    msg = str(ex)

    if "psycopg" in name or "sqlalchemy" in name or "database" in name or "connection" in msg.lower():
        return db_error("DB_UNKNOWN", msg, cause=ex)

    # Data系っぽい
    if isinstance(ex, (KeyError, TypeError)) or "nan" in msg.lower() or "null" in msg.lower():
        return data_error("DATA_UNKNOWN", msg, cause=ex)

    # Config系っぽい
    if isinstance(ex, ValueError) and ("NF_" in msg or "DATASET_DB_URL" in msg or "MODEL_DB_URL" in msg):
        return config_error("CONFIG_INVALID", msg, cause=ex)

    return NFError(ErrorType.UNKNOWN, "UNKNOWN", msg, cause=ex)
'''
errors_path.write_text(errors_code, encoding="utf-8")
print("Wrote:", errors_path)
print(errors_path.read_text(encoding="utf-8")[:900], "...\n")


Wrote: /mnt/e/env/ts/tslib/src/nf_app/errors.py
\
from __future__ import annotations

import traceback as tb
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, Optional


class ErrorType(str, Enum):
    DATA = "DATA"        # データ品質、欠損、リーク、型不整合など
    DB = "DB"            # DB接続/SQL/整合性制約など
    MODEL = "MODEL"      # 学習/推論/保存/ロード、パラメータ不正など
    CONFIG = "CONFIG"    # 設定値不正（env/.env等）
    UNKNOWN = "UNKNOWN"  # 未分類


@dataclass
class NFError(Exception):
    """
    DBに保存できるように構造化した例外。
    - error_type: 失敗カテゴリ
    - error_code: 文字列コード（集計・アラートに使う）
    - message: 人間向けメッセージ
    - context: JSON化可能な補足情報（run_id, table名, unique_idなど）
    - cause: 元例外（例外チェーン）
    """
    error_type: ErrorType
    error_code: str
    message: str
    context: Dict[str, Any] = field(default_factory=dict)
    cause: Optional[BaseException] = None

    def __post_init__(self):
        super().__init__(self.message)

    def to ...



## ✅ codecell: 0-Errors-2. Notebookで動作確認（分類→to_record）

In [27]:
from nf_app.errors import data_error, db_error, model_error, classify_exception

try:
    1 / 0
except Exception as e:
    nferr = classify_exception(e)
    print(nferr.error_type, nferr.error_code)
    rec = nferr.to_record()
    # DBへそのまま入る形（dict）になっている
    for k in ["error_type", "error_code", "error_message", "cause_type", "traceback"]:
        print(k, "=>", (rec[k][:80] + "...") if isinstance(rec[k], str) and len(rec[k]) > 80 else rec[k])


ErrorType.UNKNOWN UNKNOWN
error_type => UNKNOWN
error_code => UNKNOWN
error_message => division by zero
cause_type => ZeroDivisionError
traceback => Traceback (most recent call last):
  File "/tmp/ipykernel_58523/3620536958.py", ...


## ✅ codecell: 0-Errors-3. pytest（tests/test_errors.py 作成）

In [28]:
from pathlib import Path

test_errors_path = Path("/mnt/e/env/ts/tslib/tests/test_errors.py")
test_errors_path.parent.mkdir(parents=True, exist_ok=True)

test_errors_code = r'''\
from nf_app.errors import (
    NFError, ErrorType,
    data_error, db_error, model_error, config_error,
    classify_exception
)

def test_nferror_to_record_has_required_fields():
    err = data_error("DATA_X", "bad data", context={"run_id": "RID"})
    rec = err.to_record()
    assert rec["error_type"] == ErrorType.DATA.value
    assert rec["error_code"] == "DATA_X"
    assert rec["error_message"] == "bad data"
    assert rec["error_context"]["run_id"] == "RID"
    assert "traceback" in rec

def test_classify_exception_wraps_non_nferror():
    try:
        1 / 0
    except Exception as e:
        wrapped = classify_exception(e)

    assert isinstance(wrapped, NFError)
    # ゼロ除算はここではUNKNOWN分類でよい（後で拡張してもOK）
    assert wrapped.error_type in (ErrorType.UNKNOWN,)

def test_factories_set_error_type():
    assert data_error("A", "m").error_type == ErrorType.DATA
    assert db_error("A", "m").error_type == ErrorType.DB
    assert model_error("A", "m").error_type == ErrorType.MODEL
    assert config_error("A", "m").error_type == ErrorType.CONFIG
'''
test_errors_path.write_text(test_errors_code, encoding="utf-8")
print("Wrote:", test_errors_path)
print(test_errors_path.read_text(encoding="utf-8"))


Wrote: /mnt/e/env/ts/tslib/tests/test_errors.py
\
from nf_app.errors import (
    NFError, ErrorType,
    data_error, db_error, model_error, config_error,
    classify_exception
)

def test_nferror_to_record_has_required_fields():
    err = data_error("DATA_X", "bad data", context={"run_id": "RID"})
    rec = err.to_record()
    assert rec["error_type"] == ErrorType.DATA.value
    assert rec["error_code"] == "DATA_X"
    assert rec["error_message"] == "bad data"
    assert rec["error_context"]["run_id"] == "RID"
    assert "traceback" in rec

def test_classify_exception_wraps_non_nferror():
    try:
        1 / 0
    except Exception as e:
        wrapped = classify_exception(e)

    assert isinstance(wrapped, NFError)
    # ゼロ除算はここではUNKNOWN分類でよい（後で拡張してもOK）
    assert wrapped.error_type in (ErrorType.UNKNOWN,)

def test_factories_set_error_type():
    assert data_error("A", "m").error_type == ErrorType.DATA
    assert db_error("A", "m").error_type == ErrorType.DB
    assert model_error

## ✅ codecell: 0-Errors-4. pytest 実行

In [29]:
%cd /mnt/e/env/ts/tslib
!pytest -q tests/test_errors.py


/mnt/e/env/ts/tslib
[32m.[0m[32m.[0m[32m.[0m[32m                                                                      [100%][0m
[32m[32m[1m3 passed[0m[32m in 0.01s[0m[0m


# 1. DB Connect（SQLAlchemy + psycopg）

目的：
- dataset DB / model DB の両方に接続できることを確認する（Smoke）
- Notebookでもpytestでも同じコードで動く
- 再実行時に接続プールが残り続けないよう、明示的にdisposeできる


# ✅ codecell: 1-DB-1. nf_app/db.py を作成

In [None]:
from pathlib import Path

db_path = Path("/mnt/e/env/ts/tslib/src/nf_app/db.py")

db_code = r'''\
from __future__ import annotations

import os
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Tuple
from urllib.parse import urlparse, urlunparse

from sqlalchemy import create_engine, text
from sqlalchemy.engine import Engine
from sqlalchemy.exc import SQLAlchemyError

from nf_app.errors import db_error
from nf_app.config import NFConfig


class DbKind(str, Enum):
    DATASET = "dataset"
    MODEL = "model"


def _inject_password_if_missing(db_url: str, *, kind: DbKind) -> str:
    """
    URLにパスワードが含まれていない場合、環境変数から注入する。
    優先順位：
      1) DATASET_DB_PASSWORD / MODEL_DB_PASSWORD
      2) PGPASSWORD
    ※ URLに既に password があれば何もしない
    """
    parsed = urlparse(db_url)
    if parsed.password:
        return db_url

    # postgresql以外はそのまま
    if not parsed.scheme.startswith("postgresql"):
        return db_url

    pwd_key = "DATASET_DB_PASSWORD" if kind == DbKind.DATASET else "MODEL_DB_PASSWORD"
    pwd = os.environ.get(pwd_key) or os.environ.get("PGPASSWORD")
    if not pwd:
        return db_url  # 注入できない（後で接続時に落ちる/skip対象）

    # netloc: user@host:port 形式に password を入れる -> user:pass@host:port
    username = parsed.username or ""
    hostname = parsed.hostname or ""
    port = f":{parsed.port}" if parsed.port else ""
    if username:
        netloc = f"{username}:{pwd}@{hostname}{port}"
    else:
        netloc = f"{hostname}{port}"

    rebuilt = parsed._replace(netloc=netloc)
    return urlunparse(rebuilt)


def _make_engine(db_url: str, *, echo: bool = False) -> Engine:
    try:
        engine = create_engine(
            db_url,
            echo=echo,
            pool_pre_ping=True,
            future=True,
            connect_args={"connect_timeout": 5},
        )
        return engine
    except Exception as e:
        raise db_error(
            "DB_ENGINE_CREATE_FAILED",
            f"Failed to create engine for url={db_url}",
            context={"db_url": db_url},
            cause=e,
        )


def _ping(engine: Engine) -> None:
    try:
        with engine.connect() as conn:
            conn.execute(text("SELECT 1"))
    except SQLAlchemyError as e:
        raise db_error(
            "DB_PING_FAILED",
            "DB ping failed (SELECT 1).",
            context={"dialect": str(engine.dialect.name)},
            cause=e,
        )


def _version(engine: Engine) -> str:
    with engine.connect() as conn:
        r = conn.execute(text("SELECT version()")).scalar_one()
    return str(r)


@dataclass
class DbManager:
    cfg: NFConfig
    echo_sql: bool = False

    _dataset_engine: Optional[Engine] = None
    _model_engine: Optional[Engine] = None

    def dataset_engine(self) -> Engine:
        if self._dataset_engine is None:
            if not self.cfg.dataset_db_url:
                raise db_error("DB_URL_EMPTY", "DATASET_DB_URL is empty.", context={"kind": DbKind.DATASET.value})
            url = _inject_password_if_missing(self.cfg.dataset_db_url, kind=DbKind.DATASET)
            self._dataset_engine = _make_engine(url, echo=self.echo_sql)
        return self._dataset_engine

    def model_engine(self) -> Engine:
        if self._model_engine is None:
            if not self.cfg.model_db_url:
                raise db_error("DB_URL_EMPTY", "MODEL_DB_URL is empty.", context={"kind": DbKind.MODEL.value})
            url = _inject_password_if_missing(self.cfg.model_db_url, kind=DbKind.MODEL)
            self._model_engine = _make_engine(url, echo=self.echo_sql)
        return self._model_engine

    def test_connections(self) -> Tuple[str, str]:
        ds = self.dataset_engine()
        md = self.model_engine()

        _ping(ds)
        _ping(md)

        return _version(ds), _version(md)

    def dispose(self) -> None:
        if self._dataset_engine is not None:
            self._dataset_engine.dispose()
            self._dataset_engine = None
        if self._model_engine is not None:
            self._model_engine.dispose()
            self._model_engine = None
'''
db_path.write_text(db_code, encoding="utf-8")
print("Updated:", db_path)
print(db_path.read_text(encoding="utf-8")[:700], "...\n")


Wrote: /mnt/e/env/ts/tslib/src/nf_app/db.py
\
from __future__ import annotations

from dataclasses import dataclass
from enum import Enum
from typing import Optional, Tuple

from sqlalchemy import create_engine, text
from sqlalchemy.engine import Engine
from sqlalchemy.exc import SQLAlchemyError

from nf_app.errors import db_error, NFError
from nf_app.config import NFConfig


class DbKind(str, Enum):
    DATASET = "dataset"
    MODEL = "model"


def _make_engine(db_url: str, *, echo: bool = False) -> Engine:
    """
    SQLAlchemy Engine を生成する。
    - pool_pre_ping=True：切れた接続を再利用して落ちる事故を減らす
    - connect_timeout：ハングを避ける（psycopg側へ渡す）
    """
    try:
        engine = create_engine(
            db_url,
            echo=echo,
            pool_pre_ping=True,
            future=True,
            connect_args={"connect_timeout": 5},
        )
        return engine
    except Exception as e:
        raise db_error(
            "DB_ENGINE_ ...



## ✅ codecell：/mnt/e/env/ts/tslib/tests/test_db_smoke.py を上書き

In [39]:
from pathlib import Path

test_path = Path("/mnt/e/env/ts/tslib/tests/test_db_smoke.py")

test_code = r'''\
from pathlib import Path
import os
import pytest

from nf_app.config import NFConfig
from nf_app.db import DbManager
from nf_app.errors import NFError


def _has_db_creds() -> bool:
    # URLは必須。パスワードは URLに含まれるか、環境変数で供給されればOK。
    if not os.environ.get("DATASET_DB_URL") or not os.environ.get("MODEL_DB_URL"):
        return False
    # パスワード必須環境の可能性が高いので、どれかが無い場合はskipに寄せる
    return bool(
        os.environ.get("DATASET_DB_PASSWORD")
        or os.environ.get("MODEL_DB_PASSWORD")
        or os.environ.get("PGPASSWORD")
        or ("://" in os.environ["DATASET_DB_URL"] and "@" in os.environ["DATASET_DB_URL"] and ":" in os.environ["DATASET_DB_URL"].split("@")[0])
    )


@pytest.mark.integration
def test_db_smoke_connects_dataset_and_model(monkeypatch):
    # conftest が env をクリアするので、ここでセットする。
    # NOTE: パスワードはURL直書きせず、DATASET_DB_PASSWORD/MODEL_DB_PASSWORD か PGPASSWORD を使う。
    monkeypatch.setenv("DATASET_DB_URL", "postgresql+psycopg://postgres@127.0.0.1:5432/dataset")
    monkeypatch.setenv("MODEL_DB_URL",   "postgresql+psycopg://postgres@127.0.0.1:5432/model")
    monkeypatch.setenv("NF_STRICT_VALIDATION", "true")
    monkeypatch.setenv("NF_ARTIFACT_ROOT", "/tmp/tslib_artifacts_for_test")
    monkeypatch.setenv("NF_RUN_MODE", "DRY_RUN")

    # ここが無いなら環境依存で落ちるのでskip
    if not _has_db_creds():
        pytest.skip("DB credentials not provided. Set DATASET_DB_PASSWORD/MODEL_DB_PASSWORD or PGPASSWORD.")

    cfg = NFConfig.from_env(project_root=Path("/mnt/e/env/ts/tslib"))
    dbm = DbManager(cfg=cfg, echo_sql=False)

    try:
        ds_ver, md_ver = dbm.test_connections()
        assert "PostgreSQL" in ds_ver
        assert "PostgreSQL" in md_ver
    except NFError as e:
        # 認証/起動状態など環境要因はskip扱いにする（統合テストのため）
        pytest.skip(f"DB not reachable/auth failed: {e.to_record().get('cause_message')}")
    finally:
        dbm.dispose()
'''
test_path.write_text(test_code, encoding="utf-8")
print("Updated:", test_path)
print(test_path.read_text(encoding="utf-8")[:650], "...\n")


Updated: /mnt/e/env/ts/tslib/tests/test_db_smoke.py
\
from pathlib import Path
import os
import pytest

from nf_app.config import NFConfig
from nf_app.db import DbManager
from nf_app.errors import NFError


def _has_db_creds() -> bool:
    # URLは必須。パスワードは URLに含まれるか、環境変数で供給されればOK。
    if not os.environ.get("DATASET_DB_URL") or not os.environ.get("MODEL_DB_URL"):
        return False
    # パスワード必須環境の可能性が高いので、どれかが無い場合はskipに寄せる
    return bool(
        os.environ.get("DATASET_DB_PASSWORD")
        or os.environ.get("MODEL_DB_PASSWORD")
        or os.environ.get("PGPASSWORD")
        or ("://" in os.environ["DATASET_DB_URL"] and "@" in os.environ["DATASET_DB_URL"] and ":" in os.environ["DATASET_DB_ ...



## ✅ codecell：Notebookで環境変数セット例（推奨）

In [41]:
import os

# URLにパスワードを書かない運用
os.environ["DATASET_DB_URL"] = "postgresql+psycopg://postgres@127.0.0.1:5432/dataset"
os.environ["MODEL_DB_URL"]   = "postgresql+psycopg://postgres@127.0.0.1:5432/model"

# どちらかで供給（推奨はDB別）
os.environ["DATASET_DB_PASSWORD"] = "z"
os.environ["MODEL_DB_PASSWORD"]   = "z"

os.environ["PGPASSWORD"] = "z"


## ✅ codecell: 1-DB-2. Notebookで疎通確認（cfg→DbManager→test）

In [42]:
import os
from pathlib import Path

# Notebook用の一時設定（.envに書いてもOK）
os.environ["DATASET_DB_URL"] = "postgresql+psycopg://postgres@127.0.0.1:5432/dataset"
os.environ["MODEL_DB_URL"]   = "postgresql+psycopg://postgres@127.0.0.1:5432/model"
os.environ["NF_RUN_MODE"] = "DRY_RUN"
os.environ["NF_ARTIFACT_ROOT"] = "/mnt/e/env/ts/tslib/artifacts"
os.environ["NF_STRICT_VALIDATION"] = "true"

from nf_app.config import NFConfig
from nf_app.db import DbManager

cfg = NFConfig.from_env(project_root=Path("/mnt/e/env/ts/tslib"))
dbm = DbManager(cfg=cfg, echo_sql=False)

ds_ver, md_ver = dbm.test_connections()
print("dataset version:", ds_ver)
print("model   version:", md_ver)

# Notebook再実行耐性：最後に必ず閉じる（次工程で“共有”するなら閉じない設計も可）
dbm.dispose()


dataset version: PostgreSQL 16.11 (Ubuntu 16.11-0ubuntu0.24.04.1) on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 13.3.0-6ubuntu2~24.04) 13.3.0, 64-bit
model   version: PostgreSQL 16.11 (Ubuntu 16.11-0ubuntu0.24.04.1) on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 13.3.0-6ubuntu2~24.04) 13.3.0, 64-bit


## ✅ codecell: 1-DB-3. pytest（tests/test_db_smoke.py を作成）

In [43]:
from pathlib import Path

test_path = Path("/mnt/e/env/ts/tslib/tests/test_db_smoke.py")
test_path.parent.mkdir(parents=True, exist_ok=True)

test_code = r'''\
from pathlib import Path

from nf_app.config import NFConfig
from nf_app.db import DbManager


def test_db_smoke_connects_dataset_and_model(monkeypatch):
    # conftestがenvをクリアするので、ここで明示的にセット
    monkeypatch.setenv("DATASET_DB_URL", "postgresql+psycopg://postgres@127.0.0.1:5432/dataset")
    monkeypatch.setenv("MODEL_DB_URL",   "postgresql+psycopg://postgres@127.0.0.1:5432/model")
    monkeypatch.setenv("NF_STRICT_VALIDATION", "true")
    monkeypatch.setenv("NF_ARTIFACT_ROOT", "/tmp/tslib_artifacts_for_test")
    monkeypatch.setenv("NF_RUN_MODE", "DRY_RUN")

    cfg = NFConfig.from_env(project_root=Path("/mnt/e/env/ts/tslib"))
    dbm = DbManager(cfg=cfg, echo_sql=False)

    ds_ver, md_ver = dbm.test_connections()

    assert "PostgreSQL" in ds_ver
    assert "PostgreSQL" in md_ver

    dbm.dispose()
'''
test_path.write_text(test_code, encoding="utf-8")
print("Wrote:", test_path)
print(test_path.read_text(encoding="utf-8"))


Wrote: /mnt/e/env/ts/tslib/tests/test_db_smoke.py
\
from pathlib import Path

from nf_app.config import NFConfig
from nf_app.db import DbManager


def test_db_smoke_connects_dataset_and_model(monkeypatch):
    # conftestがenvをクリアするので、ここで明示的にセット
    monkeypatch.setenv("DATASET_DB_URL", "postgresql+psycopg://postgres@127.0.0.1:5432/dataset")
    monkeypatch.setenv("MODEL_DB_URL",   "postgresql+psycopg://postgres@127.0.0.1:5432/model")
    monkeypatch.setenv("NF_STRICT_VALIDATION", "true")
    monkeypatch.setenv("NF_ARTIFACT_ROOT", "/tmp/tslib_artifacts_for_test")
    monkeypatch.setenv("NF_RUN_MODE", "DRY_RUN")

    cfg = NFConfig.from_env(project_root=Path("/mnt/e/env/ts/tslib"))
    dbm = DbManager(cfg=cfg, echo_sql=False)

    ds_ver, md_ver = dbm.test_connections()

    assert "PostgreSQL" in ds_ver
    assert "PostgreSQL" in md_ver

    dbm.dispose()



## ✅ codecell: 1-DB-4. pytest 実行

In [44]:
%cd /mnt/e/env/ts/tslib
!pytest -q tests/test_db_smoke.py


/mnt/e/env/ts/tslib
[32m.[0m[32m                                                                        [100%][0m
[32m[32m[1m1 passed[0m[32m in 0.23s[0m[0m


# 1. DB Migrate（DDL実行：初回セットアップ）

目的：
- model DB に `neuralforecast` スキーマと管理テーブル群を作成する
- 何度実行しても壊れない（CREATE IF NOT EXISTS / 追記可能）
- pytestで「必須テーブルが存在する」ことを担保する

作成対象（model DB: schema=neuralforecast）
- schema_migrations（適用履歴）
- feature_prefix_rules（hist_/futr_/stat_等のルール）
- join_rules（dataset結合SQLのルール）
- experiments（実験単位の台帳）
- run_plans（実行計画：enabled/priority等）
- runs（実行結果：status/error/metrics）
- model_versions（保存モデルと外生変数リスト）


In [45]:
from pathlib import Path

mig_path = Path("/mnt/e/env/ts/tslib/src/nf_app/migrations.py")
mig_path.parent.mkdir(parents=True, exist_ok=True)

mig_code = r'''\
from __future__ import annotations

from dataclasses import dataclass
from typing import Iterable, List, Optional, Sequence

from sqlalchemy import text
from sqlalchemy.engine import Engine
from sqlalchemy.exc import SQLAlchemyError

from nf_app.errors import db_error


SCHEMA_NAME = "neuralforecast"
MIGRATION_VERSION = 1


def _exec(engine: Engine, statements: Sequence[str]) -> None:
    """
    DDL実行（トランザクション）。
    CREATE IF NOT EXISTS を前提に、繰り返し実行しても安全にする。
    """
    try:
        with engine.begin() as conn:
            for stmt in statements:
                conn.execute(text(stmt))
    except SQLAlchemyError as e:
        raise db_error(
            "DB_MIGRATION_FAILED",
            "Migration failed.",
            context={"schema": SCHEMA_NAME, "version": MIGRATION_VERSION, "statement": statements[-1] if statements else None},
            cause=e,
        )


def migrate_model_db(engine: Engine) -> None:
    """
    model DBに neuralforecast スキーマと必須テーブルを作成する。
    """
    stmts: List[str] = []

    # UUID生成用（gen_random_uuid）
    stmts.append("CREATE EXTENSION IF NOT EXISTS pgcrypto;")

    # schema
    stmts.append(f"CREATE SCHEMA IF NOT EXISTS {SCHEMA_NAME};")

    # migration ledger
    stmts.append(f"""
    CREATE TABLE IF NOT EXISTS {SCHEMA_NAME}.schema_migrations (
        version INTEGER PRIMARY KEY,
        applied_ts TIMESTAMPTZ NOT NULL DEFAULT NOW()
    );
    """)

    # prefix rules: hist_/futr_/stat_ など
    stmts.append(f"""
    CREATE TABLE IF NOT EXISTS {SCHEMA_NAME}.feature_prefix_rules (
        rule_id BIGSERIAL PRIMARY KEY,
        prefix TEXT NOT NULL UNIQUE,
        role TEXT NOT NULL, -- HIST / FUTR / STAT / IGNORE / META
        enabled BOOLEAN NOT NULL DEFAULT TRUE,
        description TEXT,
        created_ts TIMESTAMPTZ NOT NULL DEFAULT NOW(),
        updated_ts TIMESTAMPTZ
    );
    """)

    # join rules: dataset側の結合ビューをSQLで管理（最初はSQL文字列で十分に強い）
    stmts.append(f"""
    CREATE TABLE IF NOT EXISTS {SCHEMA_NAME}.join_rules (
        rule_id BIGSERIAL PRIMARY KEY,
        rule_name TEXT NOT NULL UNIQUE,
        enabled BOOLEAN NOT NULL DEFAULT TRUE,
        priority INTEGER NOT NULL DEFAULT 100,
        sql_text TEXT NOT NULL,
        description TEXT,
        created_ts TIMESTAMPTZ NOT NULL DEFAULT NOW(),
        updated_ts TIMESTAMPTZ
    );
    """)

    # experiments: 実験の台帳（大枠）
    stmts.append(f"""
    CREATE TABLE IF NOT EXISTS {SCHEMA_NAME}.experiments (
        experiment_id BIGSERIAL PRIMARY KEY,
        name TEXT NOT NULL UNIQUE,
        description TEXT,
        created_ts TIMESTAMPTZ NOT NULL DEFAULT NOW()
    );
    """)

    # run plans: 実行計画（enabled/priorityで実行対象を制御）
    stmts.append(f"""
    CREATE TABLE IF NOT EXISTS {SCHEMA_NAME}.run_plans (
        plan_id BIGSERIAL PRIMARY KEY,
        enabled BOOLEAN NOT NULL DEFAULT TRUE,
        priority INTEGER NOT NULL DEFAULT 100,
        plan_name TEXT NOT NULL UNIQUE,

        experiment_id BIGINT REFERENCES {SCHEMA_NAME}.experiments(experiment_id),

        model_name TEXT NOT NULL,           -- 例: AutoNBEATS, AutoTFT ...
        run_mode TEXT NOT NULL,             -- DRY_RUN / TRAIN_SAVE_PREDICT / TRAIN_PREDICT / PREDICT_ONLY
        params_json JSONB NOT NULL DEFAULT '{{}}'::jsonb,        -- モデルパラメータ
        data_json JSONB NOT NULL DEFAULT '{{}}'::jsonb,          -- データ結合/フィルタ設定（join_rule参照など）
        artifact_subdir TEXT,               -- 成果物の相対パス（artifact_root配下）

        created_ts TIMESTAMPTZ NOT NULL DEFAULT NOW(),
        updated_ts TIMESTAMPTZ
    );
    """)

    # runs: 実行結果（失敗も資産化）
    stmts.append(f"""
    CREATE TABLE IF NOT EXISTS {SCHEMA_NAME}.runs (
        run_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
        plan_id BIGINT REFERENCES {SCHEMA_NAME}.run_plans(plan_id),
        experiment_id BIGINT REFERENCES {SCHEMA_NAME}.experiments(experiment_id),

        status TEXT NOT NULL DEFAULT 'PENDING',   -- PENDING/RUNNING/SUCCEEDED/FAILED/SKIPPED
        created_ts TIMESTAMPTZ NOT NULL DEFAULT NOW(),
        started_ts TIMESTAMPTZ,
        ended_ts TIMESTAMPTZ,

        data_signature JSONB,     -- データ署名（期間/件数/列/ハッシュ等）
        metrics_json JSONB,       -- 評価指標（集計）
        resources_json JSONB,     -- GPU/CPU/メモリ/時間など

        error_type TEXT,
        error_code TEXT,
        error_message TEXT,
        error_context JSONB,
        traceback TEXT
    );
    """)

    stmts.append(f"CREATE INDEX IF NOT EXISTS ix_runs_plan_id ON {SCHEMA_NAME}.runs(plan_id);")
    stmts.append(f"CREATE INDEX IF NOT EXISTS ix_runs_experiment_id ON {SCHEMA_NAME}.runs(experiment_id);")
    stmts.append(f"CREATE INDEX IF NOT EXISTS ix_runs_status ON {SCHEMA_NAME}.runs(status);")

    # model_versions: 保存モデルの台帳（外生変数リストも保持）
    stmts.append(f"""
    CREATE TABLE IF NOT EXISTS {SCHEMA_NAME}.model_versions (
        model_version_id BIGSERIAL PRIMARY KEY,
        model_name TEXT NOT NULL,
        run_id UUID REFERENCES {SCHEMA_NAME}.runs(run_id),

        artifact_path TEXT NOT NULL,
        created_ts TIMESTAMPTZ NOT NULL DEFAULT NOW(),

        exog_hist_cols TEXT[] DEFAULT ARRAY[]::TEXT[],
        exog_futr_cols TEXT[] DEFAULT ARRAY[]::TEXT[],
        exog_stat_cols TEXT[] DEFAULT ARRAY[]::TEXT[],

        notes TEXT
    );
    """)
    stmts.append(f"CREATE INDEX IF NOT EXISTS ix_model_versions_model_name ON {SCHEMA_NAME}.model_versions(model_name);")
    stmts.append(f"CREATE INDEX IF NOT EXISTS ix_model_versions_run_id ON {SCHEMA_NAME}.model_versions(run_id);")

    # 実行
    _exec(engine, stmts)

    # version upsert（最小の履歴）
    _exec(engine, [f"""
    INSERT INTO {SCHEMA_NAME}.schema_migrations(version)
    VALUES ({MIGRATION_VERSION})
    ON CONFLICT (version) DO NOTHING;
    """])

    # 初期データ投入：prefix rules（idempotent）
    _exec(engine, [
        f"""
        INSERT INTO {SCHEMA_NAME}.feature_prefix_rules(prefix, role, enabled, description)
        VALUES
            ('hist_', 'HIST', TRUE, 'Historical exogenous features'),
            ('futr_', 'FUTR', TRUE, 'Future exogenous features'),
            ('stat_', 'STAT', TRUE, 'Static exogenous features')
        ON CONFLICT (prefix) DO NOTHING;
        """
    ])


@dataclass
class MigrationRunner:
    """
    今後マイグレーションが増えても拡張しやすいようにrunner化。
    """
    model_engine: Engine

    def apply_all(self) -> None:
        migrate_model_db(self.model_engine)
'''
mig_path.write_text(mig_code, encoding="utf-8")
print("Wrote:", mig_path)
print(mig_path.read_text(encoding="utf-8")[:900], "...\n")


Wrote: /mnt/e/env/ts/tslib/src/nf_app/migrations.py
\
from __future__ import annotations

from dataclasses import dataclass
from typing import Iterable, List, Optional, Sequence

from sqlalchemy import text
from sqlalchemy.engine import Engine
from sqlalchemy.exc import SQLAlchemyError

from nf_app.errors import db_error


SCHEMA_NAME = "neuralforecast"
MIGRATION_VERSION = 1


def _exec(engine: Engine, statements: Sequence[str]) -> None:
    """
    DDL実行（トランザクション）。
    CREATE IF NOT EXISTS を前提に、繰り返し実行しても安全にする。
    """
    try:
        with engine.begin() as conn:
            for stmt in statements:
                conn.execute(text(stmt))
    except SQLAlchemyError as e:
        raise db_error(
            "DB_MIGRATION_FAILED",
            "Migration failed.",
            context={"schema": SCHEMA_NAME, "version": MIGRATION_VERSION, "statement": statements[-1] if statements else None},
            cause=e,
        )


def migrate_model_ ...



## ✅ codecell: 1-Migrate-2. Notebookでmigrate実行（model DB）

In [46]:
import os
from pathlib import Path

# 例：URLはパスワードなし、パスワードは環境変数で供給（推奨）
os.environ["DATASET_DB_URL"] = "postgresql+psycopg://postgres@127.0.0.1:5432/dataset"
os.environ["MODEL_DB_URL"]   = "postgresql+psycopg://postgres@127.0.0.1:5432/model"
# os.environ["MODEL_DB_PASSWORD"] = "..."  # 必要なら

os.environ["NF_ARTIFACT_ROOT"] = "/mnt/e/env/ts/tslib/artifacts"
os.environ["NF_RUN_MODE"] = "DRY_RUN"
os.environ["NF_STRICT_VALIDATION"] = "true"

from nf_app.config import NFConfig
from nf_app.db import DbManager
from nf_app.migrations import MigrationRunner, SCHEMA_NAME

cfg = NFConfig.from_env(project_root=Path("/mnt/e/env/ts/tslib"))
dbm = DbManager(cfg=cfg, echo_sql=False)

runner = MigrationRunner(model_engine=dbm.model_engine())
runner.apply_all()

print("Migration applied for schema:", SCHEMA_NAME)
dbm.dispose()


Migration applied for schema: neuralforecast


## ✅ codecell: 1-Migrate-3. pytest（tests/test_migrations.py を作成）

In [47]:
from pathlib import Path

tpath = Path("/mnt/e/env/ts/tslib/tests/test_migrations.py")
tpath.parent.mkdir(parents=True, exist_ok=True)

tcode = r'''\
from pathlib import Path
import os
import pytest
from sqlalchemy import text

from nf_app.config import NFConfig
from nf_app.db import DbManager
from nf_app.migrations import MigrationRunner, SCHEMA_NAME


def _has_db_creds() -> bool:
    if not os.environ.get("MODEL_DB_URL"):
        return False
    return bool(
        os.environ.get("MODEL_DB_PASSWORD")
        or os.environ.get("PGPASSWORD")
        or ("://" in os.environ["MODEL_DB_URL"] and "@" in os.environ["MODEL_DB_URL"] and ":" in os.environ["MODEL_DB_URL"].split("@")[0])
    )


@pytest.mark.integration
def test_migrations_create_required_tables(monkeypatch):
    # conftestがenvをクリアするのでセット
    monkeypatch.setenv("DATASET_DB_URL", "postgresql+psycopg://postgres@127.0.0.1:5432/dataset")
    monkeypatch.setenv("MODEL_DB_URL",   "postgresql+psycopg://postgres@127.0.0.1:5432/model")
    monkeypatch.setenv("NF_STRICT_VALIDATION", "true")
    monkeypatch.setenv("NF_ARTIFACT_ROOT", "/tmp/tslib_artifacts_for_test")
    monkeypatch.setenv("NF_RUN_MODE", "DRY_RUN")

    # パスワード系が無ければskip（環境依存の統合テスト）
    if not _has_db_creds():
        pytest.skip("DB credentials not provided. Set MODEL_DB_PASSWORD or PGPASSWORD.")

    cfg = NFConfig.from_env(project_root=Path("/mnt/e/env/ts/tslib"))
    dbm = DbManager(cfg=cfg, echo_sql=False)

    try:
        runner = MigrationRunner(model_engine=dbm.model_engine())
        runner.apply_all()

        required = [
            f"{SCHEMA_NAME}.schema_migrations",
            f"{SCHEMA_NAME}.feature_prefix_rules",
            f"{SCHEMA_NAME}.join_rules",
            f"{SCHEMA_NAME}.experiments",
            f"{SCHEMA_NAME}.run_plans",
            f"{SCHEMA_NAME}.runs",
            f"{SCHEMA_NAME}.model_versions",
        ]

        # to_regclass('schema.table') がNULLでなければ存在
        with dbm.model_engine().connect() as conn:
            for t in required:
                exists = conn.execute(text("SELECT to_regclass(:t) IS NOT NULL"), {"t": t}).scalar_one()
                assert exists, f"missing table: {t}"

            # 初期prefixが入っていること
            prefixes = conn.execute(text(f"SELECT prefix FROM {SCHEMA_NAME}.feature_prefix_rules ORDER BY prefix")).scalars().all()
            assert "hist_" in prefixes
            assert "futr_" in prefixes
            assert "stat_" in prefixes

    finally:
        dbm.dispose()
'''
tpath.write_text(tcode, encoding="utf-8")
print("Wrote:", tpath)
print(tpath.read_text(encoding="utf-8")[:700], "...\n")


Wrote: /mnt/e/env/ts/tslib/tests/test_migrations.py
\
from pathlib import Path
import os
import pytest
from sqlalchemy import text

from nf_app.config import NFConfig
from nf_app.db import DbManager
from nf_app.migrations import MigrationRunner, SCHEMA_NAME


def _has_db_creds() -> bool:
    if not os.environ.get("MODEL_DB_URL"):
        return False
    return bool(
        os.environ.get("MODEL_DB_PASSWORD")
        or os.environ.get("PGPASSWORD")
        or ("://" in os.environ["MODEL_DB_URL"] and "@" in os.environ["MODEL_DB_URL"] and ":" in os.environ["MODEL_DB_URL"].split("@")[0])
    )


@pytest.mark.integration
def test_migrations_create_required_tables(monkeypatch):
    # conftestがenvをクリアするのでセット
    monkeypatch.setenv("DATASET_DB_URL" ...



## ✅ codecell: 1-Migrate-4. pytest 実行

In [50]:
%cd /mnt/e/env/ts/tslib
!pytest -q tests/test_migrations.py


/mnt/e/env/ts/tslib
[32m.[0m[32m                                                                        [100%][0m
[32m[32m[1m1 passed[0m[32m in 0.23s[0m[0m


## pytest mark登録

integration マークを pytest.ini に登録して警告を消す。


In [49]:
from pathlib import Path

pytest_ini = Path("/mnt/e/env/ts/tslib/pytest.ini")
pytest_ini.write_text(
    "\n".join([
        "[pytest]",
        "markers =",
        "    integration: integration tests requiring external services (e.g., Postgres)",
        "",
    ]),
    encoding="utf-8",
)
print("Wrote:", pytest_ini)
print(pytest_ini.read_text(encoding="utf-8"))


Wrote: /mnt/e/env/ts/tslib/pytest.ini
[pytest]
markers =
    integration: integration tests requiring external services (e.g., Postgres)



## 1. DB Migrate - join_rules（ID 013）

目的：
- model DB の `neuralforecast.join_rules` に初期ルールを投入（何度実行しても安全）
- pytestで join_rules に INSERT できることを検証する


## ✅ codecell: 1-Migrate-joinrules-1 migrations.py を更新（フル上書き）

In [52]:
from pathlib import Path

mig_path = Path("/mnt/e/env/ts/tslib/src/nf_app/migrations.py")
mig_path.parent.mkdir(parents=True, exist_ok=True)

mig_code = r'''\
from __future__ import annotations

from dataclasses import dataclass
from typing import List, Sequence

from sqlalchemy import text
from sqlalchemy.engine import Engine
from sqlalchemy.exc import SQLAlchemyError

from nf_app.errors import db_error


SCHEMA_NAME = "neuralforecast"
MIGRATION_VERSION = 2


def _exec(engine: Engine, statements: Sequence[str]) -> None:
    """
    DDL/seed実行（トランザクション）
    - できるだけCREATE IF NOT EXISTS / ON CONFLICT で冪等にする
    """
    try:
        with engine.begin() as conn:
            for stmt in statements:
                conn.execute(text(stmt))
    except SQLAlchemyError as e:
        raise db_error(
            "DB_MIGRATION_FAILED",
            "Migration failed.",
            context={
                "schema": SCHEMA_NAME,
                "version": MIGRATION_VERSION,
                "statement_tail": (statements[-1][:400] if statements else None),
            },
            cause=e,
        )


def migrate_model_db(engine: Engine) -> None:
    """
    model DBに neuralforecast スキーマと必須テーブルを作成し、初期データを投入する。
    """
    stmts: List[str] = []

    # UUID生成用
    stmts.append("CREATE EXTENSION IF NOT EXISTS pgcrypto;")

    # schema
    stmts.append(f"CREATE SCHEMA IF NOT EXISTS {SCHEMA_NAME};")

    # migration ledger
    stmts.append(f"""
    CREATE TABLE IF NOT EXISTS {SCHEMA_NAME}.schema_migrations (
        version INTEGER PRIMARY KEY,
        applied_ts TIMESTAMPTZ NOT NULL DEFAULT NOW()
    );
    """)

    # prefix rules
    stmts.append(f"""
    CREATE TABLE IF NOT EXISTS {SCHEMA_NAME}.feature_prefix_rules (
        rule_id BIGSERIAL PRIMARY KEY,
        prefix TEXT NOT NULL UNIQUE,
        role TEXT NOT NULL, -- HIST / FUTR / STAT / IGNORE / META
        enabled BOOLEAN NOT NULL DEFAULT TRUE,
        description TEXT,
        created_ts TIMESTAMPTZ NOT NULL DEFAULT NOW(),
        updated_ts TIMESTAMPTZ
    );
    """)

    # join rules
    stmts.append(f"""
    CREATE TABLE IF NOT EXISTS {SCHEMA_NAME}.join_rules (
        rule_id BIGSERIAL PRIMARY KEY,
        rule_name TEXT NOT NULL UNIQUE,
        enabled BOOLEAN NOT NULL DEFAULT TRUE,
        priority INTEGER NOT NULL DEFAULT 100,
        sql_text TEXT NOT NULL,
        description TEXT,
        created_ts TIMESTAMPTZ NOT NULL DEFAULT NOW(),
        updated_ts TIMESTAMPTZ
    );
    """)

    # experiments
    stmts.append(f"""
    CREATE TABLE IF NOT EXISTS {SCHEMA_NAME}.experiments (
        experiment_id BIGSERIAL PRIMARY KEY,
        name TEXT NOT NULL UNIQUE,
        description TEXT,
        created_ts TIMESTAMPTZ NOT NULL DEFAULT NOW()
    );
    """)

    # run plans
    stmts.append(f"""
    CREATE TABLE IF NOT EXISTS {SCHEMA_NAME}.run_plans (
        plan_id BIGSERIAL PRIMARY KEY,
        enabled BOOLEAN NOT NULL DEFAULT TRUE,
        priority INTEGER NOT NULL DEFAULT 100,
        plan_name TEXT NOT NULL UNIQUE,

        experiment_id BIGINT REFERENCES {SCHEMA_NAME}.experiments(experiment_id),

        model_name TEXT NOT NULL,
        run_mode TEXT NOT NULL,
        params_json JSONB NOT NULL DEFAULT '{{}}'::jsonb,
        data_json JSONB NOT NULL DEFAULT '{{}}'::jsonb,
        artifact_subdir TEXT,

        created_ts TIMESTAMPTZ NOT NULL DEFAULT NOW(),
        updated_ts TIMESTAMPTZ
    );
    """)

    # runs
    stmts.append(f"""
    CREATE TABLE IF NOT EXISTS {SCHEMA_NAME}.runs (
        run_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
        plan_id BIGINT REFERENCES {SCHEMA_NAME}.run_plans(plan_id),
        experiment_id BIGINT REFERENCES {SCHEMA_NAME}.experiments(experiment_id),

        status TEXT NOT NULL DEFAULT 'PENDING',
        created_ts TIMESTAMPTZ NOT NULL DEFAULT NOW(),
        started_ts TIMESTAMPTZ,
        ended_ts TIMESTAMPTZ,

        data_signature JSONB,
        metrics_json JSONB,
        resources_json JSONB,

        error_type TEXT,
        error_code TEXT,
        error_message TEXT,
        error_context JSONB,
        traceback TEXT
    );
    """)
    stmts.append(f"CREATE INDEX IF NOT EXISTS ix_runs_plan_id ON {SCHEMA_NAME}.runs(plan_id);")
    stmts.append(f"CREATE INDEX IF NOT EXISTS ix_runs_experiment_id ON {SCHEMA_NAME}.runs(experiment_id);")
    stmts.append(f"CREATE INDEX IF NOT EXISTS ix_runs_status ON {SCHEMA_NAME}.runs(status);")

    # model_versions
    stmts.append(f"""
    CREATE TABLE IF NOT EXISTS {SCHEMA_NAME}.model_versions (
        model_version_id BIGSERIAL PRIMARY KEY,
        model_name TEXT NOT NULL,
        run_id UUID REFERENCES {SCHEMA_NAME}.runs(run_id),

        artifact_path TEXT NOT NULL,
        created_ts TIMESTAMPTZ NOT NULL DEFAULT NOW(),

        exog_hist_cols TEXT[] DEFAULT ARRAY[]::TEXT[],
        exog_futr_cols TEXT[] DEFAULT ARRAY[]::TEXT[],
        exog_stat_cols TEXT[] DEFAULT ARRAY[]::TEXT[],

        notes TEXT
    );
    """)
    stmts.append(f"CREATE INDEX IF NOT EXISTS ix_model_versions_model_name ON {SCHEMA_NAME}.model_versions(model_name);")
    stmts.append(f"CREATE INDEX IF NOT EXISTS ix_model_versions_run_id ON {SCHEMA_NAME}.model_versions(run_id);")

    # 1) DDL適用
    _exec(engine, stmts)

    # 2) schema_migrations（冪等）
    _exec(engine, [f"""
    INSERT INTO {SCHEMA_NAME}.schema_migrations(version)
    VALUES ({MIGRATION_VERSION})
    ON CONFLICT (version) DO NOTHING;
    """])

    # 3) 初期prefix rules（冪等）
    _exec(engine, [f"""
    INSERT INTO {SCHEMA_NAME}.feature_prefix_rules(prefix, role, enabled, description)
    VALUES
        ('hist_', 'HIST', TRUE, 'Historical exogenous features'),
        ('futr_', 'FUTR', TRUE, 'Future exogenous features'),
        ('stat_', 'STAT', TRUE, 'Static exogenous features')
    ON CONFLICT (prefix) DO NOTHING;
    """])

    # 4) 初期join rule（冪等）
    # NOTE: この sql_text は dataset DB で実行される想定のSQLテンプレ。
    #       後工程で「列の自動選択（prefixで外生変数分離）」を実装して、SELECT句を動的生成する。
    _exec(engine, [f"""
    INSERT INTO {SCHEMA_NAME}.join_rules(rule_name, enabled, priority, sql_text, description)
    VALUES (
        'loto_join_v1',
        TRUE,
        100,
        $SQL$
        SELECT
            h.ds,
            h.unique_id,
            h.y,
            -- prizes: hist_*
            p.hist_b1, p.hist_b2,
            p.hist_stc, p.hist_stm,
            -- futr: futr_*
            f.futr_year, f.futr_month, f.futr_day
        FROM public.loto_hist h
        LEFT JOIN public.loto_prizes p
          ON h.ds = p.ds AND h.unique_id = p.unique_id
        LEFT JOIN public.loto_futr f
          ON h.ds = f.ds
        $SQL$,
        'Default join rule for loto_* tables (dataset DB SQL template).'
    )
    ON CONFLICT (rule_name) DO NOTHING;
    """])


@dataclass
class MigrationRunner:
    model_engine: Engine

    def apply_all(self) -> None:
        migrate_model_db(self.model_engine)
'''
mig_path.write_text(mig_code, encoding="utf-8")
print("Updated:", mig_path)
print(mig_path.read_text(encoding="utf-8")[:900], "...\n")


Updated: /mnt/e/env/ts/tslib/src/nf_app/migrations.py
\
from __future__ import annotations

from dataclasses import dataclass
from typing import List, Sequence

from sqlalchemy import text
from sqlalchemy.engine import Engine
from sqlalchemy.exc import SQLAlchemyError

from nf_app.errors import db_error


SCHEMA_NAME = "neuralforecast"
MIGRATION_VERSION = 2


def _exec(engine: Engine, statements: Sequence[str]) -> None:
    """
    DDL/seed実行（トランザクション）
    - できるだけCREATE IF NOT EXISTS / ON CONFLICT で冪等にする
    """
    try:
        with engine.begin() as conn:
            for stmt in statements:
                conn.execute(text(stmt))
    except SQLAlchemyError as e:
        raise db_error(
            "DB_MIGRATION_FAILED",
            "Migration failed.",
            context={
                "schema": SCHEMA_NAME,
                "version": MIGRATION_VERSION,
                "statement_tail": (statements[-1][:400] if statements else None), ...



## ✅ codecell: 1-Migrate-joinrules-2 test_migrations.py を更新（INSERT検証を追加）

既存の tests/test_migrations.py に join_rulesへINSERTできるテストを追加します（フル上書き）。
パス：/mnt/e/env/ts/tslib/tests/test_migrations.py

In [53]:
from pathlib import Path

tpath = Path("/mnt/e/env/ts/tslib/tests/test_migrations.py")
tpath.parent.mkdir(parents=True, exist_ok=True)

tcode = r'''\
from pathlib import Path
import os
import pytest
from sqlalchemy import text

from nf_app.config import NFConfig
from nf_app.db import DbManager
from nf_app.migrations import MigrationRunner, SCHEMA_NAME


def _has_db_creds() -> bool:
    if not os.environ.get("MODEL_DB_URL"):
        return False
    return bool(
        os.environ.get("MODEL_DB_PASSWORD")
        or os.environ.get("PGPASSWORD")
        or ("://" in os.environ["MODEL_DB_URL"] and "@" in os.environ["MODEL_DB_URL"] and ":" in os.environ["MODEL_DB_URL"].split("@")[0])
    )


@pytest.mark.integration
def test_migrations_create_required_tables(monkeypatch):
    monkeypatch.setenv("DATASET_DB_URL", "postgresql+psycopg://postgres@127.0.0.1:5432/dataset")
    monkeypatch.setenv("MODEL_DB_URL",   "postgresql+psycopg://postgres@127.0.0.1:5432/model")
    monkeypatch.setenv("NF_STRICT_VALIDATION", "true")
    monkeypatch.setenv("NF_ARTIFACT_ROOT", "/tmp/tslib_artifacts_for_test")
    monkeypatch.setenv("NF_RUN_MODE", "DRY_RUN")

    if not _has_db_creds():
        pytest.skip("DB credentials not provided. Set MODEL_DB_PASSWORD or PGPASSWORD.")

    cfg = NFConfig.from_env(project_root=Path("/mnt/e/env/ts/tslib"))
    dbm = DbManager(cfg=cfg, echo_sql=False)

    try:
        runner = MigrationRunner(model_engine=dbm.model_engine())
        runner.apply_all()

        required = [
            f"{SCHEMA_NAME}.schema_migrations",
            f"{SCHEMA_NAME}.feature_prefix_rules",
            f"{SCHEMA_NAME}.join_rules",
            f"{SCHEMA_NAME}.experiments",
            f"{SCHEMA_NAME}.run_plans",
            f"{SCHEMA_NAME}.runs",
            f"{SCHEMA_NAME}.model_versions",
        ]

        with dbm.model_engine().connect() as conn:
            for t in required:
                exists = conn.execute(text("SELECT to_regclass(:t) IS NOT NULL"), {"t": t}).scalar_one()
                assert exists, f"missing table: {t}"

            prefixes = conn.execute(
                text(f"SELECT prefix FROM {SCHEMA_NAME}.feature_prefix_rules ORDER BY prefix")
            ).scalars().all()
            assert "hist_" in prefixes
            assert "futr_" in prefixes
            assert "stat_" in prefixes

            # seed join ruleが入っていること（013にも関係）
            rule_names = conn.execute(
                text(f"SELECT rule_name FROM {SCHEMA_NAME}.join_rules ORDER BY rule_name")
            ).scalars().all()
            assert "loto_join_v1" in rule_names

    finally:
        dbm.dispose()


@pytest.mark.integration
def test_join_rules_allows_insert(monkeypatch):
    """
    ID 013 Done: join_rulesにINSERTできること
    """
    monkeypatch.setenv("DATASET_DB_URL", "postgresql+psycopg://postgres@127.0.0.1:5432/dataset")
    monkeypatch.setenv("MODEL_DB_URL",   "postgresql+psycopg://postgres@127.0.0.1:5432/model")
    monkeypatch.setenv("NF_STRICT_VALIDATION", "true")
    monkeypatch.setenv("NF_ARTIFACT_ROOT", "/tmp/tslib_artifacts_for_test")
    monkeypatch.setenv("NF_RUN_MODE", "DRY_RUN")

    if not _has_db_creds():
        pytest.skip("DB credentials not provided. Set MODEL_DB_PASSWORD or PGPASSWORD.")

    cfg = NFConfig.from_env(project_root=Path("/mnt/e/env/ts/tslib"))
    dbm = DbManager(cfg=cfg, echo_sql=False)

    try:
        runner = MigrationRunner(model_engine=dbm.model_engine())
        runner.apply_all()

        test_name = "pytest_join_rule"
        sql_text = "SELECT 1 AS dummy;"

        with dbm.model_engine().begin() as conn:
            conn.execute(text(f"""
                INSERT INTO {SCHEMA_NAME}.join_rules(rule_name, enabled, priority, sql_text, description)
                VALUES (:name, TRUE, 999, :sql, 'pytest insert check')
                ON CONFLICT (rule_name) DO UPDATE
                SET sql_text = EXCLUDED.sql_text,
                    priority = EXCLUDED.priority,
                    updated_ts = NOW();
            """), {"name": test_name, "sql": sql_text})

        with dbm.model_engine().connect() as conn:
            got = conn.execute(
                text(f"SELECT sql_text, priority FROM {SCHEMA_NAME}.join_rules WHERE rule_name=:name"),
                {"name": test_name}
            ).one()
            assert got[0].strip() == sql_text
            assert int(got[1]) == 999

    finally:
        dbm.dispose()
'''
tpath.write_text(tcode, encoding="utf-8")
print("Updated:", tpath)
print(tpath.read_text(encoding="utf-8")[:800], "...\n")


Updated: /mnt/e/env/ts/tslib/tests/test_migrations.py
\
from pathlib import Path
import os
import pytest
from sqlalchemy import text

from nf_app.config import NFConfig
from nf_app.db import DbManager
from nf_app.migrations import MigrationRunner, SCHEMA_NAME


def _has_db_creds() -> bool:
    if not os.environ.get("MODEL_DB_URL"):
        return False
    return bool(
        os.environ.get("MODEL_DB_PASSWORD")
        or os.environ.get("PGPASSWORD")
        or ("://" in os.environ["MODEL_DB_URL"] and "@" in os.environ["MODEL_DB_URL"] and ":" in os.environ["MODEL_DB_URL"].split("@")[0])
    )


@pytest.mark.integration
def test_migrations_create_required_tables(monkeypatch):
    monkeypatch.setenv("DATASET_DB_URL", "postgresql+psycopg://postgres@127.0.0.1:5432/dataset")
    monkeypatch.setenv("MODEL_DB_URL",   "postgresql+psycopg://postgres@ ...



## ✅ codecell: 1-Migrate-joinrules-3 pytest 実行

In [54]:
%cd /mnt/e/env/ts/tslib
!pytest -q tests/test_migrations.py


/mnt/e/env/ts/tslib
[32m.[0m[32m.[0m[32m                                                                       [100%][0m
[32m[32m[1m2 passed[0m[32m in 0.21s[0m[0m


## ID 014: enabled planだけ抽出できる

目的：
- model DB（neuralforecast.run_plans）に plan を upsert で登録できる
- enabled=true の plan のみを priority昇順で抽出できる
- pytestで担保（DB統合テスト：資格情報なければskip）


## ✅ codecell: 1-DB-014-1 run_plans.py を作成

実装先：/mnt/e/env/ts/tslib/src/nf_app/run_plans.py

In [56]:
from pathlib import Path

p = Path("/mnt/e/env/ts/tslib/src/nf_app/run_plans.py")
p.parent.mkdir(parents=True, exist_ok=True)

code = r'''\
from __future__ import annotations

import json
from dataclasses import dataclass
from typing import Any, Dict, List, Optional

from sqlalchemy import text
from sqlalchemy.engine import Engine
from sqlalchemy.exc import SQLAlchemyError

from nf_app.errors import db_error

SCHEMA_NAME = "neuralforecast"


@dataclass(frozen=True)
class RunPlan:
    plan_id: int
    enabled: bool
    priority: int
    plan_name: str
    experiment_id: Optional[int]
    model_name: str
    run_mode: str
    params_json: Dict[str, Any]
    data_json: Dict[str, Any]
    artifact_subdir: Optional[str]


def ensure_experiment(engine: Engine, *, name: str, description: Optional[str] = None) -> int:
    """
    experimentsに name をupsertし、experiment_idを返す
    """
    try:
        with engine.begin() as conn:
            conn.execute(
                text(f"""
                INSERT INTO {SCHEMA_NAME}.experiments(name, description)
                VALUES (:name, :desc)
                ON CONFLICT (name) DO UPDATE
                SET description = COALESCE(EXCLUDED.description, {SCHEMA_NAME}.experiments.description);
                """),
                {"name": name, "desc": description},
            )
        with engine.connect() as conn:
            eid = conn.execute(
                text(f"SELECT experiment_id FROM {SCHEMA_NAME}.experiments WHERE name=:name"),
                {"name": name},
            ).scalar_one()
        return int(eid)
    except SQLAlchemyError as e:
        raise db_error("DB_EXPERIMENT_UPSERT_FAILED", "Failed to upsert experiment.", context={"name": name}, cause=e)


def upsert_run_plan(
    engine: Engine,
    *,
    plan_name: str,
    enabled: bool = True,
    priority: int = 100,
    experiment_id: Optional[int] = None,
    model_name: str,
    run_mode: str,
    params_json: Optional[Dict[str, Any]] = None,
    data_json: Optional[Dict[str, Any]] = None,
    artifact_subdir: Optional[str] = None,
) -> int:
    """
    run_plansへupsertし、plan_idを返す（冪等）
    - JSONBは文字列として渡して ::jsonb キャスト（psycopg依存を減らす）
    """
    params_s = json.dumps(params_json or {}, ensure_ascii=False)
    data_s = json.dumps(data_json or {}, ensure_ascii=False)

    try:
        with engine.begin() as conn:
            conn.execute(
                text(f"""
                INSERT INTO {SCHEMA_NAME}.run_plans(
                    enabled, priority, plan_name, experiment_id,
                    model_name, run_mode, params_json, data_json, artifact_subdir
                )
                VALUES (
                    :enabled, :priority, :plan_name, :experiment_id,
                    :model_name, :run_mode, :params_json::jsonb, :data_json::jsonb, :artifact_subdir
                )
                ON CONFLICT (plan_name) DO UPDATE
                SET enabled = EXCLUDED.enabled,
                    priority = EXCLUDED.priority,
                    experiment_id = EXCLUDED.experiment_id,
                    model_name = EXCLUDED.model_name,
                    run_mode = EXCLUDED.run_mode,
                    params_json = EXCLUDED.params_json,
                    data_json = EXCLUDED.data_json,
                    artifact_subdir = EXCLUDED.artifact_subdir,
                    updated_ts = NOW();
                """),
                {
                    "enabled": enabled,
                    "priority": priority,
                    "plan_name": plan_name,
                    "experiment_id": experiment_id,
                    "model_name": model_name,
                    "run_mode": run_mode,
                    "params_json": params_s,
                    "data_json": data_s,
                    "artifact_subdir": artifact_subdir,
                },
            )

        with engine.connect() as conn:
            pid = conn.execute(
                text(f"SELECT plan_id FROM {SCHEMA_NAME}.run_plans WHERE plan_name=:plan_name"),
                {"plan_name": plan_name},
            ).scalar_one()
        return int(pid)

    except SQLAlchemyError as e:
        raise db_error("DB_RUN_PLAN_UPSERT_FAILED", "Failed to upsert run_plan.", context={"plan_name": plan_name}, cause=e)


def list_enabled_run_plans(engine: Engine, *, limit: Optional[int] = None) -> List[RunPlan]:
    """
    enabled=true の plan だけを priority昇順（同順位はplan_id昇順）で返す
    """
    try:
        q = f"""
        SELECT
            plan_id, enabled, priority, plan_name, experiment_id,
            model_name, run_mode, params_json, data_json, artifact_subdir
        FROM {SCHEMA_NAME}.run_plans
        WHERE enabled = TRUE
        ORDER BY priority ASC, plan_id ASC
        """
        if limit is not None:
            q += " LIMIT :limit"

        with engine.connect() as conn:
            rows = conn.execute(text(q), {"limit": limit} if limit is not None else {}).mappings().all()

        out: List[RunPlan] = []
        for r in rows:
            out.append(
                RunPlan(
                    plan_id=int(r["plan_id"]),
                    enabled=bool(r["enabled"]),
                    priority=int(r["priority"]),
                    plan_name=str(r["plan_name"]),
                    experiment_id=int(r["experiment_id"]) if r["experiment_id"] is not None else None,
                    model_name=str(r["model_name"]),
                    run_mode=str(r["run_mode"]),
                    params_json=dict(r["params_json"] or {}),
                    data_json=dict(r["data_json"] or {}),
                    artifact_subdir=str(r["artifact_subdir"]) if r["artifact_subdir"] is not None else None,
                )
            )
        return out

    except SQLAlchemyError as e:
        raise db_error("DB_RUN_PLAN_LIST_FAILED", "Failed to list enabled run_plans.", cause=e)
'''
p.write_text(code, encoding="utf-8")
print("Wrote:", p)
print(p.read_text(encoding="utf-8")[:800], "...\n")


Wrote: /mnt/e/env/ts/tslib/src/nf_app/run_plans.py
\
from __future__ import annotations

import json
from dataclasses import dataclass
from typing import Any, Dict, List, Optional

from sqlalchemy import text
from sqlalchemy.engine import Engine
from sqlalchemy.exc import SQLAlchemyError

from nf_app.errors import db_error

SCHEMA_NAME = "neuralforecast"


@dataclass(frozen=True)
class RunPlan:
    plan_id: int
    enabled: bool
    priority: int
    plan_name: str
    experiment_id: Optional[int]
    model_name: str
    run_mode: str
    params_json: Dict[str, Any]
    data_json: Dict[str, Any]
    artifact_subdir: Optional[str]


def ensure_experiment(engine: Engine, *, name: str, description: Optional[str] = None) -> int:
    """
    experimentsに name をupsertし、experiment_idを返す
    """
    try:
        with engine.begin() as conn:
       ...



## ✅ codecell: 1-DB-014-2 test_migrations.py を更新（enabled plan抽出テストを追加）

pytest先：/mnt/e/env/ts/tslib/tests/test_migrations.py
（既存2テストは保持しつつ、3つ目のテストを追加する形でフル上書きします）

In [57]:
from pathlib import Path

tpath = Path("/mnt/e/env/ts/tslib/tests/test_migrations.py")
tpath.parent.mkdir(parents=True, exist_ok=True)

tcode = r'''\
from pathlib import Path
import os
import pytest
from sqlalchemy import text

from nf_app.config import NFConfig
from nf_app.db import DbManager
from nf_app.migrations import MigrationRunner, SCHEMA_NAME
from nf_app.run_plans import ensure_experiment, upsert_run_plan, list_enabled_run_plans


def _has_db_creds() -> bool:
    if not os.environ.get("MODEL_DB_URL"):
        return False
    return bool(
        os.environ.get("MODEL_DB_PASSWORD")
        or os.environ.get("PGPASSWORD")
        or ("://" in os.environ["MODEL_DB_URL"] and "@" in os.environ["MODEL_DB_URL"] and ":" in os.environ["MODEL_DB_URL"].split("@")[0])
    )


@pytest.mark.integration
def test_migrations_create_required_tables(monkeypatch):
    monkeypatch.setenv("DATASET_DB_URL", "postgresql+psycopg://postgres@127.0.0.1:5432/dataset")
    monkeypatch.setenv("MODEL_DB_URL",   "postgresql+psycopg://postgres@127.0.0.1:5432/model")
    monkeypatch.setenv("NF_STRICT_VALIDATION", "true")
    monkeypatch.setenv("NF_ARTIFACT_ROOT", "/tmp/tslib_artifacts_for_test")
    monkeypatch.setenv("NF_RUN_MODE", "DRY_RUN")

    if not _has_db_creds():
        pytest.skip("DB credentials not provided. Set MODEL_DB_PASSWORD or PGPASSWORD.")

    cfg = NFConfig.from_env(project_root=Path("/mnt/e/env/ts/tslib"))
    dbm = DbManager(cfg=cfg, echo_sql=False)

    try:
        runner = MigrationRunner(model_engine=dbm.model_engine())
        runner.apply_all()

        required = [
            f"{SCHEMA_NAME}.schema_migrations",
            f"{SCHEMA_NAME}.feature_prefix_rules",
            f"{SCHEMA_NAME}.join_rules",
            f"{SCHEMA_NAME}.experiments",
            f"{SCHEMA_NAME}.run_plans",
            f"{SCHEMA_NAME}.runs",
            f"{SCHEMA_NAME}.model_versions",
        ]

        with dbm.model_engine().connect() as conn:
            for t in required:
                exists = conn.execute(text("SELECT to_regclass(:t) IS NOT NULL"), {"t": t}).scalar_one()
                assert exists, f"missing table: {t}"

            prefixes = conn.execute(
                text(f"SELECT prefix FROM {SCHEMA_NAME}.feature_prefix_rules ORDER BY prefix")
            ).scalars().all()
            assert "hist_" in prefixes
            assert "futr_" in prefixes
            assert "stat_" in prefixes

            rule_names = conn.execute(
                text(f"SELECT rule_name FROM {SCHEMA_NAME}.join_rules ORDER BY rule_name")
            ).scalars().all()
            assert "loto_join_v1" in rule_names

    finally:
        dbm.dispose()


@pytest.mark.integration
def test_join_rules_allows_insert(monkeypatch):
    monkeypatch.setenv("DATASET_DB_URL", "postgresql+psycopg://postgres@127.0.0.1:5432/dataset")
    monkeypatch.setenv("MODEL_DB_URL",   "postgresql+psycopg://postgres@127.0.0.1:5432/model")
    monkeypatch.setenv("NF_STRICT_VALIDATION", "true")
    monkeypatch.setenv("NF_ARTIFACT_ROOT", "/tmp/tslib_artifacts_for_test")
    monkeypatch.setenv("NF_RUN_MODE", "DRY_RUN")

    if not _has_db_creds():
        pytest.skip("DB credentials not provided. Set MODEL_DB_PASSWORD or PGPASSWORD.")

    cfg = NFConfig.from_env(project_root=Path("/mnt/e/env/ts/tslib"))
    dbm = DbManager(cfg=cfg, echo_sql=False)

    try:
        runner = MigrationRunner(model_engine=dbm.model_engine())
        runner.apply_all()

        test_name = "pytest_join_rule"
        sql_text = "SELECT 1 AS dummy;"

        with dbm.model_engine().begin() as conn:
            conn.execute(text(f"""
                INSERT INTO {SCHEMA_NAME}.join_rules(rule_name, enabled, priority, sql_text, description)
                VALUES (:name, TRUE, 999, :sql, 'pytest insert check')
                ON CONFLICT (rule_name) DO UPDATE
                SET sql_text = EXCLUDED.sql_text,
                    priority = EXCLUDED.priority,
                    updated_ts = NOW();
            """), {"name": test_name, "sql": sql_text})

        with dbm.model_engine().connect() as conn:
            got = conn.execute(
                text(f"SELECT sql_text, priority FROM {SCHEMA_NAME}.join_rules WHERE rule_name=:name"),
                {"name": test_name}
            ).one()
            assert got[0].strip() == sql_text
            assert int(got[1]) == 999

    finally:
        dbm.dispose()


@pytest.mark.integration
def test_run_plans_list_enabled_only(monkeypatch):
    """
    ID 014 Done: enabled planだけ抽出できる（priority昇順）
    """
    monkeypatch.setenv("DATASET_DB_URL", "postgresql+psycopg://postgres@127.0.0.1:5432/dataset")
    monkeypatch.setenv("MODEL_DB_URL",   "postgresql+psycopg://postgres@127.0.0.1:5432/model")
    monkeypatch.setenv("NF_STRICT_VALIDATION", "true")
    monkeypatch.setenv("NF_ARTIFACT_ROOT", "/tmp/tslib_artifacts_for_test")
    monkeypatch.setenv("NF_RUN_MODE", "DRY_RUN")

    if not _has_db_creds():
        pytest.skip("DB credentials not provided. Set MODEL_DB_PASSWORD or PGPASSWORD.")

    cfg = NFConfig.from_env(project_root=Path("/mnt/e/env/ts/tslib"))
    dbm = DbManager(cfg=cfg, echo_sql=False)

    try:
        runner = MigrationRunner(model_engine=dbm.model_engine())
        runner.apply_all()

        eng = dbm.model_engine()
        eid = ensure_experiment(eng, name="pytest_exp_014", description="for enabled plan filtering")

        # enabled: True (priority 20)
        upsert_run_plan(
            eng,
            plan_name="pytest_plan_enabled_20",
            enabled=True,
            priority=20,
            experiment_id=eid,
            model_name="AutoNBEATS",
            run_mode="DRY_RUN",
            params_json={"a": 1},
            data_json={"join_rule": "loto_join_v1"},
            artifact_subdir="pytest/014/enabled20",
        )
        # enabled: True (priority 10) -> 先に来るはず
        upsert_run_plan(
            eng,
            plan_name="pytest_plan_enabled_10",
            enabled=True,
            priority=10,
            experiment_id=eid,
            model_name="AutoTFT",
            run_mode="DRY_RUN",
            params_json={"b": 2},
            data_json={"join_rule": "loto_join_v1"},
            artifact_subdir="pytest/014/enabled10",
        )
        # enabled: False -> 出てこない
        upsert_run_plan(
            eng,
            plan_name="pytest_plan_disabled",
            enabled=False,
            priority=1,
            experiment_id=eid,
            model_name="AutoNBEATS",
            run_mode="DRY_RUN",
            params_json={},
            data_json={},
            artifact_subdir="pytest/014/disabled",
        )

        plans = list_enabled_run_plans(eng)

        names = [p.plan_name for p in plans]
        assert "pytest_plan_disabled" not in names
        assert "pytest_plan_enabled_10" in names
        assert "pytest_plan_enabled_20" in names

        # priority順になっていること（enabled_10 が enabled_20 より前）
        i10 = names.index("pytest_plan_enabled_10")
        i20 = names.index("pytest_plan_enabled_20")
        assert i10 < i20

    finally:
        dbm.dispose()
'''
tpath.write_text(tcode, encoding="utf-8")
print("Updated:", tpath)
print(tpath.read_text(encoding="utf-8")[:900], "...\n")


Updated: /mnt/e/env/ts/tslib/tests/test_migrations.py
\
from pathlib import Path
import os
import pytest
from sqlalchemy import text

from nf_app.config import NFConfig
from nf_app.db import DbManager
from nf_app.migrations import MigrationRunner, SCHEMA_NAME
from nf_app.run_plans import ensure_experiment, upsert_run_plan, list_enabled_run_plans


def _has_db_creds() -> bool:
    if not os.environ.get("MODEL_DB_URL"):
        return False
    return bool(
        os.environ.get("MODEL_DB_PASSWORD")
        or os.environ.get("PGPASSWORD")
        or ("://" in os.environ["MODEL_DB_URL"] and "@" in os.environ["MODEL_DB_URL"] and ":" in os.environ["MODEL_DB_URL"].split("@")[0])
    )


@pytest.mark.integration
def test_migrations_create_required_tables(monkeypatch):
    monkeypatch.setenv("DATASET_DB_URL", "postgresql+psycopg://postgres@127.0.0.1:5432/dataset")
    monkeypatch.setenv("MODEL_DB_URL",   "postgresql+psycopg://postgres@127.0.0.1:54 ...



## ✅ codecell: 1-DB-014-3 pytest 実行

In [58]:
%cd /mnt/e/env/ts/tslib
!pytest -q tests/test_migrations.py


/mnt/e/env/ts/tslib
[32m.[0m[32m.[0m[31mF[0m[31m                                                                      [100%][0m
[31m[1m_______________________ test_run_plans_list_enabled_only _______________________[0m

self = <sqlalchemy.engine.base.Connection object at 0x7222ed5db010>
dialect = <sqlalchemy.dialects.postgresql.psycopg.PGDialect_psycopg object at 0x7222ed5e0410>
context = <sqlalchemy.dialects.postgresql.psycopg.PGExecutionContext_psycopg object at 0x7222ed5da210>
statement = <sqlalchemy.dialects.postgresql.psycopg.PGCompiler_psycopg object at 0x7222ed5d9110>
parameters = [{'artifact_subdir': 'pytest/014/enabled20', 'enabled': True, 'experiment_id': 1, 'model_name': 'AutoNBEATS', ...}]

    [0m[94mdef[39;49;00m[90m [39;49;00m[92m_exec_single_context[39;49;00m([90m[39;49;00m
        [96mself[39;49;00m,[90m[39;49;00m
        dialect: Dialect,[90m[39;49;00m
        context: ExecutionContext,[90m[39;49;00m
        statement: Union[[96mstr[39;49

## run_plans upsert のSQL修正

SQLAlchemy text() で `:param::jsonb` を書くと bind解析されず `:` がSQLに残ってSyntaxErrorになる。
`CAST(:param AS JSONB)` に変更して解消する。


## ✅ codecell（/mnt/e/env/ts/tslib/src/nf_app/run_plans.py をフル上書き）

In [59]:
from pathlib import Path

p = Path("/mnt/e/env/ts/tslib/src/nf_app/run_plans.py")
p.parent.mkdir(parents=True, exist_ok=True)

code = r'''\
from __future__ import annotations

import json
from dataclasses import dataclass
from typing import Any, Dict, List, Optional

from sqlalchemy import text
from sqlalchemy.engine import Engine
from sqlalchemy.exc import SQLAlchemyError

from nf_app.errors import db_error

SCHEMA_NAME = "neuralforecast"


@dataclass(frozen=True)
class RunPlan:
    plan_id: int
    enabled: bool
    priority: int
    plan_name: str
    experiment_id: Optional[int]
    model_name: str
    run_mode: str
    params_json: Dict[str, Any]
    data_json: Dict[str, Any]
    artifact_subdir: Optional[str]


def ensure_experiment(engine: Engine, *, name: str, description: Optional[str] = None) -> int:
    """experimentsに name をupsertし、experiment_idを返す"""
    try:
        with engine.begin() as conn:
            conn.execute(
                text(f"""
                INSERT INTO {SCHEMA_NAME}.experiments(name, description)
                VALUES (:name, :desc)
                ON CONFLICT (name) DO UPDATE
                SET description = COALESCE(EXCLUDED.description, {SCHEMA_NAME}.experiments.description);
                """),
                {"name": name, "desc": description},
            )
        with engine.connect() as conn:
            eid = conn.execute(
                text(f"SELECT experiment_id FROM {SCHEMA_NAME}.experiments WHERE name=:name"),
                {"name": name},
            ).scalar_one()
        return int(eid)
    except SQLAlchemyError as e:
        raise db_error(
            "DB_EXPERIMENT_UPSERT_FAILED",
            "Failed to upsert experiment.",
            context={"name": name},
            cause=e,
        )


def upsert_run_plan(
    engine: Engine,
    *,
    plan_name: str,
    enabled: bool = True,
    priority: int = 100,
    experiment_id: Optional[int] = None,
    model_name: str,
    run_mode: str,
    params_json: Optional[Dict[str, Any]] = None,
    data_json: Optional[Dict[str, Any]] = None,
    artifact_subdir: Optional[str] = None,
) -> int:
    """
    run_plansへupsertし、plan_idを返す（冪等）
    NOTE: SQLAlchemy text() では ':param::jsonb' がbindとして解釈されないことがあるため
          CAST(:param AS JSONB) を使う。
    """
    params_s = json.dumps(params_json or {}, ensure_ascii=False)
    data_s = json.dumps(data_json or {}, ensure_ascii=False)

    try:
        with engine.begin() as conn:
            conn.execute(
                text(f"""
                INSERT INTO {SCHEMA_NAME}.run_plans(
                    enabled, priority, plan_name, experiment_id,
                    model_name, run_mode, params_json, data_json, artifact_subdir
                )
                VALUES (
                    :enabled, :priority, :plan_name, :experiment_id,
                    :model_name, :run_mode,
                    CAST(:params_json AS JSONB),
                    CAST(:data_json AS JSONB),
                    :artifact_subdir
                )
                ON CONFLICT (plan_name) DO UPDATE
                SET enabled = EXCLUDED.enabled,
                    priority = EXCLUDED.priority,
                    experiment_id = EXCLUDED.experiment_id,
                    model_name = EXCLUDED.model_name,
                    run_mode = EXCLUDED.run_mode,
                    params_json = EXCLUDED.params_json,
                    data_json = EXCLUDED.data_json,
                    artifact_subdir = EXCLUDED.artifact_subdir,
                    updated_ts = NOW();
                """),
                {
                    "enabled": enabled,
                    "priority": priority,
                    "plan_name": plan_name,
                    "experiment_id": experiment_id,
                    "model_name": model_name,
                    "run_mode": run_mode,
                    "params_json": params_s,
                    "data_json": data_s,
                    "artifact_subdir": artifact_subdir,
                },
            )

        with engine.connect() as conn:
            pid = conn.execute(
                text(f"SELECT plan_id FROM {SCHEMA_NAME}.run_plans WHERE plan_name=:plan_name"),
                {"plan_name": plan_name},
            ).scalar_one()
        return int(pid)

    except SQLAlchemyError as e:
        raise db_error(
            "DB_RUN_PLAN_UPSERT_FAILED",
            "Failed to upsert run_plan.",
            context={"plan_name": plan_name},
            cause=e,
        )


def list_enabled_run_plans(engine: Engine, *, limit: Optional[int] = None) -> List[RunPlan]:
    """enabled=true の plan だけを priority昇順（同順位はplan_id昇順）で返す"""
    try:
        q = f"""
        SELECT
            plan_id, enabled, priority, plan_name, experiment_id,
            model_name, run_mode, params_json, data_json, artifact_subdir
        FROM {SCHEMA_NAME}.run_plans
        WHERE enabled = TRUE
        ORDER BY priority ASC, plan_id ASC
        """
        if limit is not None:
            q += " LIMIT :limit"

        with engine.connect() as conn:
            rows = conn.execute(text(q), {"limit": limit} if limit is not None else {}).mappings().all()

        out: List[RunPlan] = []
        for r in rows:
            out.append(
                RunPlan(
                    plan_id=int(r["plan_id"]),
                    enabled=bool(r["enabled"]),
                    priority=int(r["priority"]),
                    plan_name=str(r["plan_name"]),
                    experiment_id=int(r["experiment_id"]) if r["experiment_id"] is not None else None,
                    model_name=str(r["model_name"]),
                    run_mode=str(r["run_mode"]),
                    params_json=dict(r["params_json"] or {}),
                    data_json=dict(r["data_json"] or {}),
                    artifact_subdir=str(r["artifact_subdir"]) if r["artifact_subdir"] is not None else None,
                )
            )
        return out

    except SQLAlchemyError as e:
        raise db_error("DB_RUN_PLAN_LIST_FAILED", "Failed to list enabled run_plans.", cause=e)
'''
p.write_text(code, encoding="utf-8")
print("Wrote:", p)


Wrote: /mnt/e/env/ts/tslib/src/nf_app/run_plans.py


## ✅ codecell（pytest再実行）

In [60]:
%cd /mnt/e/env/ts/tslib
!pytest -q tests/test_migrations.py::test_run_plans_list_enabled_only


/mnt/e/env/ts/tslib
[32m.[0m[32m                                                                        [100%][0m
[32m[32m[1m1 passed[0m[32m in 0.25s[0m[0m


## ID 015 dataset: forecast保存テーブル（preds/backtest）

dataset DB側に `neuralforecast.forecast_preds` と `neuralforecast.forecast_backtests` を作成し、
予測（yhat）をINSERTできることをpytestで担保する。

方針:
- 既存のmodel DB migrationは変更せず、migrations.pyに dataset用関数を追記
- JSONBは CAST(:x AS JSONB) で安全にバインドする（:x::jsonb でコケる事故回避）


## codecell（migrations.py に dataset migration 関数を追記）

フルパス: /mnt/e/env/ts/tslib/src/nf_app/migrations.py

In [61]:
from pathlib import Path

path = Path("/mnt/e/env/ts/tslib/src/nf_app/migrations.py")
src = path.read_text(encoding="utf-8")

sentinel = "def apply_dataset_migrations("
if sentinel in src:
    print("apply_dataset_migrations already exists. Skip append.")
else:
    append = r'''

# =========================
# Dataset DB migrations (ID 015)
# =========================
DATASET_MIGRATION_VERSION = 1

def apply_dataset_migrations(dataset_engine):
    """
    dataset DB側の予測保存テーブル（preds/backtest）を作成する（冪等）
    - cross-db FKは張れないので run_id/plan_id は参照キーとして保持するだけ
    """
    import json
    from sqlalchemy import text
    from sqlalchemy.exc import SQLAlchemyError
    from nf_app.errors import db_error

    try:
        with dataset_engine.begin() as conn:
            # schema + schema_migrations
            conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {SCHEMA_NAME};"))
            conn.execute(text(f"""
            CREATE TABLE IF NOT EXISTS {SCHEMA_NAME}.schema_migrations(
                id INTEGER PRIMARY KEY DEFAULT 1 CHECK (id = 1),
                version INTEGER NOT NULL,
                updated_ts TIMESTAMPTZ NOT NULL DEFAULT NOW()
            );
            """))
            # version upsert
            conn.execute(
                text(f"""
                INSERT INTO {SCHEMA_NAME}.schema_migrations(id, version)
                VALUES (1, :v)
                ON CONFLICT (id) DO UPDATE
                SET version = GREATEST({SCHEMA_NAME}.schema_migrations.version, EXCLUDED.version),
                    updated_ts = NOW();
                """),
                {"v": DATASET_MIGRATION_VERSION},
            )

            # -------- forecast_preds (future prediction)
            conn.execute(text(f"""
            CREATE TABLE IF NOT EXISTS {SCHEMA_NAME}.forecast_preds(
                pred_id BIGSERIAL PRIMARY KEY,
                run_id UUID NOT NULL,
                plan_id BIGINT NULL,
                model_name TEXT NOT NULL,
                unique_id TEXT NOT NULL,
                ds DATE NOT NULL,
                horizon INTEGER NULL,
                cutoff DATE NULL,
                yhat DOUBLE PRECISION NOT NULL,
                yhat_lower DOUBLE PRECISION NULL,
                yhat_upper DOUBLE PRECISION NULL,
                extra_json JSONB NOT NULL DEFAULT '{{}}'::jsonb,
                created_ts TIMESTAMPTZ NOT NULL DEFAULT NOW(),
                UNIQUE (run_id, model_name, unique_id, ds)
            );
            """))

            conn.execute(text(f"CREATE INDEX IF NOT EXISTS ix_forecast_preds_run_id ON {SCHEMA_NAME}.forecast_preds(run_id);"))
            conn.execute(text(f"CREATE INDEX IF NOT EXISTS ix_forecast_preds_uid_ds ON {SCHEMA_NAME}.forecast_preds(unique_id, ds);"))
            conn.execute(text(f"CREATE INDEX IF NOT EXISTS ix_forecast_preds_plan_id ON {SCHEMA_NAME}.forecast_preds(plan_id);"))

            # -------- forecast_backtests (historical backtest prediction)
            conn.execute(text(f"""
            CREATE TABLE IF NOT EXISTS {SCHEMA_NAME}.forecast_backtests(
                bt_id BIGSERIAL PRIMARY KEY,
                run_id UUID NOT NULL,
                plan_id BIGINT NULL,
                model_name TEXT NOT NULL,
                unique_id TEXT NOT NULL,
                cutoff DATE NOT NULL,
                ds DATE NOT NULL,
                horizon INTEGER NULL,
                y DOUBLE PRECISION NULL,
                yhat DOUBLE PRECISION NOT NULL,
                extra_json JSONB NOT NULL DEFAULT '{{}}'::jsonb,
                created_ts TIMESTAMPTZ NOT NULL DEFAULT NOW(),
                UNIQUE (run_id, model_name, unique_id, cutoff, ds)
            );
            """))

            conn.execute(text(f"CREATE INDEX IF NOT EXISTS ix_forecast_bt_run_id ON {SCHEMA_NAME}.forecast_backtests(run_id);"))
            conn.execute(text(f"CREATE INDEX IF NOT EXISTS ix_forecast_bt_uid_ds ON {SCHEMA_NAME}.forecast_backtests(unique_id, ds);"))
            conn.execute(text(f"CREATE INDEX IF NOT EXISTS ix_forecast_bt_cutoff ON {SCHEMA_NAME}.forecast_backtests(cutoff);"))
            conn.execute(text(f"CREATE INDEX IF NOT EXISTS ix_forecast_bt_plan_id ON {SCHEMA_NAME}.forecast_backtests(plan_id);"))

    except SQLAlchemyError as e:
        raise db_error(
            "DB_DATASET_MIGRATION_FAILED",
            "Failed to apply dataset migrations (forecast tables).",
            cause=e,
        )
'''
    path.write_text(src + append, encoding="utf-8")
    print("Appended dataset migrations to:", path)


Appended dataset migrations to: /mnt/e/env/ts/tslib/src/nf_app/migrations.py


## codecell（test_migrations.py に ID015 テストを追記）

フルパス: /mnt/e/env/ts/tslib/tests/test_migrations.py

In [62]:
from pathlib import Path

tpath = Path("/mnt/e/env/ts/tslib/tests/test_migrations.py")
tsrc = tpath.read_text(encoding="utf-8")

test_name = "test_dataset_forecast_tables_allow_insert"
if test_name in tsrc:
    print("ID015 test already exists. Skip append.")
else:
    add = r'''

@pytest.mark.integration
def test_dataset_forecast_tables_allow_insert(monkeypatch):
    """
    ID 015 Done: dataset側の forecast_preds / forecast_backtests にINSERTできる
    """
    import json
    import uuid
    from datetime import date
    from pathlib import Path
    from sqlalchemy import text

    monkeypatch.setenv("DATASET_DB_URL", "postgresql+psycopg://postgres@127.0.0.1:5432/dataset")
    monkeypatch.setenv("MODEL_DB_URL",   "postgresql+psycopg://postgres@127.0.0.1:5432/model")
    monkeypatch.setenv("NF_STRICT_VALIDATION", "true")
    monkeypatch.setenv("NF_ARTIFACT_ROOT", "/tmp/tslib_artifacts_for_test")
    monkeypatch.setenv("NF_RUN_MODE", "DRY_RUN")

    if not _has_db_creds():
        pytest.skip("DB credentials not provided. Set MODEL_DB_PASSWORD or PGPASSWORD.")

    cfg = NFConfig.from_env(project_root=Path("/mnt/e/env/ts/tslib"))
    dbm = DbManager(cfg=cfg, echo_sql=False)

    # model側 migrations は既存の runner で適用（既存テストと同じ流れ）
    runner = MigrationRunner(model_engine=dbm.model_engine())
    runner.apply_all()

    # dataset側 migrations（ID015）
    from nf_app.migrations import apply_dataset_migrations
    apply_dataset_migrations(dbm.dataset_engine())

    ds_eng = dbm.dataset_engine()

    run_id = str(uuid.uuid4())
    model_name = "AutoNBEATS"
    unique_id = "pytest_uid_015"

    # --- forecast_preds insert
    with ds_eng.begin() as conn:
        conn.execute(
            text(f"""
            INSERT INTO {SCHEMA_NAME}.forecast_preds(
                run_id, plan_id, model_name, unique_id, ds, horizon, cutoff, yhat, yhat_lower, yhat_upper, extra_json
            ) VALUES (
                :run_id, :plan_id, :model_name, :unique_id, :ds, :h, :cutoff, :yhat, :lo, :hi, CAST(:extra AS JSONB)
            )
            """),
            {
                "run_id": run_id,
                "plan_id": 999,
                "model_name": model_name,
                "unique_id": unique_id,
                "ds": date(2020, 1, 2),
                "h": 1,
                "cutoff": date(2020, 1, 1),
                "yhat": 123.0,
                "lo": 100.0,
                "hi": 150.0,
                "extra": json.dumps({"q": [0.1, 0.9]}, ensure_ascii=False),
            },
        )

    with ds_eng.connect() as conn:
        n = conn.execute(
            text(f"SELECT COUNT(*) FROM {SCHEMA_NAME}.forecast_preds WHERE run_id=:run_id"),
            {"run_id": run_id},
        ).scalar_one()
    assert int(n) == 1

    # --- forecast_backtests insert
    with ds_eng.begin() as conn:
        conn.execute(
            text(f"""
            INSERT INTO {SCHEMA_NAME}.forecast_backtests(
                run_id, plan_id, model_name, unique_id, cutoff, ds, horizon, y, yhat, extra_json
            ) VALUES (
                :run_id, :plan_id, :model_name, :unique_id, :cutoff, :ds, :h, :y, :yhat, CAST(:extra AS JSONB)
            )
            """),
            {
                "run_id": run_id,
                "plan_id": 999,
                "model_name": model_name,
                "unique_id": unique_id,
                "cutoff": date(2020, 1, 1),
                "ds": date(2020, 1, 2),
                "h": 1,
                "y": 120.0,
                "yhat": 123.0,
                "extra": json.dumps({"fold": 0}, ensure_ascii=False),
            },
        )

    with ds_eng.connect() as conn:
        m = conn.execute(
            text(f"SELECT COUNT(*) FROM {SCHEMA_NAME}.forecast_backtests WHERE run_id=:run_id"),
            {"run_id": run_id},
        ).scalar_one()
    assert int(m) == 1
'''
    tpath.write_text(tsrc + add, encoding="utf-8")
    print("Appended ID015 test to:", tpath)


Appended ID015 test to: /mnt/e/env/ts/tslib/tests/test_migrations.py


## codecell（pytest：ID015だけ実行）

In [63]:
%cd /mnt/e/env/ts/tslib
!pytest -q tests/test_migrations.py::test_dataset_forecast_tables_allow_insert


/mnt/e/env/ts/tslib
[32m.[0m[32m                                                                        [100%][0m
[32m[32m[1m1 passed[0m[32m in 0.35s[0m[0m


## ID 020 information_schema内省（列一覧・型・NULL）

目的:
- dataset DBの任意テーブルについて、information_schemaから列メタ情報を取得する
- 列が増減しても壊れない（＝ハードコードで列セットを固定しない）

Done条件:
- テストで一時テーブルを作る → 内省 → ALTERで列追加 → 内省 → 追加列が検出できる


In [64]:
from pathlib import Path

p = Path("/mnt/e/env/ts/tslib/src/nf_app/introspect.py")
p.parent.mkdir(parents=True, exist_ok=True)

code = r'''\
from __future__ import annotations

from dataclasses import dataclass
from typing import Any, Dict, List, Optional

from sqlalchemy import text
from sqlalchemy.engine import Engine
from sqlalchemy.exc import SQLAlchemyError

from nf_app.errors import db_error


@dataclass(frozen=True)
class ColumnInfo:
    table_schema: str
    table_name: str
    column_name: str
    ordinal_position: int
    data_type: str
    udt_name: str
    is_nullable: bool
    column_default: Optional[str]
    character_maximum_length: Optional[int]
    numeric_precision: Optional[int]
    numeric_scale: Optional[int]
    datetime_precision: Optional[int]

    def to_dict(self) -> Dict[str, Any]:
        return {
            "table_schema": self.table_schema,
            "table_name": self.table_name,
            "column_name": self.column_name,
            "ordinal_position": self.ordinal_position,
            "data_type": self.data_type,
            "udt_name": self.udt_name,
            "is_nullable": self.is_nullable,
            "column_default": self.column_default,
            "character_maximum_length": self.character_maximum_length,
            "numeric_precision": self.numeric_precision,
            "numeric_scale": self.numeric_scale,
            "datetime_precision": self.datetime_precision,
        }


def list_columns(engine: Engine, *, schema: str, table: str) -> List[ColumnInfo]:
    """
    information_schema.columns を使って列メタ情報を返す（列の増減に強い）
    - PostgreSQLに依存しすぎない範囲で取れる情報を取る
    - ORDER BY ordinal_position で安定順序
    """
    q = text(
        """
        SELECT
            table_schema,
            table_name,
            column_name,
            ordinal_position,
            data_type,
            udt_name,
            is_nullable,
            column_default,
            character_maximum_length,
            numeric_precision,
            numeric_scale,
            datetime_precision
        FROM information_schema.columns
        WHERE table_schema = :schema
          AND table_name = :table
        ORDER BY ordinal_position;
        """
    )

    try:
        with engine.connect() as conn:
            rows = conn.execute(q, {"schema": schema, "table": table}).mappings().all()

        out: List[ColumnInfo] = []
        for r in rows:
            out.append(
                ColumnInfo(
                    table_schema=str(r["table_schema"]),
                    table_name=str(r["table_name"]),
                    column_name=str(r["column_name"]),
                    ordinal_position=int(r["ordinal_position"]),
                    data_type=str(r["data_type"]),
                    udt_name=str(r["udt_name"]),
                    is_nullable=(str(r["is_nullable"]).upper() == "YES"),
                    column_default=str(r["column_default"]) if r["column_default"] is not None else None,
                    character_maximum_length=int(r["character_maximum_length"]) if r["character_maximum_length"] is not None else None,
                    numeric_precision=int(r["numeric_precision"]) if r["numeric_precision"] is not None else None,
                    numeric_scale=int(r["numeric_scale"]) if r["numeric_scale"] is not None else None,
                    datetime_precision=int(r["datetime_precision"]) if r["datetime_precision"] is not None else None,
                )
            )
        return out

    except SQLAlchemyError as e:
        raise db_error(
            "DB_INTROSPECT_COLUMNS_FAILED",
            "Failed to introspect columns via information_schema.",
            context={"schema": schema, "table": table},
            cause=e,
        )
'''
p.write_text(code, encoding="utf-8")
print("Wrote:", p)


Wrote: /mnt/e/env/ts/tslib/src/nf_app/introspect.py


## ✅ codecell（pytest）/mnt/e/env/ts/tslib/tests/test_introspect.py

In [66]:
from pathlib import Path

tp = Path("/mnt/e/env/ts/tslib/tests/test_introspect.py")
tp.parent.mkdir(parents=True, exist_ok=True)

tcode = r'''\
import os
import uuid
from datetime import date
from pathlib import Path

import pytest
from sqlalchemy import text

from nf_app.config import NFConfig
from nf_app.db import DbManager
from nf_app.introspect import list_columns

SCHEMA = "neuralforecast"


def _has_db_creds() -> bool:
    # 既存テストと同じ運用：PGPASSWORD or MODEL_DB_PASSWORD があれば integration を回す
    return bool(os.getenv("PGPASSWORD") or os.getenv("MODEL_DB_PASSWORD") or os.getenv("DATASET_DB_PASSWORD"))


@pytest.mark.integration
def test_introspect_columns_survives_add_remove(monkeypatch):
    """
    ID 020 Done: 列増減で壊れない（増減を“実際に”起こして検証）
    """
    monkeypatch.setenv("DATASET_DB_URL", "postgresql+psycopg://postgres@127.0.0.1:5432/dataset")
    monkeypatch.setenv("MODEL_DB_URL",   "postgresql+psycopg://postgres@127.0.0.1:5432/model")
    monkeypatch.setenv("NF_STRICT_VALIDATION", "true")
    monkeypatch.setenv("NF_ARTIFACT_ROOT", "/tmp/tslib_artifacts_for_test")
    monkeypatch.setenv("NF_RUN_MODE", "DRY_RUN")

    if not _has_db_creds():
        pytest.skip("DB credentials not provided. Set PGPASSWORD / MODEL_DB_PASSWORD / DATASET_DB_PASSWORD.")

    cfg = NFConfig.from_env(project_root=Path("/mnt/e/env/ts/tslib"))
    dbm = DbManager(cfg=cfg, echo_sql=False)
    eng = dbm.dataset_engine()

    tbl = f"pytest_introspect_{uuid.uuid4().hex[:8]}"

    # create
    with eng.begin() as conn:
        conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {SCHEMA};"))
        conn.execute(text(f"""
            CREATE TABLE {SCHEMA}.{tbl}(
                id BIGSERIAL PRIMARY KEY,
                ds DATE NOT NULL,
                y DOUBLE PRECISION NULL
            );
        """))

    cols1 = list_columns(eng, schema=SCHEMA, table=tbl)
    names1 = [c.column_name for c in cols1]
    assert "id" in names1 and "ds" in names1 and "y" in names1
    assert len(cols1) >= 3

    # add column (増分)
    with eng.begin() as conn:
        conn.execute(text(f"ALTER TABLE {SCHEMA}.{tbl} ADD COLUMN hist_x INTEGER NULL;"))

    cols2 = list_columns(eng, schema=SCHEMA, table=tbl)
    names2 = [c.column_name for c in cols2]
    assert "hist_x" in names2
    assert len(cols2) == len(cols1) + 1

    # drop column (減分)
    with eng.begin() as conn:
        conn.execute(text(f"ALTER TABLE {SCHEMA}.{tbl} DROP COLUMN hist_x;"))

    cols3 = list_columns(eng, schema=SCHEMA, table=tbl)
    names3 = [c.column_name for c in cols3]
    assert "hist_x" not in names3
    assert len(cols3) == len(cols1)

    # cleanup
    with eng.begin() as conn:
        conn.execute(text(f"DROP TABLE {SCHEMA}.{tbl};"))
'''
tp.write_text(tcode, encoding="utf-8")
print("Wrote:", tp)


Wrote: /mnt/e/env/ts/tslib/tests/test_introspect.py


## ✅ codecell（pytest実行）

In [67]:
%cd /mnt/e/env/ts/tslib
!pytest -q tests/test_introspect.py


/mnt/e/env/ts/tslib
[32m.[0m[32m                                                                        [100%][0m
[32m[32m[1m1 passed[0m[32m in 0.25s[0m[0m


## ID 021 join_rulesに基づくJOIN SQL生成（衝突回避）

目的:
- join_rules（JSON仕様）から dataset DB で実行可能な JOIN SQL を生成する
- 列名衝突（同名カラム）を必ず回避し、生成SQLが毎回同一（安定）であること

設計ポイント:
- join_rulesテーブルに spec_json (JSONB) があればそれを最優先で使う
- 無い場合でも、join_rulesの列を“推測”してspecに変換（拡張に強い）
- dataset側のinformation_schema内省結果（ordinal_position順）でSELECT順序を固定（安定化）
- 衝突回避ポリシー:
  - JOIN側カラムは prefix があれば prefix+col
  - prefixが無ければ join_alias__col
  - ds/unique_id/y は “核” として固定（JOIN側同名は原則スキップ）


## codecell（実装）/mnt/e/env/ts/tslib/src/nf_app/query_builder.py

In [68]:
from pathlib import Path

p = Path("/mnt/e/env/ts/tslib/src/nf_app/query_builder.py")
p.parent.mkdir(parents=True, exist_ok=True)

code = r'''\
from __future__ import annotations

import json
import re
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple

from sqlalchemy import text
from sqlalchemy.engine import Engine
from sqlalchemy.exc import SQLAlchemyError

from nf_app.errors import db_error
from nf_app.introspect import list_columns


# -------------------------
# Helpers
# -------------------------

_IDENT_RE = re.compile(r"^[a-z_][a-z0-9_]*$")


def _q_ident(name: str) -> str:
    """
    SQL identifier quote (Postgres). ほぼ小文字+_ならクオート不要、そうでなければ "..."。
    """
    if _IDENT_RE.match(name):
        return name
    return '"' + name.replace('"', '""') + '"'


def _q_fqn(schema: str, table: str) -> str:
    return f"{_q_ident(schema)}.{_q_ident(table)}"


def _stable_join_type(s: str) -> str:
    s2 = (s or "left").strip().lower()
    if s2 in ("left", "left join"):
        return "LEFT JOIN"
    if s2 in ("inner", "inner join"):
        return "INNER JOIN"
    if s2 in ("right", "right join"):
        return "RIGHT JOIN"
    if s2 in ("full", "full join", "full outer", "full outer join"):
        return "FULL OUTER JOIN"
    # デフォルトは安全側
    return "LEFT JOIN"


def _json_load_maybe(x: Any) -> Any:
    if x is None:
        return None
    if isinstance(x, (dict, list)):
        return x
    if isinstance(x, str):
        x = x.strip()
        if not x:
            return None
        try:
            return json.loads(x)
        except Exception:
            return None
    return None


# -------------------------
# Spec dataclasses
# -------------------------

@dataclass(frozen=True)
class JoinOn:
    left: str   # base column
    right: str  # join column


@dataclass(frozen=True)
class JoinSpec:
    schema: str
    table: str
    alias: str
    join_type: str
    on: Tuple[JoinOn, ...]
    prefix: str = ""                   # collision回避 prefix (e.g., hist_, futr_, stat_)
    include: Tuple[str, ...] = ("*",)   # ["*"] or explicit column list
    exclude: Tuple[str, ...] = ()       # columns to exclude
    rename: Dict[str, str] = None       # optional rename map (col -> out_name)

    def __post_init__(self):
        if self.rename is None:
            object.__setattr__(self, "rename", {})


@dataclass(frozen=True)
class BaseSpec:
    schema: str
    table: str
    alias: str
    ds_col: str = "ds"
    uid_col: str = "unique_id"
    y_col: str = "y"
    include: Tuple[str, ...] = ()       # base extra columns; empty means only (ds, uid, y)
    exclude: Tuple[str, ...] = ()       # base columns to exclude (rare)


@dataclass(frozen=True)
class QuerySpec:
    rule_name: str
    base: BaseSpec
    joins: Tuple[JoinSpec, ...]


# -------------------------
# Load join_rules from model DB (flexible)
# -------------------------

def load_join_rule_spec(model_engine: Engine, *, schema: str, rule_name: str) -> Dict[str, Any]:
    """
    join_rules から spec を読む。
    1) spec_json / rule_json / join_spec_json みたいな JSONB列があればそれを優先
    2) 無ければ行を丸ごと読み、推測して最低限のspecに変換（拡張に強い）
    """
    try:
        with model_engine.connect() as conn:
            # 列一覧を内省して JSON列の有無を見る
            cols = conn.execute(
                text("""
                SELECT column_name
                FROM information_schema.columns
                WHERE table_schema=:schema AND table_name='join_rules'
                ORDER BY ordinal_position
                """),
                {"schema": schema},
            ).scalars().all()

            if not cols:
                raise db_error("DB_JOIN_RULES_NOT_FOUND", "join_rules table not found.", context={"schema": schema})

            json_candidates = [c for c in cols if c in ("spec_json", "rule_json", "join_spec_json", "spec", "rule")]
            if json_candidates:
                jcol = json_candidates[0]
                row = conn.execute(
                    text(f"SELECT { _q_ident(jcol) } AS j FROM { _q_ident(schema) }.join_rules WHERE rule_name=:rn"),
                    {"rn": rule_name},
                ).mappings().first()
                if not row or row["j"] is None:
                    raise db_error("DB_JOIN_RULE_NOT_FOUND", "join_rule not found.", context={"rule_name": rule_name})
                spec = _json_load_maybe(row["j"])
                if not isinstance(spec, dict):
                    raise db_error("DB_JOIN_RULE_BAD_JSON", "join_rule JSON invalid.", context={"rule_name": rule_name})
                return spec

            # fallback: read whole row, guess
            row = conn.execute(
                text(f"SELECT * FROM { _q_ident(schema) }.join_rules WHERE rule_name=:rn"),
                {"rn": rule_name},
            ).mappings().first()
            if not row:
                raise db_error("DB_JOIN_RULE_NOT_FOUND", "join_rule not found.", context={"rule_name": rule_name})

            # 推測：base_table/base_schema と joins_json っぽいもの
            guessed: Dict[str, Any] = {"rule_name": rule_name}
            for k in ("base_schema", "base_table", "ds_col", "uid_col", "y_col"):
                if k in row and row[k] is not None:
                    guessed[k] = row[k]
            for k in ("joins_json", "joins", "join_json"):
                if k in row and row[k] is not None:
                    j = _json_load_maybe(row[k])
                    if isinstance(j, list):
                        guessed["joins"] = j
                        break
            return guessed

    except SQLAlchemyError as e:
        raise db_error(
            "DB_JOIN_RULE_LOAD_FAILED",
            "Failed to load join_rules spec.",
            context={"schema": schema, "rule_name": rule_name},
            cause=e,
        )


def normalize_query_spec(spec: Dict[str, Any]) -> QuerySpec:
    """
    dict spec -> QuerySpec
    必須:
      - base_schema/base_table
      - joins: list of {schema, table, alias, on:[{left,right}], join_type?, prefix?}
    """
    rule_name = str(spec.get("rule_name") or spec.get("name") or "unnamed_rule")

    base_schema = str(spec.get("base_schema") or spec.get("schema") or "public")
    base_table = str(spec.get("base_table") or spec.get("table") or "")
    if not base_table:
        raise ValueError("join spec missing base_table/base_schema")

    base_alias = str(spec.get("base_alias") or "b")
    ds_col = str(spec.get("ds_col") or "ds")
    uid_col = str(spec.get("uid_col") or "unique_id")
    y_col = str(spec.get("y_col") or "y")

    base_include = tuple(spec.get("base_include") or spec.get("base_columns") or [])
    base_exclude = tuple(spec.get("base_exclude") or [])

    joins_in = spec.get("joins") or []
    if not isinstance(joins_in, list):
        raise ValueError("join spec 'joins' must be list")

    joins: List[JoinSpec] = []
    for i, j in enumerate(joins_in):
        if not isinstance(j, dict):
            continue
        js = str(j.get("schema") or base_schema)
        jt = str(j.get("table") or "")
        if not jt:
            raise ValueError(f"join[{i}] missing table")
        alias = str(j.get("alias") or f"j{i+1}")
        join_type = _stable_join_type(str(j.get("join_type") or j.get("type") or "left"))
        prefix = str(j.get("prefix") or "")

        on_in = j.get("on") or []
        if not isinstance(on_in, list) or not on_in:
            # デフォルトキー（ds, unique_id）
            on = (JoinOn(left=uid_col, right=uid_col), JoinOn(left=ds_col, right=ds_col))
        else:
            ons: List[JoinOn] = []
            for o in on_in:
                if isinstance(o, dict) and "left" in o and "right" in o:
                    ons.append(JoinOn(left=str(o["left"]), right=str(o["right"])))
            if not ons:
                ons = [JoinOn(left=uid_col, right=uid_col), JoinOn(left=ds_col, right=ds_col)]
            on = tuple(ons)

        include = tuple(j.get("include") or ("*",))
        exclude = tuple(j.get("exclude") or ())
        rename = j.get("rename") or {}
        if not isinstance(rename, dict):
            rename = {}

        joins.append(
            JoinSpec(
                schema=js,
                table=jt,
                alias=alias,
                join_type=join_type,
                on=on,
                prefix=prefix,
                include=include,
                exclude=exclude,
                rename=rename,
            )
        )

    return QuerySpec(
        rule_name=rule_name,
        base=BaseSpec(
            schema=base_schema,
            table=base_table,
            alias=base_alias,
            ds_col=ds_col,
            uid_col=uid_col,
            y_col=y_col,
            include=base_include,
            exclude=base_exclude,
        ),
        joins=tuple(joins),
    )


# -------------------------
# SQL builder
# -------------------------

def _select_columns_for_table(
    dataset_engine: Engine,
    *,
    schema: str,
    table: str,
    include: Sequence[str],
    exclude: Sequence[str],
) -> List[str]:
    """
    include/excludeに従って列名を返す。include=["*"]なら全列。
    順序は ordinal_position で固定（安定生成の核）。
    """
    cols = list_columns(dataset_engine, schema=schema, table=table)
    all_names = [c.column_name for c in cols]

    inc = list(include) if include else []
    exc = set(exclude or [])

    if not inc:
        return [n for n in all_names if n not in exc]

    if len(inc) == 1 and inc[0] == "*":
        return [n for n in all_names if n not in exc]

    # 明示列
    return [n for n in inc if n in all_names and n not in exc]


def build_join_sql(
    dataset_engine: Engine,
    *,
    query_spec: QuerySpec,
) -> str:
    """
    dataset DBで実行するJOIN SQLを生成する（安定・衝突回避）
    """
    b = query_spec.base

    # base columns
    base_all = _select_columns_for_table(
        dataset_engine, schema=b.schema, table=b.table,
        include=(["*"] if not b.include else list(b.include)),
        exclude=b.exclude,
    )

    # 核カラム（必須）
    core = [b.uid_col, b.ds_col, b.y_col]
    # base select順序：unique_id, ds, y, その後に残り（安定）
    base_ordered: List[str] = []
    for c in core:
        if c in base_all and c not in base_ordered:
            base_ordered.append(c)
    for c in base_all:
        if c not in base_ordered:
            base_ordered.append(c)

    # 生成SELECTと衝突管理
    used_out_names: set[str] = set()
    select_exprs: List[str] = []

    def add_expr(src_alias: str, col: str, out_name: str) -> None:
        used_out_names.add(out_name)
        select_exprs.append(f"{src_alias}.{_q_ident(col)} AS {_q_ident(out_name)}")

    # base: coreは固定名に正規化（unique_id/ds/y）
    # ここは「ユーザー要件：ds/unique_id/yに設定」を満たす
    if b.uid_col not in base_ordered or b.ds_col not in base_ordered or b.y_col not in base_ordered:
        raise ValueError("base table must contain ds/unique_id/y columns (or mapped via ds_col/uid_col/y_col)")

    add_expr(b.alias, b.uid_col, "unique_id")
    add_expr(b.alias, b.ds_col, "ds")
    add_expr(b.alias, b.y_col, "y")

    # base extra columns（核以外）
    for c in base_ordered:
        if c in (b.uid_col, b.ds_col, b.y_col):
            continue
        out = c
        if out in used_out_names:
            # base側の衝突は join 側で避けるのが基本だが、念のため安定サフィックス
            out = f"b__{out}"
        add_expr(b.alias, c, out)

    # joins
    join_clauses: List[str] = []
    for j in query_spec.joins:
        # join ON
        ons = []
        for o in j.on:
            ons.append(f"{b.alias}.{_q_ident(o.left)} = {j.alias}.{_q_ident(o.right)}")
        on_sql = " AND ".join(ons)

        join_clauses.append(
            f"{j.join_type} {_q_fqn(j.schema, j.table)} {j.alias} ON {on_sql}"
        )

        # join select columns
        join_cols = _select_columns_for_table(
            dataset_engine, schema=j.schema, table=j.table,
            include=j.include, exclude=j.exclude,
        )

        # デフォルトで “キー” と “核(y)” はJOIN側からは出さない（混乱・衝突の元）
        skip = {o.right for o in j.on}
        skip.update({b.ds_col, b.uid_col, b.y_col, "ds", "unique_id", "y"})
        for col in join_cols:
            if col in skip:
                continue

            # rename 優先
            if col in (j.rename or {}):
                out = str(j.rename[col])
            else:
                # prefixがあれば prefix+col、無ければ alias__col
                out = f"{j.prefix}{col}" if j.prefix else f"{j.alias}__{col}"

            # さらに衝突したら安定サフィックス
            if out in used_out_names:
                out2 = out
                k = 2
                while out2 in used_out_names:
                    out2 = f"{out}__{k}"
                    k += 1
                out = out2

            add_expr(j.alias, col, out)

    sql = "\n".join(
        [
            "SELECT",
            "  " + ",\n  ".join(select_exprs),
            f"FROM {_q_fqn(b.schema, b.table)} {b.alias}",
            *(join_clauses),
        ]
    )
    return sql


def build_join_sql_from_rule(
    model_engine: Engine,
    dataset_engine: Engine,
    *,
    model_schema: str,
    rule_name: str,
) -> str:
    """
    model DBの join_rules を読み、dataset DB向け JOIN SQL を返す
    """
    raw = load_join_rule_spec(model_engine, schema=model_schema, rule_name=rule_name)
    qs = normalize_query_spec(raw)
    return build_join_sql(dataset_engine, query_spec=qs)
'''
p.write_text(code, encoding="utf-8")
print("Wrote:", p)


Wrote: /mnt/e/env/ts/tslib/src/nf_app/query_builder.py


## codecell（pytest）/mnt/e/env/ts/tslib/tests/test_query_builder.py

In [70]:
from pathlib import Path

tp = Path("/mnt/e/env/ts/tslib/tests/test_query_builder.py")
tp.parent.mkdir(parents=True, exist_ok=True)

tcode = r'''\
import os
import uuid
from datetime import date
from pathlib import Path

import pytest
from sqlalchemy import text

from nf_app.config import NFConfig
from nf_app.db import DbManager
from nf_app.query_builder import build_join_sql, normalize_query_spec

SCHEMA = "neuralforecast"


def _has_db_creds() -> bool:
    return bool(os.getenv("PGPASSWORD") or os.getenv("MODEL_DB_PASSWORD") or os.getenv("DATASET_DB_PASSWORD"))


@pytest.mark.integration
def test_query_builder_stable_and_collision_safe(monkeypatch):
    """
    ID 021 Done:
    - JOIN SQLが毎回同一（安定生成）
    - 列名衝突が回避される（prefix or alias__col）
    """
    monkeypatch.setenv("DATASET_DB_URL", "postgresql+psycopg://postgres@127.0.0.1:5432/dataset")
    monkeypatch.setenv("MODEL_DB_URL",   "postgresql+psycopg://postgres@127.0.0.1:5432/model")
    monkeypatch.setenv("NF_STRICT_VALIDATION", "true")
    monkeypatch.setenv("NF_ARTIFACT_ROOT", "/tmp/tslib_artifacts_for_test")
    monkeypatch.setenv("NF_RUN_MODE", "DRY_RUN")

    if not _has_db_creds():
        pytest.skip("DB credentials not provided. Set PGPASSWORD / MODEL_DB_PASSWORD / DATASET_DB_PASSWORD.")

    cfg = NFConfig.from_env(project_root=Path("/mnt/e/env/ts/tslib"))
    dbm = DbManager(cfg=cfg, echo_sql=False)
    eng = dbm.dataset_engine()

    base = f"pytest_base_{uuid.uuid4().hex[:8]}"
    hist = f"pytest_hist_{uuid.uuid4().hex[:8]}"
    stat = f"pytest_stat_{uuid.uuid4().hex[:8]}"

    with eng.begin() as conn:
        conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {SCHEMA};"))

        # base（核: unique_id/ds/y + 追加: shared）
        conn.execute(text(f"""
            CREATE TABLE {SCHEMA}.{base}(
                unique_id TEXT NOT NULL,
                ds DATE NOT NULL,
                y DOUBLE PRECISION NULL,
                shared INTEGER NULL
            );
        """))

        # hist（衝突: shared を持つ）
        conn.execute(text(f"""
            CREATE TABLE {SCHEMA}.{hist}(
                unique_id TEXT NOT NULL,
                ds DATE NOT NULL,
                shared INTEGER NULL,
                h1 INTEGER NULL
            );
        """))

        # stat（衝突: shared を持つ）
        conn.execute(text(f"""
            CREATE TABLE {SCHEMA}.{stat}(
                unique_id TEXT NOT NULL,
                ds DATE NOT NULL,
                shared INTEGER NULL,
                s1 INTEGER NULL
            );
        """))

        # data
        conn.execute(text(f"INSERT INTO {SCHEMA}.{base}(unique_id, ds, y, shared) VALUES ('u1', '2020-01-01', 10.0, 7);"))
        conn.execute(text(f"INSERT INTO {SCHEMA}.{hist}(unique_id, ds, shared, h1) VALUES ('u1', '2020-01-01', 70, 1);"))
        conn.execute(text(f"INSERT INTO {SCHEMA}.{stat}(unique_id, ds, shared, s1) VALUES ('u1', '2020-01-01', 700, 2);"))

    spec = {
        "rule_name": "pytest_join_rule",
        "base_schema": SCHEMA,
        "base_table": base,
        "base_alias": "b",
        # base include empty -> core+extrasを自動（実装では "*" 相当）
        "joins": [
            {
                "schema": SCHEMA,
                "table": hist,
                "alias": "h",
                "join_type": "left",
                "on": [{"left": "unique_id", "right": "unique_id"}, {"left": "ds", "right": "ds"}],
                "prefix": "hist_",
                "include": ["*"],
                "exclude": [],
            },
            {
                "schema": SCHEMA,
                "table": stat,
                "alias": "s",
                "join_type": "left",
                "on": [{"left": "unique_id", "right": "unique_id"}, {"left": "ds", "right": "ds"}],
                "prefix": "stat_",
                "include": ["*"],
                "exclude": [],
            },
        ],
    }

    qs = normalize_query_spec(spec)

    sql1 = build_join_sql(eng, query_spec=qs)
    sql2 = build_join_sql(eng, query_spec=qs)
    assert sql1 == sql2  # 安定生成

    # 核は固定で含まれる
    assert "b.unique_id AS unique_id" in sql1
    assert "b.ds AS ds" in sql1
    assert "b.y AS y" in sql1

    # 衝突(shared)は prefixで回避される（JOIN側は hist_shared / stat_shared）
    assert "h.shared AS hist_shared" in sql1
    assert "s.shared AS stat_shared" in sql1

    # 実行して列が期待どおり取れる（Smoke）
    with eng.connect() as conn:
        row = conn.execute(text(sql1)).mappings().first()
        assert row is not None
        assert row["unique_id"] == "u1"
        assert row["ds"].isoformat() == "2020-01-01"
        assert float(row["y"]) == 10.0
        assert int(row["hist_shared"]) == 70
        assert int(row["stat_shared"]) == 700
        assert int(row["hist_h1"]) == 1
        assert int(row["stat_s1"]) == 2

    # cleanup
    with eng.begin() as conn:
        conn.execute(text(f"DROP TABLE {SCHEMA}.{stat};"))
        conn.execute(text(f"DROP TABLE {SCHEMA}.{hist};"))
        conn.execute(text(f"DROP TABLE {SCHEMA}.{base};"))
'''
tp.write_text(tcode, encoding="utf-8")
print("Wrote:", tp)


Wrote: /mnt/e/env/ts/tslib/tests/test_query_builder.py


## codecell（pytest実行）

In [73]:
%cd /mnt/e/env/ts/tslib
!pytest -q tests/test_query_builder.py


/mnt/e/env/ts/tslib
[32m.[0m[32m                                                                        [100%][0m
[32m[32m[1m1 passed[0m[32m in 0.22s[0m[0m


## ID 022 dataset結合結果の取得（pandas read_sql）

目的:
- 生成済みJOIN SQL（query_builderの出力）を pandas.read_sql で DataFrame 化して返す
- 列の増減に強い（列を固定しない）
- 最低限 required_cols = ["unique_id","ds","y"] を満たすことを検証

設計:
- dataset_loader は「SQL→DataFrame」だけ担当（JOIN生成は query_builder 側）
- ds は datetime に正規化（Date/str混在を吸収）
- 必須列が無ければ DataError（NFError）で落とす


## codecell（実装）/mnt/e/env/ts/tslib/src/nf_app/dataset_loader.py

In [None]:
from pathlib import Path

p = Path("/mnt/e/env/ts/tslib/src/nf_app/dataset_loader.py")
p.parent.mkdir(parents=True, exist_ok=True)

code = r'''\
from __future__ import annotations

from typing import Any, Dict, Iterable, Mapping, Optional, Sequence

import pandas as pd
from sqlalchemy import text
from sqlalchemy.engine import Engine
from sqlalchemy.exc import SQLAlchemyError

from nf_app.errors import db_error, data_error


def load_dataframe(
    engine: Engine,
    *,
    sql: str,
    params: Optional[Mapping[str, Any]] = None,
    required_cols: Sequence[str] = ("unique_id", "ds", "y"),
    parse_dates: Sequence[str] = ("ds",),
) -> pd.DataFrame:
    """
    SQLをpandas.read_sqlでDataFrame化して返す（列増減に強い）

    - required_cols が存在しない場合は data_error を投げる
    - ds は datetime に正規化（date/str混在を吸収）
    """
    try:
        with engine.connect() as conn:
            df = pd.read_sql_query(
                text(sql),
                con=conn,
                params=dict(params or {}),
                parse_dates=list(parse_dates) if parse_dates else None,
            )
    except SQLAlchemyError as e:
        raise db_error(
            "DB_READ_SQL_FAILED",
            "Failed to read SQL into DataFrame.",
            context={"sql_head": sql[:200]},
            cause=e,
        )
    except Exception as e:
        raise data_error(
            "DATA_READ_SQL_FAILED",
            "Failed to read SQL into DataFrame.",
            context={"sql_head": sql[:200]},
            cause=e,
        )

    # 必須列チェック（列増減は許容、必須だけ要求）
    missing = [c for c in required_cols if c not in df.columns]
    if missing:
        raise data_error(
            "DATA_MISSING_REQUIRED_COLUMNS",
            f"Missing required columns: {missing}",
            context={"missing": missing, "columns": list(df.columns)},
        )

    # ds 正規化（parse_datesで拾えない型もあるため最後に保険）
    if "ds" in df.columns:
        df["ds"] = pd.to_datetime(df["ds"], errors="coerce")
        bad = int(df["ds"].isna().sum())
        if bad > 0:
            raise data_error(
                "DATA_INVALID_DS",
                "Some ds values could not be parsed as datetime.",
                context={"bad_rows": bad},
            )

    return df
'''
p.write_text(code, encoding="utf-8")
print("Wrote:", p)


## codecell（pytest）/mnt/e/env/ts/tslib/tests/test_dataset_loader_unit.py

In [74]:
from pathlib import Path

tp = Path("/mnt/e/env/ts/tslib/tests/test_dataset_loader_unit.py")
tp.parent.mkdir(parents=True, exist_ok=True)

tcode = r'''\
import pandas as pd
import pytest
from sqlalchemy import create_engine, text

from nf_app.dataset_loader import load_dataframe
from nf_app.errors import NFError


def test_dataset_loader_returns_dataframe_with_required_cols_and_extra_cols():
    # in-memory SQLite で「列増減に壊れない」を検証（DB資格情報不要）
    eng = create_engine("sqlite+pysqlite:///:memory:")

    with eng.begin() as conn:
        conn.execute(text("""
            CREATE TABLE joined(
                unique_id TEXT NOT NULL,
                ds TEXT NOT NULL,
                y REAL NULL,
                hist_x INTEGER NULL
            );
        """))
        conn.execute(
            text("INSERT INTO joined(unique_id, ds, y, hist_x) VALUES (:u, :ds, :y, :hx)"),
            {"u": "A", "ds": "2020-01-02", "y": 123.0, "hx": 7},
        )

    df = load_dataframe(
        eng,
        sql="SELECT unique_id, ds, y, hist_x FROM joined",
        required_cols=("unique_id", "ds", "y"),
    )

    assert isinstance(df, pd.DataFrame)
    assert set(["unique_id", "ds", "y"]).issubset(df.columns)
    # 追加列も落とさない（列増減に強い）
    assert "hist_x" in df.columns
    # ds が datetime に正規化される
    assert pd.api.types.is_datetime64_any_dtype(df["ds"])


def test_dataset_loader_raises_when_required_missing():
    eng = create_engine("sqlite+pysqlite:///:memory:")

    with eng.begin() as conn:
        conn.execute(text("""
            CREATE TABLE joined(
                unique_id TEXT NOT NULL,
                ds TEXT NOT NULL
            );
        """))
        conn.execute(
            text("INSERT INTO joined(unique_id, ds) VALUES (:u, :ds)"),
            {"u": "A", "ds": "2020-01-02"},
        )

    with pytest.raises(NFError):
        # y が無いので落ちる
        load_dataframe(eng, sql="SELECT unique_id, ds FROM joined", required_cols=("unique_id", "ds", "y"))
'''
tp.write_text(tcode, encoding="utf-8")
print("Wrote:", tp)


Wrote: /mnt/e/env/ts/tslib/tests/test_dataset_loader_unit.py


## codecell（pytest実行）

In [75]:
%cd /mnt/e/env/ts/tslib
!pytest -q tests/test_dataset_loader_unit.py


/mnt/e/env/ts/tslib

[31m[1m______________ ERROR collecting tests/test_dataset_loader_unit.py ______________[0m
[31mImportError while importing test module '/mnt/e/env/ts/tslib/tests/test_dataset_loader_unit.py'.
Hint: make sure your test modules/packages have valid Python names.
Traceback:
[1m[31m/home/az/miniconda3/envs/ts/lib/python3.11/importlib/__init__.py[0m:126: in import_module
    [0m[94mreturn[39;49;00m _bootstrap._gcd_import(name[level:], package, level)[90m[39;49;00m
[1m[31mtests/test_dataset_loader_unit.py[0m:6: in <module>
    [0m[94mfrom[39;49;00m[90m [39;49;00m[04m[96mnf_app[39;49;00m[04m[96m.[39;49;00m[04m[96mdataset_loader[39;49;00m[90m [39;49;00m[94mimport[39;49;00m load_dataframe[90m[39;49;00m
[1m[31mE   ModuleNotFoundError: No module named 'nf_app.dataset_loader'[0m[0m
[31mERROR[0m tests/test_dataset_loader_unit.py
!!!!!!!!!!!!!!!!!!!! Interrupted: 1 error during collection !!!!!!!!!!!!!!!!!!!!
[31m[31m[1m1 error[0m[31m

## ✅ codecell 1：まず存在確認（ここで9割決まる）

In [76]:
from pathlib import Path
import pkgutil
import nf_app

print("nf_app path:", list(getattr(nf_app, "__path__", [])))

p = Path("/mnt/e/env/ts/tslib/src/nf_app/dataset_loader.py")
print("dataset_loader.py exists?:", p.exists(), "->", p)

print("modules under nf_app:", [m.name for m in pkgutil.iter_modules(nf_app.__path__)])


nf_app path: ['/mnt/e/env/ts/tslib/src/nf_app']
dataset_loader.py exists?: False -> /mnt/e/env/ts/tslib/src/nf_app/dataset_loader.py
modules under nf_app: ['config', 'db', 'errors', 'introspect', 'logging', 'migrations', 'query_builder', 'run_plans']


## ✅ codecell 2：dataset_loader.py を フルパスで確実に作成

In [77]:
from pathlib import Path

p = Path("/mnt/e/env/ts/tslib/src/nf_app/dataset_loader.py")
p.parent.mkdir(parents=True, exist_ok=True)

code = r'''\
from __future__ import annotations

from typing import Any, Mapping, Optional, Sequence

import pandas as pd
from sqlalchemy import text
from sqlalchemy.engine import Engine
from sqlalchemy.exc import SQLAlchemyError

from nf_app.errors import db_error, data_error


def load_dataframe(
    engine: Engine,
    *,
    sql: str,
    params: Optional[Mapping[str, Any]] = None,
    required_cols: Sequence[str] = ("unique_id", "ds", "y"),
    parse_dates: Sequence[str] = ("ds",),
) -> pd.DataFrame:
    """
    SQLをpandas.read_sqlでDataFrame化して返す（列増減に強い）
    - required_cols が存在しない場合は data_error を投げる
    - ds は datetime に正規化（date/str混在を吸収）
    """
    try:
        with engine.connect() as conn:
            df = pd.read_sql_query(
                text(sql),
                con=conn,
                params=dict(params or {}),
                parse_dates=list(parse_dates) if parse_dates else None,
            )
    except SQLAlchemyError as e:
        raise db_error(
            "DB_READ_SQL_FAILED",
            "Failed to read SQL into DataFrame.",
            context={"sql_head": sql[:200]},
            cause=e,
        )
    except Exception as e:
        raise data_error(
            "DATA_READ_SQL_FAILED",
            "Failed to read SQL into DataFrame.",
            context={"sql_head": sql[:200]},
            cause=e,
        )

    missing = [c for c in required_cols if c not in df.columns]
    if missing:
        raise data_error(
            "DATA_MISSING_REQUIRED_COLUMNS",
            f"Missing required columns: {missing}",
            context={"missing": missing, "columns": list(df.columns)},
        )

    if "ds" in df.columns:
        df["ds"] = pd.to_datetime(df["ds"], errors="coerce")
        bad = int(df["ds"].isna().sum())
        if bad > 0:
            raise data_error(
                "DATA_INVALID_DS",
                "Some ds values could not be parsed as datetime.",
                context={"bad_rows": bad},
            )

    return df
'''
p.write_text(code, encoding="utf-8")
print("Wrote:", p, "size:", p.stat().st_size)
print(p.read_text(encoding="utf-8")[:400])


Wrote: /mnt/e/env/ts/tslib/src/nf_app/dataset_loader.py size: 2066
\
from __future__ import annotations

from typing import Any, Mapping, Optional, Sequence

import pandas as pd
from sqlalchemy import text
from sqlalchemy.engine import Engine
from sqlalchemy.exc import SQLAlchemyError

from nf_app.errors import db_error, data_error


def load_dataframe(
    engine: Engine,
    *,
    sql: str,
    params: Optional[Mapping[str, Any]] = None,
    required_cols: Seq


## ✅ codecell 3：pytest 再実行

In [79]:
%cd /mnt/e/env/ts/tslib
!pytest -q tests/test_dataset_loader_unit.py


/mnt/e/env/ts/tslib
[32m.[0m[32m.[0m[32m                                                                       [100%][0m
[32m[32m[1m2 passed[0m[32m in 0.10s[0m[0m


## ID 023 prefix_rules取得（DB→メモリ辞書）

目的:
- model DB（schema: neuralforecast）の feature_prefix_rules を取得し、メモリ上のルール集合へ変換する
- ルール変更が即反映される（=キャッシュせず毎回DBを読む）

出力（例）:
- rules: List[FeaturePrefixRule]（priority昇順・prefix長降順で安定）
- dict: { "hist_": "hist", "futr_": "futr", "stat_": "stat" } など（下流の自動分類で使用）


## ✅ codecell（実装）/mnt/e/env/ts/tslib/src/nf_app/feature_rules.py

In [80]:
from pathlib import Path

p = Path("/mnt/e/env/ts/tslib/src/nf_app/feature_rules.py")
p.parent.mkdir(parents=True, exist_ok=True)

code = r'''\
from __future__ import annotations

from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Sequence, Tuple

from sqlalchemy import inspect, text
from sqlalchemy.engine import Engine
from sqlalchemy.exc import SQLAlchemyError

from nf_app.errors import db_error

DEFAULT_SCHEMA = "neuralforecast"
DEFAULT_TABLE = "feature_prefix_rules"


@dataclass(frozen=True)
class FeaturePrefixRule:
    """
    prefix: 例 'hist_' / 'futr_' / 'stat_'
    exog_kind: NeuralForecastの分類用（例 'hist' / 'futr' / 'stat'）
    enabled: 有効フラグ
    priority: 小さいほど優先
    """
    prefix: str
    exog_kind: str
    enabled: bool = True
    priority: int = 100

    def to_dict(self) -> Dict[str, Any]:
        return {
            "prefix": self.prefix,
            "exog_kind": self.exog_kind,
            "enabled": self.enabled,
            "priority": self.priority,
        }


def _full_table(schema: Optional[str], table: str) -> str:
    return f"{schema}.{table}" if schema else table


def _detect_columns(engine: Engine, *, schema: Optional[str], table: str) -> List[str]:
    """
    SQLAlchemy Inspector で列名を取得（DB方言に依存しにくい）
    """
    insp = inspect(engine)
    cols = insp.get_columns(table, schema=schema)
    return [c["name"] for c in cols]


def load_prefix_rules(
    engine: Engine,
    *,
    schema: Optional[str] = DEFAULT_SCHEMA,
    table: str = DEFAULT_TABLE,
    enabled_only: bool = True,
) -> List[FeaturePrefixRule]:
    """
    feature_prefix_rules をDBから取得して返す（※キャッシュしない＝即反映）
    - ORDER: priority ASC, prefix length DESC, prefix ASC で安定
    - 列名が多少変わっても検出して読む（拡張耐性）
    """
    try:
        cols = _detect_columns(engine, schema=schema, table=table)
    except Exception as e:
        raise db_error(
            "DB_PREFIX_RULES_INTROSPECT_FAILED",
            "Failed to introspect feature_prefix_rules columns.",
            context={"schema": schema, "table": table},
            cause=e,
        )

    # できるだけ“揺れ”に耐える（移行や拡張を想定）
    prefix_col_candidates = ["prefix", "feature_prefix", "col_prefix"]
    kind_col_candidates = ["exog_kind", "exog_type", "kind", "feature_type", "group_name"]
    enabled_col_candidates = ["enabled", "is_enabled"]
    priority_col_candidates = ["priority", "order", "rank"]

    def pick(cands: Sequence[str], *, required: bool) -> Optional[str]:
        for c in cands:
            if c in cols:
                return c
        if required:
            raise db_error(
                "DB_PREFIX_RULES_SCHEMA_INVALID",
                "feature_prefix_rules schema is missing required columns.",
                context={"missing_candidates": list(cands), "found_columns": cols},
            )
        return None

    prefix_col = pick(prefix_col_candidates, required=True)
    kind_col = pick(kind_col_candidates, required=True)
    enabled_col = pick(enabled_col_candidates, required=False)
    priority_col = pick(priority_col_candidates, required=False)

    full = _full_table(schema, table)

    where = ""
    if enabled_only and enabled_col:
        where = f"WHERE {enabled_col} = TRUE"

    # SQLite等で enabled_col がない場合も許容（enabled_only=trueでも“全取得”になる）
    order_by = []
    if priority_col:
        order_by.append(f"{priority_col} ASC")
    order_by.append(f"LENGTH({prefix_col}) DESC")
    order_by.append(f"{prefix_col} ASC")
    order_sql = "ORDER BY " + ", ".join(order_by)

    sql = f"""
        SELECT
            {prefix_col} AS prefix,
            {kind_col}   AS exog_kind
            {"," + enabled_col + " AS enabled" if enabled_col else ""}
            {"," + priority_col + " AS priority" if priority_col else ""}
        FROM {full}
        {where}
        {order_sql};
    """

    try:
        with engine.connect() as conn:
            rows = conn.execute(text(sql)).mappings().all()
    except SQLAlchemyError as e:
        raise db_error(
            "DB_PREFIX_RULES_SELECT_FAILED",
            "Failed to select feature_prefix_rules.",
            context={"schema": schema, "table": table},
            cause=e,
        )

    out: List[FeaturePrefixRule] = []
    for r in rows:
        out.append(
            FeaturePrefixRule(
                prefix=str(r["prefix"]),
                exog_kind=str(r["exog_kind"]),
                enabled=bool(r["enabled"]) if "enabled" in r and r["enabled"] is not None else True,
                priority=int(r["priority"]) if "priority" in r and r["priority"] is not None else 100,
            )
        )
    return out


def prefix_rules_dict(
    engine: Engine,
    *,
    schema: Optional[str] = DEFAULT_SCHEMA,
    table: str = DEFAULT_TABLE,
    enabled_only: bool = True,
) -> Dict[str, str]:
    """
    使いやすい辞書形（prefix -> exog_kind）
    ※同一prefixが複数あり得る場合は、load_prefix_rulesの先頭（優先）を採用
    """
    rules = load_prefix_rules(engine, schema=schema, table=table, enabled_only=enabled_only)
    d: Dict[str, str] = {}
    for r in rules:
        if r.prefix not in d:
            d[r.prefix] = r.exog_kind
    return d
'''
p.write_text(code, encoding="utf-8")
print("Wrote:", p)


Wrote: /mnt/e/env/ts/tslib/src/nf_app/feature_rules.py


## ✅ codecell（pytest）/mnt/e/env/ts/tslib/tests/test_feature_rules.py

DB資格情報不要で回るように、SQLite(in-memory)で「UPDATE→再取得→即反映」を検証します（=キャッシュしてたら落ちる）。

In [81]:
from pathlib import Path

tp = Path("/mnt/e/env/ts/tslib/tests/test_feature_rules.py")
tp.parent.mkdir(parents=True, exist_ok=True)

tcode = r'''\
import pytest
from sqlalchemy import create_engine, text

from nf_app.feature_rules import load_prefix_rules, prefix_rules_dict


def test_prefix_rules_reflects_updates_immediately_no_cache():
    eng = create_engine("sqlite+pysqlite:///:memory:")

    # schemaはSQLiteだと無いので schema=None で呼ぶ（本番は neuralforecast）
    with eng.begin() as conn:
        conn.execute(text("""
            CREATE TABLE feature_prefix_rules(
                prefix TEXT PRIMARY KEY,
                exog_kind TEXT NOT NULL,
                enabled BOOLEAN NOT NULL DEFAULT 1,
                priority INTEGER NOT NULL DEFAULT 100
            );
        """))
        conn.execute(
            text("INSERT INTO feature_prefix_rules(prefix, exog_kind, enabled, priority) VALUES (:p, :k, :e, :pri)"),
            {"p": "hist_", "k": "hist", "e": True, "pri": 10},
        )

    d1 = prefix_rules_dict(eng, schema=None, table="feature_prefix_rules")
    assert d1["hist_"] == "hist"

    # ルール変更（UPDATE）
    with eng.begin() as conn:
        conn.execute(
            text("UPDATE feature_prefix_rules SET exog_kind=:k WHERE prefix=:p"),
            {"p": "hist_", "k": "futr"},
        )

    # 再取得で即反映（キャッシュしてたらここが失敗する）
    d2 = prefix_rules_dict(eng, schema=None, table="feature_prefix_rules")
    assert d2["hist_"] == "futr"


def test_prefix_rules_order_is_stable_priority_then_prefixlen():
    eng = create_engine("sqlite+pysqlite:///:memory:")

    with eng.begin() as conn:
        conn.execute(text("""
            CREATE TABLE feature_prefix_rules(
                prefix TEXT PRIMARY KEY,
                exog_kind TEXT NOT NULL,
                enabled BOOLEAN NOT NULL DEFAULT 1,
                priority INTEGER NOT NULL DEFAULT 100
            );
        """))
        conn.execute(text("INSERT INTO feature_prefix_rules(prefix, exog_kind, enabled, priority) VALUES ('h_', 'hist', 1, 10)"))
        conn.execute(text("INSERT INTO feature_prefix_rules(prefix, exog_kind, enabled, priority) VALUES ('hist_', 'hist', 1, 10)"))
        conn.execute(text("INSERT INTO feature_prefix_rules(prefix, exog_kind, enabled, priority) VALUES ('stat_', 'stat', 1, 20)"))

    rules = load_prefix_rules(eng, schema=None, table="feature_prefix_rules")
    # 同priorityなら prefix長が長い方が先（衝突回避の基本）
    assert rules[0].prefix == "hist_"
    assert rules[1].prefix == "h_"
'''
tp.write_text(tcode, encoding="utf-8")
print("Wrote:", tp)


Wrote: /mnt/e/env/ts/tslib/tests/test_feature_rules.py


## ✅ codecell（pytest実行）

In [82]:
%cd /mnt/e/env/ts/tslib
!pytest -q tests/test_feature_rules.py


/mnt/e/env/ts/tslib
[32m.[0m[32m.[0m[32m                                                                       [100%][0m
[32m[32m[1m2 passed[0m[32m in 0.09s[0m[0m


## ID 024 列を役割に分類（hist/futr/stat/meta/ignore）

入力:
- DataFrame列一覧（例: ds, unique_id, y, hist_*, futr_*, stat_* ...）
- prefix_rules（DBから取得した prefix -> exog_kind）

出力（roles）:
- meta_cols: ['unique_id','ds','y']（必須）
- hist_exog_cols / futr_exog_cols / stat_exog_cols（外生変数）
- ignore_cols: 学習に使わない（id, exec_ts など）
- unknown_cols: どれにも当てはまらない列（運用で気づけるように残す）

設計:
- prefixは「長い一致を優先」（衝突回避）
- ルールや列増減に強い（列が増えても分類するだけで落ちない）


## ✅ codecell（実装）/mnt/e/env/ts/tslib/src/nf_app/feature_router.py

In [83]:
from pathlib import Path

p = Path("/mnt/e/env/ts/tslib/src/nf_app/feature_router.py")
p.parent.mkdir(parents=True, exist_ok=True)

code = r'''\
from __future__ import annotations

from dataclasses import dataclass
from typing import Dict, Iterable, List, Optional, Sequence, Set, Tuple

META_DEFAULT = ("unique_id", "ds", "y")

# 運用上よくある「学習に不要」列。増えても害はない（ignoreに落ちるだけ）。
IGNORE_DEFAULT_PREFIXES = (
    "exec_", "updated_", "created_", "proc_", "run_", "job_",
)
IGNORE_DEFAULT_COLS = (
    "id",
)


@dataclass(frozen=True)
class RoutedFeatures:
    meta_cols: Tuple[str, ...]
    hist_exog_cols: Tuple[str, ...]
    futr_exog_cols: Tuple[str, ...]
    stat_exog_cols: Tuple[str, ...]
    ignore_cols: Tuple[str, ...]
    unknown_cols: Tuple[str, ...]


def _sorted_prefixes(prefix_to_kind: Dict[str, str]) -> List[str]:
    # 長いprefixを先に（衝突回避）
    return sorted(prefix_to_kind.keys(), key=lambda s: (-len(s), s))


def route_columns(
    columns: Sequence[str],
    *,
    prefix_to_kind: Dict[str, str],
    meta_cols: Sequence[str] = META_DEFAULT,
    ignore_prefixes: Sequence[str] = IGNORE_DEFAULT_PREFIXES,
    ignore_cols: Sequence[str] = IGNORE_DEFAULT_COLS,
) -> RoutedFeatures:
    """
    列名を役割に分類する（列増減で壊れない）
    - prefix_to_kind: {"hist_":"hist","futr_":"futr","stat_":"stat", ...}
    - meta_cols: 必須（unique_id/ds/y）
    - ignore: 運用列（id/exec_ts等）を学習対象から除外
    """
    cols = list(columns)
    meta_set: Set[str] = set(meta_cols)

    # 必須メタ列チェック（ここは落とす：学習以前の前提が崩れるため）
    missing_meta = [c for c in meta_cols if c not in cols]
    if missing_meta:
        raise ValueError(f"Missing required meta columns: {missing_meta}")

    prefixes = _sorted_prefixes(prefix_to_kind)

    hist: List[str] = []
    futr: List[str] = []
    stat: List[str] = []
    ignore: List[str] = []
    unknown: List[str] = []

    for c in cols:
        if c in meta_set:
            continue

        # ignore（完全一致）
        if c in ignore_cols:
            ignore.append(c)
            continue

        # ignore（prefix一致）
        if any(c.startswith(pfx) for pfx in ignore_prefixes):
            ignore.append(c)
            continue

        # prefixルール（長い一致を優先）
        matched = False
        for pfx in prefixes:
            if c.startswith(pfx):
                kind = prefix_to_kind[pfx]
                if kind == "hist":
                    hist.append(c)
                elif kind == "futr":
                    futr.append(c)
                elif kind == "stat":
                    stat.append(c)
                else:
                    # 将来のkind拡張：未知kindはunknown扱いにして監視可能に
                    unknown.append(c)
                matched = True
                break

        if matched:
            continue

        # どれにも当てはまらない列
        unknown.append(c)

    # 安定化（順序は入力列順を尊重しつつ、テストしやすいようtuple化）
    return RoutedFeatures(
        meta_cols=tuple(meta_cols),
        hist_exog_cols=tuple(hist),
        futr_exog_cols=tuple(futr),
        stat_exog_cols=tuple(stat),
        ignore_cols=tuple(ignore),
        unknown_cols=tuple(unknown),
    )
'''
p.write_text(code, encoding="utf-8")
print("Wrote:", p)


Wrote: /mnt/e/env/ts/tslib/src/nf_app/feature_router.py


## ✅ codecell（pytest）/mnt/e/env/ts/tslib/tests/test_feature_router.py

In [84]:
from pathlib import Path

tp = Path("/mnt/e/env/ts/tslib/tests/test_feature_router.py")
tp.parent.mkdir(parents=True, exist_ok=True)

tcode = r'''\
import pytest
from nf_app.feature_router import route_columns


def test_feature_router_classifies_prefixes_correctly():
    cols = [
        "unique_id", "ds", "y",
        "hist_pn1", "hist_pm1",
        "futr_year", "futr_month",
        "stat_mean_7",
        "id", "exec_ts",
        "other_col",
    ]
    prefix_to_kind = {"hist_": "hist", "futr_": "futr", "stat_": "stat"}

    routed = route_columns(cols, prefix_to_kind=prefix_to_kind)

    assert set(routed.hist_exog_cols) == {"hist_pn1", "hist_pm1"}
    assert set(routed.futr_exog_cols) == {"futr_year", "futr_month"}
    assert set(routed.stat_exog_cols) == {"stat_mean_7"}

    # ignore: id はデフォルトで ignore_cols、exec_ts は ignore_prefixes により ignore
    assert "id" in routed.ignore_cols
    assert "exec_ts" in routed.ignore_cols

    # unknown: ルールにない列は unknown
    assert "other_col" in routed.unknown_cols


def test_feature_router_prefers_longest_prefix_to_avoid_collisions():
    cols = ["unique_id", "ds", "y", "hist_x", "h_x"]
    # 衝突しやすいケース：h_ と hist_
    prefix_to_kind = {"h_": "hist", "hist_": "futr"}  # わざと違うkindにする
    routed = route_columns(cols, prefix_to_kind=prefix_to_kind)

    # hist_x は hist_ が優先（長いprefix）
    assert "hist_x" in routed.futr_exog_cols
    # h_x は h_ にしか一致しない
    assert "h_x" in routed.hist_exog_cols


def test_feature_router_requires_meta_columns():
    cols = ["unique_id", "ds", "hist_x"]
    prefix_to_kind = {"hist_": "hist"}
    with pytest.raises(ValueError):
        route_columns(cols, prefix_to_kind=prefix_to_kind)
'''
tp.write_text(tcode, encoding="utf-8")
print("Wrote:", tp)


Wrote: /mnt/e/env/ts/tslib/tests/test_feature_router.py


## ✅ codecell（pytest実行）

In [85]:
%cd /mnt/e/env/ts/tslib
!pytest -q tests/test_feature_router.py


/mnt/e/env/ts/tslib
[32m.[0m[32m.[0m[32m.[0m[32m                                                                      [100%][0m
[32m[32m[1m3 passed[0m[32m in 0.01s[0m[0m


## ID 025 `df`生成（unique_id/ds/y＋hist外生）

目的:
- join済みの巨大DataFrameから、NeuralForecast入力の df を生成する
- df = [unique_id, ds, y] + hist_exog_cols（feature_routerで分類されたhist_*のみ）
- dsはdatetimeへ正規化、（推奨）unique_idをstrへ正規化
- 安定運用のため、(unique_id, ds)でソート可能にする

注意:
- futr_/stat_ はここでは入れない（次工程で future/static として扱えるように分離）


## ✅ codecell（実装）/mnt/e/env/ts/tslib/src/nf_app/builders/df_builder.py

In [86]:
from pathlib import Path

# package dir
pkg = Path("/mnt/e/env/ts/tslib/src/nf_app/builders")
pkg.mkdir(parents=True, exist_ok=True)
(pkg / "__init__.py").write_text("", encoding="utf-8")

p = pkg / "df_builder.py"

code = r'''\
from __future__ import annotations

from typing import Iterable, List, Sequence

import pandas as pd

from nf_app.feature_router import RoutedFeatures


def build_training_df(
    joined_df: pd.DataFrame,
    *,
    routed: RoutedFeatures,
    sort: bool = True,
    coerce_unique_id_str: bool = True,
    coerce_ds_datetime: bool = True,
) -> pd.DataFrame:
    """
    NeuralForecast学習用 df を生成:
      df = meta_cols(unique_id/ds/y) + hist_exog_cols

    - joined_df: dataset結合結果（巨大でもOK）
    - routed: route_columns() の結果（hist/futr/stat/ignore/unknownなど）
    - sort: (unique_id, ds) でソート（再現性/学習安定性のため推奨）
    """
    if joined_df is None:
        raise ValueError("joined_df is None")

    # 必須メタ列
    meta_cols = list(routed.meta_cols)
    for c in meta_cols:
        if c not in joined_df.columns:
            raise ValueError(f"Missing required meta column in joined_df: {c}")

    # hist外生（存在しない列が混ざっていたら、それは上流の不整合なので落とす）
    hist_cols = list(routed.hist_exog_cols)
    missing_hist = [c for c in hist_cols if c not in joined_df.columns]
    if missing_hist:
        raise ValueError(f"Missing hist_exog columns in joined_df: {missing_hist}")

    cols = meta_cols + hist_cols

    df = joined_df.loc[:, cols].copy()

    # 型正規化
    if coerce_unique_id_str and "unique_id" in df.columns:
        df["unique_id"] = df["unique_id"].astype(str)

    if coerce_ds_datetime and "ds" in df.columns:
        # DATE/文字列/Timestampなど混在に耐える
        df["ds"] = pd.to_datetime(df["ds"], errors="raise")

    if sort:
        # NeuralForecast系は (unique_id, ds) ソートが基本的に安全
        df = df.sort_values(["unique_id", "ds"]).reset_index(drop=True)

    return df
'''
p.write_text(code, encoding="utf-8")
print("Wrote:", p)


Wrote: /mnt/e/env/ts/tslib/src/nf_app/builders/df_builder.py


## ✅ codecell（pytest）/mnt/e/env/ts/tslib/tests/test_df_builder.py

In [89]:
from pathlib import Path

tp = Path("/mnt/e/env/ts/tslib/tests/test_df_builder.py")

tcode = r'''\
import pandas as pd
import pytest

from nf_app.feature_router import route_columns
from nf_app.builders.df_builder import build_training_df


def test_df_builder_columns_are_spec():
    joined = pd.DataFrame(
        {
            "unique_id": [2, 1],
            "ds": ["2020-01-02", "2020-01-01"],
            "y": [10, 20],
            "hist_a": [0.1, 0.2],
            "hist_b": [1, 2],
            "futr_x": [9, 9],
            "stat_s": [7, 7],
            "id": [100, 101],
            "other": ["x", "y"],
        }
    )
    prefix_to_kind = {"hist_": "hist", "futr_": "futr", "stat_": "stat"}
    routed = route_columns(list(joined.columns), prefix_to_kind=prefix_to_kind)

    df = build_training_df(joined, routed=routed, sort=False)

    # dfは meta + hist だけ
    assert list(df.columns) == ["unique_id", "ds", "y", "hist_a", "hist_b"]

    # dsはdatetime化
    assert pd.api.types.is_datetime64_any_dtype(df["ds"])

    # futr/stat/other/idは混ざらない
    assert "futr_x" not in df.columns
    assert "stat_s" not in df.columns
    assert "other" not in df.columns
    assert "id" not in df.columns


def test_df_builder_sorts_by_unique_id_and_ds():
    joined = pd.DataFrame(
        {
            "unique_id": [2, 1, 1],
            "ds": ["2020-01-02", "2020-01-03", "2020-01-01"],
            "y": [10, 30, 20],
            "hist_a": [0.1, 0.3, 0.2],
        }
    )
    routed = route_columns(list(joined.columns), prefix_to_kind={"hist_": "hist"})

    df = build_training_df(joined, routed=routed, sort=True)

    # ソート順を確認
    assert df["unique_id"].tolist() == ["1", "1", "2"]
    assert df["ds"].dt.strftime("%Y-%m-%d").tolist() == ["2020-01-01", "2020-01-03", "2020-01-02"]


def test_router_raises_when_meta_missing():
    """
    meta列の欠落は、df_builderではなく feature_router（route_columns）が落とす設計。
    pipeline上の前提（unique_id/ds/y）を満たさない場合は早期に止める。
    """
    joined = pd.DataFrame({"unique_id": ["a"], "ds": ["2020-01-01"], "hist_a": [1]})
    with pytest.raises(ValueError):
        route_columns(list(joined.columns), prefix_to_kind={"hist_": "hist"})
'''
tp.write_text(tcode, encoding="utf-8")
print("Wrote:", tp)


Wrote: /mnt/e/env/ts/tslib/tests/test_df_builder.py


## ✅ codecell（pytest実行）

In [90]:
%cd /mnt/e/env/ts/tslib
!pytest -q tests/test_df_builder.py


/mnt/e/env/ts/tslib
[32m.[0m[32m.[0m[32m.[0m[32m                                                                      [100%][0m
[32m[32m[1m3 passed[0m[32m in 0.02s[0m[0m


### 3. Build futr_df（unique_id/ds＋futr外生）

- joined（JOIN済みDataFrame）から、各 series（unique_id）ごとに「学習終了日（yが最後に存在するds）」以降の行を future とみなし、
  予測ホライズン `h` 本ぶんの `futr_df` を作る。
- `futr_df` の列は `unique_id`, `ds`, `futr_` 接頭辞列のみ。
- 将来行が `h` 本未満なら ValueError（「予測ホライズン分を満たす」保証）。


## ✅ Codeセル 1（ディレクトリ作成）

In [91]:
!mkdir -p /mnt/e/env/ts/tslib/src/nf_app/builders
!mkdir -p /mnt/e/env/ts/tslib/tests


## ✅ Codeセル 2（実装：futr_builder.py）

In [92]:
%%writefile /mnt/e/env/ts/tslib/src/nf_app/builders/futr_builder.py
from __future__ import annotations

from dataclasses import dataclass
from typing import Iterable, List, Optional

import pandas as pd

from nf_app.feature_router import RoutedFeatures


def _ensure_datetime(df: pd.DataFrame, col: str = "ds") -> pd.DataFrame:
    out = df.copy()
    out[col] = pd.to_datetime(out[col], errors="raise")
    return out


def _last_train_ds(g: pd.DataFrame) -> pd.Timestamp:
    """
    学習終了日（train end）を推定:
    - y列があれば「yが非nullの最大ds」
    - y列がなければ「最大ds」（=全部学習扱い）
    """
    if "y" in g.columns:
        g2 = g[g["y"].notna()]
        if len(g2) > 0:
            return pd.to_datetime(g2["ds"]).max()
    return pd.to_datetime(g["ds"]).max()


def build_futr_df(
    joined: pd.DataFrame,
    routed: RoutedFeatures,
    *,
    horizon: int,
    require_full_horizon: bool = True,
) -> pd.DataFrame:
    """
    futr_df（future exogenous）を生成する。

    入力:
      - joined: JOIN済みDataFrame（最低 unique_id, ds は必須。通常 y も含む）
      - routed: route_columns の結果（routed.futr を利用）
      - horizon: 予測ホライズン（h）
    出力:
      - columns: ['unique_id', 'ds'] + futr_cols
      - rows: 各 unique_id ごとに「学習終了日以降」の先頭 horizon 行

    設計方針（拡張性）:
      - 列増減に強い: futr列は routed.futr を参照するだけ
      - テーブル増減に強い: JOIN結果の列セットが変わっても routed に従う
    """
    if horizon <= 0:
        raise ValueError("horizon must be positive")

    for c in ("unique_id", "ds"):
        if c not in joined.columns:
            raise ValueError(f"Missing required column: {c}")

    futr_cols: List[str] = list(getattr(routed, "futr", []))
    # futr_cols が空でも futr_df 自体は作れる（モデル側で futr_exog を使わないケース）
    base_cols = ["unique_id", "ds"]
    keep_cols = base_cols + futr_cols

    df = _ensure_datetime(joined, "ds")
    # 不要列があっても落ちないように、存在する列だけ抜く
    existing_keep_cols = [c for c in keep_cols if c in df.columns]
    df = df[existing_keep_cols].copy()

    out_parts: List[pd.DataFrame] = []
    for uid, g in df.groupby("unique_id", sort=False):
        g = g.sort_values("ds")
        # 学習終了日推定には joined 全体（y等）も見たいので、元joinedから取る
        g_full = joined[joined["unique_id"] == uid].copy()
        g_full = _ensure_datetime(g_full, "ds").sort_values("ds")
        last_ds = _last_train_ds(g_full)

        future = g[g["ds"] > last_ds]
        if require_full_horizon and len(future) < horizon:
            raise ValueError(
                f"Insufficient future rows for unique_id={uid}: "
                f"need horizon={horizon}, but got {len(future)}"
            )
        future = future.head(horizon)
        out_parts.append(future)

    if not out_parts:
        # unique_id が空など
        return pd.DataFrame(columns=keep_cols)

    out = pd.concat(out_parts, ignore_index=True)
    # 列順を安定化
    out = out[[c for c in keep_cols if c in out.columns]].copy()
    return out


Writing /mnt/e/env/ts/tslib/src/nf_app/builders/futr_builder.py


## ✅ Codeセル 3（pytest：test_futr_builder.py）

In [104]:
from pathlib import Path

p = Path("/mnt/e/env/ts/tslib/tests/test_futr_builder.py")
p.write_text(
"""\
import pandas as pd
import pytest

from nf_app.feature_router import route_columns
from nf_app.builders.futr_builder import build_futr_df


def test_build_futr_df_picks_horizon_rows_per_series():
    joined = pd.DataFrame(
        {
            "unique_id": ["a","a","a","b","b","b"],
            "ds": ["2020-01-01","2020-01-02","2020-01-03","2020-01-01","2020-01-02","2020-01-03"],
            "y": [1, 2, None, 10, 20, None],  # 3日目が未来（yなし）
            "hist_x": [0.1,0.2,0.3, 1.1,1.2,1.3],
            "futr_x": [100,101,102, 200,201,202],
            "stat_z": [9,9,9, 8,8,8],
        }
    )

    routed = route_columns(
        list(joined.columns),
        prefix_to_kind={"hist_":"hist","futr_":"futr","stat_":"stat"},
    )

    futr_df = build_futr_df(joined, routed, horizon=1)

    # 列: unique_id/ds + futr_* のみ（stat_ は含めない）
    assert list(futr_df.columns) == ["unique_id", "ds", "futr_x"]

    # 行: seriesごとに horizon 行（ここでは未来の1行＝2020-01-03が選ばれる）
    assert len(futr_df) == 2
    # ds は datetime に変換されている想定
    ds_str = futr_df["ds"].dt.strftime("%Y-%m-%d").tolist()
    assert ds_str == ["2020-01-03", "2020-01-03"]

    # futr_x が残る
    assert futr_df["futr_x"].tolist() == [102, 202]


def test_build_futr_df_raises_when_not_enough_future_rows():
    joined = pd.DataFrame(
        {
            "unique_id": ["a","a","a"],
            "ds": ["2020-01-01","2020-01-02","2020-01-03"],
            "y": [1, 2, None],     # 未来は1行しかない
            "futr_x": [100,101,102],
        }
    )
    routed = route_columns(list(joined.columns), prefix_to_kind={"futr_":"futr"})
    with pytest.raises(ValueError):
        build_futr_df(joined, routed, horizon=2)


def test_build_futr_df_rejects_non_positive_horizon():
    joined = pd.DataFrame({"unique_id":["a"], "ds":["2020-01-01"], "y":[1], "futr_x":[100]})
    routed = route_columns(list(joined.columns), prefix_to_kind={"futr_":"futr"})
    with pytest.raises(ValueError):
        build_futr_df(joined, routed, horizon=0)
""",
    encoding="utf-8",
)
print("Wrote:", p)
print(p.read_text(encoding="utf-8")[:400])


Wrote: /mnt/e/env/ts/tslib/tests/test_futr_builder.py
import pandas as pd
import pytest

from nf_app.feature_router import route_columns
from nf_app.builders.futr_builder import build_futr_df


def test_build_futr_df_picks_horizon_rows_per_series():
    joined = pd.DataFrame(
        {
            "unique_id": ["a","a","a","b","b","b"],
            "ds": ["2020-01-01","2020-01-02","2020-01-03","2020-01-01","2020-01-02","2020-01-03"],
            "y":


## ✅ Codeセル 4（pytest実行）

In [105]:
%cd /mnt/e/env/ts/tslib
!pytest -q --collect-only -vv tests/test_futr_builder.py


/mnt/e/env/ts/tslib
platform linux -- Python 3.11.14, pytest-8.3.5, pluggy-1.6.0 -- /home/az/miniconda3/envs/ts/bin/python3.11
cachedir: .pytest_cache
metadata: {'Python': '3.11.14', 'Platform': 'Linux-6.6.87.2-microsoft-standard-WSL2-x86_64-with-glibc2.39', 'Packages': {'pytest': '8.3.5', 'pluggy': '1.6.0'}, 'Plugins': {'typeguard': '2.13.3', 'metadata': '3.1.1', 'anyio': '4.12.1', 'dash': '3.3.0', 'fugue': '0.9.4', 'env': '1.1.5', 'html': '4.1.1', 'jaxtyping': '0.2.29', 'hydra-core': '1.3.0'}}
Fugue tests will be initialized with options:
rootdir: /mnt/e/env/ts/tslib
configfile: pytest.ini
plugins: typeguard-2.13.3, metadata-3.1.1, anyio-4.12.1, dash-3.3.0, fugue-0.9.4, env-1.1.5, html-4.1.1, jaxtyping-0.2.29, hydra-core-1.3.0
collected 3 items                                                              [0m

<Dir tslib>
  <Dir tests>
    <Module test_futr_builder.py>
      <Function test_build_futr_df_picks_horizon_rows_per_series>
      <Function test_build_futr_df_raises_when_not

In [106]:
!pytest -q tests/test_futr_builder.py


[32m.[0m[32m.[0m[32m.[0m[32m                                                                      [100%][0m
[32m[32m[1m3 passed[0m[32m in 0.03s[0m[0m


## 3. Build static_df（ID 027）

`joined`（dataset結合結果）から `stat_` 接頭辞の列（= static features）を抽出し、
`unique_id` 単位で 1行に集約した `static_df` を生成する。

仕様:
- 列: unique_id + stat_* のみ
- 行: 1 unique_id = 1 行（重複しない）
- routed が dict/dataclass/任意オブジェクトでも stat 列を吸収できる
- routed が無い/空でも stat_ 接頭辞でフォールバック（列増減に強い）
- stat が行方向に揺れていても、ds昇順で「最後に観測できた値（last non-null）」を採用する


In [107]:
from pathlib import Path

impl = Path("/mnt/e/env/ts/tslib/src/nf_app/builders/static_builder.py")
impl.write_text(
"""\
# /mnt/e/env/ts/tslib/src/nf_app/builders/static_builder.py
from __future__ import annotations

from typing import Any, Dict, List, Sequence

import pandas as pd


def _as_list(x: Any) -> List[str]:
    if x is None:
        return []
    if isinstance(x, list):
        return [str(v) for v in x]
    if isinstance(x, tuple):
        return [str(v) for v in x]
    if isinstance(x, set):
        return [str(v) for v in x]
    if isinstance(x, str):
        return [x]
    try:
        return [str(v) for v in list(x)]
    except Exception:
        return []


def _get_routed_list(routed: Any, candidate_names: Sequence[str]) -> List[str]:
    \"\"\"routed が dict / dataclass / 任意オブジェクトでも stat列を拾えるようにする\"\"\"
    if routed is None:
        return []

    if isinstance(routed, dict):
        for name in candidate_names:
            if name in routed:
                out = _as_list(routed.get(name))
                if out:
                    return out

    for name in candidate_names:
        if hasattr(routed, name):
            out = _as_list(getattr(routed, name))
            if out:
                return out

    return []


def build_static_df(
    joined: pd.DataFrame,
    routed: Any,
    *,
    unique_id_col: str = "unique_id",
    ds_col: str = "ds",
    stat_prefix: str = "stat_",
) -> pd.DataFrame:
    \"\"\"
    static_df を生成する（NeuralForecast の static features 想定）

    仕様:
    - 列: unique_id + stat_* のみ
    - 行: 1 unique_id = 1 行
    - 集約: ds昇順で最後に観測できた値（last non-null）を採用
    - routed が無い/空でも stat_ 接頭辞でフォールバック
    \"\"\"
    if unique_id_col not in joined.columns:
        raise ValueError(f"Missing required column: {unique_id_col}")

    df = joined.copy()

    # ds が無いケースも運用上あり得るので、無ければ「入力順の最後」を使う
    has_ds = ds_col in df.columns
    if has_ds:
        df[ds_col] = pd.to_datetime(df[ds_col], errors="coerce")

    # stat列抽出（routed の揺れに強く）
    stat_cols = _get_routed_list(
        routed,
        candidate_names=[
            "stat",
            "stat_cols",
            "stat_exog",
            "stat_exog_cols",
            "stat_exog_list",
            "static",
            "static_cols",
            "static_features",
            "static_features_cols",
        ],
    )
    if not stat_cols:
        stat_cols = [c for c in df.columns if c.startswith(stat_prefix)]

    # stat列がゼロでも、空の static features として unique_id だけ返す（拡張に強い）
    if not stat_cols:
        out = df[[unique_id_col]].drop_duplicates().reset_index(drop=True)
        return out

    missing = [c for c in stat_cols if c not in df.columns]
    if missing:
        raise ValueError(f"Some stat columns are missing in joined df: {missing}")

    # 並び替え（dsがあれば ds 昇順、無ければ入力順を尊重）
    if has_ds:
        # NaT は末尾に寄せたいので stable sort で
        df = df.sort_values([unique_id_col, ds_col], kind="stable")

    # last non-null を取る関数
    def _last_non_null(s: pd.Series):
        nn = s.dropna()
        if nn.empty:
            return None
        return nn.iloc[-1]

    # unique_id 単位で集約
    agg: Dict[str, Any] = {c: _last_non_null for c in stat_cols}
    out = df.groupby(unique_id_col, dropna=False, sort=False).agg(agg).reset_index()

    # 1 unique_id = 1 行を保証（groupby なので基本成立）
    if out[unique_id_col].duplicated().any():
        raise ValueError("static_df has duplicated unique_id rows (unexpected).")

    # 列順を固定
    out = out[[unique_id_col] + stat_cols]
    return out
""",
    encoding="utf-8",
)
print("Wrote:", impl)
print(impl.read_text(encoding="utf-8")[:300])


Wrote: /mnt/e/env/ts/tslib/src/nf_app/builders/static_builder.py
# /mnt/e/env/ts/tslib/src/nf_app/builders/static_builder.py
from __future__ import annotations

from typing import Any, Dict, List, Sequence

import pandas as pd


def _as_list(x: Any) -> List[str]:
    if x is None:
        return []
    if isinstance(x, list):
        return [str(v) for v in x]
  


## codecell：pytest（テスト作成）

In [108]:
from pathlib import Path

t = Path("/mnt/e/env/ts/tslib/tests/test_static_builder.py")
t.write_text(
"""\
import pandas as pd

from nf_app.feature_router import route_columns
from nf_app.builders.static_builder import build_static_df


def test_build_static_df_aggregates_stat_per_unique_id():
    joined = pd.DataFrame(
        {
            "unique_id": ["a","a","a","b","b","b"],
            "ds": ["2020-01-01","2020-01-02","2020-01-03","2020-01-01","2020-01-02","2020-01-03"],
            "y": [1, 2, None, 10, 20, None],
            "hist_x": [0.1,0.2,0.3, 1.1,1.2,1.3],
            "futr_x": [100,101,102, 200,201,202],
            "stat_z": [9,9,9, 8,8,8],
        }
    )

    routed = route_columns(
        list(joined.columns),
        prefix_to_kind={"hist_":"hist","futr_":"futr","stat_":"stat"},
    )

    static_df = build_static_df(joined, routed)

    assert list(static_df.columns) == ["unique_id", "stat_z"]
    assert len(static_df) == 2
    assert static_df["unique_id"].is_unique

    # 値（unique_id順は保証しないので dict で確認）
    got = dict(zip(static_df["unique_id"].tolist(), static_df["stat_z"].tolist()))
    assert got == {"a": 9, "b": 8}


def test_build_static_df_uses_last_non_null_when_stat_varies():
    joined = pd.DataFrame(
        {
            "unique_id": ["a","a","a"],
            "ds": ["2020-01-01","2020-01-02","2020-01-03"],
            "y": [1, 2, None],
            "stat_z": [1, None, 3],  # last non-null は 3
        }
    )
    routed = route_columns(list(joined.columns), prefix_to_kind={"stat_":"stat"})
    static_df = build_static_df(joined, routed)
    assert static_df.loc[0, "unique_id"] == "a"
    assert static_df.loc[0, "stat_z"] == 3


def test_build_static_df_returns_unique_id_only_when_no_stat_cols():
    joined = pd.DataFrame(
        {"unique_id": ["a","a","b"], "ds": ["2020-01-01","2020-01-02","2020-01-01"], "y":[1,2,3]}
    )
    routed = route_columns(list(joined.columns), prefix_to_kind={"hist_":"hist","futr_":"futr","stat_":"stat"})
    static_df = build_static_df(joined, routed)
    assert list(static_df.columns) == ["unique_id"]
    assert len(static_df) == 2
    assert set(static_df["unique_id"].tolist()) == {"a","b"}
""",
    encoding="utf-8",
)
print("Wrote:", t)
print(t.read_text(encoding="utf-8")[:300])


Wrote: /mnt/e/env/ts/tslib/tests/test_static_builder.py
import pandas as pd

from nf_app.feature_router import route_columns
from nf_app.builders.static_builder import build_static_df


def test_build_static_df_aggregates_stat_per_unique_id():
    joined = pd.DataFrame(
        {
            "unique_id": ["a","a","a","b","b","b"],
            "ds": ["202


## codecell：収集確認 → 実行

In [109]:
%cd /mnt/e/env/ts/tslib
!pytest -q --collect-only -vv tests/test_static_builder.py


/mnt/e/env/ts/tslib
platform linux -- Python 3.11.14, pytest-8.3.5, pluggy-1.6.0 -- /home/az/miniconda3/envs/ts/bin/python3.11
cachedir: .pytest_cache
metadata: {'Python': '3.11.14', 'Platform': 'Linux-6.6.87.2-microsoft-standard-WSL2-x86_64-with-glibc2.39', 'Packages': {'pytest': '8.3.5', 'pluggy': '1.6.0'}, 'Plugins': {'typeguard': '2.13.3', 'metadata': '3.1.1', 'anyio': '4.12.1', 'dash': '3.3.0', 'fugue': '0.9.4', 'env': '1.1.5', 'html': '4.1.1', 'jaxtyping': '0.2.29', 'hydra-core': '1.3.0'}}
Fugue tests will be initialized with options:
rootdir: /mnt/e/env/ts/tslib
configfile: pytest.ini
plugins: typeguard-2.13.3, metadata-3.1.1, anyio-4.12.1, dash-3.3.0, fugue-0.9.4, env-1.1.5, html-4.1.1, jaxtyping-0.2.29, hydra-core-1.3.0
collected 3 items                                                              [0m

<Dir tslib>
  <Dir tests>
    <Module test_static_builder.py>
      <Function test_build_static_df_aggregates_stat_per_unique_id>
      <Function test_build_static_df_uses_last

### 3. Build static_df（stat_ を unique_id で 1行に集約）
- joined から stat_（静的特徴量）だけを抜き出し、unique_id 単位で 1 行にする
- routed（分類結果）が dict/dataclass/任意オブジェクトでも動く
- stat_ が時系列で揺れている場合は「最後の非NULL値」を採用（運用で破綻しにくい）


In [110]:
%%bash
set -euo pipefail

mkdir -p /mnt/e/env/ts/tslib/src/nf_app/builders

cat > /mnt/e/env/ts/tslib/src/nf_app/builders/static_builder.py <<'PY'
from __future__ import annotations

from typing import Any, Dict, List, Optional, Sequence

import pandas as pd


def _as_list(x: Any) -> List[str]:
    if x is None:
        return []
    if isinstance(x, list):
        return [str(v) for v in x]
    if isinstance(x, tuple):
        return [str(v) for v in x]
    if isinstance(x, set):
        return [str(v) for v in x]
    if isinstance(x, str):
        return [x]
    try:
        return [str(v) for v in list(x)]
    except Exception:
        return []


def _get_routed_list(routed: Any, candidate_names: Sequence[str]) -> List[str]:
    """
    routed が dict / dataclass / 任意オブジェクトでも「それっぽい属性名」からリスト抽出する。
    """
    if routed is None:
        return []

    if isinstance(routed, dict):
        for name in candidate_names:
            if name in routed:
                out = _as_list(routed.get(name))
                if out:
                    return out

    for name in candidate_names:
        if hasattr(routed, name):
            out = _as_list(getattr(routed, name))
            if out:
                return out

    return []


def build_static_df(
    joined: pd.DataFrame,
    routed: Any,
    *,
    unique_id_col: str = "unique_id",
    ds_col: str = "ds",
    stat_prefix: str = "stat_",
    validate_constant: bool = False,
) -> pd.DataFrame:
    """
    static_df を生成（NeuralForecast の static_features 用）

    仕様:
    - 1 unique_id = 1 行
    - 列: unique_id + stat_ 列（stat_ が無ければ unique_id のみ）
    - stat_ が時系列で変化しても「最後の非NULL値」を採用（安定運用寄り）
    - validate_constant=True の場合、unique_id 内で stat_ が複数値なら例外（品質ゲート）
    """
    if unique_id_col not in joined.columns:
        raise ValueError(f"Missing required column: {unique_id_col}")

    df = joined.copy()

    if ds_col in df.columns:
        df[ds_col] = pd.to_datetime(df[ds_col], errors="coerce")
        if df[ds_col].isna().any():
            bad = df[df[ds_col].isna()].head(5)
            raise ValueError(f"{ds_col} contains unparseable values. sample=\n{bad}")

    stat_cols = _get_routed_list(
        routed,
        candidate_names=[
            "stat",
            "stat_cols",
            "static",
            "static_cols",
            "static_features",
            "static_features_cols",
        ],
    )
    if not stat_cols:
        stat_cols = [c for c in df.columns if c.startswith(stat_prefix)]

    # stat_ が無い場合も破綻しない（unique_id のみ返す）
    if not stat_cols:
        return df[[unique_id_col]].drop_duplicates().reset_index(drop=True)

    missing = [c for c in stat_cols if c not in df.columns]
    if missing:
        raise ValueError(f"Some stat columns are missing in joined df: {missing}")

    # unique_idごとに最後の行（dsがあればds順、なければ出現順）を取って stat を確定させる
    if ds_col in df.columns:
        df = df.sort_values([unique_id_col, ds_col], kind="stable")
    else:
        df = df.sort_values([unique_id_col], kind="stable")

    last = df.groupby(unique_id_col, dropna=False, sort=False).tail(1)

    out = last[[unique_id_col] + stat_cols].copy()

    # 最後の行が NULL の場合に備え、列ごとに「最後の非NULL値」を補完（series内で最後を優先）
    for c in stat_cols:
        if out[c].isna().any():
            # seriesごとに最後の非NULL
            s = (
                df[[unique_id_col, c]]
                .dropna(subset=[c])
                .groupby(unique_id_col, dropna=False, sort=False)[c]
                .tail(1)
            )
            # s は index が元のdfの行indexなので、unique_id→値に変換して map
            s_map = df.loc[s.index, [unique_id_col, c]].drop_duplicates(subset=[unique_id_col], keep="last")
            out[c] = out[c].where(out[c].notna(), out[unique_id_col].map(dict(zip(s_map[unique_id_col], s_map[c]))))

    if validate_constant:
        # unique_id内で複数値（NA除外）があるなら落とす
        bad_cols: List[str] = []
        for c in stat_cols:
            nun = (
                df[[unique_id_col, c]]
                .dropna(subset=[c])
                .groupby(unique_id_col, dropna=False)[c]
                .nunique()
            )
            if (nun > 1).any():
                bad_cols.append(c)
        if bad_cols:
            raise ValueError(f"static features not constant within series: {bad_cols}")

    return out.drop_duplicates(subset=[unique_id_col], keep="last").reset_index(drop=True)
PY


## ✅ Codeセル（pytest作成）

In [111]:
%%bash
set -euo pipefail

cat > /mnt/e/env/ts/tslib/tests/test_static_builder.py <<'PY'
import pandas as pd

from nf_app.builders.static_builder import build_static_df
from nf_app.feature_router import route_columns


def test_build_static_df_one_row_per_unique_id():
    joined = pd.DataFrame(
        {
            "unique_id": ["a", "a", "b", "b"],
            "ds": ["2020-01-01", "2020-01-02", "2020-01-01", "2020-01-02"],
            "y": [1, 2, 10, 20],
            "stat_z": [9, 9, 8, 8],
            "stat_w": [1.5, 1.5, 2.5, 2.5],
        }
    )
    routed = route_columns(
        list(joined.columns),
        prefix_to_kind={"hist_": "hist", "futr_": "futr", "stat_": "stat"},
    )

    static_df = build_static_df(joined, routed)
    assert list(static_df.columns) == ["unique_id", "stat_z", "stat_w"]
    assert static_df["unique_id"].nunique() == len(static_df) == 2
    # a は stat_z=9, b は stat_z=8
    m = dict(zip(static_df["unique_id"], static_df["stat_z"]))
    assert m["a"] == 9
    assert m["b"] == 8


def test_build_static_df_uses_last_non_null_when_last_row_has_null():
    joined = pd.DataFrame(
        {
            "unique_id": ["a", "a", "a"],
            "ds": ["2020-01-01", "2020-01-02", "2020-01-03"],
            "y": [1, 2, 3],
            "stat_z": [9, 9, None],  # 最終行が欠損
        }
    )
    routed = route_columns(
        list(joined.columns),
        prefix_to_kind={"stat_": "stat"},
    )
    static_df = build_static_df(joined, routed)
    assert static_df.loc[0, "stat_z"] == 9
PY


## ✅ Codeセル（pytest実行）

In [112]:
%%bash
set -euo pipefail
cd /mnt/e/env/ts/tslib
pytest -q tests/test_static_builder.py


[32m.[0m[32m.[0m[32m                                                                       [100%][0m
[32m[32m[1m2 passed[0m[32m in 0.03s[0m[0m


### 3. Dtypes（NeuralForecast投入用の型整形）
- unique_id: str（文字列）
- ds: datetime64[ns]（タイムゾーン付きなら tz を落として naive 化）
- y: float（欠損は NaN）
- 外生変数（hist_/futr_/stat_ など）は基本 float に寄せる（torch で落ちにくくする）


## ✅ Codeセル（実装ファイル作成）

In [113]:
%%bash
set -euo pipefail

cat > /mnt/e/env/ts/tslib/src/nf_app/transforms.py <<'PY'
from __future__ import annotations

from dataclasses import dataclass
from typing import Any, Iterable, List, Optional, Sequence

import pandas as pd


@dataclass(frozen=True)
class DtypeSpec:
    unique_id_col: str = "unique_id"
    ds_col: str = "ds"
    y_col: str = "y"


def _to_datetime_naive(s: pd.Series) -> pd.Series:
    dt = pd.to_datetime(s, errors="coerce")
    # tz-aware を naive に戻す（NeuralForecast/torch で扱いやすい）
    try:
        if hasattr(dt.dt, "tz") and dt.dt.tz is not None:
            dt = dt.dt.tz_convert(None)
    except Exception:
        # dt が datetime じゃないケースなど
        pass
    return dt


def normalize_main_df(
    df: pd.DataFrame,
    *,
    spec: DtypeSpec = DtypeSpec(),
    exog_cols: Optional[Sequence[str]] = None,
    coerce_exog_numeric: bool = True,
    copy: bool = True,
) -> pd.DataFrame:
    """
    学習用 df（unique_id/ds/y + hist外生）を NeuralForecast に入れやすい型へ正規化。
    """
    if copy:
        out = df.copy()
    else:
        out = df

    for c in (spec.unique_id_col, spec.ds_col, spec.y_col):
        if c not in out.columns:
            raise ValueError(f"Missing required column: {c}")

    out[spec.unique_id_col] = out[spec.unique_id_col].astype(str)
    out[spec.ds_col] = _to_datetime_naive(out[spec.ds_col])
    if out[spec.ds_col].isna().any():
        bad = out[out[spec.ds_col].isna()].head(5)
        raise ValueError(f"{spec.ds_col} contains unparseable values. sample=\n{bad}")

    out[spec.y_col] = pd.to_numeric(out[spec.y_col], errors="coerce").astype(float)

    if exog_cols is None:
        # meta以外を外生候補として扱う（ただし文字列列が混ざる場合もあるので coercion を条件化）
        exog_cols = [c for c in out.columns if c not in (spec.unique_id_col, spec.ds_col, spec.y_col)]

    if coerce_exog_numeric:
        for c in exog_cols:
            if c in out.columns:
                out[c] = pd.to_numeric(out[c], errors="coerce").astype(float)

    return out


def normalize_futr_df(
    futr_df: pd.DataFrame,
    *,
    spec: DtypeSpec = DtypeSpec(),
    exog_cols: Optional[Sequence[str]] = None,
    coerce_exog_numeric: bool = True,
    copy: bool = True,
) -> pd.DataFrame:
    """
    予測用 futr_df（unique_id/ds + futr外生）を正規化。
    """
    if copy:
        out = futr_df.copy()
    else:
        out = futr_df

    for c in (spec.unique_id_col, spec.ds_col):
        if c not in out.columns:
            raise ValueError(f"Missing required column: {c}")

    out[spec.unique_id_col] = out[spec.unique_id_col].astype(str)
    out[spec.ds_col] = _to_datetime_naive(out[spec.ds_col])
    if out[spec.ds_col].isna().any():
        bad = out[out[spec.ds_col].isna()].head(5)
        raise ValueError(f"{spec.ds_col} contains unparseable values. sample=\n{bad}")

    if exog_cols is None:
        exog_cols = [c for c in out.columns if c not in (spec.unique_id_col, spec.ds_col)]

    if coerce_exog_numeric:
        for c in exog_cols:
            if c in out.columns:
                out[c] = pd.to_numeric(out[c], errors="coerce").astype(float)

    return out


def normalize_static_df(
    static_df: pd.DataFrame,
    *,
    spec: DtypeSpec = DtypeSpec(),
    feature_cols: Optional[Sequence[str]] = None,
    coerce_numeric: bool = True,
    copy: bool = True,
) -> pd.DataFrame:
    """
    static_df（unique_id + stat特徴）を正規化。
    """
    if copy:
        out = static_df.copy()
    else:
        out = static_df

    if spec.unique_id_col not in out.columns:
        raise ValueError(f"Missing required column: {spec.unique_id_col}")

    out[spec.unique_id_col] = out[spec.unique_id_col].astype(str)

    if feature_cols is None:
        feature_cols = [c for c in out.columns if c != spec.unique_id_col]

    if coerce_numeric:
        for c in feature_cols:
            if c in out.columns:
                out[c] = pd.to_numeric(out[c], errors="coerce").astype(float)

    return out
PY


## ✅ Codeセル（pytest作成）

In [114]:
%%bash
set -euo pipefail

cat > /mnt/e/env/ts/tslib/tests/test_transforms.py <<'PY'
import pandas as pd

from nf_app.transforms import normalize_main_df, normalize_futr_df, normalize_static_df, DtypeSpec


def test_normalize_main_df_casts_types():
    df = pd.DataFrame(
        {
            "unique_id": [1, 1, 2],
            "ds": ["2020-01-01", "2020-01-02", "2020-01-01"],
            "y": [10, "20", None],
            "hist_x": [1, 2, 3],
        }
    )
    out = normalize_main_df(df, exog_cols=["hist_x"])
    assert out["unique_id"].dtype == object  # str化
    assert pd.api.types.is_datetime64_any_dtype(out["ds"])
    assert out["y"].dtype == float
    assert out["hist_x"].dtype == float


def test_normalize_futr_df_casts_types():
    futr = pd.DataFrame(
        {"unique_id": ["a", "a"], "ds": ["2020-01-03", "2020-01-04"], "futr_x": [100, "101"]}
    )
    out = normalize_futr_df(futr, exog_cols=["futr_x"])
    assert pd.api.types.is_datetime64_any_dtype(out["ds"])
    assert out["futr_x"].dtype == float


def test_normalize_static_df_casts_features():
    st = pd.DataFrame({"unique_id": ["a", "b"], "stat_z": ["9", 8]})
    out = normalize_static_df(st, feature_cols=["stat_z"])
    assert out["stat_z"].dtype == float
PY


## ✅ Codeセル（pytest実行）

In [115]:
%%bash
set -euo pipefail
cd /mnt/e/env/ts/tslib
pytest -q tests/test_transforms.py


[32m.[0m[32m.[0m[32m.[0m[32m                                                                      [100%][0m
[32m[32m[1m3 passed[0m[32m in 0.01s[0m[0m


### 3. Missing Policy（欠損戦略 drop / impute / flag をRunPlan化）
- RunPlan(params_json) に `missing_policy` を持たせ、欠損処理の再現性を担保する
- y（目的変数）は学習上欠損不可 → 原則 drop（例外的に impute も選べる）
- 外生変数（exog）は drop / impute / flag を切替可能
- flag は「欠損フラグ列を追加しつつ、値は一定値で埋める」方式（モデルが欠損情報を学習できる）


## 🧩 Codeセル（実装：/mnt/e/env/ts/tslib/src/nf_app/missing.py）

In [116]:
%%bash
set -euo pipefail

cat > /mnt/e/env/ts/tslib/src/nf_app/missing.py <<'PY'
from __future__ import annotations

from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple

import pandas as pd


@dataclass(frozen=True)
class MissingPolicy:
    """
    欠損戦略（RunPlan化する前提）

    mode:
      - "drop"   : 欠損がある行を落とす（主に exog 欠損に対して）
      - "impute" : 欠損を埋める
      - "flag"   : 欠損フラグ列を追加し、値は埋める（通常 constant=0）
    y_action:
      - "drop"   : y 欠損行は落とす（デフォルト）
      - "impute" : y も埋める（推奨しないが再現性は担保する）
    impute_method（mode=impute/flag のとき）:
      - "zero" | "mean" | "median" | "constant" | "ffill" | "bfill"
      ※ ffill/bfill は series（unique_id）内・ds順で処理（必要なら）
    constant_value:
      - "constant" / "flag" の埋め値（デフォルト 0.0）
    add_flags:
      - True の場合、mode="impute" でもフラグ列を追加できる
    """
    mode: str = "impute"
    y_action: str = "drop"
    impute_method: str = "zero"
    constant_value: float = 0.0
    add_flags: bool = False

    @staticmethod
    def from_dict(d: Optional[Dict[str, Any]]) -> "MissingPolicy":
        d = d or {}
        return MissingPolicy(
            mode=str(d.get("mode", "impute")).upper().lower(),  # safety: normalize
            y_action=str(d.get("y_action", "drop")).upper().lower(),
            impute_method=str(d.get("impute_method", "zero")).upper().lower(),
            constant_value=float(d.get("constant_value", 0.0)),
            add_flags=bool(d.get("add_flags", False)),
        )

    def to_dict(self) -> Dict[str, Any]:
        return {
            "mode": self.mode,
            "y_action": self.y_action,
            "impute_method": self.impute_method,
            "constant_value": self.constant_value,
            "add_flags": self.add_flags,
        }


@dataclass(frozen=True)
class MissingReport:
    dropped_rows: int
    imputed_counts: Dict[str, int]
    flags_added: List[str]


def load_missing_policy_from_runplan_params(params_json: Optional[Dict[str, Any]]) -> MissingPolicy:
    """
    run_plans.params_json から missing_policy を取得する想定。
    例:
      params_json = {"missing_policy": {"mode":"flag","impute_method":"constant","constant_value":0}}
    """
    params_json = params_json or {}
    mp = params_json.get("missing_policy")
    if isinstance(mp, dict):
        return MissingPolicy.from_dict(mp)
    return MissingPolicy()  # default


def _ensure_cols_exist(df: pd.DataFrame, cols: Sequence[str], *, label: str) -> None:
    missing = [c for c in cols if c not in df.columns]
    if missing:
        raise ValueError(f"{label}: missing required columns: {missing}")


def _numericize(df: pd.DataFrame, cols: Sequence[str]) -> pd.DataFrame:
    out = df.copy()
    for c in cols:
        if c in out.columns:
            out[c] = pd.to_numeric(out[c], errors="coerce")
    return out


def _compute_fill_value(s: pd.Series, method: str, constant_value: float) -> float:
    # すべて欠損のときも deterministic に落とす（NaN を返さない）
    nonnull = s.dropna()
    if method == "zero":
        return 0.0
    if method == "constant":
        return float(constant_value)
    if nonnull.empty:
        # mean/median のときに NaN で返さない
        return float(constant_value if method in ("mean", "median") else 0.0)
    if method == "mean":
        return float(nonnull.mean())
    if method == "median":
        return float(nonnull.median())
    # ffill/bfill は値じゃなく処理なのでここには来ない
    return 0.0


def apply_missing_policy(
    df: pd.DataFrame,
    *,
    policy: MissingPolicy,
    unique_id_col: str = "unique_id",
    ds_col: str = "ds",
    y_col: Optional[str] = "y",
    exog_cols: Optional[Sequence[str]] = None,
    serieswise_fill: bool = True,
) -> Tuple[pd.DataFrame, MissingReport]:
    """
    欠損処理の本体。
    - df: 学習df / futr_df / static_df いずれにも使える
    - y_col=None の場合は y 処理を行わない（futr_df や static_df 用）
    - exog_cols が None の場合、(unique_id, ds, y) 以外を exog とみなす
    """
    if policy.mode not in ("drop", "impute", "flag"):
        raise ValueError(f"Unknown policy.mode: {policy.mode}")
    if policy.y_action not in ("drop", "impute"):
        raise ValueError(f"Unknown policy.y_action: {policy.y_action}")

    out = df.copy()

    _ensure_cols_exist(out, [unique_id_col], label="apply_missing_policy")
    if ds_col in out.columns:
        out[ds_col] = pd.to_datetime(out[ds_col], errors="coerce")
        if out[ds_col].isna().any():
            bad = out[out[ds_col].isna()].head(5)
            raise ValueError(f"{ds_col} contains unparseable values. sample=\n{bad}")

    if exog_cols is None:
        base = {unique_id_col}
        if ds_col in out.columns:
            base.add(ds_col)
        if y_col is not None:
            base.add(y_col)
        exog_cols = [c for c in out.columns if c not in base]

    # 欠損フラグ（必要なら）
    flags_added: List[str] = []
    want_flags = (policy.mode == "flag") or policy.add_flags
    if want_flags:
        for c in exog_cols:
            if c in out.columns:
                flag_col = f"{c}__isna"
                out[flag_col] = out[c].isna().astype(int)
                flags_added.append(flag_col)

    # numericに寄せる（mean/medianなどのため）
    out = _numericize(out, [c for c in exog_cols if c in out.columns])
    if y_col is not None and y_col in out.columns:
        out[y_col] = pd.to_numeric(out[y_col], errors="coerce")

    dropped_rows = 0
    imputed_counts: Dict[str, int] = {}

    # 1) y の処理（学習df向け）
    if y_col is not None and y_col in out.columns:
        if policy.y_action == "drop":
            before = len(out)
            out = out[out[y_col].notna()].copy()
            dropped_rows += before - len(out)
        else:  # impute
            # y は serieswise_fill の恩恵が薄いので全体で deterministic に埋める
            n = int(out[y_col].isna().sum())
            if n > 0:
                fillv = _compute_fill_value(out[y_col], policy.impute_method, policy.constant_value)
                out[y_col] = out[y_col].fillna(fillv)
                imputed_counts[y_col] = n

    # 2) exog の処理
    if not exog_cols:
        return out.reset_index(drop=True), MissingReport(dropped_rows, imputed_counts, flags_added)

    if policy.mode == "drop":
        # exog のどれかが欠損なら行drop（必要なら列を絞って運用する余地はあるが、まずは明快に）
        cols = [c for c in exog_cols if c in out.columns]
        if cols:
            before = len(out)
            out = out.dropna(subset=cols).copy()
            dropped_rows += before - len(out)
        return out.reset_index(drop=True), MissingReport(dropped_rows, imputed_counts, flags_added)

    # impute / flag は埋める
    method = policy.impute_method

    cols = [c for c in exog_cols if c in out.columns]
    if not cols:
        return out.reset_index(drop=True), MissingReport(dropped_rows, imputed_counts, flags_added)

    # ffill/bfill は series内・ds順でやるのが安全（予測の時間方向を壊しにくい）
    if method in ("ffill", "bfill"):
        if serieswise_fill and ds_col in out.columns:
            out = out.sort_values([unique_id_col, ds_col], kind="stable")
            g = out.groupby(unique_id_col, dropna=False, sort=False)
            for c in cols:
                n = int(out[c].isna().sum())
                if n == 0:
                    continue
                if method == "ffill":
                    out[c] = g[c].ffill()
                else:
                    out[c] = g[c].bfill()
                # 残る欠損は constant で最後に埋める（端の欠損対策）
                rem = int(out[c].isna().sum())
                if rem > 0:
                    out[c] = out[c].fillna(float(policy.constant_value))
                imputed_counts[c] = n
        else:
            # dsがない/serieswiseでない → 全体で fill
            for c in cols:
                n = int(out[c].isna().sum())
                if n == 0:
                    continue
                out[c] = out[c].ffill() if method == "ffill" else out[c].bfill()
                rem = int(out[c].isna().sum())
                if rem > 0:
                    out[c] = out[c].fillna(float(policy.constant_value))
                imputed_counts[c] = n

        return out.reset_index(drop=True), MissingReport(dropped_rows, imputed_counts, flags_added)

    # mean/median/zero/constant
    for c in cols:
        n = int(out[c].isna().sum())
        if n == 0:
            continue
        fillv = _compute_fill_value(out[c], method, policy.constant_value)
        out[c] = out[c].fillna(fillv)
        imputed_counts[c] = n

    # mode=flag のとき、埋め方法は上で決まる（典型は constant/zero）
    return out.reset_index(drop=True), MissingReport(dropped_rows, imputed_counts, flags_added)
PY


## 🧪 Codeセル（pytest：/mnt/e/env/ts/tslib/tests/test_missing.py）

In [117]:
%%bash
set -euo pipefail

cat > /mnt/e/env/ts/tslib/tests/test_missing.py <<'PY'
import pandas as pd

from nf_app.missing import MissingPolicy, apply_missing_policy, load_missing_policy_from_runplan_params


def test_load_missing_policy_from_runplan_params_defaults():
    p = load_missing_policy_from_runplan_params({})
    assert isinstance(p, MissingPolicy)
    assert p.mode in ("drop", "impute", "flag")


def test_drop_policy_drops_rows_with_missing_y_and_exog():
    df = pd.DataFrame(
        {
            "unique_id": ["a", "a", "a"],
            "ds": ["2020-01-01", "2020-01-02", "2020-01-03"],
            "y": [1.0, None, 3.0],
            "hist_x": [10.0, 20.0, None],
        }
    )
    policy = MissingPolicy(mode="drop", y_action="drop")
    out, rep = apply_missing_policy(df, policy=policy, exog_cols=["hist_x"])
    # y欠損行(1行) + exog欠損行(1行) が落ちて 1行だけ残る
    assert len(out) == 1
    assert rep.dropped_rows == 2


def test_impute_mean_is_deterministic():
    df = pd.DataFrame(
        {
            "unique_id": ["a", "a", "b", "b"],
            "ds": ["2020-01-01", "2020-01-02", "2020-01-01", "2020-01-02"],
            "y": [1.0, 2.0, 3.0, 4.0],
            "hist_x": [10.0, None, 30.0, None],
        }
    )
    policy = MissingPolicy(mode="impute", y_action="drop", impute_method="mean", constant_value=0.0)
    out1, rep1 = apply_missing_policy(df, policy=policy, exog_cols=["hist_x"])
    out2, rep2 = apply_missing_policy(df, policy=policy, exog_cols=["hist_x"])
    pd.testing.assert_frame_equal(out1, out2)
    assert rep1.imputed_counts == rep2.imputed_counts
    # mean(10,30)=20 を入れるので欠損2個が20になる
    assert (out1["hist_x"] == 20.0).sum() == 2


def test_flag_adds_indicator_and_fills_constant():
    df = pd.DataFrame(
        {
            "unique_id": ["a", "a", "b"],
            "ds": ["2020-01-01", "2020-01-02", "2020-01-01"],
            "y": [1.0, 2.0, 3.0],
            "hist_x": [None, 5.0, None],
        }
    )
    policy = MissingPolicy(mode="flag", y_action="drop", impute_method="constant", constant_value=0.0)
    out, rep = apply_missing_policy(df, policy=policy, exog_cols=["hist_x"])
    assert "hist_x__isna" in out.columns
    # 欠損だったところが0で埋まる
    assert out.loc[out["hist_x__isna"] == 1, "hist_x"].eq(0.0).all()
    # フラグは欠損の数だけ1
    assert int(out["hist_x__isna"].sum()) == 2
PY


## ▶️ Codeセル（pytest実行）

In [118]:
%%bash
set -euo pipefail
cd /mnt/e/env/ts/tslib
pytest -q tests/test_missing.py


[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m                                                                     [100%][0m
[32m[32m[1m4 passed[0m[32m in 0.03s[0m[0m


In [132]:
%%bash
cd /mnt/e/env/ts/tslib

pytest -q tests/test_basic_checks.py tests/test_leak_audit.py tests/test_signature.py tests/test_preflight.py



[31m[1m_________________ ERROR collecting tests/test_basic_checks.py __________________[0m
[31m[1m[31m/home/az/miniconda3/envs/ts/lib/python3.11/site-packages/_pytest/python.py[0m:493: in importtestmodule
    [0mmod = import_path([90m[39;49;00m
[1m[31m/home/az/miniconda3/envs/ts/lib/python3.11/site-packages/_pytest/pathlib.py[0m:587: in import_path
    [0mimportlib.import_module(module_name)[90m[39;49;00m
[1m[31m/home/az/miniconda3/envs/ts/lib/python3.11/importlib/__init__.py[0m:126: in import_module
    [0m[94mreturn[39;49;00m _bootstrap._gcd_import(name[level:], package, level)[90m[39;49;00m
[1m[31m<frozen importlib._bootstrap>[0m:1204: in _gcd_import
    [0m[04m[91m?[39;49;00m[04m[91m?[39;49;00m[04m[91m?[39;49;00m[90m[39;49;00m
[1m[31m<frozen importlib._bootstrap>[0m:1176: in _find_and_load
    [0m[04m[91m?[39;49;00m[04m[91m?[39;49;00m[04m[91m?[39;49;00m[90m[39;49;00m
[1m[31m<frozen importlib._bootstrap>[0m:1147: in _find_and

CalledProcessError: Command 'b'cd /mnt/e/env/ts/tslib\n\npytest -q tests/test_basic_checks.py tests/test_leak_audit.py tests/test_signature.py tests/test_preflight.py\n'' returned non-zero exit status 2.

In [125]:
%%bash
cd /mnt/e/env/ts/tslib

pytest -q tests/test_basic_checks.py tests/test_leak_audit.py tests/test_signature.py tests/test_preflight.py



[31m[1m_________________ ERROR collecting tests/test_basic_checks.py __________________[0m
[31m[1m[31m/home/az/miniconda3/envs/ts/lib/python3.11/site-packages/_pytest/python.py[0m:493: in importtestmodule
    [0mmod = import_path([90m[39;49;00m
[1m[31m/home/az/miniconda3/envs/ts/lib/python3.11/site-packages/_pytest/pathlib.py[0m:587: in import_path
    [0mimportlib.import_module(module_name)[90m[39;49;00m
[1m[31m/home/az/miniconda3/envs/ts/lib/python3.11/importlib/__init__.py[0m:126: in import_module
    [0m[94mreturn[39;49;00m _bootstrap._gcd_import(name[level:], package, level)[90m[39;49;00m
[1m[31m<frozen importlib._bootstrap>[0m:1204: in _gcd_import
    [0m[04m[91m?[39;49;00m[04m[91m?[39;49;00m[04m[91m?[39;49;00m[90m[39;49;00m
[1m[31m<frozen importlib._bootstrap>[0m:1176: in _find_and_load
    [0m[04m[91m?[39;49;00m[04m[91m?[39;49;00m[04m[91m?[39;49;00m[90m[39;49;00m
[1m[31m<frozen importlib._bootstrap>[0m:1147: in _find_and

CalledProcessError: Command 'b'cd /mnt/e/env/ts/tslib\n\npytest -q tests/test_basic_checks.py tests/test_leak_audit.py tests/test_signature.py tests/test_preflight.py\n'' returned non-zero exit status 2.