In [18]:
import pandas as pd
import numpy as np
from sklearn.model_selection import GroupKFold
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import confusion_matrix

# 讀資料

In [None]:
df = pd.read_csv("../指標計算/20250725/sorted_clips_with_info.csv")
df_unpaired = pd.read_csv("../指標計算/20250725/sorted_clips_with_info_unpaired.csv")

numeric_cols = df_unpaired.columns[5:65].tolist()
selected_cols = ['LAnkle_centroid_of_motion_x', 'RAnkle_centroid_of_motion_x',
                 'LShoulder_angular_velocities', 'RShoulder_angular_velocities',
                 'LHip_angular_velocities', 'RHip_angular_velocities',
                 'LShoulder_angles', 'RShoulder_angles',
                 'LKnee_angles', 'RKnee_angles',
                 'LHip_angles', 'RHip_angles']
meta_cols = ['filename', 'prefix', 'number', 'label', 'clips']

# 建模

In [37]:
# 設定模型
models = {
    'Logistic Regression': LogisticRegression(max_iter=1000),
    'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
    'XGBoost': XGBClassifier(eval_metric='logloss', random_state=42)
}

# 初始化 results dataframe，保留原始資訊
results = df[['prefix', 'filename', 'clips', 'label']].copy()
for name in models:
    results[name] = np.nan  # 先填入 NaN，之後再填入預測值

In [None]:
cols = selected_cols # 若讀取包含 "_s" 的檔案，則使用 selected_cols

In [39]:
# 以 prefix 作為 group key 做 GroupKFold
gkf = GroupKFold(n_splits=5)

X = df[cols].values
y = df['label'].values
groups = df['prefix'].values

X_unpaired = df_unpaired[cols].values
y_unpaired = df_unpaired['label'].values

In [40]:
for fold, (train_idx, test_idx) in enumerate(gkf.split(X, y, groups)):
    print(f"Fold {fold+1}")
    X_train, X_test = X[train_idx], X[test_idx]
    y_train = y[train_idx]

    # 若不使用 unpaired 資料，則使用此兩行並註解掉下面兩行
    X_train_full = X_train.copy()
    y_train_full = y_train.copy()

    # 加上 unpaired 資料
    # X_train_full = np.concatenate([X_train, X_unpaired], axis=0)
    # y_train_full = np.concatenate([y_train, y_unpaired], axis=0)
    
    # 印出每個 fold 的兩個 class 的數量比
    unique, counts = np.unique(y_train_full, return_counts=True)
    class_counts = dict(zip(unique, counts))
    print(f"Fold {fold+1} class counts: {class_counts}")

    # 計算 class weights，若不使用請註解掉
    # classes = np.unique(y_train_full)
    # class_weight = compute_class_weight('balanced', classes=classes, y=y_train_full)
    # class_weight_dict = dict(zip(classes, class_weight))
    # print(f"Class weights: {class_weight_dict}")
    # scale_pos_weight = class_weight_dict.get(1, 1) / class_weight_dict.get(0, 1) if 0 in class_weight_dict and 1 in class_weight_dict else 1

    for name, model in models.items():
        # 若不使用 class weights，則註解掉下面這段 if-elif
        # if name == 'Logistic Regression':
        #     print(f"Training {name} during fold {fold+1}")
        #     model.set_params(class_weight=class_weight_dict)
        # elif name == 'Random Forest':
        #     print(f"Training {name} during fold {fold+1}")
        #     model.set_params(class_weight=class_weight_dict)
        # elif name == 'XGBoost':
        #     print(f"Training {name} during fold {fold+1}")
        #     model.set_params(scale_pos_weight=scale_pos_weight)
        model.fit(X_train_full, y_train_full)
        preds = model.predict(X[test_idx])
        results.loc[test_idx, name] = preds  # 填入預測結果

# 確保預測結果為整數型別
for name in models:
    results[name] = results[name].astype(int)

Fold 1
Fold 1 class counts: {0: 1062, 1: 324}
Fold 2
Fold 2 class counts: {0: 1008, 1: 381}
Fold 3
Fold 3 class counts: {0: 1110, 1: 276}
Fold 4
Fold 4 class counts: {0: 1110, 1: 279}
Fold 5
Fold 5 class counts: {0: 1062, 1: 324}


合併 clips：以「連續 clip 的判定結果」來決定整部影片是否為異常

In [41]:
model_cols = ['Logistic Regression', 'Random Forest', 'XGBoost']
gap = 5
thresholds = [5, 10, 15]

# 預設所有 pred 欄位為 -1
for model in model_cols:
    for threshold in thresholds:
        pred_col = f'pred_{model.replace(" ", "_")}_{threshold}'
        results[pred_col] = -1  # 初始化欄位為 -1

for model in model_cols:
    for threshold in thresholds:
        cnt = 0
        pred_col = f'pred_{model.replace(" ", "_")}_{threshold}'
        for prefix, group in results.groupby('prefix'):
            if group['clips'].iloc[0] < threshold:
                cnt += 1
                continue  # 該 group 太短，不考慮

            values = group[model].values
            mask = values == 1  # 1 表示異常
            count = 0
            last_idx = -gap - 1
            found = False

            for idx, val in enumerate(mask):
                if val:
                    if idx - last_idx <= gap:
                        count += 1
                    else:
                        count = 1
                    last_idx = idx
                    if count >= threshold:
                        found = True
                        break
                else:
                    if idx - last_idx > gap:
                        count = 0

            result = 1 if found else 0
            results.loc[results['prefix'] == prefix, pred_col] = result

        print(f"[{model}] Threshold {threshold}: Skipped {cnt} (clips < threshold)")

[Logistic Regression] Threshold 5: Skipped 0 (clips < threshold)
[Logistic Regression] Threshold 10: Skipped 8 (clips < threshold)
[Logistic Regression] Threshold 15: Skipped 13 (clips < threshold)
[Random Forest] Threshold 5: Skipped 0 (clips < threshold)
[Random Forest] Threshold 10: Skipped 8 (clips < threshold)
[Random Forest] Threshold 15: Skipped 13 (clips < threshold)
[XGBoost] Threshold 5: Skipped 0 (clips < threshold)
[XGBoost] Threshold 10: Skipped 8 (clips < threshold)
[XGBoost] Threshold 15: Skipped 13 (clips < threshold)


計算分數

In [42]:
df_grouped = results.groupby('prefix').first().reset_index()

metrics_all = {}

for model in model_cols:
    for threshold in thresholds:
        pred_col = f'pred_{model.replace(" ", "_")}_{threshold}'
        # 過濾掉 pred == -1 的資料
        valid_mask = df_grouped[pred_col] != -1
        y_true = df_grouped.loc[valid_mask, 'label'].values
        y_pred = df_grouped.loc[valid_mask, pred_col].values

        key = f'{model}_{threshold}'

        # 若沒有 valid 資料就跳過該 threshold
        if len(y_true) == 0:
            metrics_all[key] = {
                'Precision': None,
                'Sensitivity': None,
                'Specificity': None,
                'Accuracy': None
            }
            continue

        tn, fp, fn, tp = confusion_matrix(y_true, y_pred, labels=[0, 1]).ravel()

        precision = tp / (tp + fp) if tp + fp > 0 else 0
        sensitivity = tp / (tp + fn) if tp + fn > 0 else 0
        specificity = tn / (tn + fp) if tn + fp > 0 else 0
        accuracy = (tp + tn) / (tp + tn + fp + fn)

        metrics_all[key] = {
            'Precision': round(precision, 3),
            'Sensitivity': round(sensitivity, 3),
            'Specificity': round(specificity, 3),
            'Accuracy': round(accuracy, 3)
        }

# 整理成 DataFrame
df_metrics = pd.DataFrame(metrics_all).T
df_metrics.index.name = 'Model_Threshold'
df_metrics

Unnamed: 0_level_0,Precision,Sensitivity,Specificity,Accuracy
Model_Threshold,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
Logistic Regression_5,0.0,0.0,0.833,0.472
Logistic Regression_10,0.0,0.0,0.84,0.467
Logistic Regression_15,0.0,0.0,0.87,0.5
Random Forest_5,0.381,0.348,0.567,0.472
Random Forest_10,0.556,0.25,0.84,0.578
Random Forest_15,0.75,0.176,0.957,0.625
XGBoost_5,0.381,0.348,0.567,0.472
XGBoost_10,0.571,0.2,0.88,0.578
XGBoost_15,0.4,0.118,0.87,0.55


# 分析模型對「動作障礙」個案的預測結果

In [None]:
case_analysis = results[results['prefix'] == '81_N_M_SETD2_infantile spasm_0.39y(mosaic)']
# case_analysis.to_excel("./case_analysis_filter_2.5_s_withunpaired.xlsx", index=False)