In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)


# Learned Weights for base models:
#  - model_A_prob: 0.0163
#  - model_B_prob: 0.9710
#  - model_C_prob: 0.0127
#
# --- 설정 (사용자 환경에 맞게 수정) ---
# CSV_FILE_PATHS = {
#    'FreqNet': '/content/drive/MyDrive/deepfakedetection_csv/FreqNet_Predictions.csv',
#    'GenConViT': '/content/drive/MyDrive/deepfakedetection_csv/GenConViT_Predictions.csv',
#    'CaFft': '/content/drive/MyDrive/deepfakedetection_csv/CaFft_predictions.csv'
    # model: PATH
#}
# COLUMN NAME SETTING
#COLUMN_NAMES = {
#     'video_id': 'video_id',
#     'FreqNet': {
#         'fake_prob': 'fake_prob',
#         'true_label': 'true_label'
#     },
#     'GenConViT': {
#         'fake_prob': 'fake_prob',
#         # 'true_label': 'true_label'
#     },
#     'CaFft': {
#         'fake_prob': 'fake_prob',
#         # 'true_label': 'true_label'
#     }
# }

# # Soft Voting 시 각 모델에 부여할 가중치
# MODEL_WEIGHTS = {
#     'FreqNet': 0.0163,
#     'GenConViT': 0.9710,
#     'CaFft': 0.127
# } # or None

Mounted at /content/drive


In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
import os

def prepare_ensemble_csv(
    model_csv_paths: dict,
    output_dir: str = "./",
    test_size: float = 0.3,
    label_col: str = "true_label",
    prob_col: str = "fake_prob",
    id_col: str = "video_id",
    random_state: int = 42
):
    """
    모델 예측 CSV들을 받아 병합하고 validation/test로 나눠 저장.
    model_csv_paths: {'model_A': 'path/to/csv1.csv', ...}
    output_dir: 저장 경로
    """
    dfs = {}
    label_source_df = None

    # 각 모델의 예측 결과 로드 및 리네이밍
    for model_name, path in model_csv_paths.items():
        df = pd.read_csv(path)

        # 첫 번째 모델에서 true_label 가져옴
        if label_source_df is None and label_col in df.columns:
            label_source_df = df[[id_col, label_col]].copy()

        if prob_col not in df.columns or id_col not in df.columns:
            raise ValueError(f"{model_name} 파일에 '{prob_col}' 또는 '{id_col}' 컬럼이 없음")

        df = df[[id_col, prob_col]].rename(columns={prob_col: f"{model_name}_prob"})
        dfs[model_name] = df

    # 병합 시작 (model_A부터 시작)
    merged_df = list(dfs.values())[0]
    for df in list(dfs.values())[1:]:
        merged_df = pd.merge(merged_df, df, on=id_col, how="inner")

    # true_label 병합
    if label_source_df is not None:
        merged_df = pd.merge(merged_df, label_source_df, on=id_col, how="inner")

        # 문자열 레이블 처리
        if merged_df[label_col].dtype == object:
            label_map = {"REAL": 0, "FAKE": 1}
            merged_df[label_col] = merged_df[label_col].map(label_map)

    else:
        raise ValueError("true_label 컬럼을 가진 CSV 파일이 하나 이상 필요합니다.")

    # 정렬
    merged_df = merged_df.sort_values(by=id_col).reset_index(drop=True)

    # Validation/Test 분할
    val_df, test_df = train_test_split(
        merged_df, test_size=test_size,
        stratify=merged_df[label_col], random_state=random_state
    )

    # 저장
    os.makedirs(output_dir, exist_ok=True)
    val_path = os.path.join(output_dir, "validation_predictions.csv")
    test_path = os.path.join(output_dir, "test_predictions.csv")
    val_df.to_csv(val_path, index=False)
    test_df.to_csv(test_path, index=False)

    print(f"[✓] 저장 완료:\n  Validation: {val_path}\n  Test: {test_path}")
    return val_df, test_df

# --- 사용 예시 ---
model_csvs = {
    "model_A": "/content/drive/MyDrive/deepfakedetection_csv/FreqNet_Predictions.csv",
    "model_B": "/content/drive/MyDrive/deepfakedetection_csv/GenConViT_Predictions.csv",
    "model_C": "/content/drive/MyDrive/deepfakedetection_csv/CaFft_predictions.csv",
    "model_D": "/content/drive/MyDrive/deepfakedetection_csv/face-x-ray_predictions.csv",
    "model_E": "/content/drive/MyDrive/deepfakedetection_csv/ResNext&LSTM_predictions.csv"
}

val_df, test_df = prepare_ensemble_csv(model_csvs, output_dir="/content/drive/MyDrive/deepfakedetection_csv")


[✓] 저장 완료:
  Validation: /content/drive/MyDrive/deepfakedetection_csv/validation_predictions.csv
  Test: /content/drive/MyDrive/deepfakedetection_csv/test_predictions.csv


In [None]:
# adaptive_ensemble_train.py

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import pandas as pd
import numpy as np
from sklearn.metrics import roc_auc_score, accuracy_score, f1_score
import logging

# --- 로깅 설정 ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- Adaptive Ensemble Model 정의 ---
class AdaptiveWeightedEnsemble(nn.Module):
    def __init__(self, num_base_models):
        super().__init__()
        self.base_model_weights = nn.Parameter(torch.ones(num_base_models) / num_base_models)

    def forward(self, base_model_predictions):
        normalized_weights = torch.softmax(self.base_model_weights, dim=0)
        ensemble_prediction_prob = torch.sum(base_model_predictions * normalized_weights, dim=1).unsqueeze(1)
        return ensemble_prediction_prob, normalized_weights

    def get_learned_weights(self):
        return torch.softmax(self.base_model_weights, dim=0).detach().cpu().numpy()

# --- 설정 및 하이퍼파라미터 ---
VALIDATION_CSV_PATH = '/content/drive/MyDrive/deepfakedetection_csv/validation_predictions.csv'
TEST_CSV_PATH = '/content/drive/MyDrive/deepfakedetection_csv/test_predictions.csv'
MODEL_SAVE_PATH = 'adaptive_ensemble_weights.pth'
NUM_BASE_MODELS = 3
BASE_MODEL_PROB_COLUMNS = ['model_A_prob', 'model_B_prob', 'model_C_prob', 'model_D_prob', 'model_E_prob']
LABEL_COLUMN = 'true_label'
VIDEO_ID_COLUMN = 'video_id'

LEARNING_RATE = 0.01
NUM_EPOCHS = 100
BATCH_SIZE = 64
PATIENCE_EARLY_STOPPING = 10

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logging.info(f"Using device: {DEVICE}")

# --- 데이터 로딩 함수 ---
def load_data_from_csv(csv_path, prob_columns, label_column):
    df = pd.read_csv(csv_path)
    X = df[prob_columns].values.astype(np.float32)
    y = df[label_column].values.astype(np.float32).reshape(-1, 1)
    X_tensor = torch.tensor(X, dtype=torch.float32)
    y_tensor = torch.tensor(y, dtype=torch.float32)
    return X_tensor, y_tensor

# --- 메인 실행 ---
if __name__ == '__main__':
    logging.info("Loading validation data...")
    X_val, y_val = load_data_from_csv(VALIDATION_CSV_PATH, BASE_MODEL_PROB_COLUMNS, LABEL_COLUMN)
    val_loader = DataLoader(TensorDataset(X_val, y_val), batch_size=BATCH_SIZE, shuffle=False)

    logging.info("Loading test data...")
    X_test, y_test = load_data_from_csv(TEST_CSV_PATH, BASE_MODEL_PROB_COLUMNS, LABEL_COLUMN)
    test_loader = DataLoader(TensorDataset(X_test, y_test), batch_size=BATCH_SIZE, shuffle=False)

    ensemble_model = AdaptiveWeightedEnsemble(num_base_models=NUM_BASE_MODELS).to(DEVICE)
    criterion = nn.BCELoss()
    optimizer = optim.AdamW(ensemble_model.parameters(), lr=LEARNING_RATE)

    best_val_loss = float('inf')
    epochs_no_improve = 0

    logging.info("Starting training of ensemble weights...")
    for epoch in range(NUM_EPOCHS):
        ensemble_model.train()
        train_loss_epoch = 0
        for batch_X, batch_y in val_loader:
            batch_X, batch_y = batch_X.to(DEVICE), batch_y.to(DEVICE)
            optimizer.zero_grad()
            predictions, _ = ensemble_model(batch_X)
            loss = criterion(predictions, batch_y)
            loss.backward()
            optimizer.step()
            train_loss_epoch += loss.item()

        avg_train_loss = train_loss_epoch / len(val_loader)

        ensemble_model.eval()
        val_loss_epoch = 0
        with torch.no_grad():
            for batch_X_val, batch_y_val in val_loader:
                batch_X_val, batch_y_val = batch_X_val.to(DEVICE), batch_y_val.to(DEVICE)
                val_preds, _ = ensemble_model(batch_X_val)
                loss = criterion(val_preds, batch_y_val)
                val_loss_epoch += loss.item()

        avg_val_loss = val_loss_epoch / len(val_loader)
        logging.info(f"Epoch [{epoch+1}/{NUM_EPOCHS}], Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}")

        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            torch.save(ensemble_model.state_dict(), MODEL_SAVE_PATH)
            logging.info(f"Model saved to {MODEL_SAVE_PATH}")
            epochs_no_improve = 0
        else:
            epochs_no_improve += 1
            if epochs_no_improve == PATIENCE_EARLY_STOPPING:
                logging.info("Early stopping triggered.")
                break

    logging.info("Training finished.")

    logging.info("Evaluating on test set with learned weights...")
    ensemble_model.load_state_dict(torch.load(MODEL_SAVE_PATH))
    ensemble_model.eval()

    all_test_preds_probs = []
    all_test_labels = []
    with torch.no_grad():
        for batch_X_test, batch_y_test in test_loader:
            batch_X_test = batch_X_test.to(DEVICE)
            test_preds_probs, _ = ensemble_model(batch_X_test)
            all_test_preds_probs.extend(test_preds_probs.cpu().numpy().flatten())
            all_test_labels.extend(batch_y_test.cpu().numpy().flatten())

    all_test_preds_probs = np.array(all_test_preds_probs)
    all_test_labels = np.array(all_test_labels)
    all_test_preds_binary = (all_test_preds_probs > 0.5).astype(int)

    auc = roc_auc_score(all_test_labels, all_test_preds_probs)
    accuracy = accuracy_score(all_test_labels, all_test_preds_binary)
    f1 = f1_score(all_test_labels, all_test_preds_binary)

    learned_weights = ensemble_model.get_learned_weights()

from sklearn.metrics import confusion_matrix

# confusion matrix 계산
tn, fp, fn, tp = confusion_matrix(all_test_labels, all_test_preds_binary).ravel()

print("\n====== Final Test Set Results ======")
print(f"AUC Score     : {auc:.4f}")
print(f"Accuracy      : {accuracy:.4f}")
print(f"F1 Score      : {f1:.4f}")
print("\nConfusion Matrix (Threshold=0.5):")
print(f"  TP (Fake predicted as Fake) : {tp}")
print(f"  TN (Real predicted as Real) : {tn}")
print(f"  FP (Real predicted as Fake) : {fp}")
print(f"  FN (Fake predicted as Real) : {fn}")
print(f"  -> Fake detection accuracy   : {tp / (tp + fn):.4f}")
print(f"  -> Real detection accuracy   : {tn / (tn + fp):.4f}")

print("\nLearned Weights for base models:")
for model_name, weight in zip(BASE_MODEL_PROB_COLUMNS, learned_weights):
    print(f"  - {model_name}: {weight:.4f}")
print("====================================\n")



AUC Score     : 1.0000
Accuracy      : 0.9667
F1 Score      : 0.9789

Confusion Matrix (Threshold=0.5):
  TP (Fake predicted as Fake) : 93
  TN (Real predicted as Real) : 23
  FP (Real predicted as Fake) : 0
  FN (Fake predicted as Real) : 4
  -> Fake detection accuracy   : 0.9588
  -> Real detection accuracy   : 1.0000

Learned Weights for base models:
  - model_A_prob: 0.0082
  - model_B_prob: 0.9606
  - model_C_prob: 0.0057
  - model_D_prob: 0.0095
  - model_E_prob: 0.0160



In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# softVoting

import pandas as pd
import numpy as np
from sklearn.metrics import roc_auc_score, accuracy_score, f1_score, precision_score, recall_score
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- 설정 (사용자 환경에 맞게 수정) ---
CSV_FILE_PATHS = {
    'FreqNet': '/content/drive/MyDrive/deepfakedetection_csv/FreqNet_Predictions.csv',
    'GenConViT': '/content/drive/MyDrive/deepfakedetection_csv/GenConViT_Predictions.csv',
    'CaFft': '/content/drive/MyDrive/deepfakedetection_csv/CaFft_predictions.csv',
    'face-x-ray': '/content/drive/MyDrive/deepfakedetection_csv/face-x-ray_predictions.csv',
    'cnn-rnn': '/content/drive/MyDrive/deepfakedetection_csv/ResNext&LSTM_predictions.csv'
}
COLUMN_NAMES = {
    'video_id': 'video_id',
    'FreqNet': {
        'fake_prob': 'fake_prob',
        'true_label': 'true_label'
    },
    'GenConViT': {
        'fake_prob': 'fake_prob'
    },
    'CaFft': {
        'fake_prob': 'fake_prob'
    },
    'face-x-ray': {
    'fake_prob': 'fake_prob'
    },
    'cnn-rnn': {
    'fake_prob': 'fake_prob'
    }
}

# Soft Voting 시 각 모델에 부여할 가중치
MODEL_WEIGHTS = {
    'FreqNet': 0.0082,
    'GenConViT': 0.9613,
    'CaFft': 0.0057,
    'face-x-ray': 0.0088,
    'cnn-rnn': 0.016

} # or None
# AWARE-NET 방식으로 가중치를 학습시키려면 이전의 train_adaptive_ensemble.py 코드를 활용

# 최종 예측을 위한 임계값
THRESHOLD = 0.5
# --- 설정 끝 ---

if __name__ == '__main__':
    logging.info("--- Soft Voting 앙상블 시작 ---")

    loaded_dfs = {}
    model_names_loaded = []

    for model_name, csv_path in CSV_FILE_PATHS.items():
        try:
            df = pd.read_csv(csv_path)
            # 필요한 컬럼만 선택하고 컬럼 이름 통일 (fake_prob_{model_name}, true_label)
            cols_to_select = {COLUMN_NAMES['video_id']: 'video_id'}
            if 'fake_prob' in COLUMN_NAMES[model_name]:
                cols_to_select[COLUMN_NAMES[model_name]['fake_prob']] = f'fake_prob_{model_name}'
            else:
                logging.error(f"'{model_name}'에 대한 fake_prob 컬럼 이름이 COLUMN_NAMES에 정의되지 않았습니다.")
                continue

            if not model_names_loaded and 'true_label' in COLUMN_NAMES[model_name]:
                 cols_to_select[COLUMN_NAMES[model_name]['true_label']] = 'true_label'

            df_selected = df[list(cols_to_select.keys())].rename(columns=cols_to_select)
            loaded_dfs[model_name] = df_selected
            model_names_loaded.append(model_name)
            logging.info(f"'{csv_path}' 로드 완료. {len(df_selected)}개 샘플, 컬럼: {df_selected.columns.tolist()}")
        except FileNotFoundError:
            logging.error(f"파일을 찾을 수 없습니다: {csv_path}. 이 모델은 앙상블에서 제외됩니다.")
        except KeyError as e:
            logging.error(f"'{csv_path}' 파일에서 필요한 컬럼을 찾을 수 없습니다: {e}. 이 모델은 앙상블에서 제외됩니다.")
        except Exception as e:
            logging.error(f"'{csv_path}' 파일 로드 중 에러 발생: {e}. 이 모델은 앙상블에서 제외됩니다.")

    if len(model_names_loaded) < 2:
        logging.error("앙상블을 수행하기에 충분한 모델 예측 결과를 로드하지 못했습니다. (최소 2개 필요)")
        exit()

    label_mapping = {'REAL': 0, 'FAKE': 1}
    for model_name, df in loaded_dfs.items():
        if 'true_label' in df.columns:
            df['true_label'] = df['true_label'].map(label_mapping)
    final_df = loaded_dfs[model_names_loaded[0]].copy()
    if 'true_label' not in final_df.columns:
        for mn in model_names_loaded[1:]: # 나머지 모델들 순회
            if 'true_label' in loaded_dfs[mn].columns:
                final_df = pd.merge(final_df, loaded_dfs[mn][['video_id', 'true_label']], on=COLUMN_NAMES['video_id'], how='left')

                if 'true_label' in final_df.columns:
                     logging.info(f"'true_label' 컬럼을 '{mn}' 모델의 CSV에서 가져왔습니다.")
                     break
        if 'true_label' not in final_df.columns:
            logging.error("'true_label' 컬럼을 어떤 CSV에서도 찾을 수 없습니다. 성능 평가가 불가능합니다.")
            exit()


    for i in range(1, len(model_names_loaded)):
        model_name_to_merge = model_names_loaded[i]
        df_to_merge = loaded_dfs[model_name_to_merge][['video_id', f'fake_prob_{model_name_to_merge}']]
        final_df = pd.merge(final_df, df_to_merge, on=COLUMN_NAMES['video_id'], how='inner')

    logging.info(f"모든 CSV 병합 완료. 최종 {len(final_df)}개 샘플로 앙상블 수행.")
    if final_df.empty:
        logging.error("병합 후 남은 데이터가 없습니다. video_id가 일치하는지 확인하세요.")
        exit()

    # 가중치 설정
    if MODEL_WEIGHTS is None: #
        num_models = len(model_names_loaded)
        weights = {model_name: 1/num_models for model_name in model_names_loaded}
        logging.info(f"동일 가중치 사용: {weights}")
    else:
        weights = MODEL_WEIGHTS
        logging.info(f"사용자 정의 가중치 사용: {weights}")
        if set(weights.keys()) != set(model_names_loaded):
            logging.warning("MODEL_WEIGHTS에 정의된 모델과 실제 로드된 모델이 일치하지 않습니다. 동일 가중치로 대체합니다.")
            num_models = len(model_names_loaded)
            weights = {model_name: 1/num_models for model_name in model_names_loaded}


    # 앙상블 예측 확률 계산
    final_df['ensemble_fake_prob'] = 0.0
    for model_name in model_names_loaded:
        prob_col = f'fake_prob_{model_name}'
        if prob_col in final_df.columns:
            final_df['ensemble_fake_prob'] += final_df[prob_col].fillna(0) * weights.get(model_name, 0)
        else:
            logging.warning(f"'{prob_col}' 컬럼이 병합된 DataFrame에 없습니다. '{model_name}' 모델은 앙상블에서 제외됩니다.")

    final_df['ensemble_prediction'] = (final_df['ensemble_fake_prob'] > THRESHOLD).astype(int)

    if 'true_label' in final_df.columns:
        y_true = final_df['true_label']
        y_pred_prob_ensemble = final_df['ensemble_fake_prob']
        y_pred_binary_ensemble = final_df['ensemble_prediction']

        eval_df = final_df.dropna(subset=['true_label', 'ensemble_fake_prob'])
        if not eval_df.empty:
            y_true_eval = eval_df['true_label']
            y_pred_prob_eval = eval_df['ensemble_fake_prob']
            y_pred_binary_eval = eval_df['ensemble_prediction']

            auc = roc_auc_score(y_true_eval, y_pred_prob_eval)
            accuracy = accuracy_score(y_true_eval, y_pred_binary_eval)
            f1 = f1_score(y_true_eval, y_pred_binary_eval)
            precision = precision_score(y_true_eval, y_pred_binary_eval, zero_division=0)
            recall = recall_score(y_true_eval, y_pred_binary_eval, zero_division=0)

            logging.info("\n--- Soft Voting 앙상블 성능 ---")
            logging.info(f"사용된 모델: {model_names_loaded}")
            logging.info(f"가중치: {weights}")
            logging.info(f"AUC: {auc:.4f}")
            logging.info(f"Accuracy: {accuracy:.4f}")
            logging.info(f"F1 Score: {f1:.4f}")
            logging.info(f"Precision: {precision:.4f}")
            logging.info(f"Recall: {recall:.4f}")
            logging.info(f"평가에 사용된 샘플 수: {len(eval_df)}")
        else:
            logging.warning("평가할 수 있는 유효한 데이터가 없습니다 (NaN 값 등으로 인해).")

        logging.info("\n--- 개별 모델 성능 (참고용) ---")
        for model_name in model_names_loaded:
            prob_col = f'fake_prob_{model_name}'
            if prob_col in final_df.columns:
                # NaN 아닌 값들만 평가
                model_eval_df = final_df.dropna(subset=['true_label', prob_col])
                if not model_eval_df.empty:
                    m_y_true = model_eval_df['true_label']
                    m_y_pred_prob = model_eval_df[prob_col]
                    m_y_pred_binary = (m_y_pred_prob > THRESHOLD).astype(int)

                    m_auc = roc_auc_score(m_y_true, m_y_pred_prob)
                    m_acc = accuracy_score(m_y_true, m_y_pred_binary)
                    m_f1 = f1_score(m_y_true, m_y_pred_binary)
                    logging.info(f"모델: {model_name} | AUC: {m_auc:.4f}, Acc: {m_acc:.4f}, F1: {m_f1:.4f} (샘플 수: {len(model_eval_df)})")
                else:
                    logging.info(f"모델: {model_name} | 평가할 유효 데이터 없음.")
    else:
        logging.warning("'true_label' 컬럼이 없어 성능 평가를 수행할 수 없습니다.")

    logging.info("\n--- 앙상블 결과 DataFrame (상위 5개 행) ---")
    print(final_df.head())

    logging.info("\n--- Soft Voting 앙상블 완료 ---")

    final_df.to_csv('ensemble_predictions.csv', index=False)

from sklearn.metrics import confusion_matrix

print("\n--- Soft Voting 앙상블 성능 ---")
print(f"사용된 모델: {model_names_loaded}")
print(f"가중치: {weights}")
print(f"AUC: {auc:.4f}")
print(f"Accuracy: {accuracy:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"평가에 사용된 샘플 수: {len(eval_df)}")

# Confusion Matrix 출력
cm = confusion_matrix(y_true_eval, y_pred_binary_eval)
tn, fp, fn, tp = cm.ravel()
print("\n--- Confusion Matrix ---")
print(f"True Negatives (REAL correctly predicted): {tn}")
print(f"False Positives (REAL misclassified as FAKE): {fp}")
print(f"False Negatives (FAKE misclassified as REAL): {fn}")
print(f"True Positives (FAKE correctly predicted): {tp}")

# 결과 리스트 만들기 **********************************************
results = [
    (row['video_id'], round(row['ensemble_fake_prob'], 4))
    for _, row in final_df.iterrows()
]

# 리스트 그대로 출력
print("\n--- Raw Results (video_id, fake_prob) ---")
print(results)


         video_id  fake_prob_FreqNet  true_label  fake_prob_GenConViT  \
0  aagfhgtpmv.mp4             0.0000           1             0.946487   
1  aapnvogymq.mp4             0.0000           1             0.770607   
2  abarnvbtwb.mp4             0.0011           0             0.076319   
3  abofeumbvv.mp4             0.9903           1             0.500000   
4  abqwwspghj.mp4             0.0000           1             0.992096   

   fake_prob_CaFft  fake_prob_face-x-ray  fake_prob_cnn-rnn  \
0         0.205174              0.203639           0.983453   
1         0.222363              0.203538           0.001762   
2         0.167179              0.210317           0.062986   
3         0.489749             -1.000000           0.000248   
4         0.520626              0.281806           0.002953   

   ensemble_fake_prob  ensemble_prediction  
0            0.928555                    1  
1            0.743871                    1  
2            0.077186                    0  
3 