In [27]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import IsolationForest
from sklearn.metrics import classification_report, accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import precision_recall_curve
import matplotlib.pyplot as plt
import seaborn as sns
import kagglehub

# 設定隨機種子和參數
RANDOM_SEED = 42
TEST_SIZE = 0.3
BATCH_SIZE = 512
EPOCHS = 100
LEARNING_RATE = 0.001

In [16]:
# 設定隨機種子
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed(RANDOM_SEED)

# 檢查GPU可用性
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'使用設備: {device}')
if torch.cuda.is_available():
    print(f'GPU名稱: {torch.cuda.get_device_name(0)}')

使用設備: cuda
GPU名稱: NVIDIA GeForce RTX 4080 SUPER


In [17]:
print("載入數據集...")
path = kagglehub.dataset_download("mlg-ulb/creditcardfraud")
data = pd.read_csv(f"{path}/creditcard.csv")
data['Class'] = data['Class'].astype(int)

# 數據預處理
print("數據預處理...")
data = data.drop(['Time'], axis=1)

# 標準化 Amount 欄位
scaler_amount = StandardScaler()
data['Amount'] = scaler_amount.fit_transform(data['Amount'].values.reshape(-1, 1))

# 顯示數據分布
fraud = data[data['Class'] == 1]
nonfraud = data[data['Class'] == 0]
print(f'詐欺交易: {len(fraud)}, 正常交易: {len(nonfraud)}')
print(f'詐欺交易比例: {len(fraud)/(len(fraud) + len(nonfraud))*100:.3f}%')

# 準備特徵和標籤
X = data.drop(columns=['Class']).values
y = data['Class'].values

# 分割訓練集和測試集
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=TEST_SIZE, random_state=RANDOM_SEED, stratify=y
)

# 標準化特徵
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

print(f"訓練集大小: {X_train_scaled.shape}")
print(f"測試集大小: {X_test_scaled.shape}")

載入數據集...
數據預處理...
詐欺交易: 492, 正常交易: 284315
詐欺交易比例: 0.173%
訓練集大小: (199364, 29)
測試集大小: (85443, 29)


In [18]:
# 步驟1: 非監督學習 - Isolation Forest
print("\n=== 步驟1: 使用 Isolation Forest 進行異常檢測 ===")

# 使用正常交易數據訓練 Isolation Forest
normal_data = X_train_scaled[y_train == 0]
# 使用更多數據但設置合理的樣本數
sample_size = min(10000, len(normal_data))  # 使用更多樣本
normal_sample = normal_data[:sample_size]
print(f"用於訓練 Isolation Forest 的正常交易數量: {len(normal_sample)}")

# 訓練 Isolation Forest
isolation_forest = IsolationForest(
    contamination=0.02,  # 降低預期異常比例至2%，更貼近實際
    random_state=RANDOM_SEED,
    n_estimators=200,    # 增加樹的數量提高穩定性
    max_samples=1000,    # 固定樣本數
    max_features=0.8,    # 使用80%的特徵
    n_jobs=-1
)

isolation_forest.fit(normal_data)

# 獲取異常分數
train_anomaly_scores = isolation_forest.decision_function(X_train_scaled)
test_anomaly_scores = isolation_forest.decision_function(X_test_scaled)

# 預測異常（-1為異常，1為正常）
train_anomaly_pred = isolation_forest.predict(X_train_scaled)
test_anomaly_pred = isolation_forest.predict(X_test_scaled)

# 將異常分數轉換為特徵（分數越低越可能是異常）
train_anomaly_features = train_anomaly_scores.reshape(-1, 1)
test_anomaly_features = test_anomaly_scores.reshape(-1, 1)

print(f"Isolation Forest 檢測到的異常數量 (訓練集): {np.sum(train_anomaly_pred == -1)}")
print(f"Isolation Forest 檢測到的異常數量 (測試集): {np.sum(test_anomaly_pred == -1)}")


=== 步驟1: 使用 Isolation Forest 進行異常檢測 ===
用於訓練 Isolation Forest 的正常交易數量: 10000


Isolation Forest 檢測到的異常數量 (訓練集): 4250
Isolation Forest 檢測到的異常數量 (測試集): 1863


In [19]:
# 步驟2: 組合特徵
print("\n=== 步驟2: 組合原始特徵和異常分數特徵 ===")

# 將異常分數作為新特徵加入
X_train_enhanced = np.concatenate([X_train_scaled, train_anomaly_features], axis=1)
X_test_enhanced = np.concatenate([X_test_scaled, test_anomaly_features], axis=1)

print(f"增強後的特徵維度: {X_train_enhanced.shape[1]}")


=== 步驟2: 組合原始特徵和異常分數特徵 ===
增強後的特徵維度: 30


In [22]:
# 步驟3: 定義深度神經網路模型
class FraudDetectionNet(nn.Module):
    def __init__(self, input_dim, hidden_dims=[128, 64, 32]):  # 減少模型複雜度
        super(FraudDetectionNet, self).__init__()
        
        layers = []
        prev_dim = input_dim
        
        for i, hidden_dim in enumerate(hidden_dims):
            layers.extend([
                nn.Linear(prev_dim, hidden_dim),
                nn.BatchNorm1d(hidden_dim),
                nn.ReLU(),
                nn.Dropout(0.4 if i < len(hidden_dims)-1 else 0.2)  # 遞減 dropout
            ])
            prev_dim = hidden_dim
        
        # 輸出層
        layers.append(nn.Linear(prev_dim, 1))
        # 移除 Sigmoid，在損失函數中處理
        
        self.network = nn.Sequential(*layers)
    
    def forward(self, x):
        return self.network(x)

In [23]:
# 創建模型
input_dim = X_train_enhanced.shape[1]
model = FraudDetectionNet(input_dim).to(device)

print(f"\n=== 步驟3: 創建深度神經網路模型 ===")
print(f"模型結構:")
print(model)

# 計算類別權重以處理不平衡數據 - 調整權重策略
class_weights = compute_class_weight('balanced', classes=np.unique(y_train), y=y_train)
# 降低正類權重，避免過度預測詐欺
pos_weight = torch.tensor([class_weights[1]/class_weights[0] * 0.1], dtype=torch.float32).to(device)  # 降低10倍

print(f"類別權重: {class_weights}")
print(f"正類權重: {pos_weight.item():.2f}")


=== 步驟3: 創建深度神經網路模型 ===
模型結構:
FraudDetectionNet(
  (network): Sequential(
    (0): Linear(in_features=30, out_features=128, bias=True)
    (1): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): Dropout(p=0.4, inplace=False)
    (4): Linear(in_features=128, out_features=64, bias=True)
    (5): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (6): ReLU()
    (7): Dropout(p=0.4, inplace=False)
    (8): Linear(in_features=64, out_features=32, bias=True)
    (9): BatchNorm1d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (10): ReLU()
    (11): Dropout(p=0.2, inplace=False)
    (12): Linear(in_features=32, out_features=1, bias=True)
  )
)
類別權重: [  0.50086423 289.77325581]
正類權重: 57.85


In [24]:
# 定義損失函數和優化器
criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE, weight_decay=1e-5)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=10, factor=0.5)


In [25]:
# 準備數據載入器
train_dataset = TensorDataset(
    torch.FloatTensor(X_train_enhanced).to(device),
    torch.FloatTensor(y_train).to(device)
)
test_dataset = TensorDataset(
    torch.FloatTensor(X_test_enhanced).to(device),
    torch.FloatTensor(y_test).to(device)
)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

In [28]:
# 步驟4: 訓練模型
print(f"\n=== 步驟4: 訓練深度神經網路模型 ===")

train_losses = []
val_losses = []
best_f1 = 0
best_model_state = None
patience_counter = 0
patience = 20

model.train()
for epoch in range(EPOCHS):
    epoch_train_loss = 0
    for batch_X, batch_y in train_loader:
        optimizer.zero_grad()
        
        outputs = model(batch_X).squeeze()
        loss = criterion(outputs, batch_y)
        
        loss.backward()
        # 梯度裁剪防止梯度爆炸
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        
        epoch_train_loss += loss.item()
    
    # 驗證 - 使用動態閾值
    model.eval()
    val_loss = 0
    all_probs = []
    all_labels = []
    
    with torch.no_grad():
        for batch_X, batch_y in test_loader:
            outputs = model(batch_X).squeeze()
            loss = criterion(outputs, batch_y)
            val_loss += loss.item()
            
            probs = torch.sigmoid(outputs)
            all_probs.extend(probs.cpu().numpy())
            all_labels.extend(batch_y.cpu().numpy())
    
    # 計算最佳閾值和F1分數
    all_probs = np.array(all_probs)
    all_labels = np.array(all_labels)
    
    # 使用 PR 曲線找最佳閾值
    precision, recall, thresholds = precision_recall_curve(all_labels, all_probs)
    f1_scores = 2 * (precision * recall) / (precision + recall + 1e-8)
    best_idx = np.argmax(f1_scores)
    current_f1 = f1_scores[best_idx]
    
    train_losses.append(epoch_train_loss / len(train_loader))
    val_losses.append(val_loss / len(test_loader))
    
    # 保存最佳模型和早停
    if current_f1 > best_f1:
        best_f1 = current_f1
        best_model_state = model.state_dict().copy()
        patience_counter = 0
    else:
        patience_counter += 1
    
    scheduler.step(val_loss)
    
    if (epoch + 1) % 10 == 0:
        print(f'Epoch [{epoch+1}/{EPOCHS}], Train Loss: {train_losses[-1]:.4f}, '
              f'Val Loss: {val_losses[-1]:.4f}, F1: {current_f1:.4f}')
    
    # 早停
    if patience_counter >= patience:
        print(f"早停於第 {epoch+1} 個 epoch")
        break
    
    model.train()


=== 步驟4: 訓練深度神經網路模型 ===
Epoch [10/100], Train Loss: 0.0558, Val Loss: 0.0974, F1: 0.8029
Epoch [20/100], Train Loss: 0.0346, Val Loss: 0.1086, F1: 0.8058
Epoch [30/100], Train Loss: 0.0283, Val Loss: 0.1234, F1: 0.8127
Epoch [40/100], Train Loss: 0.0230, Val Loss: 0.1322, F1: 0.8169
Epoch [50/100], Train Loss: 0.0210, Val Loss: 0.1372, F1: 0.8085
早停於第 52 個 epoch


In [29]:
# 載入最佳模型
model.load_state_dict(best_model_state)
print(f"最佳 F1 分數: {best_f1:.4f}")

最佳 F1 分數: 0.8198


In [30]:
# 步驟5: 評估模型
print(f"\n=== 步驟5: 模型評估 ===")

def evaluation(y_true, y_pred, model_name="Model"):
    accuracy = accuracy_score(y_true, y_pred)
    precision = precision_score(y_true, y_pred, zero_division=0)
    recall = recall_score(y_true, y_pred)
    f1 = f1_score(y_true, y_pred)
    
    print(f'\n{model_name} 評估結果:')
    print('===' * 20)
    print(f' 準確率 (Accuracy): {accuracy:.4f}')
    print(f' 精確率 (Precision): {precision:.4f}')
    print(f' 召回率 (Recall): {recall:.4f}')
    print(f' F1 分數: {f1:.4f}')
    print("\n詳細分類報告:")
    print(classification_report(y_true, y_pred))
    
    return accuracy, precision, recall, f1


=== 步驟5: 模型評估 ===


In [31]:
# 最終預測
model.eval()
final_probs = []

with torch.no_grad():
    for batch_X, _ in test_loader:
        outputs = model(batch_X).squeeze()
        probs = torch.sigmoid(outputs)
        final_probs.extend(probs.cpu().numpy())

final_probs = np.array(final_probs)

In [32]:
# 動態找出最佳閾值
from sklearn.metrics import precision_recall_curve
precision, recall, thresholds = precision_recall_curve(y_test, final_probs)

In [33]:
# 選擇 F1 分數最高的閾值
f1_scores = 2 * (precision * recall) / (precision + recall + 1e-8)
best_threshold_idx = np.argmax(f1_scores)
optimal_threshold = thresholds[best_threshold_idx]

print(f"最佳閾值: {optimal_threshold:.4f}")
print(f"該閾值下的 F1 分數: {f1_scores[best_threshold_idx]:.4f}")

# 使用最佳閾值進行預測
final_preds = (final_probs > optimal_threshold).astype(int)

最佳閾值: 0.9863
該閾值下的 F1 分數: 0.8085


In [34]:
# 新增：閾值優化和多種評估方法
print(f"\n=== 閾值優化和多種評估 ===")

# 1. PR曲線分析
from sklearn.metrics import precision_recall_curve, average_precision_score
import matplotlib.pyplot as plt

precision, recall, thresholds = precision_recall_curve(y_test, final_probs)
ap_score = average_precision_score(y_test, final_probs)

# 找到最佳 F1 閾值
f1_scores = 2 * (precision * recall) / (precision + recall + 1e-8)
best_f1_idx = np.argmax(f1_scores)
best_f1_threshold = thresholds[best_f1_idx]

# 找到精確率90%時的閾值（高精確率策略）
high_precision_idx = np.where(precision >= 0.9)[0]
if len(high_precision_idx) > 0:
    high_precision_threshold = thresholds[high_precision_idx[0]]
else:
    high_precision_threshold = 0.9

# 找到召回率80%時的閾值（平衡策略）
balanced_recall_idx = np.where(recall >= 0.8)[0]
if len(balanced_recall_idx) > 0:
    balanced_threshold = thresholds[balanced_recall_idx[-1]]
else:
    balanced_threshold = 0.3

print(f"Average Precision Score: {ap_score:.4f}")
print(f"最佳 F1 閾值: {best_f1_threshold:.4f} (F1: {f1_scores[best_f1_idx]:.4f})")
print(f"高精確率閾值: {high_precision_threshold:.4f}")
print(f"平衡閾值: {balanced_threshold:.4f}")


=== 閾值優化和多種評估 ===
Average Precision Score: 0.7498
最佳 F1 閾值: 0.9863 (F1: 0.8085)
高精確率閾值: 0.9997
平衡閾值: 0.9564


In [35]:
# 2. 使用不同閾值進行評估
thresholds_to_test = [best_f1_threshold, high_precision_threshold, balanced_threshold, 0.5]
threshold_names = ['最佳F1', '高精確率', '平衡策略', '預設0.5']

results_comparison = []
for i, (threshold, name) in enumerate(zip(thresholds_to_test, threshold_names)):
    preds = (final_probs > threshold).astype(int)
    acc = accuracy_score(y_test, preds)
    prec = precision_score(y_test, preds, zero_division=0)
    rec = recall_score(y_test, preds)
    f1 = f1_score(y_test, preds)
    
    results_comparison.append({
        '策略': name,
        '閾值': threshold,
        '準確率': acc,
        '精確率': prec,
        '召回率': rec,
        'F1分數': f1
    })
    
    print(f"\n{name} (閾值={threshold:.4f}):")
    print(f"  準確率: {acc:.4f}, 精確率: {prec:.4f}, 召回率: {rec:.4f}, F1: {f1:.4f}")



最佳F1 (閾值=0.9863):
  準確率: 0.9994, 精確率: 0.8496, 召回率: 0.7635, F1: 0.8043

高精確率 (閾值=0.9997):
  準確率: 0.9988, 精確率: 0.9057, 召回率: 0.3243, F1: 0.4776

平衡策略 (閾值=0.9564):
  準確率: 0.9992, 精確率: 0.7712, 召回率: 0.7973, F1: 0.7841

預設0.5 (閾值=0.5000):
  準確率: 0.9988, 精確率: 0.6080, 召回率: 0.8176, F1: 0.6974


In [36]:
# 創建比較表格
comparison_df = pd.DataFrame(results_comparison)
print(f"\n=== 不同閾值策略比較 ===")
print(comparison_df.round(4))

# 比較 Isolation Forest 單獨的結果
iso_pred_binary = (test_anomaly_pred == -1).astype(int)
print(f"\n=== Isolation Forest 單獨結果比較 ===")
evaluation(y_test, iso_pred_binary, "Isolation Forest (單獨)")


=== 不同閾值策略比較 ===
      策略      閾值     準確率     精確率     召回率    F1分數
0   最佳F1  0.9863  0.9994  0.8496  0.7635  0.8043
1   高精確率  0.9997  0.9988  0.9057  0.3243  0.4776
2   平衡策略  0.9564  0.9992  0.7712  0.7973  0.7841
3  預設0.5  0.5000  0.9988  0.6080  0.8176  0.6974

=== Isolation Forest 單獨結果比較 ===

Isolation Forest (單獨) 評估結果:
 準確率 (Accuracy): 0.9790
 精確率 (Precision): 0.0590
 召回率 (Recall): 0.7432
 F1 分數: 0.1094

詳細分類報告:
              precision    recall  f1-score   support

           0       1.00      0.98      0.99     85295
           1       0.06      0.74      0.11       148

    accuracy                           0.98     85443
   macro avg       0.53      0.86      0.55     85443
weighted avg       1.00      0.98      0.99     85443



(0.9790386573505144,
 0.059044551798174985,
 0.7432432432432432,
 0.10939830929885629)