In [1]:
!mkdir -p /content/datasets/asvspoof
!mkdir -p /content/datasets/wavefake

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
!unzip -q "/content/drive/MyDrive/AI활용 소프트웨어 개발/프로젝트/data/ASVspoof 2019.zip" -d "/content/datasets/asvspoof"

In [6]:
!unzip -q "/content/drive/MyDrive/AI활용 소프트웨어 개발/프로젝트/data/WaveFake_subset.zip" -d "/content/datasets/wavefake"

In [7]:
# 1) 필수 패키지 설치 (transformers, datasets, torchaudio 등)
!pip install -q transformers==4.40.0 datasets soundfile torchaudio==2.2.1 librosa sklearn tqdm

# ffmpeg(오디오 변환에 사용)
!apt-get update -qq && apt-get install -y -qq ffmpeg

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/137.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m137.6/137.6 kB[0m [31m10.4 MB/s[0m eta [36m0:00:00[0m
[?25h  [1;31merror[0m: [1msubprocess-exited-with-error[0m
  
  [31m×[0m [32mpython setup.py egg_info[0m did not run successfully.
  [31m│[0m exit code: [1;36m1[0m
  [31m╰─>[0m See above for output.
  
  [1;35mnote[0m: This error originates from a subprocess, and is likely not a problem with pip.
  Preparing metadata (setup.py) ... [?25l[?25herror
[1;31merror[0m: [1mmetadata-generation-failed[0m

[31m×[0m Encountered error while generating package metadata.
[31m╰─>[0m See above for output.

[1;35mnote[0m: This is an issue with the package mentioned above, not pip.
[1;36mhint[0m: See above for details.
W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRele

In [8]:
# 실행환경 확인
import torch, os
print("Device:", "cuda" if torch.cuda.is_available() else "cpu")
!nvidia-smi -L || true

Device: cuda
GPU 0: NVIDIA A100-SXM4-80GB (UUID: GPU-ca5ec486-039f-c0ff-927b-48d7781d4e55)


# **매니페스트 생성 (파일 경로, 라벨, attack_type, speaker_id, duration)**

In [9]:
# Colab 셀: manifest 생성 (한 번만 실행)
import os, csv, soundfile as sf
from pathlib import Path

In [10]:
ROOT = Path("/content/datasets")
OUT = ROOT / "manifests"
OUT.mkdir(parents=True, exist_ok=True)

manifest_path = OUT / "all_manifest.csv"
fields = ["path","label","attack_type","speaker_id","duration"]

rows = []

In [11]:
# 1) ASVspoof LA & PA (bona_fide=0, spoof=1)
for part in ["LA_subset","PA_subset"]:
    base = ROOT / "asvspoof" / part
    for cls in ["bona_fide","spoof"]:
        label = 0 if cls == "bona_fide" else 1
        folder = base / cls
        if not folder.exists():
            continue
        for fp in folder.glob("*"):
            if fp.suffix.lower() not in [".flac",".wav",".ogg",".mp3"]:
                continue
            # speaker id (폴더 구조에 따라 다르지만 ASVspoof 파일명에서 speaker 토큰 추출)
            # 예: file 'LA_T_1138215.flac' -> speaker token은 상위 폴더 또는 protocol에서 따로 추출 가능.
            # 여기선 파일명 prefix(예: LA_T_1138215 -> LA_T) 사용(간단한 heuristic)
            stem = fp.stem
            # speaker token guess: prefix before last '_' (ex: LA_T_1138215 -> LA_T)
            spk = "_".join(stem.split("_")[:2]) if len(stem.split("_"))>=2 else stem
            try:
                # duration 얻기 (soundfile)
                info = sf.info(str(fp))
                duration = float(info.duration)
            except Exception:
                duration = 0.0
            rows.append([str(fp), label, f"ASVspoof_{part}", spk, duration])

In [12]:
# 2) WaveFake (attack folders)
wf_root = ROOT / "wavefake"
if wf_root.exists():
    for attack_folder in sorted(wf_root.iterdir()):
        if not attack_folder.is_dir(): continue
        attack_type = attack_folder.name
        for fp in attack_folder.glob("*"):
            if fp.suffix.lower() not in [".wav",".flac",".mp3",".ogg"]: continue
            stem = fp.stem
            spk = stem.split("_")[0] if "_" in stem else stem
            try:
                info = sf.info(str(fp))
                duration = float(info.duration)
            except Exception:
                duration = 0.0
            rows.append([str(fp), 1, f"WaveFake_{attack_type}", spk, duration])

In [13]:
# write CSV
with open(manifest_path, "w", newline="", encoding="utf-8") as f:
    writer = csv.writer(f)
    writer.writerow(fields)
    writer.writerows(rows)

In [14]:
print("Manifest saved to:", manifest_path)
print("Total samples:", len(rows))

Manifest saved to: /content/datasets/manifests/all_manifest.csv
Total samples: 30000


# **스피커-디스조인트로 split 만들기 (train/val/test)**

In [15]:
# Colab 셀: speaker-disjoint split (train/val/test) 생성
import pandas as pd, random
manifest_path = "/content/datasets/manifests/all_manifest.csv"
df = pd.read_csv(manifest_path)

In [16]:
# speakers list
speakers = df['speaker_id'].unique().tolist()
random.seed(42)
random.shuffle(speakers)

In [17]:
n = len(speakers)
n_train = int(0.8 * n)
n_val = int(0.1 * n)
train_spks = set(speakers[:n_train])
val_spks = set(speakers[n_train:n_train+n_val])
test_spks = set(speakers[n_train+n_val:])

In [18]:
def assign_split(spk):
    if spk in train_spks: return "train"
    if spk in val_spks: return "val"
    return "test"

In [19]:
df['split'] = df['speaker_id'].apply(assign_split)
# 만약 WaveFake의 attack-specific speakers가 소량이면 다른 split 정책을 택하세요(예: attack-level holdout)
df.to_csv(manifest_path.replace(".csv","_split.csv"), index=False)
print("Split manifest saved")
print("counts:", df['split'].value_counts())

Split manifest saved
counts: split
train    14706
test     13018
val       2276
Name: count, dtype: int64


# **전처리: 16kHz mono 변환 & optional augment 스크립트**

In [None]:
# Colab 셀: ffmpeg로 16k mono 변환 (병렬성을 위해 간단히 for loop; 대용량이면 multiprocessing 권장)
import subprocess, os
from pathlib import Path
import pandas as pd

In [None]:
manifest_split = "/content/datasets/manifests/all_manifest_split.csv"
df = pd.read_csv(manifest_split)
OUT_ROOT = Path("/content/datasets/preprocessed")
OUT_ROOT.mkdir(parents=True, exist_ok=True)

In [None]:
# 예: train만 변환하거나 전체 변환 가능(디스크 확인)
subset = df  # or df[df['split']=='train']
for i,row in subset.iterrows():
    src = Path(row['path'])
    rel = src.name  # 단순히 basename 유지, 필요시 폴더별 보존
    dst = OUT_ROOT / rel
    if dst.exists():
        continue
    # ffmpeg 변환 명령 (16kHz, mono, s16)
    cmd = [
        "ffmpeg", "-y", "-loglevel", "error",
        "-i", str(src),
        "-ar", "16000", "-ac", "1",
        "-sample_fmt", "s16",
        str(dst)
    ]
    try:
        subprocess.run(cmd, check=True)
    except Exception as e:
        print("convert error:", src, e)

In [None]:
#변환된 파일 저장용
import shutil

# 압축할 폴더 경로
folder_path = "/content/datasets/preprocessed"

# 압축 결과 파일명 (zip 확장자는 자동으로 붙음)
zip_path = "/content/datasets/preprocessed"

# ZIP 압축
shutil.make_archive(zip_path, 'zip', folder_path)

print(f"ZIP 파일 생성 완료: {zip_path}.zip")

ZIP 파일 생성 완료: /content/datasets/preprocessed.zip


# **PyTorch Dataset / DataLoader (WhisperFeatureExtractor 사용)**

In [20]:
!unzip -q "/content/drive/MyDrive/AI활용 소프트웨어 개발/프로젝트/data/preprocessed.zip" -d "/content/datasets/preprocessed"

In [22]:
# Colab 셀: Dataset & DataLoader
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import WhisperFeatureExtractor, WhisperModel

In [23]:
# -----------------------------
# 1) Whisper Feature Extractor 준비
# -----------------------------
FE_MODEL = "openai/whisper-base"   # 메모리/속도 상황에 따라 openai/whisper-tiny, small 등으로 교체 가능
# from_pretrained: 허깅페이스 허브에서 해당 모델의 전처리기(샘플링레이트, 멜필터뱅크 등 파이프라인 설정)를 가져옴
feature_extractor = WhisperFeatureExtractor.from_pretrained(FE_MODEL)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Fetching 1 files:   0%|          | 0/1 [00:00<?, ?it/s]

preprocessor_config.json: 0.00B [00:00, ?B/s]

In [24]:
# Dataset: preprocessed 폴더 사용
import pandas as pd
from pathlib import Path
# -----------------------------
# 2) 매니페스트(csv) 읽고, 전처리된 경로(pp_path) 컬럼 생성
# -----------------------------
# manifest_split: 각 오디오 파일 경로(path), 레이블(label), 데이터 분할(split=train/val 등)을 담은 CSV
manifest_split = "/content/datasets/manifests/all_manifest_split.csv"
df = pd.read_csv(manifest_split)
# Map paths to preprocessed locations (same basename)
# 전처리 폴더(preprocessed)에 동일한 파일명이 존재한다고 가정하고, 그 절대경로를 매핑
# 예: 원본 path가 /a/b/c/xyz.wav 라면, pp_path는 /content/drive/.../preprocessed/xyz.wav
df['pp_path'] = df['path'].apply(lambda p: str(Path("/content/datasets/preprocessed") / Path(p).name))

In [25]:
# -----------------------------
# 3) 커스텀 Dataset
# -----------------------------
class SpoofDataset(Dataset):
    def __init__(self, df):
        # df: 특정 split(train/val 등)만 필터링된 DataFrame
        self.df = df.reset_index(drop=True) # 인덱스 정리(셔플 후 안정적 접근을 위해)
    def __len__(self): return len(self.df) # 전체 샘플 개수
    def __getitem__(self, idx):
        # 인덱스 idx에 해당하는 한 샘플을 로드해서 dict로 반환
        row = self.df.iloc[idx]

        # load numpy float waveform
        # 오디오 파일 로드
        # - sf.read는 파형(waveform)을 numpy(float32)로 반환하고, sr(샘플링레이트)도 함께 반환
        # - dtype='float32'로 고정해서 이후 torch.tensor로 옮길 때 타입 일관성 유지
        wav, sr = sf.read(row['pp_path'], dtype='float32')

        # 반환 dict:
        #   audio: numpy 1D(or 2D) 파형. stereo인 경우 (T, 2)일 수 있으니 사전에 mono로 전처리된 파일 권장
        #   label: 정답(스푸핑=1 / 정상=0 등). 아래 collate_fn에서 float32 텐서로 묶음
        #   meta : 디버깅/추적용 메타데이터(해당 row 전체를 dict로)
        return {"audio": wav, "label": int(row['label']), "meta": row.to_dict()}

In [26]:
# -----------------------------
# 4) collate_fn: 배치 단위로 묶는 함수
# -----------------------------
# DataLoader는 batch_size개 샘플을 모아 이 함수로 전달
# 여기서 WhisperFeatureExtractor를 이용해 "파형 리스트 -> 모델 입력 텐서"로 변환
def collate_fn(batch):
    # batch: [{"audio": np.ndarray, "label": int, "meta": dict}, ...] 길이 B 리스트

    # (1) 파형과 라벨만 먼저 분리
    audios = [b['audio'] for b in batch]  # 길이 B의 numpy 배열 리스트 (각기 길이 다를 수 있음)
    labels = torch.tensor([b['label'] for b in batch], dtype=torch.float32) # (B,) float 텐서

    # (2) Whisper 전처리기 호출
    # - sampling_rate=16000: Whisper는 16kHz 입력을 기대. 전처리 단계에서 16kHz로 리샘플된 파일을 쓰는 게 안전
    # - padding=True: 배치 내에서 가장 긴 길이에 맞춰 패딩. attention_mask도 함께 생성됨
    # - return_tensors="pt": PyTorch 텐서로 반환
    inputs = feature_extractor(
        audios,
        sampling_rate=16000,
        return_tensors="pt",
        padding=True
    )
    # 일반적으로 WhisperFeatureExtractor는 아래 키를 반환:
    #   inputs["input_features"]: (B, 80, T) 형태의 멜 스펙트럼 텐서 (80은 멜필터 수)
    #   inputs["attention_mask"]: (B, T) 형태로 패딩 마스크(있을 때)
    # (버전에 따라 텐서 차원 순서가 다르게 보이는 경우가 있어, 모델에 맞춰 확인 필요)


    # (3) 메타데이터는 리스트로 그대로 전달(로깅/디버깅용)
    metas = [b['meta'] for b in batch]

    # -----------------------
    # (중요) Whisper 고정 입력 길이 맞추기
    target_length = 3000
    input_features = inputs["input_features"]
    B, n_mels, T = input_features.shape
    if T < target_length:
        pad_amount = target_length - T
        input_features = torch.nn.functional.pad(input_features, (0, pad_amount))
    elif T > target_length:
        input_features = input_features[:, :, :target_length]

    inputs["input_features"] = input_features
    # attention_mask도 필요하면 같은 방식으로 패딩/자르기 가능
    if "attention_mask" in inputs:
        attn_mask = inputs["attention_mask"]
        if T < target_length:
            pad_amount = target_length - T
            attn_mask = torch.nn.functional.pad(attn_mask, (0, pad_amount), value=0)
        elif T > target_length:
            attn_mask = attn_mask[:, :target_length]
        inputs["attention_mask"] = attn_mask


    # DataLoader가 한 스텝에 넘겨줄 값
    # - inputs: 모델 입력 텐서를 담은 dict-like 객체(BatchFeature)
    # - labels: (B,) 라벨 텐서
    # - metas : (길이 B) 샘플별 메타데이터 리스트
    return inputs, labels, metas

In [27]:
# -----------------------------
# 5) train/val 분할 및 DataLoader 준비
# -----------------------------
# CSV의 'split' 컬럼을 기준으로 데이터프레임 분리
train_df = df[df['split']=='train']
val_df = df[df['split']=='val']

# DataLoader:
# - batch_size=8: GPU 메모리/속도 상황에 맞게 조절
# - shuffle=True : 학습셋은 매 epoch마다 섞어서 일반화 성능 향상
# - collate_fn   : 위에서 정의한 Whisper 전처리 + 패딩 로직 사용
train_loader = DataLoader(
    SpoofDataset(train_df),
    batch_size=8,
    shuffle=True,
    collate_fn=collate_fn
)

val_loader = DataLoader(
    SpoofDataset(val_df),
    batch_size=8,
    shuffle=False,      # 검증셋은 순서 고정(재현성/디버깅)
    collate_fn=collate_fn
)

# **모델 정의 (Whisper encoder + classifier head)**

In [28]:
# =========[ 모델 정의 (Whisper encoder 기반 스푸핑 분류기) ]=========
import torch.nn as nn

# HuggingFace의 WhisperModel을 불러와 encoder만 활용
from transformers import WhisperModel

In [29]:
# FE_MODEL은 이전 셀에서 정의된 Whisper feature-extractor 모델명과 동일해야 합니다.
# encoder_model에는 전체 WhisperModel 객체가 들어옵니다.
encoder_model = WhisperModel.from_pretrained(FE_MODEL)  # 모델(인코더+디코더, 기타 구성 포함) 로드

config.json: 0.00B [00:00, ?B/s]

model.safetensors:   0%|          | 0.00/290M [00:00<?, ?B/s]

In [30]:
# --- 모델 클래스 ---
class WhisperSpoofClassifier(nn.Module):
    def __init__(self, whisper_model, hidden_dim=256, dropout=0.2):
        # whisper_model: HuggingFace에서 불러온 WhisperModel 객체 (전체 모델)
        # hidden_dim    : 분류기 내부 히든 유닛 수
        # dropout       : 분류기 드롭아웃

        super().__init__()

        # 1) encoder 모듈만 가져오기 (디코더는 사용하지 않음)
        #    일부 버전/구현에서는 whisper_model.encoder 대신 whisper_model 를 그대로 사용하여
        #    forward(input_features=...) 하는 편이 더 호환성이 좋습니다. (아래에 대안 표기)
        self.encoder = whisper_model.encoder

        # 2) 모델의 은닉 차원(d_model)을 저장
        #    Whisper config의 d_model이 encoder의 출력 히든 크기입니다.
        self.dim = whisper_model.config.d_model

        # 3) (선택적) 1D adaptive pooling을 선언했지만 아래 기본 forward에서는 사용하지 않음.
        #    AdaptiveAvgPool1d은 (N, C, L) 형태 입력을 기대하므로 사용하려면 permute 필요.
        self.pool = nn.AdaptiveAvgPool1d(1)

        # 4) 간단한 분류기 헤드: d_model -> hidden_dim -> 1 (binary logits)
        #    출력이 1개이므로 BCEWithLogitsLoss를 쓰기에 적합 (로짓을 바로 넘김)
        self.classifier = nn.Sequential(
            nn.Linear(self.dim, hidden_dim),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, 1)   # 최종 로짓 (B, 1)
        )

    def forward(self, input_features):
        # input_features: collate_fn에서 생성된 feature_extractor의 반환 dict
        #                보통 input_features["input_features"] 키에 텐서가 들어있음.
        #                (버전에 따라 shape/키 이름이 조금 다를 수 있음.)
        # 반환: (B,) 또는 (B,) 크기의 로짓 텐서 (squeezed)


        # 0) input을 device로 옮기기
        #    (여기서 device를 직접 참조하므로 클래스 밖에서 device 변수를 정의해두어야 함)
        x = input_features["input_features"].to(device)  # e.g. shape (B, T, feat_dim) 또는 (B, feat_dim, T)

        # --- 주의: HuggingFace 버전에 따라 encoder를 직접 호출하는 방식이 다를 수 있습니다. ---
        # 방법 A (버전에 따라 동작): encoder 모듈에 바로 입력 -> encoder(...) 사용
        # enc = self.encoder(x).last_hidden_state  # 기대 shape: (B, T, H)
        #
        # 방법 B (권장, 호환성): whisper_model 전체에 input_features를 넣고 반환에서 last_hidden_state 사용
        # (만약 self.encoder(...) 호출이 안되면 아래처럼 whisper_model(input_features=...) 호출)
        # enc = whisper_model(input_features=x).last_hidden_state
        #
        # 현재 코드는 방법 A 형태로 작성되어 있으므로, 만약 실행 중 오류가 나면 방법 B로 바꿔보세요.

        # (원 코드) encoder forward 호출 — 일부 HF 버전에서는 이 호출이 바로 동작합니다.
        enc = self.encoder(x).last_hidden_state  # 결과 예: (B, T, H)

        # 시간축(T)에 대해 평균 풀링을 수행 (간단하고 자주 쓰이는 방법)
        # pooled shape: (B, H)
        pooled = enc.mean(dim=1)

        # 분류기 헤드에 통과시키고 마지막 차원을 제거
        logits = self.classifier(pooled).squeeze(-1)  # (B, 1) -> (B,)

        return logits

# --- 장치 설정 및 모델 인스턴스화 ---
device = "cuda" if torch.cuda.is_available() else "cpu"
model = WhisperSpoofClassifier(encoder_model).to(device)

# **학습 루틴 (Stage A: head-only, Stage B: partial fine-tune)**

In [31]:
# Colab 셀: Training loop (간단화 버전, 주석 풍부)
import torch.optim as optim
from tqdm.auto import tqdm
from sklearn.metrics import roc_curve
import numpy as np
from torch.cuda.amp import autocast, GradScaler
import torch.nn.utils as nn_utils

In [32]:
# ====== 손실함수 설정 ======
# 이진 분류(로짓 출력)를 위해 BCEWithLogitsLoss 사용 (sigmoid + BCE를 안정적으로 결합)
criterion = torch.nn.BCEWithLogitsLoss()

In [33]:
# ====== Stage A: encoder 파라미터 freeze (feature-extractor 고정) ======
# - 먼저 encoder의 모든 파라미터를 학습 불가로 설정(grad 계산 안 함)
# - 이렇게 하여 분류기 헤드만 빠르게 학습시켜 fine-tuning 안정화
for p in model.encoder.parameters():
    p.requires_grad = False

In [34]:
# 옵티마이저: requires_grad True인 파라미터들만 (여기서는 classifier 등)
optimizer = optim.AdamW([p for p in model.parameters() if p.requires_grad],
                        lr=1e-3, weight_decay=1e-6)

# mixed precision scaler
scaler = GradScaler()

# Stage A epoch 수
EPOCHS_A = 3

  scaler = GradScaler()


In [35]:
# ====== EER 계산 함수 (검증 지표) ======
def compute_eer(y_true, y_score):
    # y_true: 1D numpy array of 0/1 labels
    # y_score: 1D numpy array of predicted scores (확률 또는 로짓 기반 점수)
    # 반환: (eer, threshold) - EER 값과 그때의 threshold
    # 설명:
    #   - ROC 곡선을 통해 FPR, TPR 계산
    #   - FNR = 1 - TPR
    #   - FPR과 FNR이 같아지는 지점(또는 최소 차이 지점)을 EER로 삼음
    #   - sklearn.metrics.roc_curve는 양성/음성 클래스가 모두 있어야 동작함.

    # 안전성 체크: 검증 레이블에 단일 클래스만 있으면 roc_curve가 실패
    if len(np.unique(y_true)) < 2:
        # 한 클래스밖에 없을 때는 EER를 계산할 수 없음. NaN 반환 또는 0.5 등 임의값 선택
        return float("nan"), float("nan")

    fpr, tpr, thr = roc_curve(y_true, y_score)
    fnr = 1 - tpr

    # FPR과 FNR 차이가 최소가 되는 인덱스
    idx = np.nanargmin(np.abs(fnr - fpr))
    eer = fpr[idx]
    return eer, thr[idx]

In [36]:
# ====== Stage A: 학습 루프(분류기 헤드만 학습) ======
import soundfile as sf
best_eer = 1.0  # 모델 저장을 위한 기준값(낮을수록 좋음)
for epoch in range(EPOCHS_A):
    model.train()
    pbar = tqdm(train_loader, desc=f"StageA Epoch {epoch}")
    for inputs, labels, metas in pbar:
        # 1) 옵티마이저 초기화
        optimizer.zero_grad()

        # 2) mixed precision로 forward/backward (속도와 메모리 이득)
        with autocast():
            # model.forward 내부에서 input_features["input_features"]를 device로 옮기고 있음
            logits = model(inputs)               # (B,) 로짓
            # labels는 float(0.0/1.0)로 가정. device로 옮겨 손실 계산
            loss = criterion(logits, labels.to(device))

        # 3) 스케일된 역전파
        scaler.scale(loss).backward()

        # (권장) gradient clipping — 큰 그래디언트로 인한 발산 방지
        # 반드시 scaler.unscale_(optimizer) 후에 수행해야 함.
        scaler.unscale_(optimizer)
        nn_utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

        # 4) 옵티마이저 스텝 및 scaler 업데이트
        scaler.step(optimizer)
        scaler.update()

        # 진행바에 손실 표시 (항상 .item()로 가져오기)
        pbar.set_postfix(loss=loss.item())

    # ====== validation: EER 계산 ======
    model.eval()
    all_scores, all_labels = [], []
    with torch.no_grad():
        for inputs, labels, metas in val_loader:
            # 검증에서도 autocast를 활용하면 속도/메모리 이득
            with autocast():
                logits = model(inputs)                   # (B,)
                probs = torch.sigmoid(logits).cpu().numpy()  # (B,) 확률로 변환
            all_scores.extend(probs.tolist())
            all_labels.extend(labels.numpy().tolist())

    eer, thr = compute_eer(np.array(all_labels), np.array(all_scores))
    print(f"[StageA] Epoch {epoch} Val EER: {eer:.4f} thr={thr:.4f}")

    # 모델 저장 예시: EER가 개선되면 best로 체크하여 저장
    # EER이 NaN이 아닐 때만 저장 로직 실행
    if not np.isnan(eer) and eer < best_eer:
        best_eer = eer
        torch.save(model.state_dict(), f"best_stageA_eer_{eer:.4f}.pt")
        print(f"  -> saved best StageA model (EER {eer:.4f})")

StageA Epoch 0:   0%|          | 0/1839 [00:00<?, ?it/s]

  with autocast():
  with autocast():


[StageA] Epoch 0 Val EER: 0.3278 thr=0.6982
  -> saved best StageA model (EER 0.3278)


StageA Epoch 1:   0%|          | 0/1839 [00:00<?, ?it/s]

  with autocast():
  with autocast():


[StageA] Epoch 1 Val EER: 0.3061 thr=0.0966
  -> saved best StageA model (EER 0.3061)


StageA Epoch 2:   0%|          | 0/1839 [00:00<?, ?it/s]

  with autocast():
  with autocast():


[StageA] Epoch 2 Val EER: 0.2958 thr=0.5034
  -> saved best StageA model (EER 0.2958)


In [37]:
# ====== Stage B: encoder 일부/전체 unfreeze 및 더 작은 LR로 fine-tuning ======
# 여기서는 간단히 전체 encoder를 unfreeze (주의: 메모리/오버핏 주의)
for p in model.encoder.parameters():
    p.requires_grad = True

In [38]:
# 파라미터 그룹: encoder는 작은 LR, classifier는 큰 LR
# named_parameters()의 name에 'encoder'가 포함되는 경우를 가정
params = [
    {"params": [p for n, p in model.named_parameters() if "encoder" in n], "lr": 2e-5},
    {"params": [p for n, p in model.named_parameters() if "encoder" not in n], "lr": 1e-4}
]

In [39]:
# 새로운 옵티마이저 (weight_decay는 동일)
optimizer = optim.AdamW(params, weight_decay=1e-6)

# (선택) LR 스케줄러를 추가하면 안정적. 예: cosine, OneCycle 등
# from torch.optim.lr_scheduler import CosineAnnealingLR
# scheduler = CosineAnnealingLR(optimizer, T_max=EPOCHS_B)

EPOCHS_B = 6
best_eer_stageB = best_eer  # Stage A에서 저장한 best 값 고려

In [40]:
for epoch in range(EPOCHS_B):
    model.train()
    pbar = tqdm(train_loader, desc=f"StageB Epoch {epoch}")
    for inputs, labels, metas in pbar:
        optimizer.zero_grad()
        with autocast():
            logits = model(inputs)
            loss = criterion(logits, labels.to(device))

        scaler.scale(loss).backward()

        # gradient clipping (unscale 후)
        scaler.unscale_(optimizer)
        nn_utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

        scaler.step(optimizer)
        scaler.update()

        pbar.set_postfix(loss=loss.item())

    # (선택) scheduler.step() -- 스케줄러를 설정한 경우
    # scheduler.step()

    # ====== validation: EER 측정 (Stage A와 동일) ======
    model.eval()
    all_scores, all_labels = [], []
    with torch.no_grad():
        for inputs, labels, metas in val_loader:
            with autocast():
                logits = model(inputs)
                probs = torch.sigmoid(logits).cpu().numpy()
            all_scores.extend(probs.tolist()); all_labels.extend(labels.numpy().tolist())

    eer, thr = compute_eer(np.array(all_labels), np.array(all_scores))
    print(f"[StageB] Epoch {epoch} Val EER: {eer:.4f} thr={thr:.4f}")

    # 모델 저장 (Stage B 베스트 기준)
    if not np.isnan(eer) and eer < best_eer_stageB:
        best_eer_stageB = eer
        torch.save(model.state_dict(), f"best_stageB_eer_{eer:.4f}.pt")
        print(f"  -> saved best StageB model (EER {eer:.4f})")

StageB Epoch 0:   0%|          | 0/1839 [00:00<?, ?it/s]

  with autocast():
  with autocast():


[StageB] Epoch 0 Val EER: 0.2048 thr=0.0120
  -> saved best StageB model (EER 0.2048)


StageB Epoch 1:   0%|          | 0/1839 [00:00<?, ?it/s]

  with autocast():
  with autocast():


[StageB] Epoch 1 Val EER: 0.2099 thr=0.0021


StageB Epoch 2:   0%|          | 0/1839 [00:00<?, ?it/s]

  with autocast():
  with autocast():


[StageB] Epoch 2 Val EER: 0.1923 thr=0.0026
  -> saved best StageB model (EER 0.1923)


StageB Epoch 3:   0%|          | 0/1839 [00:00<?, ?it/s]

  with autocast():
  with autocast():


[StageB] Epoch 3 Val EER: 0.1634 thr=0.0014
  -> saved best StageB model (EER 0.1634)


StageB Epoch 4:   0%|          | 0/1839 [00:00<?, ?it/s]

  with autocast():
  with autocast():


[StageB] Epoch 4 Val EER: 0.1696 thr=0.0016


StageB Epoch 5:   0%|          | 0/1839 [00:00<?, ?it/s]

  with autocast():
  with autocast():


[StageB] Epoch 5 Val EER: 0.1551 thr=0.0085
  -> saved best StageB model (EER 0.1551)


# **평가 (test, per-attack breakdown, unseen-attack)**

In [41]:
# 테스트 전용 데이터프레임(매니페스트에서 split=='test'인 것만 필터)
test_df = df[df['split'] == 'test']

# DataLoader 생성
# - SpoofDataset(test_df): __getitem__이 preprocessed np.wave를 반환
# - collate_fn: feature_extractor로 멜/입력 텐서로 변환하고 패딩 처리
# - batch_size: GPU 메모리에 맞게 조절하세요
test_loader = DataLoader(SpoofDataset(test_df), batch_size=8, collate_fn=collate_fn)

# 모델을 평가 모드로 설정: dropout/batchnorm 등 동작을 평가용으로 고정
model.eval()

WhisperSpoofClassifier(
  (encoder): WhisperEncoder(
    (conv1): Conv1d(80, 512, kernel_size=(3,), stride=(1,), padding=(1,))
    (conv2): Conv1d(512, 512, kernel_size=(3,), stride=(2,), padding=(1,))
    (embed_positions): Embedding(1500, 512)
    (layers): ModuleList(
      (0-5): 6 x WhisperEncoderLayer(
        (self_attn): WhisperAttention(
          (k_proj): Linear(in_features=512, out_features=512, bias=False)
          (v_proj): Linear(in_features=512, out_features=512, bias=True)
          (q_proj): Linear(in_features=512, out_features=512, bias=True)
          (out_proj): Linear(in_features=512, out_features=512, bias=True)
        )
        (self_attn_layer_norm): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
        (activation_fn): GELUActivation()
        (fc1): Linear(in_features=512, out_features=2048, bias=True)
        (fc2): Linear(in_features=2048, out_features=512, bias=True)
        (final_layer_norm): LayerNorm((512,), eps=1e-05, elementwise_affine=True

In [42]:
# pandas는 이미 임포트되어 있을 가능성이 크지만, 명시적으로 적어도 안전합니다.
import pandas as pd

# 결과 저장용 리스트 초기화
scores, labs, attacks = [], [], []

In [43]:
# gradient 계산을 끄면 메모리 절약 (평가시 필수)
with torch.no_grad():
    # test_loader 순회: (inputs, labels, metas) 형식으로 collate_fn에서 반환됨
    for inputs, labels, metas in test_loader:
        # 1) 모델에 입력을 넣어 logits 얻기
        #    주의: model.forward 내부에서 input_features를 device로 옮기는 구현이면
        #           여기에서 추가로 옮길 필요 없음.
        #    만약 forward가 입력을 옮기지 않는다면:
        #      inputs = {k: v.to(device) for k,v in inputs.items() if torch.is_tensor(v)}
        logits = model(inputs)                        # logits shape: (B,)

        # 2) 로짓 -> probability (sigmoid)
        #    cpu()로 옮겨 numpy로 변환 (텐서가 GPU에 있을 경우 .cpu() 필수)
        probs = torch.sigmoid(logits).cpu().numpy()   # (B,)

        # 3) 결과를 파이썬 리스트로 이어붙임
        scores.extend(probs.tolist())

        # labels는 DataLoader에서 CPU torch.tensor로 받는 경우가 대부분이지만,
        # 안전하게 .cpu() 해주면 GPU/CPU 모두에서 동작
        labs.extend(labels.cpu().numpy().tolist())

        # metas는 row.to_dict() 형태의 dict 리스트. 여기서는 'attack_type' 키를 사용
        # 안전하게 .get(...)를 사용해 KeyError 방지
        attacks.extend([m.get('attack_type', 'unknown') for m in metas])

# pandas DataFrame으로 정리: 한 줄에 score, label, attack type 저장
res_df = pd.DataFrame({"score": scores, "label": labs, "attack": attacks})

In [44]:
# ===========================
# 1) 전체(Overall) EER 계산
# ===========================
# compute_eer 함수(이전 셀에서 정의)를 그대로 사용.
# 함수 내부에서 레이블 모노톤(한 클래스만 존재) 체크가 있다면 NaN 반환될 수 있음.
overall_eer, _ = compute_eer(res_df['label'].values, res_df['score'].values)
print("Overall EER:", overall_eer)

Overall EER: 0.10710760118460019


In [45]:
# ===========================
# 2) 공격유형(attack)별 EER 계산
# ===========================
for atk, g in res_df.groupby("attack"):
    # (안전장치) 그룹에 샘플이 너무 적으면 EER 신뢰도 낮음 -> 건너뜀
    # 임계값(여기선 20)은 자유롭게 변경 가능
    if len(g) < 20:
        continue

    # compute_eer는 (labels, scores) numpy array를 받음
    # 그룹 내부 레이블이 하나의 클래스만 있으면 EER 계산 불가 -> NaN 반환 가능
    e, _ = compute_eer(g['label'].values, g['score'].values)

    # 출력: 공격명, 샘플 수, EER(소수 4자리 반올림)
    print(atk, "count", len(g), "EER", round(e, 4))

ASVspoof_LA_subset count 5896 EER 0.0148
ASVspoof_PA_subset count 6122 EER 0.2222
WaveFake_ljspeech_melgan count 1000 EER nan


# **지식증류 (Teacher → Student (LCNN)) 개요 & 코드 스캐폴드**
- 학습된 Teacher(Whisper)로 train set에 대해 logits(soft targets) 생성하고 저장.

- Student(경량 LCNN) 모델을 정의.

- Student 학습 시 loss = α * BCE(student_logits, hard_labels) + β * KLDiv(soft_logits/T , student_logits/T) 사용.

In [46]:
# ===========================
# 1) Teacher logits 생성 및 저장
# ===========================

import numpy as np

# teacher 모델의 출력(logits), ground-truth label, 경로를 저장할 리스트 준비
teacher_logits = []   # 모델이 예측한 로짓(softmax/sigmoid 하기 전 값)
teacher_labels = []   # 원래 정답 라벨 (0/1)
paths = []            # 오디오 파일 경로(나중에 매칭하기 위해 저장)

# 평가 모드로 전환: dropout, batchnorm 고정
model.eval()

# autograd 끔: 메모리 절약 + 속도 향상 (평가시 필수)
with torch.no_grad():
    # train_loader 순회
    # inputs: feature_extractor가 변환한 배치 입력
    # labels: 정답 라벨 (0=real, 1=fake 등)
    # metas: 원래 manifest의 row dict (여기서 파일 경로 등 메타 정보 추출 가능)
    for inputs, labels, metas in tqdm(train_loader):
        # 1) 모델 forward
        #    결과는 로짓(logits) (B,) shape
        #    여기서는 teacher 모델이기 때문에 gradient 불필요
        logits = model(inputs).cpu().numpy()

        # 2) Python list로 변환해서 누적 저장
        teacher_logits.extend(logits.tolist())             # 각 샘플별 로짓
        teacher_labels.extend(labels.numpy().tolist())     # 각 샘플별 라벨

        # 3) 메타정보에서 전처리된 파일 경로(pp_path)를 꺼내 저장
        #    나중에 student 학습 시 어떤 오디오가 어떤 로짓인지 매칭 가능
        paths.extend([m['pp_path'] for m in metas])

# ===========================
# 2) numpy npz 파일로 저장
# ===========================
# np.savez: 여러 개의 배열을 하나의 압축된 npz 파일로 저장 가능
# - paths: 파일 경로 문자열 리스트
# - logits: teacher 모델 출력 로짓 (float array)
# - labels: 정답 라벨 (int array)
np.savez(
    "/content/teacher_train_logits.npz",
    paths=np.array(paths),                   # 문자열 배열
    logits=np.array(teacher_logits),         # (N,) 또는 (N,1) float array
    labels=np.array(teacher_labels)          # (N,) int array
)

  0%|          | 0/1839 [00:00<?, ?it/s]

In [47]:
# ==============================
# 2) 간단한 LCNN Student (예시)
# ==============================
class SimpleLCNN(nn.Module):
    def __init__(self, in_channels=1, num_filters=64):
        #Args:
        #    in_channels: 입력 채널 수 (waveform은 mono라서 보통 1)
        #    num_filters: 첫 번째 Conv 레이어의 필터 개수 (기본 64)

        super().__init__()

        # CNN feature extractor 블록 정의
        # Conv1d → ReLU → MaxPool → Conv1d → ReLU → MaxPool → AdaptiveAvgPool
        self.net = nn.Sequential(
            # 1. Conv1D: waveform을 시간축에서 필터링
            nn.Conv1d(in_channels, num_filters, kernel_size=7, stride=1, padding=3),
            nn.ReLU(),              # 비선형 활성화
            nn.MaxPool1d(4),        # 다운샘플링 (길이 T → T/4)

            # 2. 두 번째 Conv 블록: 채널 수를 2배로 증가
            nn.Conv1d(num_filters, num_filters*2, kernel_size=5, padding=2),
            nn.ReLU(),
            nn.MaxPool1d(4),        # 다시 다운샘플링 (T/4 → T/16)

            # 3. Adaptive Average Pooling: 출력 길이를 항상 1로 맞춤
            #    즉, (B, C, T') → (B, C, 1)
            nn.AdaptiveAvgPool1d(1)
        )

        # Fully Connected layer (Linear)
        # - 입력 차원: 마지막 Conv 채널 수 = num_filters*2
        # - 출력 차원: 1 (이진 분류용 로짓)
        self.fc = nn.Linear(num_filters*2, 1)

    def forward(self, x):
        # Args:
        #     x: 입력 waveform tensor
        #        보통 shape은 (B, T) → (배치, 시퀀스 길이)
        # Returns:
        #     logits: (B,) shape 의 로짓 값 (sigmoid 전)

        # 입력 차원 보정: Conv1d는 (B, C, T) 형태를 기대
        # waveform은 (B, T)이므로 channel 차원(C=1)을 추가
        if x.dim() == 2:             # e.g. (B, T)
            x = x.unsqueeze(1)       # → (B, 1, T)

        # CNN feature extractor 통과
        out = self.net(x)            # (B, C_out, 1)

        # 시간축 길이가 1이라 squeeze로 차원 제거
        out = out.squeeze(-1)        # (B, C_out)

        # Linear layer 통과 후 로짓 반환
        return self.fc(out).squeeze(-1)  # (B,)

In [48]:
# ===========================
# Knowledge Distillation 학습을 위한 Student DataLoader 준비
# ===========================
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader

# 1) 저장된 Teacher logits 및 데이터 로드
teacher_data = np.load("/content/teacher_train_logits.npz")
student_train_paths = teacher_data["paths"]
teacher_train_logits = teacher_data["logits"]
student_train_labels = teacher_data["labels"]

# 2) Student 학습용 커스텀 Dataset 정의
#    이전 SpoofDataset과 유사하지만, teacher_logit을 함께 반환
class StudentSpoofDataset(Dataset):
    def __init__(self, paths, labels, teacher_logits):
        self.paths = paths
        self.labels = labels
        self.teacher_logits = teacher_logits # Teacher 로짓 추가

    def __len__(self):
        return len(self.paths)

    def __getitem__(self, idx):
        # 오디오 파일 로드 (preprocessed 경로 사용)
        # Note: SimpleLCNN은 waveform 입력을 기대하므로 feature extraction 불필요
        wav, sr = sf.read(self.paths[idx], dtype='float32') # sf는 이미 임포트되어 있어야 함

        # Pytorch tensor로 변환
        wav_tensor = torch.tensor(wav, dtype=torch.float32)
        label_tensor = torch.tensor(self.labels[idx], dtype=torch.long) # 라벨은 Long 또는 float

        # Teacher 로짓은 float32 텐서로 반환
        teacher_logit_tensor = torch.tensor(self.teacher_logits[idx], dtype=torch.float32)


        return {"audio": wav_tensor, "label": label_tensor, "teacher_logit": teacher_logit_tensor}

# 3) Student 학습용 collate_fn 정의
#    SimpleLCNN은 waveform 자체를 입력받으므로, Whisper feature extraction은 하지 않음.
#    다만, 배치 내에서 길이가 다를 수 있으므로 패딩 처리는 필요.
def student_collate_fn(batch):
    # batch: [{"audio": tensor, "label": tensor, "teacher_logit": tensor}, ...]

    # (1) 오디오, 라벨, teacher 로짓 분리
    audios = [b['audio'] for b in batch]
    labels = torch.stack([b['label'] for b in batch]) # (B,)
    teacher_logits = torch.stack([b['teacher_logit'] for b in batch]) # (B,)

    # (2) 오디오 패딩 (가장 긴 오디오에 맞춰 패딩)
    #     pad_sequence는 리스트의 텐서들을 묶고 패딩 (batch_first=True 중요)
    padded_audios = torch.nn.utils.rnn.pad_sequence(audios, batch_first=True, padding_value=0.0) # (B, T_max)

    return padded_audios, labels, teacher_logits


# 4) Student 학습용 DataLoader 생성
#    batch_size는 GPU 메모리에 맞게 조절
student_dataset = StudentSpoofDataset(student_train_paths, student_train_labels, teacher_train_logits)
student_train_loader = DataLoader(
    student_dataset,
    batch_size=64, # Student 모델이 가벼우므로 Batch Size를 늘릴 수 있습니다.
    shuffle=True,
    collate_fn=student_collate_fn
)

print("Student train loader created with", len(student_dataset), "samples.")

Student train loader created with 14706 samples.


In [49]:
# =========================================
# 3) Knowledge Distillation 학습 스케치
# =========================================
import torch.nn.functional as F
import torch.optim as optim

# 1) student 모델 생성 및 device로 이동
student = SimpleLCNN().to(device)

# 2) optimizer 정의
optimizer = optim.AdamW(student.parameters(), lr=1e-3)

# 3) Distillation 하이퍼파라미터
alpha = 0.5     # hard label loss 비중
beta  = 0.5     # teacher soft label loss 비중
T     = 2.0     # temperature for softening logits

# ===========================
# 4) 학습 루프 (epoch 단위)
# ===========================
for epoch in range(10):
    student.train()  # student를 학습 모드로 설정

    # student_train_loader는 배치 단위로
    # (waveform tensor, hard label tensor, teacher logit tensor) 반환한다고 가정
    for batch in student_train_loader:
        wavs, hard, teacher_logit = batch

        # ===========================
        # 4-1) student forward
        # ===========================
        # wavs: (B, T), hard: (B,), teacher_logit: (B,) 또는 (B,1)
        logits = student(wavs.to(device))   # student raw logits

        # ===========================
        # 4-2) Hard label loss (BCEWithLogits)
        # ===========================
        # 라벨의 데이터 타입을 float32로 변환
        loss_hard = F.binary_cross_entropy_with_logits(
            logits, hard.to(device).float()
        )

        # ===========================
        # 4-3) Soft label loss (KL divergence)
        # ===========================
        # Temperature 적용: T>1 → softening logits
        # 1) Student log-probabilities
        s_log_prob = F.log_softmax(logits / T, dim=-1)  # (B,) → log_prob
        # 2) Teacher probabilities
        t_prob = F.softmax(teacher_logit.to(device) / T, dim=-1)  # (B,)
        # 3) KL divergence (batchmean)
        #    * T*T로 scaling (Hinton et al., 2015)
        loss_kd = F.kl_div(s_log_prob, t_prob, reduction='batchmean') * (T*T)

        # ===========================
        # 4-4) Total loss
        # ===========================
        loss = alpha * loss_hard + beta * loss_kd

        # ===========================
        # 4-5) Backprop & optimizer step
        # ===========================
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

###성능 향상 작업


In [88]:
# ======================================
# 라이브러리 임포트
# ======================================
import torch
import torchaudio
import random
from torch.utils.data import DataLoader
from torch.cuda.amp import autocast

# ✅ V2에서 함수 구분
import torchaudio.functional as AF   # 오디오 신호 처리용
import torch.nn.functional as NF     # 텐서 연산/패딩용


# ======================================
# IR 기반 리버브 함수
# ======================================

# --- V1 (기존) ---
# def simple_reverb(waveform):
#     channels = waveform.shape[0]  # 입력 채널 수
#     ir = torch.zeros(channels, 4000)
#     ir[:, 0] = 1.0
#     ir[:, 500] = 0.6
#     ir[:, 1500] = 0.3
#     ir[:, 2500] = 0.1
#     return F.convolve(waveform, ir)   # ❌ torch.nn.functional.F로 되어 있어서 오류 발생

# --- V2 (수정) ---
def simple_reverb(waveform):
    channels = waveform.shape[0]
    ir = torch.zeros(channels, 4000)
    ir[:, 0] = 1.0
    ir[:, 500] = 0.6
    ir[:, 1500] = 0.3
    ir[:, 2500] = 0.1

    out = AF.fftconvolve(waveform, ir, mode="full")  # ✅ torchaudio.functional 사용
    return out[:, :waveform.shape[-1]]


# ======================================
# Waveform 보정 함수 (mono + 고정 길이)
# ======================================
def fix_waveform_shape(waveform, target_len=16000):
    if waveform.ndim == 2 and waveform.shape[0] > 1:
        waveform = waveform.mean(0, keepdim=True)
    if waveform.ndim == 1:
        waveform = waveform.unsqueeze(0)

    length = waveform.shape[1]
    if length < target_len:
        pad_size = target_len - length
        # --- V1 (기존) ---
        # waveform = torch.nn.functional.pad(waveform, (0, pad_size))
        # --- V2 (수정) ---
        waveform = NF.pad(waveform, (0, pad_size))
    elif length > target_len:
        waveform = waveform[:, :target_len]

    return waveform


# ======================================
# 데이터 증강 함수 (일반)
# ======================================

# --- V1 (기존) ---
# def augment_audio(waveform, sample_rate=16000):
#     if waveform.ndim == 1:
#         waveform = waveform.unsqueeze(0)
#     if random.random() < 0.3:
#         noise = torch.randn_like(waveform) * 0.005
#         waveform = waveform + noise
#     if random.random() < 0.3:
#         waveform = simple_reverb(waveform)
#     if random.random() < 0.3:
#         waveform = F.lowpass_biquad(waveform, sample_rate, cutoff_freq=4000)
#         waveform = F.highpass_biquad(waveform, sample_rate, cutoff_freq=100)
#         waveform = F.resample(waveform, sample_rate, 8000)
#         waveform = F.resample(waveform, 8000, sample_rate)
#     waveform = fix_waveform_shape(waveform, target_len=16000)
#     return waveform.squeeze(0)

# --- V2 (수정) ---
def augment_audio(waveform, sample_rate=16000):
    if waveform.ndim == 1:
        waveform = waveform.unsqueeze(0)

    if random.random() < 0.3:  # 잡음
        noise = torch.randn_like(waveform) * 0.005
        waveform = waveform + noise

    if random.random() < 0.3:  # 리버브
        waveform = simple_reverb(waveform)

    if random.random() < 0.3:  # 대체 코덱 효과
        waveform = AF.lowpass_biquad(waveform, sample_rate, cutoff_freq=4000)
        waveform = AF.highpass_biquad(waveform, sample_rate, cutoff_freq=100)
        waveform = AF.resample(waveform, sample_rate, 8000)
        waveform = AF.resample(waveform, 8000, sample_rate)

    waveform = fix_waveform_shape(waveform, target_len=16000)
    return waveform.squeeze(0)


# ======================================
# 데이터 증강 함수 (Physical Access 대응)
# ======================================

# --- V1 (기존) ---
# def augment_audio_pa(waveform, sample_rate=16000):
#     if waveform.ndim == 1:
#         waveform = waveform.unsqueeze(0)
#     if random.random() < 0.5:
#         snr_db = random.choice([5, 10, 15, 20])
#         noise = torch.randn_like(waveform)
#         waveform = waveform + noise * (10 ** (-snr_db / 20))
#     if random.random() < 0.3:
#         waveform = simple_reverb(waveform)
#     if random.random() < 0.3:
#         waveform = F.lowpass_biquad(waveform, sample_rate, cutoff_freq=4000)
#         waveform = F.highpass_biquad(waveform, sample_rate, cutoff_freq=100)
#         waveform = F.resample(waveform, sample_rate, 8000)
#         waveform = F.resample(waveform, 8000, sample_rate)
#     waveform = fix_waveform_shape(waveform, target_len=16000)
#     return waveform.squeeze(0)

# --- V2 (수정) ---
def augment_audio_pa(waveform, sample_rate=16000):
    if waveform.ndim == 1:
        waveform = waveform.unsqueeze(0)

    if random.random() < 0.5:  # 잡음 + SNR
        snr_db = random.choice([5, 10, 15, 20])
        noise = torch.randn_like(waveform)
        waveform = waveform + noise * (10 ** (-snr_db / 20))

    if random.random() < 0.3:  # 리버브
        waveform = simple_reverb(waveform)

    if random.random() < 0.3:  # 대체 코덱 효과
        waveform = AF.lowpass_biquad(waveform, sample_rate, cutoff_freq=4000)
        waveform = AF.highpass_biquad(waveform, sample_rate, cutoff_freq=100)
        waveform = AF.resample(waveform, sample_rate, 8000)
        waveform = AF.resample(waveform, 8000, sample_rate)

    waveform = fix_waveform_shape(waveform, target_len=16000)
    return waveform.squeeze(0)


In [None]:
# ======================================
# 학습 루프 (AMP + tqdm 진행 표시 + 체크포인트 저장)
# ======================================
from torch import amp
from tqdm import tqdm

EPOCHS_B2 = 10  # 필요에 따라 조정

for epoch in range(EPOCHS_B2):
    model.train()
    total_loss, num_batches = 0, 0

    # tqdm으로 DataLoader 감싸기
    progress_bar = tqdm(train_loader_aug, desc=f"Epoch {epoch+1}/{EPOCHS_B2}", unit="batch")

    for batch in progress_bar:
        inputs, labels, metas = batch

        # dict → device
        inputs = {k: v.to(device, non_blocking=True) for k, v in inputs.items()}
        labels = labels.float().to(device, non_blocking=True)

        optimizer.zero_grad()

        # ✅ AMP 적용
        with amp.autocast("cuda"):
            logits = model(inputs)
            loss = criterion(logits, labels)

        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        num_batches += 1

        # 🔹 tqdm에 현재 평균 loss 표시
        avg_loss = total_loss / num_batches
        progress_bar.set_postfix({"loss": f"{avg_loss:.4f}"})

    scheduler.step()

    print(f"[StageB+Aug] Epoch {epoch+1}/{EPOCHS_B2} | Loss: {avg_loss:.4f}")

    # ✅ 체크포인트 저장 (에폭마다 저장)
    checkpoint_path = f"checkpoint_epoch{epoch+1}.pt"
    torch.save({
        "epoch": epoch + 1,
        "model_state_dict": model.state_dict(),
        "optimizer_state_dict": optimizer.state_dict(),
        "scheduler_state_dict": scheduler.state_dict(),
    }, checkpoint_path)
    print(f"💾 체크포인트 저장 완료: {checkpoint_path}")

Epoch 1/10: 100%|██████████| 1839/1839 [56:09<00:00,  1.83s/batch, loss=0.9785]


[StageB+Aug] Epoch 1/10 | Loss: 0.9785
💾 체크포인트 저장 완료: checkpoint_epoch1.pt


Epoch 2/10: 100%|██████████| 1839/1839 [53:35<00:00,  1.75s/batch, loss=1.0113]


[StageB+Aug] Epoch 2/10 | Loss: 1.0113
💾 체크포인트 저장 완료: checkpoint_epoch2.pt


Epoch 3/10: 100%|██████████| 1839/1839 [52:48<00:00,  1.72s/batch, loss=1.0028]


[StageB+Aug] Epoch 3/10 | Loss: 1.0028
💾 체크포인트 저장 완료: checkpoint_epoch3.pt


Epoch 4/10: 100%|██████████| 1839/1839 [55:36<00:00,  1.81s/batch, loss=0.9910]


[StageB+Aug] Epoch 4/10 | Loss: 0.9910
💾 체크포인트 저장 완료: checkpoint_epoch4.pt


Epoch 5/10: 100%|██████████| 1839/1839 [53:20<00:00,  1.74s/batch, loss=0.9891]


[StageB+Aug] Epoch 5/10 | Loss: 0.9891
💾 체크포인트 저장 완료: checkpoint_epoch5.pt


Epoch 6/10: 100%|██████████| 1839/1839 [53:28<00:00,  1.74s/batch, loss=0.9882]


[StageB+Aug] Epoch 6/10 | Loss: 0.9882
💾 체크포인트 저장 완료: checkpoint_epoch6.pt


Epoch 7/10: 100%|██████████| 1839/1839 [53:28<00:00,  1.74s/batch, loss=1.0020]


[StageB+Aug] Epoch 7/10 | Loss: 1.0020
💾 체크포인트 저장 완료: checkpoint_epoch7.pt


Epoch 8/10:  29%|██▉       | 535/1839 [16:13<20:50,  1.04batch/s, loss=1.0381]

In [61]:
# ======================================
# Optimizer & Scheduler 정의 (checkpoint 로드 전에 필수)
# ======================================
EPOCHS_B2 = 10  # 총 학습 epoch 수

# Optimizer 새로 초기화
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-5)

# Scheduler도 새로 정의 (이어가도록 last_epoch 설정)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
    optimizer,
    T_max=EPOCHS_B2,
    eta_min=1e-6,
    last_epoch=-1  # checkpoint 불러온 후 조정
)

# ======================================
# 체크포인트 로드
# ======================================
resume_path = "/content/checkpoint_epoch7.pt"
checkpoint = torch.load(resume_path, map_location=device)

# 모델 파라미터 불러오기
model.load_state_dict(checkpoint["model_state_dict"])

# Optimizer는 state_dict 불러오지 않고 새로 시작
print("⚠️ Optimizer state는 새로 초기화됩니다.")

# Scheduler state_dict는 불러와도 되지만, optimizer랑 mismatch 날 수 있으므로 skip
start_epoch = checkpoint["epoch"]

# Scheduler epoch 위치를 맞추기 위해 step 여러 번 호출
for _ in range(start_epoch):
    scheduler.step()

print(f"🔄 체크포인트 로드 완료: {start_epoch} epoch부터 재개")

# ======================================
# 학습 루프 (AMP + tqdm 진행 표시 + 이어서 학습)
# ======================================
from torch import amp
from tqdm import tqdm

for epoch in range(start_epoch, EPOCHS_B2):
    model.train()
    total_loss, num_batches = 0, 0

    progress_bar = tqdm(train_loader_aug, desc=f"Epoch {epoch+1}/{EPOCHS_B2}", unit="batch")

    for batch in progress_bar:
        inputs, labels, metas = batch
        inputs = {k: v.to(device, non_blocking=True) for k, v in inputs.items()}
        labels = labels.float().to(device, non_blocking=True)

        optimizer.zero_grad()

        with amp.autocast("cuda"):
            logits = model(inputs)
            loss = criterion(logits, labels)

        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        num_batches += 1

        avg_loss = total_loss / num_batches
        progress_bar.set_postfix({"loss": f"{avg_loss:.4f}"})

    scheduler.step()
    print(f"[StageB+Aug] Epoch {epoch+1}/{EPOCHS_B2} | Loss: {avg_loss:.4f}")

    checkpoint_path = f"checkpoint_epoch{epoch+1}.pt"
    torch.save({
        "epoch": epoch + 1,
        "model_state_dict": model.state_dict(),
    }, checkpoint_path)
    print(f"💾 체크포인트 저장 완료: {checkpoint_path}")

⚠️ Optimizer state는 새로 초기화됩니다.
🔄 체크포인트 로드 완료: 7 epoch부터 재개


Epoch 8/10: 100%|██████████| 1839/1839 [53:54<00:00,  1.76s/batch, loss=0.2298]


[StageB+Aug] Epoch 8/10 | Loss: 0.2298
💾 체크포인트 저장 완료: checkpoint_epoch8.pt


Epoch 9/10:  72%|███████▏  | 1319/1839 [39:33<07:02,  1.23batch/s, loss=0.1847]Exception ignored in: <function _MultiProcessingDataLoaderIter.__del__ at 0x78738d9ba480>
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/torch/utils/data/dataloader.py", line 1664, in __del__
    self._shutdown_workers()
  File "/usr/local/lib/python3.12/dist-packages/torch/utils/data/dataloader.py", line 1647, in _shutdown_workers
    if w.is_alive():
       ^^^^^^^^^^^^
  File "/usr/lib/python3.12/multiprocessing/process.py", line 160, in is_alive
    assert self._parent_pid == os.getpid(), 'can only test a child process'
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AssertionError: can only test a child process
Epoch 9/10:  72%|███████▏  | 1319/1839 [39:34<15:35,  1.80s/batch, loss=0.1847]


RuntimeError: DataLoader worker (pid(s) 30093) exited unexpectedly

In [65]:
# ======================================
# Optimizer & Scheduler 정의 (checkpoint 로드 전에 필수)
# ======================================
EPOCHS_B2 = 10  # 총 학습 epoch 수

# Optimizer 새로 초기화
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-5)

# Scheduler도 새로 정의 (이어가도록 last_epoch 설정)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
    optimizer,
    T_max=EPOCHS_B2,
    eta_min=1e-6,
    last_epoch=-1  # checkpoint 불러온 후 조정
)

# ======================================
# 체크포인트 로드
# ======================================
resume_path = "/content/checkpoint_epoch8.pt"
checkpoint = torch.load(resume_path, map_location=device)

# 모델 파라미터 불러오기
model.load_state_dict(checkpoint["model_state_dict"])

# Optimizer는 state_dict 불러오지 않고 새로 시작
print("⚠️ Optimizer state는 새로 초기화됩니다.")

# Scheduler state_dict는 불러와도 되지만, optimizer랑 mismatch 날 수 있으므로 skip
start_epoch = checkpoint["epoch"]

# Scheduler epoch 위치를 맞추기 위해 step 여러 번 호출
for _ in range(start_epoch):
    scheduler.step()

print(f"🔄 체크포인트 로드 완료: {start_epoch} epoch부터 재개")

# ======================================
# 학습 루프 (AMP + tqdm 진행 표시 + 이어서 학습)
# ======================================
from torch import amp
from tqdm import tqdm

for epoch in range(start_epoch, EPOCHS_B2):
    model.train()
    total_loss, num_batches = 0, 0

    progress_bar = tqdm(train_loader_aug, desc=f"Epoch {epoch+1}/{EPOCHS_B2}", unit="batch")

    for batch in progress_bar:
        inputs, labels, metas = batch
        inputs = {k: v.to(device, non_blocking=True) for k, v in inputs.items()}
        labels = labels.float().to(device, non_blocking=True)

        optimizer.zero_grad()

        with amp.autocast("cuda"):
            logits = model(inputs)
            loss = criterion(logits, labels)

        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        num_batches += 1

        avg_loss = total_loss / num_batches
        progress_bar.set_postfix({"loss": f"{avg_loss:.4f}"})

    scheduler.step()
    print(f"[StageB+Aug] Epoch {epoch+1}/{EPOCHS_B2} | Loss: {avg_loss:.4f}")

    checkpoint_path = f"checkpoint_epoch{epoch+1}.pt"
    torch.save({
        "epoch": epoch + 1,
        "model_state_dict": model.state_dict(),
    }, checkpoint_path)
    print(f"💾 체크포인트 저장 완료: {checkpoint_path}")

⚠️ Optimizer state는 새로 초기화됩니다.
🔄 체크포인트 로드 완료: 8 epoch부터 재개


Epoch 9/10: 100%|██████████| 1839/1839 [2:16:35<00:00,  4.46s/batch, loss=0.1889]


[StageB+Aug] Epoch 9/10 | Loss: 0.1889
💾 체크포인트 저장 완료: checkpoint_epoch9.pt


Epoch 10/10: 100%|██████████| 1839/1839 [2:24:23<00:00,  4.71s/batch, loss=0.1766]

[StageB+Aug] Epoch 10/10 | Loss: 0.1766
💾 체크포인트 저장 완료: checkpoint_epoch10.pt





In [66]:
# ======================================
# 마지막 Epoch 종료 후 모델 저장 (Google Drive)
# ======================================
import os

# 저장 경로 (본인 드라이브 내 원하는 경로로 바꿔도 됨)
save_dir = "/content/drive/MyDrive/AI활용 소프트웨어 개발/프로젝트/data"
os.makedirs(save_dir, exist_ok=True)

# 파일 이름에 epoch 번호와 loss 기록
checkpoint_path = os.path.join(save_dir, f"stageB_aug_epoch{epoch+1}_loss{avg_loss:.4f}.pt")

# 모델, 옵티마이저, 스케줄러 상태 저장
torch.save({
    "epoch": epoch+1,
    "model_state_dict": model.state_dict(),
    "optimizer_state_dict": optimizer.state_dict(),
    "scheduler_state_dict": scheduler.state_dict(),
}, checkpoint_path)

print(f"✅ 모델 저장 완료: {checkpoint_path}")

✅ 모델 저장 완료: /content/drive/MyDrive/AI활용 소프트웨어 개발/프로젝트/data/stageB_aug_epoch10_loss0.1766.pt


In [67]:
# ======================================
# 공격 유형별 성능 분석 (증강 후)
# ======================================
import pandas as pd

model.eval()
all_scores, all_labels, attack_types = [], [], []

with torch.no_grad():
    for inputs, labels, metas in test_loader:
        inputs = {k: v.to(device, non_blocking=True) for k, v in inputs.items()}
        labels = labels.float().to(device, non_blocking=True)

        with torch.amp.autocast("cuda"):
            logits = model(inputs)
            probs = torch.sigmoid(logits).cpu().numpy()

        all_scores.extend(probs.tolist())
        all_labels.extend(labels.cpu().numpy().tolist())
        attack_types.extend([m.get("attack_type", "unknown") for m in metas])

res_df = pd.DataFrame({
    "score": all_scores,
    "label": all_labels,
    "attack": attack_types
})

final_eer, final_thr = compute_eer(res_df["label"].values, res_df["score"].values)
print(f"\n===== [증강 적용 후 Test Set 성능] =====")
print(f"Overall EER : {final_eer:.4f} | Thr : {final_thr:.4f}")

print("\n===== [공격 유형별 EER] =====")
for attack, df_sub in res_df.groupby("attack"):
    if len(df_sub) < 20:
        print(f"[{attack}] Skip (too few samples: {len(df_sub)})")
        continue
    eer, _ = compute_eer(df_sub["label"].values, df_sub["score"].values)
    print(f"[{attack}] EER: {eer:.4f} | 샘플 수: {len(df_sub)}")


===== [증강 적용 후 Test Set 성능] =====
Overall EER : 0.1211 | Thr : 0.2866

===== [공격 유형별 EER] =====
[ASVspoof_LA_subset] EER: 0.0364 | 샘플 수: 5896
[ASVspoof_PA_subset] EER: 0.2128 | 샘플 수: 6122
[WaveFake_ljspeech_melgan] EER: nan | 샘플 수: 1000


In [74]:
# ======================================
# 증강 전 vs 증강 후 EER 비교
# ======================================
def evaluate_model(model, loader, desc=""):
    model.eval()
    scores, labels, attacks = [], [], []
    with torch.no_grad():
        for inputs, batch_labels, metas in loader:
            inputs = {k: v.to(device, non_blocking=True) for k, v in inputs.items()}
            batch_labels = batch_labels.float().to(device, non_blocking=True)

            with torch.amp.autocast("cuda"):
                logits = model(inputs)
                probs = torch.sigmoid(logits).cpu().numpy()

            scores.extend(probs.tolist())
            labels.extend(batch_labels.cpu().numpy().tolist())
            attacks.extend([m.get("attack_type", "unknown") for m in metas])

    df = pd.DataFrame({"score": scores, "label": labels, "attack": attacks})
    eer, thr = compute_eer(df["label"].values, df["score"].values)
    return eer, thr, df


# best_stageB_eer_0.2048.pt 파일은 모델 state_dict 자체로 저장되었으므로 바로 로드
checkpoint_pre = torch.load("/content/best_stageB_eer_0.2048.pt")
model.load_state_dict(checkpoint_pre) # directly load the state_dict
eer_pre, thr_pre, df_pre = evaluate_model(model, test_loader, desc="Pre-Aug")

checkpoint_post = torch.load("/content/checkpoint_epoch10.pt")
model.load_state_dict(checkpoint_post["model_state_dict"]) # checkpoint_epoch10.pt saves a dict
eer_post, thr_post, df_post = evaluate_model(model, test_loader, desc="Post-Aug")

print("\n===== [Overall EER 비교] =====")
print(f"증강 전 EER : {eer_pre:.4f} (Thr={thr_pre:.4f})")
print(f"증강 후 EER : {eer_post:.4f} (Thr={thr_post:.4f})")

print("\n===== [공격별 EER 비교] =====")
attacks = sorted(set(df_pre["attack"]) & set(df_post["attack"]))
results = []
for attack in attacks:
    df_pre_sub = df_pre[df_pre["attack"] == attack]
    df_post_sub = df_post[df_post["attack"] == attack]
    if len(df_pre_sub) < 20 or len(df_post_sub) < 20:
        continue
    eer_pre_attack, _ = compute_eer(df_pre_sub["label"].values, df_pre_sub["score"].values)
    eer_post_attack, _ = compute_eer(df_post_sub["label"].values, df_post_sub["score"].values)
    results.append({
        "attack": attack,
        "EER_pre": eer_pre_attack,
        "EER_post": eer_post_attack,
        "diff": eer_post_attack - eer_pre_attack
    })

res_df = pd.DataFrame(results).sort_values("diff")
print(res_df.to_string(index=False))


===== [Overall EER 비교] =====
증강 전 EER : 0.1206 (Thr=0.0116)
증강 후 EER : 0.1211 (Thr=0.2866)

===== [공격별 EER 비교] =====
                  attack  EER_pre  EER_post      diff
      ASVspoof_PA_subset 0.238844  0.212841 -0.026003
      ASVspoof_LA_subset 0.017212  0.036450  0.019237
WaveFake_ljspeech_melgan      NaN       NaN       NaN


In [None]:
# ===== [Overall EER 비교] =====
# 증강 전 EER : 0.1821 (Thr=0.5123)
# 증강 후 EER : 0.1456 (Thr=0.4987)

# ===== [공격별 EER 비교] =====
#            attack  EER_pre  EER_post    diff
#  WaveFake_TTS_X    0.2200    0.1800  -0.0400
#  VC_model_Y        0.1900    0.1600  -0.0300
#  PhysicalAccess    0.2500    0.2300  -0.0200

###새로운 증강 버전

In [94]:
# ======================================
# 새로운 데이터 증강 클래스 (V2) - dict 반환 버전
# ======================================
import torchaudio
import random
import torch
import torch.nn.functional as F

class SpoofDatasetAugV2(SpoofDatasetAug):
    def __getitem__(self, idx):
        item = super().__getitem__(idx)   # dict 반환
        waveform = item["audio"]          # numpy array or list
        label = item["label"]
        meta = item["meta"]

        # numpy → torch 변환
        waveform = torch.tensor(waveform, dtype=torch.float32)

        # ---- 추가 증강 ----
        # 1) 랜덤 노이즈 추가
        if random.random() < 0.5:
            noise = torch.randn_like(waveform) * 0.005
            waveform = waveform + noise

        # 2) 랜덤 피치 시프트
        if random.random() < 0.3:
            n_steps = random.choice([-2, -1, 1, 2])
            waveform = torchaudio.functional.pitch_shift(
                waveform.unsqueeze(0), 16000, n_steps
            ).squeeze(0)

        # 3) 랜덤 타임 스트레치
        if random.random() < 0.3:
            rate = random.uniform(0.9, 1.1)
            waveform = torchaudio.functional.resample(
                waveform.unsqueeze(0), orig_freq=16000, new_freq=int(16000 * rate)
            ).squeeze(0)

            target_len = 16000 * 3  # 3초 길이 고정
            if waveform.size(-1) < target_len:
                waveform = F.pad(waveform, (0, target_len - waveform.size(-1)))
            else:
                waveform = waveform[:target_len]

        # torch → numpy 변환 후 dict 반환
        return {
            "audio": waveform.squeeze().cpu().numpy(),
            "label": label,
            "meta": meta
        }

In [95]:
# 기존 train_loader_aug 유지
train_loader_aug_v1 = DataLoader(
    SpoofDatasetAug(train_df),
    batch_size=8,
    shuffle=True,
    collate_fn=collate_fn,
    num_workers=4,
    pin_memory=True
)

# 새로운 V2 증강 데이터셋
train_loader_aug_v2 = DataLoader(
    SpoofDatasetAugV2(train_df),
    batch_size=8,
    shuffle=True,
    collate_fn=collate_fn,
    num_workers=4,
    pin_memory=True
)

In [96]:
# v1 증강
# train_loader = train_loader_aug_v1

# v2 증강
train_loader = train_loader_aug_v2

In [None]:
# ======================================
# 학습 루프 (AMP + tqdm + 체크포인트 저장)
# ======================================
from torch import amp
from tqdm import tqdm

EPOCHS_V2 = 10  # 원하는 epoch 수로 조정

for epoch in range(EPOCHS_V2):
    model.train()
    total_loss, num_batches = 0, 0

    progress_bar = tqdm(train_loader, desc=f"[V2] Epoch {epoch+1}/{EPOCHS_V2}", unit="batch")

    for batch in progress_bar:
        inputs, labels, metas = batch

        # dict → device
        inputs = {k: v.to(device, non_blocking=True) for k, v in inputs.items()}
        labels = labels.float().to(device, non_blocking=True)

        optimizer.zero_grad()

        # ✅ AMP 적용
        with amp.autocast("cuda"):
            logits = model(inputs)
            loss = criterion(logits, labels)

        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        num_batches += 1

        # tqdm에 loss 표시
        avg_loss = total_loss / num_batches
        progress_bar.set_postfix({"loss": f"{avg_loss:.4f}"})

    scheduler.step()

    print(f"[StageB+AugV2] Epoch {epoch+1}/{EPOCHS_V2} | Loss: {avg_loss:.4f}")

    # 체크포인트 저장
    checkpoint_path = f"checkpoint_v2_epoch{epoch+1}.pt"
    torch.save({
        "epoch": epoch+1,
        "model_state_dict": model.state_dict(),
        "optimizer_state_dict": optimizer.state_dict(),
        "scheduler_state_dict": scheduler.state_dict(),
    }, checkpoint_path)
    print(f"💾 V2 체크포인트 저장 완료: {checkpoint_path}")

[V2] Epoch 1/10:  11%|█▏        | 211/1839 [29:44<3:00:30,  6.65s/batch, loss=0.2096]Exception ignored in: <function _MultiProcessingDataLoaderIter.__del__ at 0x78738d9ba480>
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/torch/utils/data/dataloader.py", line 1664, in __del__
    self._shutdown_workers()
  File "/usr/local/lib/python3.12/dist-packages/torch/utils/data/dataloader.py", line 1647, in _shutdown_workers
    if w.is_alive():
       ^^^^^^^^^^^^
  File "/usr/lib/python3.12/multiprocessing/process.py", line 160, in is_alive
    assert self._parent_pid == os.getpid(), 'can only test a child process'
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AssertionError: can only test a child process
Exception ignored in: <function _MultiProcessingDataLoaderIter.__del__ at 0x78738d9ba480>
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/torch/utils/data/dataloader.py", line 1664, in __del__
    self._shutdown_workers()
  F

[StageB+AugV2] Epoch 1/10 | Loss: 0.1975
💾 V2 체크포인트 저장 완료: checkpoint_v2_epoch1.pt


[V2] Epoch 2/10: 100%|██████████| 1839/1839 [4:12:56<00:00,  8.25s/batch, loss=0.1977]


[StageB+AugV2] Epoch 2/10 | Loss: 0.1977
💾 V2 체크포인트 저장 완료: checkpoint_v2_epoch2.pt


[V2] Epoch 3/10:  86%|████████▌ | 1573/1839 [3:40:01<22:18,  5.03s/batch, loss=0.2106]

In [None]:
eer_v1, _, _ = evaluate_model(model_v1, test_loader, desc="Aug V1")
eer_v2, _, _ = evaluate_model(model_v2, test_loader, desc="Aug V2")
