# Speaker-Invariant Deepfake Detector Evaluation

이 노트북은 다양한 n_speaker_components 값에 대해 deepfake detector의 성능을 평가합니다.

## 실험 목표
1. n_speaker_components = 1, 5, 10, 16에 대해 모델 학습
2. Train/Test 데이터셋 분할
3. 각 설정별 정확도, Precision, Recall, F1-Score 측정
4. 결과 비교 및 시각화

In [None]:
import torch
import numpy as np
import pandas as pd
import librosa
from pathlib import Path
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, classification_report
from transformers import WavLMModel, Wav2Vec2FeatureExtractor
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm.auto import tqdm

# 설정
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

## 1. SpeakerInvariantDetector 클래스 정의

In [None]:
class SpeakerInvariantDetector:
    def __init__(self, model_name="microsoft/wavlm-large", device=None):
        self.device = device if device else ("cuda" if torch.cuda.is_available() else "cpu")
        
        print(f"Loading WavLM model ({self.device})...")
        self.feature_extractor = Wav2Vec2FeatureExtractor.from_pretrained(model_name)
        self.model = WavLMModel.from_pretrained(model_name).to(self.device)
        self.model.eval()
        
        # 투영 행렬 (Speaker info 제거용)과 분류기
        self.projection_matrix = None
        self.scaler = StandardScaler()
        self.classifier = LogisticRegression(random_state=42, solver='liblinear', max_iter=1000)
        self.pca = None
        self.is_fitted = False

    def _extract_feature(self, audio_input):
        """내부용: Wav 파일 경로 또는 numpy array에서 WavLM feature 추출"""
        try:
            # 경로인 경우 로드, 아니면 그대로 사용
            if isinstance(audio_input, (str, Path)):
                audio, _ = librosa.load(str(audio_input), sr=16000, mono=True)
            else:
                audio = audio_input # 이미 numpy array라고 가정

            inputs = self.feature_extractor(
                audio, sampling_rate=16000, return_tensors="pt", padding=True
            )
            input_values = inputs.input_values.to(self.device)

            with torch.no_grad():
                outputs = self.model(input_values)
            
            # (Batch, Time, Dim) -> Mean Pooling -> (Dim,)
            pooled_features = outputs.last_hidden_state.mean(dim=1).squeeze(0).cpu().numpy()
            return pooled_features
            
        except Exception as e:
            print(f"Feature extraction error: {e}")
            return None

    def fit(self, audio_paths, labels, speaker_ids, n_speaker_components=10):
        """
        모델 학습 함수 (Projection Matrix 계산 + Classifier 학습)
        
        Args:
            audio_paths: 오디오 파일 경로 리스트
            labels: 0 (Real), 1 (Fake) 등의 레이블 리스트
            speaker_ids: 각 오디오의 화자 ID 리스트 (Speaker Subspace 계산용)
            n_speaker_components: 제거할 화자 정보 차원 수 (PC 개수)
        """
        print("Extracting features for training...")
        X_raw = []
        y = []
        spk_map = {} # {speaker_id: [indices]}
        
        # 1. Feature Extraction
        for idx, (path, label, spk) in enumerate(tqdm(list(zip(audio_paths, labels, speaker_ids)), desc="Extracting features")):
            feat = self._extract_feature(path)
            if feat is not None:
                X_raw.append(feat)
                y.append(label)
                if spk not in spk_map: spk_map[spk] = []
                spk_map[spk].append(len(X_raw) - 1)
        
        X_raw = np.array(X_raw)
        y = np.array(y)
        
        # 2. Scaling
        print("Scaling features...")
        X_scaled = self.scaler.fit_transform(X_raw)
        
        # 3. Compute Speaker Subspace (PCA on Speaker Centroids)
        print(f"Computing Speaker Subspace (removing top {n_speaker_components} components)...")
        speaker_centroids = []
        for spk, indices in spk_map.items():
            # 해당 화자의 모든 발화 평균 계산
            centroid = np.mean(X_scaled[indices], axis=0)
            speaker_centroids.append(centroid)
        
        speaker_centroids = np.array(speaker_centroids)
        
        # 화자 평균들에 대해 PCA 수행하여 주요 "화자 방향(Basis)" 찾기
        self.pca = PCA(n_components=n_speaker_components)
        self.pca.fit(speaker_centroids)
        
        # Orthogonal Projection Matrix 생성: P_perp = I - U @ U.T
        # U: Speaker Basis Vectors (n_features, n_components)
        U = self.pca.components_.T 
        I = np.eye(U.shape[0])
        self.projection_matrix = I - (U @ U.T)
        
        # 4. Project Features (Remove Speaker Info)
        # X_proj = X @ P_perp
        X_projected = X_scaled @ self.projection_matrix
        
        # 5. Train Simple Classifier (Logistic Regression)
        print("Training Decision Boundary (Logistic Regression)...")
        self.classifier.fit(X_projected, y)
        self.is_fitted = True
        
        # 학습 결과 요약
        acc = self.classifier.score(X_projected, y)
        print(f"Training Complete. Accuracy on Train Set: {acc:.4f}")
        
        return acc

    def predict(self, audio_path):
        """
        Inference 함수: Wav -> Feature -> Scale -> Project -> Predict
        """
        if not self.is_fitted:
            raise ValueError("Model has not been fitted yet. Call fit() first.")
        
        # 1. Extract
        feat = self._extract_feature(audio_path)
        if feat is None: return None
        
        # 2. Scale
        # (1, Dim) 형태로 변환
        feat = feat.reshape(1, -1)
        feat_scaled = self.scaler.transform(feat)
        
        # 3. Project (Remove Speaker Info)
        # 수학적으로: x_new = x_old @ (I - UU^T)
        feat_projected = feat_scaled @ self.projection_matrix
        
        # 4. Predict
        prob = self.classifier.predict_proba(feat_projected)[0]
        pred_label = self.classifier.predict(feat_projected)[0]
        
        return {
            "label": pred_label,          # 예측 클래스
            "probability": prob,          # [Prob_Class0, Prob_Class1]
            "feature_vector": feat_projected # 투영된 벡터 (시각화용)
        }
    
    def predict_batch(self, audio_paths):
        """
        배치 추론 함수 (효율성 향상)
        """
        predictions = []
        probabilities = []
        
        for path in tqdm(audio_paths, desc="Predicting"):
            result = self.predict(path)
            if result is not None:
                predictions.append(result['label'])
                probabilities.append(result['probability'])
            else:
                predictions.append(-1)  # Error marker
                probabilities.append([0, 0])
        
        return np.array(predictions), np.array(probabilities)

## 2. 데이터 준비

In [None]:
from libri_dataframe import build_librispeech_dataframe

LIBRISPEECH_ROOT = "./my_raw_audio/LibriSpeech"
LIBRISPEECH_SUBSET = "test-clean"

dataframe = build_librispeech_dataframe(
    librispeech_root=LIBRISPEECH_ROOT,
    subset=LIBRISPEECH_SUBSET,
)

# 처음 1000개만 사용
dataframe_10 = dataframe[:1000]
print(f"Total samples: {len(dataframe_10)}")
print(f"Number of speakers: {dataframe_10['speaker_id'].nunique()}")

In [None]:
# 데이터 준비 (Real + Fake)
all_paths = []
all_labels = []
all_speakers = []

print("Preparing dataset...")
for row in dataframe_10.to_dict('records'):
    # Real Data
    real_path = Path(row['audio_path'])
    if real_path.exists():
        all_paths.append(str(real_path))
        all_labels.append(0)  # 0 for Real
        all_speakers.append(row['speaker_id'])
    
    # Fake Data (Gen 2)
    base_name = real_path.stem
    fake_path = Path(f"generated_results/speaker_libri_transcript_{base_name}.wav")
    if fake_path.exists():
        all_paths.append(str(fake_path))
        all_labels.append(1)  # 1 for Fake
        all_speakers.append(row['speaker_id'])  # 같은 화자 ID

print(f"Total samples collected: {len(all_paths)}")
print(f"Real samples: {sum(1 for l in all_labels if l == 0)}")
print(f"Fake samples: {sum(1 for l in all_labels if l == 1)}")

## 3. Train/Test Split

데이터를 80% Train, 20% Test로 분할합니다.
**중요**: Speaker가 Train/Test에 섞이지 않도록 stratify를 사용합니다.

In [None]:
# Train/Test Split (80/20)
X_train_paths, X_test_paths, y_train, y_test, spk_train, spk_test = train_test_split(
    all_paths, all_labels, all_speakers,
    test_size=0.2,
    random_state=42,
    stratify=all_labels  # Label 비율 유지
)

print(f"\nTrain set: {len(X_train_paths)} samples")
print(f"  Real: {sum(1 for l in y_train if l == 0)}, Fake: {sum(1 for l in y_train if l == 1)}")
print(f"\nTest set: {len(X_test_paths)} samples")
print(f"  Real: {sum(1 for l in y_test if l == 0)}, Fake: {sum(1 for l in y_test if l == 1)}")
print(f"\nUnique speakers in train: {len(set(spk_train))}")
print(f"Unique speakers in test: {len(set(spk_test))}")

## 4. 실험: n_speaker_components = 1, 5, 10, 16

각 n값에 대해:
1. 모델 학습 (Train set)
2. 예측 수행 (Test set)
3. 성능 지표 계산 (Accuracy, Precision, Recall, F1)
4. Confusion Matrix 시각화

In [None]:
# 실험할 n_speaker_components 값들
n_components_list = [1, 5, 10, 16]

# 결과 저장용 딕셔너리
results = {
    'n_components': [],
    'train_accuracy': [],
    'test_accuracy': [],
    'precision': [],
    'recall': [],
    'f1_score': [],
    'confusion_matrix': [],
    'predictions': [],
    'probabilities': []
}

In [None]:
# WavLM 모델은 한 번만 로드 (공유)
print("Initializing WavLM model (this will be shared across experiments)...")
base_detector = SpeakerInvariantDetector(device=device)

### 실험 루프

In [None]:
for n in n_components_list:
    print(f"\n{'='*70}")
    print(f"Experiment: n_speaker_components = {n}")
    print(f"{'='*70}\n")
    
    # 새로운 detector 인스턴스 생성 (매번 fresh start)
    detector = SpeakerInvariantDetector(device=device)
    
    # 학습
    print("\n[1/3] Training...")
    train_acc = detector.fit(X_train_paths, y_train, spk_train, n_speaker_components=n)
    
    # 테스트 예측
    print("\n[2/3] Testing...")
    y_pred, y_prob = detector.predict_batch(X_test_paths)
    
    # 성능 평가
    print("\n[3/3] Evaluating...")
    test_acc = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred, average='binary')
    recall = recall_score(y_test, y_pred, average='binary')
    f1 = f1_score(y_test, y_pred, average='binary')
    cm = confusion_matrix(y_test, y_pred)
    
    # 결과 저장
    results['n_components'].append(n)
    results['train_accuracy'].append(train_acc)
    results['test_accuracy'].append(test_acc)
    results['precision'].append(precision)
    results['recall'].append(recall)
    results['f1_score'].append(f1)
    results['confusion_matrix'].append(cm)
    results['predictions'].append(y_pred)
    results['probabilities'].append(y_prob)
    
    # 결과 출력
    print(f"\n{'='*70}")
    print(f"Results for n = {n}:")
    print(f"{'='*70}")
    print(f"Train Accuracy: {train_acc:.4f}")
    print(f"Test Accuracy:  {test_acc:.4f}")
    print(f"Precision:      {precision:.4f}")
    print(f"Recall:         {recall:.4f}")
    print(f"F1-Score:       {f1:.4f}")
    print(f"\nConfusion Matrix:")
    print(cm)
    print(f"\n{classification_report(y_test, y_pred, target_names=['Real', 'Fake'])}")

## 5. 결과 요약 및 시각화

### 5.1 결과 테이블

In [None]:
# 결과를 DataFrame으로 정리
results_df = pd.DataFrame({
    'n_components': results['n_components'],
    'Train Accuracy': results['train_accuracy'],
    'Test Accuracy': results['test_accuracy'],
    'Precision': results['precision'],
    'Recall': results['recall'],
    'F1-Score': results['f1_score']
})

print("\n" + "="*80)
print("Summary of All Experiments")
print("="*80)
print(results_df.to_string(index=False))
print("="*80)

### 5.2 성능 지표 비교 그래프

In [None]:
# 성능 지표 시각화
fig, axes = plt.subplots(2, 2, figsize=(16, 12))

# (0, 0): Train vs Test Accuracy
ax = axes[0, 0]
ax.plot(results['n_components'], results['train_accuracy'], marker='o', linewidth=2, markersize=8, label='Train Accuracy')
ax.plot(results['n_components'], results['test_accuracy'], marker='s', linewidth=2, markersize=8, label='Test Accuracy')
ax.set_xlabel('n_speaker_components', fontsize=12)
ax.set_ylabel('Accuracy', fontsize=12)
ax.set_title('Train vs Test Accuracy', fontsize=14, fontweight='bold')
ax.legend(fontsize=11)
ax.grid(True, linestyle='--', alpha=0.5)
ax.set_xticks(results['n_components'])

# (0, 1): Precision
ax = axes[0, 1]
ax.plot(results['n_components'], results['precision'], marker='o', linewidth=2, markersize=8, color='green')
ax.set_xlabel('n_speaker_components', fontsize=12)
ax.set_ylabel('Precision', fontsize=12)
ax.set_title('Precision (Fake Detection)', fontsize=14, fontweight='bold')
ax.grid(True, linestyle='--', alpha=0.5)
ax.set_xticks(results['n_components'])

# (1, 0): Recall
ax = axes[1, 0]
ax.plot(results['n_components'], results['recall'], marker='o', linewidth=2, markersize=8, color='orange')
ax.set_xlabel('n_speaker_components', fontsize=12)
ax.set_ylabel('Recall', fontsize=12)
ax.set_title('Recall (Fake Detection)', fontsize=14, fontweight='bold')
ax.grid(True, linestyle='--', alpha=0.5)
ax.set_xticks(results['n_components'])

# (1, 1): F1-Score
ax = axes[1, 1]
ax.plot(results['n_components'], results['f1_score'], marker='o', linewidth=2, markersize=8, color='red')
ax.set_xlabel('n_speaker_components', fontsize=12)
ax.set_ylabel('F1-Score', fontsize=12)
ax.set_title('F1-Score (Fake Detection)', fontsize=14, fontweight='bold')
ax.grid(True, linestyle='--', alpha=0.5)
ax.set_xticks(results['n_components'])

fig.suptitle('Speaker-Invariant Detector Performance Comparison', fontsize=18, fontweight='bold', y=0.995)
plt.tight_layout()
plt.show()

### 5.3 Confusion Matrix 비교

In [None]:
# Confusion Matrix 시각화
fig, axes = plt.subplots(1, 4, figsize=(20, 5))

for idx, (n, cm) in enumerate(zip(results['n_components'], results['confusion_matrix'])):
    ax = axes[idx]
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=ax, 
                xticklabels=['Real', 'Fake'], yticklabels=['Real', 'Fake'],
                cbar=True, square=True)
    ax.set_xlabel('Predicted', fontsize=11)
    ax.set_ylabel('Actual', fontsize=11)
    ax.set_title(f'n = {n}\nAcc: {results["test_accuracy"][idx]:.3f}', fontsize=12, fontweight='bold')

fig.suptitle('Confusion Matrices Comparison', fontsize=16, fontweight='bold', y=1.02)
plt.tight_layout()
plt.show()

### 5.4 최적 n_components 선택

In [None]:
# 최고 성능을 보인 n_components 찾기
best_idx = np.argmax(results['test_accuracy'])
best_n = results['n_components'][best_idx]
best_acc = results['test_accuracy'][best_idx]
best_f1 = results['f1_score'][best_idx]

print(f"\n{'='*80}")
print(f"Best Configuration")
print(f"{'='*80}")
print(f"n_speaker_components: {best_n}")
print(f"Test Accuracy:        {best_acc:.4f}")
print(f"F1-Score:             {best_f1:.4f}")
print(f"Precision:            {results['precision'][best_idx]:.4f}")
print(f"Recall:               {results['recall'][best_idx]:.4f}")
print(f"{'='*80}")

## 6. 분석 및 결론

### 예상 결과:

1. **n = 1 (매우 약한 제거)**
   - 화자 정보가 많이 남아 있어 overfitting 가능성
   - Train accuracy는 높지만 Test accuracy는 낮을 수 있음

2. **n = 5~10 (적절한 제거)**
   - 화자 정보는 충분히 제거하면서 Fake 탐지 정보는 보존
   - 가장 균형 잡힌 성능 기대

3. **n = 16 (강한 제거)**
   - 화자 정보는 거의 제거되지만
   - Fake 탐지에 필요한 정보까지 손실될 가능성

### 관찰 포인트:
- Train/Test accuracy gap이 작을수록 일반화 성능이 좋음
- Precision과 Recall의 균형 (F1-Score로 확인)
- Confusion Matrix에서 False Positive/Negative 비율

## 7. 결과 저장 (선택 사항)

In [None]:
# 결과를 CSV로 저장
results_df.to_csv('detector_evaluation_results.csv', index=False)
print("Results saved to 'detector_evaluation_results.csv'")

## 10. CLI 사용법

저장된 모델은 커맨드라인에서도 사용할 수 있습니다:

```bash
# 단일 파일 예측
python inference.py --audio_path /path/to/audio.wav --model_path ./models/detector_n10.pkl

# 출력 예시:
# Prediction: FAKE
# Confidence: 87.34%
# Detailed Probabilities:
#   Real: 0.1266
#   Fake: 0.8734
```

### Python 코드에서 사용:

```python
from inference import DeepfakeDetector

# 모델 로드
detector = DeepfakeDetector(model_path="./models/detector_n10.pkl")

# 단일 파일 예측
result = detector.predict("new_audio.wav")
print(f"Is Synthetic: {result['is_fake']}")
print(f"Confidence: {result['confidence']:.2%}")

# 배치 예측
audio_files = ["audio1.wav", "audio2.wav", "audio3.wav"]
results = detector.predict_batch(audio_files)
for audio_file, result in zip(audio_files, results):
    print(f"{audio_file}: {'FAKE' if result['is_fake'] else 'REAL'} ({result['confidence']:.2%})")
```

In [None]:
# 테스트 셋에서 몇 개 샘플 선택하여 inference 테스트
test_samples = X_test_paths[:5]  # 처음 5개 샘플

print("\n" + "="*80)
print("Inference Examples")
print("="*80)

for i, audio_path in enumerate(test_samples, 1):
    result = inference_detector.predict(audio_path)
    
    actual_label = y_test[X_test_paths.index(audio_path)]
    actual_str = "REAL" if actual_label == 0 else "FAKE"
    
    print(f"\n[Sample {i}]")
    print(f"File: {Path(audio_path).name}")
    print(f"Actual:     {actual_str}")
    print(f"Predicted:  {'FAKE' if result['is_fake'] else 'REAL'}")
    print(f"Confidence: {result['confidence']:.2%}")
    print(f"Probabilities: Real={result['probabilities']['real']:.4f}, Fake={result['probabilities']['fake']:.4f}")
    
    # 정답 여부 표시
    is_correct = (result['is_fake'] and actual_label == 1) or (not result['is_fake'] and actual_label == 0)
    print(f"Result: {'✅ CORRECT' if is_correct else '❌ WRONG'}")

print("\n" + "="*80)

In [None]:
# inference.py 모듈 import
from inference import DeepfakeDetector

# 저장된 모델 로드
print("Loading saved model for inference...")
inference_detector = DeepfakeDetector(model_path=model_save_path, device=device)

## 9. Inference 예제

저장된 모델을 불러와서 새로운 오디오 파일에 대해 예측하는 방법을 보여줍니다.

In [None]:
# 최고 성능 모델을 다시 학습하여 저장
print(f"\nRetraining best model (n={best_n}) for saving...")
print("="*80)

best_detector = SpeakerInvariantDetector(device=device)
best_detector.fit(X_train_paths, y_train, spk_train, n_speaker_components=best_n)

# 모델 저장
model_save_path = f"./models/detector_n{best_n}.pkl"
save_trained_model(best_detector, model_save_path, best_n)

print("\n✅ Best model has been saved!")
print(f"   Model path: {model_save_path}")
print(f"   n_speaker_components: {best_n}")
print(f"   Test Accuracy: {best_acc:.4f}")

In [None]:
import pickle

def save_trained_model(detector, save_path, n_speaker_components, model_name="microsoft/wavlm-large"):
    """
    학습된 SpeakerInvariantDetector를 저장합니다.

    Args:
        detector: 학습된 SpeakerInvariantDetector 인스턴스
        save_path: 저장할 파일 경로 (.pkl)
        n_speaker_components: 사용한 speaker component 개수
        model_name: 사용한 WavLM 모델 이름
    """
    if not detector.is_fitted:
        raise ValueError("Detector must be fitted before saving!")

    save_data = {
        'scaler': detector.scaler,
        'projection_matrix': detector.projection_matrix,
        'classifier': detector.classifier,
        'n_speaker_components': n_speaker_components,
        'model_name': model_name,
        'pca': detector.pca  # 추가 정보 (분석용)
    }

    # 디렉토리가 없으면 생성
    Path(save_path).parent.mkdir(parents=True, exist_ok=True)

    with open(save_path, 'wb') as f:
        pickle.dump(save_data, f)

    print(f"Model saved to {save_path}")
    print(f"  n_speaker_components: {n_speaker_components}")
    print(f"  Model name: {model_name}")

## 8. 최고 성능 모델 저장

최고 성능을 보인 모델을 저장하여 나중에 inference에 사용할 수 있습니다.