In [None]:
# --- Cell 17: End-to-End Inference Pipeline (Mock -> True COP) [PLS Version] ---
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import Ridge
from sklearn.cross_decomposition import PLSRegression  # ← ここを修正
from sklearn.metrics import r2_score, mean_squared_error

# ==========================================
# 0. 設定 (Model A/B の共通定義)
# ==========================================
# 列定義
drift_sensitive_X = ['F1', 'F2', 'F3', 'F4']
context_cols = ['pedal', 'steer']
target_features_A = ['圧力中心列_2', '荷重(N)_1', '圧力中心行_1', '圧力中心列_1']
seat_features = target_features_A 
z_targets = ['COP_X', 'COP_Y']

# 前処理パラメータ
smooth_window = 70   # ノイズ除去
detrend_window = 300 # ドリフト除去
best_shift = -65     # ラグ補正
n_components_A = 3   # PLSの成分数 (グリッドサーチ等で決めた値)

# ==========================================
# 1. 学習フェーズ (Train Phase)
# ==========================================
split_idx = int(len(df_all) * 0.8)
df_train = df_all.iloc[:split_idx].copy()
df_test = df_all.iloc[split_idx:].copy()

print(f"Train samples: {len(df_train)}, Test samples: {len(df_test)}")

# --- Step 1: Model B の学習 (Y + Context -> Z) ---
df_train_B = df_train.dropna(subset=seat_features + context_cols + z_targets)
X_train_B = df_train_B[seat_features + context_cols]
y_train_B = df_train_B[z_targets]

model_B = Pipeline([
    ('scaler', StandardScaler()),
    ('regressor', Ridge(alpha=1.0))
])
model_B.fit(X_train_B, y_train_B)
print("✅ Model B Trained (Ridge).")

# --- Step 2: Model A の学習 (X + Context -> Delta Y) ---
# 前処理関数
def apply_preprocessing_A(df):
    df_out = df.copy()
    # Smooth X
    df_out[drift_sensitive_X] = df_out[drift_sensitive_X].rolling(smooth_window, center=True, min_periods=1).mean()
    # Detrend X
    x_mean = df_out[drift_sensitive_X].rolling(detrend_window, min_periods=1).mean()
    df_out[drift_sensitive_X] = df_out[drift_sensitive_X] - x_mean
    # Detrend Y (学習用)
    y_mean = df_out[target_features_A].rolling(detrend_window, min_periods=1).mean()
    df_out[target_features_A] = df_out[target_features_A] - y_mean
    return df_out

# 学習データ作成
df_train_A = apply_preprocessing_A(df_train)
df_train_A[target_features_A] = df_train_A[target_features_A].shift(best_shift)
df_train_A = df_train_A.dropna(subset=drift_sensitive_X + context_cols + target_features_A)

X_train_A = df_train_A[drift_sensitive_X + context_cols]
Y_train_A = df_train_A[target_features_A]

# ★ここをPLSに変更しました
model_A = Pipeline([
    ('scaler', StandardScaler()),
    ('regressor', PLSRegression(n_components=n_components_A))
])
model_A.fit(X_train_A, Y_train_A)
print("✅ Model A Trained (PLS).")

# 復元用のベースライン保存
y_baseline = df_train[target_features_A].mean()


# ==========================================
# 2. 推論フェーズ (Inference on Test Data)
# ==========================================
print("\n--- Starting End-to-End Inference ---")

# Step 1: 入力データの前処理
df_test_proc = df_test.copy()
# Smooth X
df_test_proc[drift_sensitive_X] = df_test_proc[drift_sensitive_X].rolling(smooth_window, min_periods=1).mean()
# Detrend X
x_mean_test = df_test_proc[drift_sensitive_X].rolling(detrend_window, min_periods=1).mean()
df_test_proc[drift_sensitive_X] = df_test_proc[drift_sensitive_X] - x_mean_test

# Model A入力
X_test_input = df_test_proc[drift_sensitive_X + context_cols].dropna()
valid_idx = X_test_input.index

# Step 2: Model A 推論 (Delta Y)
Y_pred_delta = model_A.predict(X_test_input)

# Step 3: 絶対値復元
Y_pred_abs = Y_pred_delta + y_baseline.values
df_Y_pred_abs = pd.DataFrame(Y_pred_abs, columns=seat_features, index=valid_idx)

# Step 4: Model B 推論 (Z)
X_test_B_input = pd.concat([
    df_Y_pred_abs,
    df_test.loc[valid_idx, context_cols]
], axis=1)

Z_pred_final = model_B.predict(X_test_B_input)


# ==========================================
# 3. 評価 (Evaluation)
# ==========================================
Z_true = df_test.loc[valid_idx, z_targets]

r2_x = r2_score(Z_true['COP_X'], Z_pred_final[:, 0])
r2_y = r2_score(Z_true['COP_Y'], Z_pred_final[:, 1])

print("-" * 40)
print(f"★ End-to-End Test Result (PLS Version) ★")
print(f"COP X (Left-Right) : R2 = {r2_x:.4f}")
print(f"COP Y (Front-Back) : R2 = {r2_y:.4f}")
print("-" * 40)

# 可視化
fig, ax = plt.subplots(2, 1, figsize=(15, 10), sharex=True)

ax[0].plot(Z_true.index, Z_true['COP_X'], label='True COP X', color='black', alpha=0.6)
ax[0].plot(Z_true.index, Z_pred_final[:, 0], label='Pred COP X', color='red', linestyle='--', alpha=0.8)
ax[0].set_title(f"End-to-End (PLS): COP X | R2={r2_x:.3f}")
ax[0].legend()
ax[0].grid(True)

ax[1].plot(Z_true.index, Z_true['COP_Y'], label='True COP Y', color='black', alpha=0.6)
ax[1].plot(Z_true.index, Z_pred_final[:, 1], label='Pred COP Y', color='blue', linestyle='--', alpha=0.8)
ax[1].set_title(f"End-to-End (PLS): COP Y | R2={r2_y:.3f}")
ax[1].legend()
ax[1].grid(True)

plt.tight_layout()
plt.show()