# Unit20｜RUL（剩餘壽命）預測：把模型接進維護決策

本 Notebook 用 NASA Turbofan（本倉庫已包含資料）示範：RUL 標籤、避免群組洩漏（按引擎分割）、baseline 模型、交付圖表。

In [None]:
import os

# 路徑設定：使用 Part_4 本地 data 資料夾
NOTEBOOK_DIR = Path(__file__).parent if '__file__' in globals() else Path.cwd()
DATA_DIR = NOTEBOOK_DIR / 'data'
OUTPUT_DIR = NOTEBOOK_DIR / 'outputs'
OUTPUT_DIR.mkdir(exist_ok=True)
os.chdir(OUTPUT_DIR)

os.makedirs('P4_Unit20_Results', exist_ok=True)
print('Outputs:', (Path.cwd() / 'P4_Unit20_Results').resolve())


In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import mean_squared_error, mean_absolute_error
from sklearn.linear_model import Ridge
from sklearn.ensemble import RandomForestRegressor
from sklearn.neural_network import MLPRegressor

data_dir = DATA_DIR / 'NASA_Turbofan_Data' / 'CMaps'
train_path = data_dir / 'train_FD001.txt'
test_path = data_dir / 'test_FD001.txt'
rul_path = data_dir / 'RUL_FD001.txt'

assert train_path.exists(), f'Missing: {train_path}'
assert test_path.exists(), f'Missing: {test_path}'
assert rul_path.exists(), f'Missing: {rul_path}'

cols = ['unit', 'cycle', 'op1', 'op2', 'op3'] + [f's{i}' for i in range(1, 22)]

def read_cmaps(path: Path):
    df = pd.read_csv(path, sep='\\s+', header=None)
    df = df.dropna(axis=1, how='all')
    df.columns = cols[: df.shape[1]]
    return df

train = read_cmaps(train_path)
test = read_cmaps(test_path)
true_rul_test = pd.read_csv(rul_path, header=None, names=['RUL']).dropna()

print('train shape:', train.shape)
print('test shape:', test.shape)
train.head()


In [None]:
# Build RUL labels (piecewise cap)
max_cycle = train.groupby('unit')['cycle'].max().rename('max_cycle')
train = train.join(max_cycle, on='unit')
train['RUL'] = train['max_cycle'] - train['cycle']

RUL_CAP = 125
train['RUL_cap'] = train['RUL'].clip(upper=RUL_CAP)

# Select feature columns (drop constants)
feature_cols = [c for c in train.columns if c.startswith('op') or c.startswith('s')]
stds = train[feature_cols].std(numeric_only=True)
feature_cols = [c for c in feature_cols if stds.get(c, 0.0) > 1e-6]
print('n_features:', len(feature_cols))

# Avoid group leakage: split by engine (unit)
units = train['unit'].unique()
u_tr, u_te = train_test_split(units, test_size=0.2, random_state=42)
tr = train[train['unit'].isin(u_tr)].copy()
va = train[train['unit'].isin(u_te)].copy()

X_tr = tr[feature_cols].to_numpy()
y_tr = tr['RUL_cap'].to_numpy()
X_va = va[feature_cols].to_numpy()
y_va = va['RUL_cap'].to_numpy()

def nasa_score(y_true, y_pred):
    # asymmetric penalty: over-estimate is often worse near failure
    s = 0.0
    for e in (y_pred - y_true):
        if e < 0:
            s += np.exp(-e/13.0) - 1.0
        else:
            s += np.exp(e/10.0) - 1.0
    return float(s)

models = {
    'Ridge': Pipeline([('scaler', StandardScaler()), ('m', Ridge(alpha=1.0))]),
    'RandomForest': RandomForestRegressor(n_estimators=400, random_state=42, n_jobs=-1),
    'MLP': Pipeline([('scaler', StandardScaler()),
                    ('m', MLPRegressor(hidden_layer_sizes=(64, 32), random_state=42,
                                      max_iter=250, early_stopping=True))]),
}

rows = []
for name, model in models.items():
    model.fit(X_tr, y_tr)
    pred = model.predict(X_va)
    rows.append({
        'model': name,
        'MAE': mean_absolute_error(y_va, pred),
        'RMSE': mean_squared_error(y_va, pred, squared=False),
        'NASA_score': nasa_score(y_va, pred),
    })

score = pd.DataFrame(rows).sort_values('RMSE')
score.to_csv('P4_Unit20_Results/01_holdout_scores.csv', index=False)
score

## 不確定度（區間）示範：Quantile regression

RUL 上線常用保守策略（用下界觸發維護）。這裡用 sklearn 的 quantile regression（GBDT）示範 10%/90% 區間。

In [None]:
from sklearn.ensemble import GradientBoostingRegressor
import numpy as np
import pandas as pd

q_lo, q_hi = 0.1, 0.9
g_lo = GradientBoostingRegressor(loss='quantile', alpha=q_lo, random_state=42)
g_hi = GradientBoostingRegressor(loss='quantile', alpha=q_hi, random_state=42)

# train on the same train split (unit holdout)
g_lo.fit(X_tr, y_tr)
g_hi.fit(X_tr, y_tr)
lo = g_lo.predict(X_va)
hi = g_hi.predict(X_va)
pred_mid = (lo + hi) / 2

coverage = float(((y_va >= lo) & (y_va <= hi)).mean())
width = float(np.mean(hi - lo))
print('interval coverage:', coverage, 'avg width:', width)

df_int = pd.DataFrame({'y_true': y_va, 'q10': lo, 'q90': hi, 'mid': pred_mid})
df_int.to_csv('P4_Unit20_Results/05_interval_holdout.csv', index=False)
print('Saved: P4_Unit20_Results/05_interval_holdout.csv')


## 風險/成本：用保守門檻做維護觸發

示範一個最小成本模型：
- 太早維護（false early）→ 成本 `C_early`
- 太晚維護（miss near-failure）→ 成本 `C_late`（通常更大）

用 RUL 下界（例如 q10）做觸發門檻通常更保守。

In [None]:
import numpy as np
import pandas as pd

C_early = 1.0
C_late = 8.0

# choose a trigger threshold (cycles)
thresholds = np.arange(10, 130, 10)
rows = []
for T in thresholds:
    # trigger maintenance if predicted lower bound <= T
    trig = lo <= T
    true_need = y_va <= T
    false_early = trig & (~true_need)
    miss = (~trig) & true_need
    cost = C_early * false_early.mean() + C_late * miss.mean()
    rows.append({'T': int(T), 'false_early_rate': float(false_early.mean()), 'miss_rate': float(miss.mean()), 'cost': float(cost)})
df_cost = pd.DataFrame(rows)
df_cost.to_csv('P4_Unit20_Results/06_cost_curve.csv', index=False)
print('Saved: P4_Unit20_Results/06_cost_curve.csv')
df_cost


In [None]:
# Visualize predictions for a few held-out engines
best_name = score.iloc[0]['model']
best_model = models[best_name]
best_model.fit(X_tr, y_tr)

units_show = sorted(va['unit'].unique())[:4]
fig, axes = plt.subplots(len(units_show), 1, figsize=(10, 8), sharex=False)
if len(units_show) == 1:
    axes = [axes]

for ax, uid in zip(axes, units_show):
    sub = va[va['unit'] == uid].sort_values('cycle')
    pred = best_model.predict(sub[feature_cols].to_numpy())
    ax.plot(sub['cycle'], sub['RUL_cap'], label='true', lw=1)
    ax.plot(sub['cycle'], pred, label='pred', lw=1)
    ax.set_title(f'Unit {uid} — {best_name}')
    ax.set_ylabel('RUL (cap)')
    ax.legend(loc='upper right')

axes[-1].set_xlabel('cycle')
plt.tight_layout()
plt.savefig('P4_Unit20_Results/02_rul_curves_examples.png', dpi=150)
print('Saved: P4_Unit20_Results/02_rul_curves_examples.png')

In [None]:
# Optional: evaluate on official test set (predict RUL at last observed cycle)
last_rows = test.sort_values(['unit','cycle']).groupby('unit').tail(1).copy()
X_last = last_rows[feature_cols].to_numpy()
pred_rul = best_model.predict(X_last)

df_test = pd.DataFrame({
    'unit': last_rows['unit'].to_numpy(),
    'pred_RUL': pred_rul,
    'true_RUL': true_rul_test['RUL'].to_numpy()[:len(pred_rul)],
})
df_test['err'] = df_test['pred_RUL'] - df_test['true_RUL']
df_test.to_csv('P4_Unit20_Results/03_test_rul_predictions.csv', index=False)
print('Saved: P4_Unit20_Results/03_test_rul_predictions.csv')

rmse_test = mean_squared_error(df_test['true_RUL'], df_test['pred_RUL'], squared=False)
mae_test = mean_absolute_error(df_test['true_RUL'], df_test['pred_RUL'])
print('Test RMSE:', rmse_test, 'MAE:', mae_test)

fig, ax = plt.subplots(1, 1, figsize=(6,5))
ax.scatter(df_test['true_RUL'], df_test['pred_RUL'], s=12, alpha=0.7)
lims = [0, max(df_test['true_RUL'].max(), df_test['pred_RUL'].max())]
ax.plot(lims, lims, 'k--', lw=1)
ax.set_xlabel('true RUL')
ax.set_ylabel('pred RUL')
ax.set_title('RUL at last cycle (test set)')
plt.tight_layout()
plt.savefig('P4_Unit20_Results/04_test_scatter.png', dpi=150)
print('Saved: P4_Unit20_Results/04_test_scatter.png')

df_test.head()

## （選讀）深度序列模型（RNN/LSTM）

若你的環境有 TensorFlow，你可以把每台引擎的序列切成滑動視窗，輸入 LSTM 來預測 RUL；並把「高估/低估」轉成維護成本做決策。

In [None]:
try:
    import tensorflow as tf
    print('TensorFlow version:', tf.__version__)
except Exception as e:
    print('TensorFlow not available. Skip LSTM RUL demo.')
    print('Reason:', e)