# Trainingset資料前處理

In [None]:
import os
import random
import numpy as np
import pandas as pd
import joblib
from sklearn.ensemble import RandomForestRegressor
from sklearn.preprocessing import StandardScaler
import shap

# === 1. 全局 SEED（確保 deterministic）===
GLOBAL_SEED = 42
random.seed(GLOBAL_SEED)
np.random.seed(GLOBAL_SEED)

# === 2. 基本參數與路徑 ===
MODEL_DIR = '/kaggle/working'
os.makedirs(MODEL_DIR, exist_ok=True)
max_rul = 130      # 最大RUL截斷
window_size = 12   # 退化起點偵測 window
threshold = 0.2    # 退化偵測 threshold

columns = ['unit', 'time', 'op1', 'op2', 'op3'] + [f's{i}' for i in range(1, 22)]

# === 3. 資料載入與初步RUL標註 ===
def load_and_label(filepath, max_rul=130):
    # 直接用 float，後續都不用煩惱型別
    df = pd.read_csv(filepath, sep='\s+', header=None, dtype=float)
    df.columns = columns
    df['RUL_linear'] = df.groupby('unit')['time'].transform('max') - df['time']
    df['RUL_linear'] = df['RUL_linear'].clip(upper=max_rul)
    return df

train = load_and_label('/kaggle/input/cmapssdata/train_FD001.txt', max_rul=max_rul)

# === 4. 分段線性退化RUL (Algorithm1) ===
def generate_piecewise_rul_algorithm1(df, feature_cols, window_size=12, threshold=0.2, max_rul=130, verbose=False):
    result_dfs = []
    for unit_id, group in df.groupby('unit'):
        group = group.sort_values('time').reset_index(drop=True)
        sensor_data = group[feature_cols].values
        num_cycles = len(group)
        num_windows = num_cycles // window_size

        if num_windows < 3:
            irul = num_cycles
            group['iRUL'] = irul
            group['RUL_piecewise'] = np.clip(irul - group['time'], 0, max_rul)
            result_dfs.append(group)
            continue

        centroids = [np.mean(sensor_data[i * window_size:(i + 1) * window_size], axis=0)
                     for i in range(num_windows)]
        base = centroids[0]
        degradation_found = False

        for i in range(2, num_windows):
            dist_sq = np.sum((centroids[i] - base) ** 2)
            if verbose:
                print(f"[Unit {unit_id}] Compare w1 to w{i+1}: Dist^2 = {dist_sq:.4f}")
            if dist_sq >= threshold:
                degradation_start = i * window_size
                irul = num_cycles - degradation_start
                degradation_found = True
                break

        if not degradation_found:
            irul = num_cycles

        rul_piecewise = [
            irul if t <= (num_cycles - irul) else irul - (t - (num_cycles - irul))
            for t in range(num_cycles)
        ]
        group['iRUL'] = irul
        group['RUL_piecewise'] = np.clip(rul_piecewise, 0, max_rul)
        result_dfs.append(group)

    return pd.concat(result_dfs, ignore_index=True)

# === 5. 基礎前處理（移除低變異感測器）===
sensor_cols = [f's{i}' for i in range(1, 22)]
sensor_cols_keep = [col for col in sensor_cols if train[col].std() >= 1e-3]

# === 6. Z-score 過濾離群值 ===
def z_score_filter(df, cols, threshold=4.0):
    for col in cols:
        mean = df[col].mean()
        std = df[col].std()
        z = (df[col] - mean) / std
        df.loc[z.abs() > threshold, col] = mean
    return df

train = z_score_filter(train, sensor_cols_keep)

# === 7. 分段線性RUL標註 ===
train = generate_piecewise_rul_algorithm1(
    train, sensor_cols_keep, window_size=window_size, threshold=threshold, max_rul=max_rul, verbose=False
)

# === 8. 特徵選擇（SHAP + 皮爾森）===
rf = RandomForestRegressor(n_estimators=100, random_state=GLOBAL_SEED, n_jobs=-1)
rf.fit(train[sensor_cols_keep], train['RUL_piecewise'])

explainer = shap.Explainer(rf, train[sensor_cols_keep].iloc[:500])
shap_values = explainer(train[sensor_cols_keep].iloc[:500])
mean_shap = np.abs(shap_values.values).mean(axis=0)
shap_scores = pd.Series(mean_shap, index=sensor_cols_keep).sort_values(ascending=False)
top8_shap = shap_scores.head(8).index.tolist()

corrs = train[sensor_cols_keep + ['RUL_piecewise']].corr()['RUL_piecewise'].abs().sort_values(ascending=False)
top8_pearson = corrs.drop('RUL_piecewise').head(8).index.tolist()

selected_sensors = [x for x in top8_shap if x in top8_pearson]
for x in top8_pearson:
    if x not in selected_sensors:
        selected_sensors.append(x)
    if len(selected_sensors) == 8:
        break

print("最終選用的8個感測器:", selected_sensors)

# === 9. 構建最終特徵（含OP/差分）===
features = selected_sensors + ['op1', 'op2', 'op3']

def multi_exponential_smoothing(series, alphas=[0.1, 0.3]):
    results = []
    for alpha in alphas:
        smoothed = [series.iloc[0]]
        for n in range(1, len(series)):
            smoothed.append(alpha * series.iloc[n] + (1 - alpha) * smoothed[-1])
        results.append(pd.Series(smoothed, index=series.index))
    return sum(results) / len(results)

for col in features:
    train[col] = train.groupby('unit')[col].transform(lambda x: multi_exponential_smoothing(x)).astype('float64')

for col in selected_sensors:
    train[f'{col}_diff'] = train.groupby('unit')[col].diff().fillna(0)

final_features = features + [f'{col}_diff' for col in selected_sensors]

# === 10. 標準化 ===
scaler = StandardScaler()
train[final_features] = scaler.fit_transform(train[final_features])

# === 11. 儲存所有重要資訊 ===
joblib.dump(scaler, f'{MODEL_DIR}/scaler_preprocessed.joblib')
joblib.dump(final_features, f'{MODEL_DIR}/feature_names.pkl')
train.to_csv(f'{MODEL_DIR}/train_with_piecewise_rul.csv', index=False)

print('🚩 已完成分段線性RUL資料與所有特徵處理，檔案儲存：')
print(f'- {MODEL_DIR}/train_with_piecewise_rul.csv')
print(f'- {MODEL_DIR}/feature_names.pkl')
print(f'- {MODEL_DIR}/scaler_preprocessed.joblib')


# FD001訓練

In [None]:
!pip install optuna optuna-integration[tfkeras] -q

In [None]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'  # 只顯示錯誤
import numpy as np
import pandas as pd
import joblib
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
import optuna
from optuna_integration import TFKerasPruningCallback

# ====== 1. 設定參數 ======
DATASET = 'fd001'
VALIDATION_SPLIT = 0.2
GLOBAL_SEED = 42

MODEL_DIR = '/kaggle/working'
DATA_PATH = f'{MODEL_DIR}/train_with_piecewise_rul.csv'
SCALER_PATH = f'{MODEL_DIR}/scaler_preprocessed.joblib'
FEATURE_PATH = f'{MODEL_DIR}/feature_names.pkl'
MODEL_PATH = f'{MODEL_DIR}/best_{DATASET}_lstm_model_mse.keras'
RESULT_CSV = f'{MODEL_DIR}/{DATASET}_optuna_search_results.csv'
VAL_UNITS_PATH = f'{MODEL_DIR}/{DATASET}_val_units.npy'
TRAIN_UNITS_PATH = f'{MODEL_DIR}/{DATASET}_train_units.npy'

np.random.seed(GLOBAL_SEED)
tf.random.set_seed(GLOBAL_SEED)

# ====== 2. 讀資料與特徵（完全對應前處理輸出）======
feature_cols = joblib.load(FEATURE_PATH)
scaler = joblib.load(SCALER_PATH)  # 預處理階段已經fit好

df_raw = pd.read_csv(DATA_PATH)
rul_col = "RUL_piecewise"
min_irul = df_raw['iRUL'].min()
np.save(f'{MODEL_DIR}/{DATASET}_min_irul.npy', min_irul)
print(f"📉 使用動態 CLIP RUL（最小 iRUL）: {min_irul}")

all_units = df_raw['unit'].unique()
rng = np.random.default_rng(GLOBAL_SEED)

if os.path.exists(VAL_UNITS_PATH) and os.path.exists(TRAIN_UNITS_PATH):
    val_units = np.load(VAL_UNITS_PATH)
    train_units = np.load(TRAIN_UNITS_PATH)
    print("✅ 已載入現有的 validation/train 分割名單")
else:
    val_units = rng.choice(all_units, int(len(all_units) * VALIDATION_SPLIT), replace=False)
    train_units = [u for u in all_units if u not in val_units]
    np.save(VAL_UNITS_PATH, val_units)
    np.save(TRAIN_UNITS_PATH, train_units)
    print("✅ 首次建立 validation/train 分割並儲存")

train_df = df_raw[df_raw['unit'].isin(train_units)]
val_df = df_raw[df_raw['unit'].isin(val_units)]

# ====== 3. LSTM訓練資料製作 ======
def make_lstm_dataset(df, feature_cols, window_size, stride=1, target_col='RUL_piecewise', clip_value=None):
    sequences, labels = [], []
    for _, group in df.groupby('unit'):
        data = group.sort_values('time')
        X = data[feature_cols].values
        y = data[target_col].values
        if clip_value is not None:
            y = np.clip(y, 0, clip_value)
        for i in range(0, len(data) - window_size + 1, stride):
            sequences.append(X[i:i+window_size])
            labels.append(y[i+window_size-1])
    return np.array(sequences), np.array(labels)

def phm08_score(y_true, y_pred):
    error = y_pred - y_true
    score = np.where(error < 0,
                     np.exp(-error / 13) - 1,
                     np.exp(error / 10) - 1)
    return np.sum(score)

def build_model(input_shape, lstm_units, dropout_rate, learning_rate, optimizer_name, num_layers):
    model = Sequential()
    model.add(tf.keras.layers.Input(shape=input_shape))
    for i in range(num_layers):
        return_seq = (i < num_layers - 1)
        model.add(LSTM(lstm_units, return_sequences=return_seq))
        model.add(Dropout(dropout_rate))
    model.add(Dense(64, activation='relu'))
    model.add(Dropout(dropout_rate))
    model.add(Dense(32, activation='relu'))
    model.add(Dense(1))
    optimizers = {
        'adam': tf.keras.optimizers.Adam(learning_rate),
        'sgd': tf.keras.optimizers.SGD(learning_rate),
        'rmsprop': tf.keras.optimizers.RMSprop(learning_rate),
        'nadam': tf.keras.optimizers.Nadam(learning_rate)
    }
    model.compile(optimizer=optimizers[optimizer_name],
                  loss='mse',
                  metrics=[tf.keras.metrics.RootMeanSquaredError()])
    return model

# ====== 4. Optuna 搜尋最佳超參數（固定資料分割）======
def objective(trial):
    seed = trial.suggest_categorical('seed', [11,22,33,44])
    np.random.seed(seed)
    tf.random.set_seed(seed)
    lstm_units = trial.suggest_categorical('lstm_units', [32, 64, 96, 128])
    dropout_rate = trial.suggest_float('dropout_rate', 0.05, 0.2)
    window_size = trial.suggest_categorical('window_size', [12, 32, 64])
    stride = trial.suggest_categorical('stride', [1])
    batch_size = trial.suggest_categorical('batch_size', [32, 64, 128])
    learning_rate = trial.suggest_float('learning_rate', 0.0003, 0.002, log=True)
    optimizer = trial.suggest_categorical('optimizer', ['nadam'])
    num_layers = trial.suggest_int('num_layers', 1, 3)
    epochs = trial.suggest_int('epochs', 80, 150)
    X_train, y_train = make_lstm_dataset(train_df, feature_cols, window_size, stride, target_col=rul_col, clip_value=min_irul)
    X_val, y_val = make_lstm_dataset(val_df, feature_cols, window_size, stride, target_col=rul_col, clip_value=min_irul)

    model = build_model(
        input_shape=(window_size, X_train.shape[2]),
        lstm_units=lstm_units,
        dropout_rate=dropout_rate,
        learning_rate=learning_rate,
        optimizer_name=optimizer,
        num_layers=num_layers
    )
    early_stop = EarlyStopping(monitor='val_loss', patience=8, restore_best_weights=True, verbose=0)
    reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=4, min_lr=1e-6, verbose=0)
    pruning_callback = TFKerasPruningCallback(trial, 'val_loss')

    history = model.fit(
        X_train, y_train,
        validation_data=(X_val, y_val),
        epochs=epochs,
        batch_size=batch_size,
        callbacks=[early_stop, reduce_lr, pruning_callback],
        verbose=0
    )

    val_rmse = model.evaluate(X_val, y_val, verbose=0)[1]
    y_pred = model.predict(X_val, verbose=0).flatten()
    val_score = phm08_score(y_val, y_pred)

    trial.set_user_attr("VAL_RMSE", val_rmse)
    trial.set_user_attr("VAL_SCORE", val_score)

    return val_rmse

# ====== 5. 執行 Optuna 搜尋最佳超參數 ======
study = optuna.create_study(direction="minimize", sampler=optuna.samplers.TPESampler(seed=GLOBAL_SEED))
study.optimize(objective, n_trials=100, timeout=21600)

print("✅ Optuna 搜尋完畢！")
print("Best params:", study.best_trial.params)
print("Best RMSE:", study.best_value)

# 儲存結果
optuna_df = study.trials_dataframe()
optuna_df.to_csv(RESULT_CSV, index=False)

# ====== 6. 以最佳超參數再訓練完整模型（同樣分割）======
best_params = study.best_trial.params
np.random.seed(best_params['seed'])
tf.random.set_seed(best_params['seed'])

X_train, y_train = make_lstm_dataset(train_df, feature_cols, best_params['window_size'], best_params['stride'], target_col=rul_col, clip_value=min_irul)
X_val, y_val = make_lstm_dataset(val_df, feature_cols, best_params['window_size'], best_params['stride'], target_col=rul_col, clip_value=min_irul)

best_model = build_model(
    input_shape=(best_params['window_size'], X_train.shape[2]),
    lstm_units=best_params['lstm_units'],
    dropout_rate=best_params['dropout_rate'],
    learning_rate=best_params['learning_rate'],
    optimizer_name=best_params['optimizer'],
    num_layers=best_params['num_layers']
)
best_model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=best_params['epochs'],
    batch_size=best_params['batch_size'],
    callbacks=[
        EarlyStopping(monitor='val_loss', patience=8, restore_best_weights=True, verbose=1),
        ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=4, min_lr=1e-6, verbose=1)
    ],
    verbose=1
)
best_model.save(MODEL_PATH)
print(f"📁 最佳模型儲存於：{MODEL_PATH}")


# 測試集資料處理

In [None]:
import numpy as np
import pandas as pd
import joblib
import os

MODEL_DIR = '/kaggle/working'

# === 1. 取得最佳SEQ_LEN ===
optuna_df = pd.read_csv(f'{MODEL_DIR}/fd001_optuna_search_results.csv')
sort_col = 'user_attrs_VAL_RMSE' if 'user_attrs_VAL_RMSE' in optuna_df.columns else 'value'
best_trial = optuna_df.loc[optuna_df[sort_col].idxmin()]
SEQ_LEN = int(best_trial['params_window_size'])

print(f"✅ 最佳 SEQ_LEN = {SEQ_LEN}")

# === 2. 載入前處理資料 ===
feature_names = joblib.load(f'{MODEL_DIR}/feature_names.pkl')
scaler = joblib.load(f'{MODEL_DIR}/scaler_preprocessed.joblib')

# === 3. 讀原始test set ===
columns = ['unit', 'time', 'op1', 'op2', 'op3'] + [f's{i}' for i in range(1, 22)]
test = pd.read_csv('/kaggle/input/cmapssdata/test_FD001.txt', sep='\s+', header=None)
test.columns = columns
rul_truth = pd.read_csv('/kaggle/input/cmapssdata/RUL_FD001.txt', header=None, names=['true_RUL'])

# === 4. 特徵工程 ===
def multi_exponential_smoothing(series, alphas=[0.1, 0.3]):
    results = []
    for alpha in alphas:
        smoothed = [series.iloc[0]]
        for n in range(1, len(series)):
            smoothed.append(alpha * series.iloc[n] + (1 - alpha) * smoothed[-1])
        results.append(pd.Series(smoothed, index=series.index))
    return sum(results) / len(results)

for col in [c for c in feature_names if not c.endswith('_diff') and c not in ['unit','time']]:
    test[col] = test.groupby('unit')[col].transform(lambda x: multi_exponential_smoothing(x))

for col in [c for c in feature_names if c.endswith('_diff')]:
    base_col = col.replace('_diff', '')
    test[col] = test.groupby('unit')[base_col].diff().fillna(0)

test[feature_names] = scaler.transform(test[feature_names])

# === 5. 滑動窗口組裝 ===
X_test, unit_ids = [], []
for unit in sorted(test['unit'].unique()):
    df_unit = test[test['unit'] == unit]
    arr = df_unit[feature_names].values
    if len(arr) >= SEQ_LEN:
        X_test.append(arr[-SEQ_LEN:])
        unit_ids.append(unit)
X_test = np.stack(X_test).astype(np.float32)
y_true = rul_truth.loc[np.array(unit_ids) - 1, 'true_RUL'].values

np.save(f'{MODEL_DIR}/X_test.npy', X_test)
np.save(f'{MODEL_DIR}/unit_ids.npy', np.array(unit_ids))
np.save(f'{MODEL_DIR}/y_true.npy', y_true)

print(f"✅ 測試集預處理完成 X_test.shape={X_test.shape}，unit數量={len(unit_ids)}")


# FD001資料驗證

In [None]:
import numpy as np
import pandas as pd
import joblib
import matplotlib.pyplot as plt
import tensorflow as tf
from sklearn.metrics import mean_squared_error
import random
import os

MODEL_DIR = '/kaggle/working'

# ====== 全局 SEED 設定 ======
GLOBAL_SEED = 42
random.seed(GLOBAL_SEED)
np.random.seed(GLOBAL_SEED)
tf.random.set_seed(GLOBAL_SEED)

# === PHM08 Score Function ===
def phm08_score(y_true, y_pred):
    error = y_pred - y_true
    score = np.where(error < 0,
                     np.exp(-error / 13) - 1,
                     np.exp(error / 10) - 1)
    return np.sum(score)

# === (A) 特徵工程（單台 engine 也可複用） ===
def multi_exponential_smoothing(series, alphas=[0.1, 0.3]):
    results = []
    for alpha in alphas:
        smoothed = [series.iloc[0]]
        for n in range(1, len(series)):
            smoothed.append(alpha * series.iloc[n] + (1 - alpha) * smoothed[-1])
        results.append(pd.Series(smoothed, index=series.index))
    return sum(results) / len(results)

def preprocess_for_engine(df, feature_names, scaler):
    # Exponential smoothing
    for col in [c for c in feature_names if not c.endswith('_diff') and c not in ['unit','time']]:
        df[col] = multi_exponential_smoothing(df[col])
    # Diff 特徵
    for col in [c for c in feature_names if c.endswith('_diff')]:
        base_col = col.replace('_diff', '')
        df[col] = df[base_col].diff().fillna(0)
    # 標準化
    df[feature_names] = scaler.transform(df[feature_names])
    return df

# === 1. 載入預處理後測試集 ===
X_test = np.load(f'{MODEL_DIR}/X_test.npy')
unit_ids = np.load(f'{MODEL_DIR}/unit_ids.npy')
y_true = np.load(f'{MODEL_DIR}/y_true.npy')

# === 2. 載入模型與特徵名 ===
MODEL_PATH = f'{MODEL_DIR}/best_fd001_lstm_model_mse.keras'
feature_cols = joblib.load(f'{MODEL_DIR}/feature_names.pkl')
scaler = joblib.load(f'{MODEL_DIR}/scaler_preprocessed.joblib')
model = tf.keras.models.load_model(MODEL_PATH)

# === 3. 自動抓取最佳 SEQ_LEN（用於畫單一 engine）===
optuna_df = pd.read_csv(f'{MODEL_DIR}/fd001_optuna_search_results.csv')
sort_col = 'user_attrs_VAL_RMSE' if 'user_attrs_VAL_RMSE' in optuna_df.columns else 'value'
best_trial = optuna_df.loc[optuna_df[sort_col].idxmin()]
SEQ_LEN = int(best_trial['params_window_size'])

# === 4. clip 上限 ===
try:
    clip_upper = int(np.load(f'{MODEL_DIR}/fd001_min_irul.npy'))
    print(f"✅ 自動 clip 上限：{clip_upper}")
except Exception as e:
    clip_upper = 130
    print(f"⚠️ 找不到 min_irul，預設 clip 上限：{clip_upper} ({e})")

# === 5. 預測與評分 ===
y_pred = model.predict(X_test, verbose=0).flatten()
y_pred = np.clip(y_pred, 0, clip_upper)

rmse = np.sqrt(mean_squared_error(y_true, y_pred))
score = phm08_score(y_true, y_pred)

df_out = pd.DataFrame({
    'engine_id': unit_ids,
    'true_RUL': y_true,
    'predicted_RUL': y_pred,
    'error': y_pred - y_true
})
df_out['score_component'] = np.where(df_out['error'] < 0,
                                     np.exp(-df_out['error'] / 13) - 1,
                                     np.exp(df_out['error'] / 10) - 1)
df_out['RMSE'] = rmse
df_out['Score'] = score
df_out.to_csv(f'{MODEL_DIR}/fd001_test_rul_results.csv', index=False)

# === (a) 真實vs預測RUL ===
plt.figure(figsize=(12, 6))
plt.plot(df_out['engine_id'], df_out['true_RUL'], label='True RUL', marker='o')
plt.plot(df_out['engine_id'], df_out['predicted_RUL'], label='Predicted RUL', marker='x')
plt.title(f'FD001: True vs Predicted RUL\nRMSE={rmse:.2f} | Score={score:.2f}')
plt.xlabel('Engine ID')
plt.ylabel('RUL')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig(f'{MODEL_DIR}/fd001_test_rul_plot.png')
plt.close()

# === (b) 預測誤差直方圖 ===
plt.figure(figsize=(10, 5))
plt.hist(df_out['error'], bins=25, color='skyblue', edgecolor='black')
plt.title('FD001 Prediction Error Histogram')
plt.xlabel('Prediction Error (Predicted - True)')
plt.ylabel('Count')
plt.tight_layout()
plt.savefig(f'{MODEL_DIR}/fd001_error_histogram.png')
plt.close()

# === (c) 殘差分布圖（Error vs True RUL scatter）===
plt.figure(figsize=(10, 6))
plt.scatter(df_out['true_RUL'], df_out['error'], c='royalblue', alpha=0.7, edgecolors='k')
plt.axhline(0, color='red', linestyle='--')
plt.xlabel('True RUL')
plt.ylabel('Prediction Error (Predicted - True)')
plt.title('Residual Distribution (Error vs True RUL)')
plt.grid(True)
plt.tight_layout()
plt.savefig(f'{MODEL_DIR}/fd001_residual_scatter.png')
plt.close()

# === (d) 單一 engine 生命週期預測曲線（最大誤差 engine）===
SAMPLE_ENGINE_ID = int(df_out.iloc[df_out['error'].abs().argmax()]['engine_id'])

# 讀原始 test raw，做前處理（**必須！**）
columns = ['unit', 'time', 'op1', 'op2', 'op3'] + [f's{i}' for i in range(1, 22)]
test_raw = pd.read_csv('/kaggle/input/cmapssdata/test_FD001.txt', sep='\s+', header=None)
test_raw.columns = columns
sample_df = test_raw[test_raw['unit'] == SAMPLE_ENGINE_ID].sort_values('time').reset_index(drop=True)

# 加入特徵工程 + scaler（保持和訓練一致）
sample_df = preprocess_for_engine(sample_df, feature_cols, scaler)
sample_features = sample_df[feature_cols].values

sample_preds = []
for i in range(SEQ_LEN, len(sample_df) + 1):
    seq_x = sample_features[i-SEQ_LEN:i]
    pred = model.predict(seq_x[np.newaxis, :, :], verbose=0).flatten()[0]
    sample_preds.append(pred)
sample_ruls = np.arange(len(sample_df)-1, -1, -1)[:len(sample_preds)]  # 對齊長度

plt.figure(figsize=(12, 6))
plt.plot(range(len(sample_preds)), sample_preds, label='Predicted RUL', marker='x')
plt.plot(range(len(sample_ruls)), sample_ruls, label='True RUL', marker='o')
plt.title(f'Engine {SAMPLE_ENGINE_ID}: RUL Prediction (Full Cycle)')
plt.xlabel('Cycle')
plt.ylabel('RUL')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig(f'{MODEL_DIR}/fd001_sample_engine_curve.png')
plt.close()

print(f"\n✅ 測試集 RMSE = {rmse:.4f}")
print(f"✅ 測試集 Score = {score:.4f}")
print(f"📊 最大誤差：{df_out['error'].max():.2f}")
print(f"📊 最小誤差：{df_out['error'].min():.2f}")
print(f"📊 超過 ±100 預測數量：{np.sum(np.abs(df_out['error']) > 100)}")
print(f"📈 圖片已儲存：")
print(f"{MODEL_DIR}/fd001_test_rul_plot.png")
print(f"{MODEL_DIR}/fd001_error_histogram.png")
print(f"{MODEL_DIR}/fd001_residual_scatter.png")
print(f"{MODEL_DIR}/fd001_sample_engine_curve.png")
