# Yield Curve PCA Analysis

Simple PCA analysis on Japanese government bond yield curves

## 1. Setup

In [None]:
import os
import sys
import requests
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.interpolate import CubicSpline
from sklearn.decomposition import PCA
from datetime import datetime
from dotenv import load_dotenv

# Load environment
project_root = os.path.abspath(os.path.join(os.getcwd(), '../..'))
load_dotenv(os.path.join(project_root, '.env'))

SUPABASE_URL = os.getenv('SUPABASE_URL')
SUPABASE_KEY = os.getenv('SUPABASE_KEY')

# Plot settings
plt.rcParams['figure.figsize'] = (14, 6)
plt.rcParams['font.size'] = 11

print('Setup complete')

## 2. パラメータ設定（ここを調整）

In [None]:
# ===== パラメータ設定 =====

LOOKBACK_DAYS = 100        # PCA学習に使用する営業日数
MIN_MATURITY = 0.5         # 最小残存期間（年）
N_COMPONENTS = 3           # 主成分の数
DISPLAY_DAYS = 30          # 表示する直近の日数

# =========================

## 3. データ取得関数

In [None]:
def normalize_bond_code(bond_code):
    """bond_codeを9桁に正規化（0パディング）"""
    if bond_code is None or bond_code == 'N/A':
        return 'N/A'
    # 文字列に変換して数値部分のみ抽出
    code_str = str(bond_code).strip()
    # 数値に変換可能かチェック
    try:
        code_int = int(code_str)
        # 9桁に0パディング
        return f'{code_int:09d}'
    except ValueError:
        # 数値に変換できない場合はそのまま返す
        return code_str


def get_latest_dates(limit=200):
    """最新の営業日を取得"""
    headers = {
        'apikey': SUPABASE_KEY,
        'Authorization': f'Bearer {SUPABASE_KEY}'
    }
    
    response = requests.get(
        f'{SUPABASE_URL}/rest/v1/bond_data',
        params={'select': 'trade_date', 'order': 'trade_date.desc', 'limit': limit * 10},
        headers=headers
    )
    
    dates = sorted(list(set([row['trade_date'] for row in response.json()])), reverse=True)
    return dates[:limit]


def get_yield_data_for_date(date, with_bond_code=False):
    """指定日のイールドカーブデータを取得"""
    headers = {
        'apikey': SUPABASE_KEY,
        'Authorization': f'Bearer {SUPABASE_KEY}'
    }
    
    # bond_codeも取得
    select_fields = 'trade_date,due_date,ave_compound_yield,bond_code'
    
    response = requests.get(
        f'{SUPABASE_URL}/rest/v1/bond_data',
        params={
            'select': select_fields,
            'trade_date': f'eq.{date}',
            'ave_compound_yield': 'not.is.null',
            'due_date': 'not.is.null'
        },
        headers=headers
    )
    
    data_list = []
    for row in response.json():
        try:
            trade_dt = datetime.strptime(row['trade_date'], '%Y-%m-%d')
            due_dt = datetime.strptime(row['due_date'], '%Y-%m-%d')
            maturity_years = (due_dt - trade_dt).days / 365.25
            
            if maturity_years >= MIN_MATURITY:
                data_dict = {
                    'maturity': round(maturity_years, 4),
                    'yield': float(row['ave_compound_yield'])
                }
                if with_bond_code:
                    # bond_codeを9桁に正規化
                    bond_code_raw = row.get('bond_code')
                    data_dict['bond_code'] = normalize_bond_code(bond_code_raw)
                data_list.append(data_dict)
        except (ValueError, TypeError):
            continue
    
    df = pd.DataFrame(data_list)
    
    # bond_code列をstring型として明示的に設定
    if with_bond_code and not df.empty:
        df['bond_code'] = df['bond_code'].astype(str)
    
    return df


def reconstruct_date(date_str, date_index, pca_model, mean_vec, common_grid_array):
    """指定日のデータを復元する共通関数"""
    # 実データを取得（正規化済み）
    actual_data = get_yield_data_for_date(date_str, with_bond_code=True)
    actual_data = actual_data.sort_values('maturity').reset_index(drop=True)
    
    # 主成分スコアを取得
    pc_scores = X_pca[date_index, :]
    
    # 完全な復元データ
    reconstructed_full = mean_vec + np.dot(pc_scores, pca_model.components_)
    
    # 実データの各残存期間に対して復元値を計算
    reconstruction_results = []
    
    for idx, row in actual_data.iterrows():
        maturity = row['maturity']
        original_yield = row['yield']
        bond_code = row['bond_code']
        
        # common_gridで最も近いインデックスを探す
        grid_idx = np.argmin(np.abs(common_grid_array - maturity))
        reconstructed_yield = reconstructed_full[grid_idx]
        
        error = original_yield - reconstructed_yield
        
        reconstruction_results.append({
            'maturity': maturity,
            'bond_code': bond_code,
            'original_yield': original_yield,
            'reconstructed_yield': reconstructed_yield,
            'error': error
        })
    
    df = pd.DataFrame(reconstruction_results)
    
    # bond_code列をstring型として明示的に設定
    if not df.empty:
        df['bond_code'] = df['bond_code'].astype(str)
    
    return df

print('関数定義完了（bond_code正規化機能追加）')

## 4. データ収集

In [None]:
# 日付取得
print('日付取得中...')
available_dates = get_latest_dates(limit=200)
target_dates = available_dates[:LOOKBACK_DAYS]

print(f'分析期間: {target_dates[-1]} ～ {target_dates[0]}')
print(f'日数: {len(target_dates)}日')

In [None]:
# イールドカーブデータ取得
print('\nイールドカーブデータ取得中...')
daily_data = {}

for i, date in enumerate(target_dates, 1):
    df = get_yield_data_for_date(date, with_bond_code=False)
    if not df.empty:
        daily_data[date] = df
    if i % 20 == 0:
        print(f'  進捗: {i}/{len(target_dates)}')

print(f'\nデータ収集完了: {len(daily_data)}日分')
total_points = sum(len(df) for df in daily_data.values())
print(f'総データポイント数: {total_points:,}個')

## 5. 残存期間の和集合とスプライン補間

In [None]:
# 残存期間の和集合を作成
all_maturities = set()
for df in daily_data.values():
    all_maturities.update(df['maturity'].values)

common_grid = np.sort(np.array(list(all_maturities)))

print(f'共通残存期間軸: {len(common_grid)}個')
print(f'範囲: {common_grid.min():.2f}年 ～ {common_grid.max():.2f}年')

In [None]:
# 3次スプライン補間
print('\n3次スプライン補間実行中...')

interpolated_data = []
valid_dates = []

for date, df in daily_data.items():
    if len(df) < 2:
        continue
    
    # ソート＆重複除去
    df_sorted = df.sort_values('maturity').drop_duplicates('maturity')
    
    # 3次スプライン補間
    cs = CubicSpline(
        df_sorted['maturity'].values,
        df_sorted['yield'].values,
        extrapolate=False
    )
    
    interpolated = cs(common_grid)
    
    # NaNが50%未満の行のみ採用
    if np.isnan(interpolated).sum() / len(interpolated) < 0.5:
        interpolated_data.append(interpolated)
        valid_dates.append(date)

X = np.array(interpolated_data)

print(f'有効日数: {len(valid_dates)}日')
print(f'データ行列サイズ: {X.shape} (日数 x 残存期間)')
print(f'NaN含有率: {np.isnan(X).sum() / X.size * 100:.2f}%')

## 6. PCA実行

In [None]:
# NaNを列平均で補完
X_filled = X.copy()
col_means = np.nanmean(X, axis=0)
for i in range(X.shape[1]):
    mask = np.isnan(X_filled[:, i])
    X_filled[mask, i] = col_means[i]

print(f'NaN補完完了: {np.isnan(X_filled).sum()}個残存（0であるべき）')

In [None]:
# PCA実行
print('\nPCA実行中...')
pca = PCA(n_components=N_COMPONENTS)
X_pca = pca.fit_transform(X_filled)

print('\n寄与率:')
for i, ratio in enumerate(pca.explained_variance_ratio_, 1):
    print(f'  PC{i}: {ratio*100:.2f}%')

cumsum = np.cumsum(pca.explained_variance_ratio_)
print(f'\n累積寄与率: {cumsum[-1]*100:.2f}%')

## 7. 結果: 主成分ベクトル

In [None]:
# 主成分ベクトルのプロット
fig, axes = plt.subplots(N_COMPONENTS, 1, figsize=(14, 4*N_COMPONENTS))

if N_COMPONENTS == 1:
    axes = [axes]

colors = ['#e74c3c', '#3498db', '#2ecc71', '#f39c12', '#9b59b6']
labels = ['Level', 'Slope', 'Curvature', 'PC4', 'PC5']

for i in range(N_COMPONENTS):
    axes[i].plot(common_grid, pca.components_[i], 
                 linewidth=2, color=colors[i], label=f'PC{i+1} ({labels[i]})')
    axes[i].axhline(y=0, color='black', linestyle='--', linewidth=1, alpha=0.5)
    axes[i].set_xlabel('Maturity (years)', fontsize=12)
    axes[i].set_ylabel('Loading', fontsize=12)
    axes[i].set_title(f'PC{i+1} ({labels[i]}) - Variance: {pca.explained_variance_ratio_[i]*100:.2f}%',
                      fontsize=13, fontweight='bold')
    axes[i].legend(loc='best')
    axes[i].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print('主成分ベクトル表示完了')

## 8. 結果: 主成分スコア（過去30日分）

In [None]:
# 主成分スコアのDataFrame作成
pc_scores_df = pd.DataFrame(
    X_pca,
    columns=[f'PC{i+1}' for i in range(N_COMPONENTS)],
    index=valid_dates
)
pc_scores_df.index.name = 'Date'

# 直近N日分を表示
print(f'主成分スコア（直近{DISPLAY_DAYS}日）:')
print(pc_scores_df.head(DISPLAY_DAYS))

In [None]:
# 主成分スコアの時系列プロット
fig, axes = plt.subplots(N_COMPONENTS, 1, figsize=(14, 3*N_COMPONENTS))

if N_COMPONENTS == 1:
    axes = [axes]

date_objects = [datetime.strptime(d, '%Y-%m-%d') for d in valid_dates[:DISPLAY_DAYS]]

for i in range(N_COMPONENTS):
    axes[i].plot(date_objects, X_pca[:DISPLAY_DAYS, i], 
                 linewidth=2, color=colors[i], marker='o', markersize=4)
    axes[i].axhline(y=0, color='black', linestyle='--', linewidth=1, alpha=0.5)
    axes[i].set_ylabel(f'PC{i+1} Score', fontsize=12)
    axes[i].set_title(f'PC{i+1} ({labels[i]}) Time Series (Last {DISPLAY_DAYS} days)',
                      fontsize=13, fontweight='bold')
    axes[i].grid(True, alpha=0.3)
    plt.setp(axes[i].xaxis.get_majorticklabels(), rotation=45)

axes[-1].set_xlabel('Date', fontsize=12)

plt.tight_layout()
plt.show()

print('主成分スコア時系列表示完了')

## 9. 最新日のデータ復元（実データのみ）

In [None]:
# 最新日の復元
latest_date = valid_dates[0]
latest_index = 0

print(f'最新日: {latest_date}')
print(f'主成分スコア: {X_pca[latest_index, :]}')

# 復元実行
mean_vector = pca.mean_
reconstruction_df_latest = reconstruct_date(latest_date, latest_index, pca, mean_vector, common_grid)

print(f'\n復元完了: {len(reconstruction_df_latest)}銘柄')
print(f'平均絶対誤差: {np.abs(reconstruction_df_latest["error"]).mean():.6f}%')
print(f'最大誤差: {np.abs(reconstruction_df_latest["error"]).max():.6f}%')

## 10. 前日のデータ復元（実データのみ）

In [None]:
# 前日の復元
previous_date = valid_dates[1]
previous_index = 1

print(f'前日: {previous_date}')
print(f'主成分スコア: {X_pca[previous_index, :]}')

# 復元実行
reconstruction_df_previous = reconstruct_date(previous_date, previous_index, pca, mean_vector, common_grid)

print(f'\n復元完了: {len(reconstruction_df_previous)}銘柄')
print(f'平均絶対誤差: {np.abs(reconstruction_df_previous["error"]).mean():.6f}%')
print(f'最大誤差: {np.abs(reconstruction_df_previous["error"]).max():.6f}%')

## 11. 最新日と前日の誤差比較グラフ

In [None]:
# グラフ: 最新日と前日の復元誤差比較
fig, ax = plt.subplots(1, 1, figsize=(14, 6))

ax.plot(reconstruction_df_latest['maturity'], reconstruction_df_latest['error'],
        linewidth=2.5, color='#e74c3c',
        label=f'Latest Date ({latest_date})', marker='o', markersize=5, alpha=0.8)
ax.plot(reconstruction_df_previous['maturity'], reconstruction_df_previous['error'],
        linewidth=2.5, color='#3498db',
        label=f'Previous Date ({previous_date})', marker='s', markersize=5, alpha=0.8)

ax.axhline(y=0, color='black', linestyle='--', linewidth=1.5, alpha=0.7)

ax.set_xlabel('Maturity (years)', fontsize=13)
ax.set_ylabel('Reconstruction Error (%)', fontsize=13)
ax.set_title('Reconstruction Error Comparison: Latest vs Previous Date',
             fontsize=14, fontweight='bold')
ax.legend(loc='best', fontsize=12)
ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print('最新日と前日の誤差比較グラフ表示完了')

### 11.1 誤差比較（残存期間5年~8年、bond_code表示付き）

In [None]:
# グラフ: 残存期間5年~8年にフィルタリングした誤差比較（bond_code表示付き）

# 5年~8年のデータをフィルタリング
filtered_latest = reconstruction_df_latest[
    (reconstruction_df_latest['maturity'] >= 5.0) & 
    (reconstruction_df_latest['maturity'] <= 8.0)
].copy()

filtered_previous = reconstruction_df_previous[
    (reconstruction_df_previous['maturity'] >= 5.0) & 
    (reconstruction_df_previous['maturity'] <= 8.0)
].copy()

# bond_codeを文字列型として確実に扱う
filtered_latest['bond_code'] = filtered_latest['bond_code'].astype(str)
filtered_previous['bond_code'] = filtered_previous['bond_code'].astype(str)

# グラフ作成
fig, ax = plt.subplots(1, 1, figsize=(16, 8))

# 最新日のプロット
ax.plot(filtered_latest['maturity'], filtered_latest['error'],
        linewidth=2.5, color='#e74c3c',
        label=f'Latest Date ({latest_date})', marker='o', markersize=7, alpha=0.8)

# 前日のプロット
ax.plot(filtered_previous['maturity'], filtered_previous['error'],
        linewidth=2.5, color='#3498db',
        label=f'Previous Date ({previous_date})', marker='s', markersize=7, alpha=0.8)

# ゼロ線
ax.axhline(y=0, color='black', linestyle='--', linewidth=1.5, alpha=0.7)

# 最新日のbond_codeラベル追加
for idx, row in filtered_latest.iterrows():
    ax.annotate(
        row['bond_code'],
        xy=(row['maturity'], row['error']),
        xytext=(5, 5),  # オフセット（ポイント単位）
        textcoords='offset points',
        fontsize=9,
        color='#e74c3c',
        alpha=0.8,
        bbox=dict(boxstyle='round,pad=0.3', facecolor='white', edgecolor='#e74c3c', alpha=0.7, linewidth=0.5)
    )

# 前日のbond_codeラベル追加
for idx, row in filtered_previous.iterrows():
    ax.annotate(
        row['bond_code'],
        xy=(row['maturity'], row['error']),
        xytext=(5, -15),  # オフセット（下にずらす）
        textcoords='offset points',
        fontsize=9,
        color='#3498db',
        alpha=0.8,
        bbox=dict(boxstyle='round,pad=0.3', facecolor='white', edgecolor='#3498db', alpha=0.7, linewidth=0.5)
    )

ax.set_xlabel('Maturity (years)', fontsize=13)
ax.set_ylabel('Reconstruction Error (%)', fontsize=13)
ax.set_title('Reconstruction Error Comparison (5-8 years maturity with Bond Codes)',
             fontsize=14, fontweight='bold')
ax.legend(loc='best', fontsize=12)
ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print('=' * 80)
print('残存期間5年~8年のフィルタリング結果:')
print('=' * 80)
print(f'最新日 ({latest_date}): {len(filtered_latest)}銘柄')
print(f'前日 ({previous_date}): {len(filtered_previous)}銘柄')
print('=' * 80)

In [None]:
# 残存期間5年~8年のデータを表形式で整理

# 最新日のデータをベースにして、前日のデータを結合
table_data = []

for idx, row_latest in filtered_latest.iterrows():
    bond_code = str(row_latest['bond_code'])  # 明示的に文字列化（既に正規化済み）
    maturity_latest = row_latest['maturity']
    error_latest_bp = row_latest['error'] * 100  # bp単位に変換
    
    # 前日で同じbond_codeを探す（正規化済みなので完全一致で比較可能）
    row_previous = filtered_previous[filtered_previous['bond_code'] == bond_code]
    
    if len(row_previous) > 0:
        error_previous_bp = row_previous['error'].values[0] * 100  # bp単位に変換
        diff_bp = error_latest_bp - error_previous_bp
    else:
        error_previous_bp = None
        diff_bp = None
    
    table_data.append({
        'maturity': maturity_latest,
        'bond_code': bond_code,
        'error_latest_bp': error_latest_bp,
        'error_previous_bp': error_previous_bp,
        'diff_bp': diff_bp
    })

# DataFrameに変換
comparison_table = pd.DataFrame(table_data)

# 残存年限でソート
comparison_table = comparison_table.sort_values('maturity').reset_index(drop=True)

# 表示用に整形
display_table = pd.DataFrame({
    'Maturity (years)': comparison_table['maturity'].apply(lambda x: f'{x:.4f}'),
    'Bond Code': comparison_table['bond_code'],
    f'Latest {latest_date} (bp)': comparison_table['error_latest_bp'].apply(lambda x: f'{x:.4f}'),
    f'Previous {previous_date} (bp)': comparison_table['error_previous_bp'].apply(lambda x: f'{x:.4f}' if x is not None else '-'),
    'Difference (bp)': comparison_table['diff_bp'].apply(lambda x: f'{x:.4f}' if x is not None else '-')
})

print('=' * 120)
print(f'残存期間5年~8年の復元誤差比較表（bp単位）')
print('=' * 120)
print(display_table.to_string(index=False))
print('=' * 120)
print(f'\n総銘柄数（最新日）: {len(display_table)}')
print(f'両日に存在: {comparison_table["diff_bp"].notna().sum()}銘柄')
print(f'最新日のみ: {comparison_table["error_previous_bp"].isna().sum()}銘柄')

### 11.2 検証: bond_code正規化の確認

In [None]:
# bond_code正規化の動作確認

print('=' * 100)
print('【bond_code正規化の検証】')
print('=' * 100)

# filtered_latest と filtered_previous のbond_codeを確認
print(f'\n最新日 ({latest_date}) のbond_code（先頭10件）:')
print(filtered_latest[['bond_code', 'maturity']].head(10).to_string(index=False))

print(f'\n前日 ({previous_date}) のbond_code（先頭10件）:')
print(filtered_previous[['bond_code', 'maturity']].head(10).to_string(index=False))

# 共通のbond_codeを確認
latest_codes = set(filtered_latest['bond_code'].values)
previous_codes = set(filtered_previous['bond_code'].values)
common_codes = latest_codes & previous_codes

print(f'\n【マッチング結果】')
print(f'最新日のユニークbond_code数: {len(latest_codes)}')
print(f'前日のユニークbond_code数: {len(previous_codes)}')
print(f'両日共通のbond_code数: {len(common_codes)}')

if common_codes:
    print(f'\n共通bond_code（先頭10件）: {sorted(list(common_codes))[:10]}')

print('=' * 100)

## 12. 最新日の詳細分析

### 12.1 元データと復元データの比較（最新日）

In [None]:
# グラフ1: 元データと復元データの比較（最新日のみ）
fig, ax = plt.subplots(1, 1, figsize=(14, 6))

ax.plot(reconstruction_df_latest['maturity'], reconstruction_df_latest['original_yield'],
        linewidth=2.5, color='#2c3e50',
        label='Original (Actual Data)', marker='o', markersize=6, alpha=0.8)
ax.plot(reconstruction_df_latest['maturity'], reconstruction_df_latest['reconstructed_yield'],
        linewidth=2.5, color='#e74c3c',
        label='Reconstructed', linestyle='--', marker='s', markersize=5, alpha=0.8)

ax.set_xlabel('Maturity (years)', fontsize=13)
ax.set_ylabel('Yield (%)', fontsize=13)
ax.set_title(f'Original vs Reconstructed Yield Curve - Actual Data Only ({latest_date})',
             fontsize=14, fontweight='bold')
ax.legend(loc='best', fontsize=12)
ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print('元データと復元データの比較グラフ表示完了')

### 12.2 復元誤差（残差）- 最新日のみ

In [None]:
# グラフ2: 復元誤差（最新日のみ）
fig, ax = plt.subplots(1, 1, figsize=(14, 5))

ax.plot(reconstruction_df_latest['maturity'], reconstruction_df_latest['error'],
        linewidth=2, color='#27ae60',
        label='Residual (Original - Reconstructed)',
        marker='o', markersize=5)
ax.axhline(y=0, color='black', linestyle='--', linewidth=1.5, alpha=0.7)

# 標準偏差の範囲を表示
std_residual = reconstruction_df_latest['error'].std()
ax.axhline(y=std_residual, color='red', linestyle=':', linewidth=1, alpha=0.5, 
           label=f'+1 Std Dev ({std_residual:.4f}%)')
ax.axhline(y=-std_residual, color='red', linestyle=':', linewidth=1, alpha=0.5, 
           label=f'-1 Std Dev ({-std_residual:.4f}%)')

ax.set_xlabel('Maturity (years)', fontsize=13)
ax.set_ylabel('Residual (%)', fontsize=13)
ax.set_title(f'Reconstruction Error - Actual Data Only ({latest_date})',
             fontsize=14, fontweight='bold')
ax.legend(loc='best', fontsize=11)
ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print('復元誤差グラフ表示完了')

### 12.3 統計サマリー（最新日）

In [None]:
# 統計サマリー
print('=' * 60)
print('復元精度の統計サマリー（実データのみ）')
print('=' * 60)
print(f'日付: {latest_date}')
print(f'実データ数: {len(reconstruction_df_latest)}銘柄')
print(f'使用主成分数: {N_COMPONENTS}')
print(f'累積寄与率: {cumsum[-1]*100:.2f}%')
print()
print('誤差統計:')
print(f'  平均絶対誤差 (MAE): {np.abs(reconstruction_df_latest["error"]).mean():.6f}%')
print(f'  平均二乗誤差 (MSE): {(reconstruction_df_latest["error"]**2).mean():.6f}')
print(f'  二乗平均平方根誤差 (RMSE): {np.sqrt((reconstruction_df_latest["error"]**2).mean()):.6f}%')
print(f'  最大誤差: {np.abs(reconstruction_df_latest["error"]).max():.6f}%')
print(f'  標準偏差: {reconstruction_df_latest["error"].std():.6f}%')
print('=' * 60)

### 12.4 復元結果詳細テーブル（残存年限、銘柄名、金利差のみ）

In [None]:
# 復元結果を表形式で出力（3列のみ）
output_df = reconstruction_df_latest[['maturity', 'bond_code', 'error']].copy()
output_df.columns = ['Maturity (years)', 'Bond Code', 'Yield Error (%)']
output_df = output_df.sort_values('Maturity (years)').reset_index(drop=True)

print(f'\n復元結果詳細テーブル（全{len(output_df)}銘柄）')
print('=' * 80)
print(output_df.to_string(index=False))
print('=' * 80)

### 12.5 誤差上位10銘柄（最新日）

In [None]:
# 誤差の大きい上位10銘柄
reconstruction_df_latest['abs_error'] = np.abs(reconstruction_df_latest['error'])
top_errors = reconstruction_df_latest.nlargest(10, 'abs_error')

top_errors_output = top_errors[['maturity', 'bond_code', 'error']].copy()
top_errors_output.columns = ['Maturity (years)', 'Bond Code', 'Yield Error (%)']

print('\n誤差上位10銘柄:')
print('=' * 80)
print(top_errors_output.to_string(index=False))
print('=' * 80)