# SW중심대학 디지털 경진대회_SW와 생성AI의 만남 : AI부문
 - 이 AI 경진대회에서는 5초 분량의 오디오 샘플에서 진짜 사람 목소리와 AI가 생성한 가짜 목소리를 정확하게 구분할 수 있는 모델을 개발하는 것이 목표입니다.
 - 이 작업은 보안, 사기 감지 및 오디오 처리 기술 향상 등 다양한 분야에서 매우 중요합니다.

## Imports
모델 학습 및 추론에 사용할 라이브러리들을 불러옵니다.

In [1]:
import os
import random

import torch
import torchaudio

import numpy as np
import pandas as pd

from torch import nn
import torch.nn.functional as F
import torchaudio.transforms as T
from torch.utils.data import Dataset, DataLoader

from tqdm.notebook import tqdm

### Check GPU Availability

In [2]:
!nvidia-smi

Sat Jul 13 17:04:34 2024       
+---------------------------------------------------------------------------------------+
| NVIDIA-SMI 546.56                 Driver Version: 546.56       CUDA Version: 12.3     |
|-----------------------------------------+----------------------+----------------------+
| GPU  Name                     TCC/WDDM  | Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |         Memory-Usage | GPU-Util  Compute M. |
|                                         |                      |               MIG M. |
|   0  NVIDIA GeForce RTX 4050 ...  WDDM  | 00000000:01:00.0 Off |                  N/A |
| N/A   47C    P8               1W /  78W |      0MiB /  6141MiB |      0%      Default |
|                                         |                      |                  N/A |
+-----------------------------------------+----------------------+----------------------+
                                                                    

In [3]:
# Set CUDA Device Number 0~7
DEVICE_NUM = 0

device = torch.device("cpu")
if torch.cuda.is_available():
    torch.cuda.set_device(DEVICE_NUM)
    device = torch.device("cuda")
print("INFO: Using device -", device)

INFO: Using device - cuda


## Config
- 딥러닝 모델을 학습하기 전에 설정해야하는 다양한 매개변수를 정의하는 설정 클래스입니다.
- 클래스를 사용하여 학습에 필요한 설정 값을 미리 지정합니다.

##### 오디오 신호
- 우리가 듣는 소리는 공기의 압력 변화로, 이것을 디지털 신호로 변환한 것이 오디오 신호입니다.
- 이 신호는 시간에 따라 변하는 진폭 값을 가지고 있습니다.

In [4]:
class Config:
    """ Configuration Class """
    SEED = 20240719  # 재현성을 위해 랜덤 시드 고정
    NB_NAME = "wespeaker_augmented"  # ipython 노트북 이름 지정
    
    """ SR(Sample Rate)
    - 오디오 데이터의 샘플링 레이트를 설정합니다.
    - 높은 샘플링 레이트는 더 높은 주파수의 소리를 캡처할 수 있지만, 처리에 더 많은 계산 자원이 필요합니다.
    - 오디오 데이터의 초당 샘플 수를 정의합니다.
    """
    SR = 32000

    """ ROOT_FOLDER
    - 데이터셋의 루트 폴더 경로를 설정합니다.
    """
    ROOT_FOLDER = os.path.join(".", "data")
    
    """ BATCH_SIZE
    - 학습 시 한 번에 처리할 데이터 샘플의 수를 정의합니다
    - 큰 배치 크기는 메모리 사용량을 증가시키지만, 학습 속도를 높입니다.
    """
    BATCH_SIZE = 100
    
    """ N_EPOCHS
    - 전체 데이터셋을 학습할 횟수를 정의합니다.
    - 에폭 수가 너무 적으면 과소적합이 발생할 수 있고, 너무 많으면 과적합이 발생할 수 있습니다.
    """
    N_EPOCHS = 15
    
    """ LR (Learning Rate)
    - 모델의 가중치를 업데이트할 때 사용되는 학습 속도를 정의합니다.
    - 학습률이 너무 크면 학습이 불안정해질 수 있고, 너무 작으면 학습 속도가 느려집니다.
    """
    LR = 1e-5

In [5]:
def seed_everything(seed):
    """ Fixed RandomSeed
    아래의 코드는 머신러닝이나 딥러닝 모델을 훈련할 때, 결과의 재현성을 보장하기 위해 사용되는 함수입니다.
    이 함수는 다양한 랜덤 시드를 고정하여, 실행할 때마다 동일한 결과를 얻기 위해 사용됩니다.
    """
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(Config.SEED)  # Seed 고정

## Dataset

In [6]:
from torchvision.datasets import utils
from sklearn.model_selection import train_test_split as split

utils.tqdm = tqdm


class VoiceDataset(Dataset):
    download_url = "https://drive.usercontent.google.com/download?id=1hi1dibkHyFbaxAteLlZJw6r3g9ddd4Lf&export=download&authuser=0&confirm=t&uuid=c40c278b-d74b-4b75-bc79-09e8a3ccffa4&at=APZUnTUvIVFVM9gjGNUCmDb4YZCy%3A1719807236671"

    @classmethod
    def download(cls, root='./data', filename="download.zip", md5=None):
        cls.download_root = root
        filepath = os.path.join(root, filename)
        if not os.path.exists(filepath):
            utils.download_and_extract_archive(cls.download_url, root, root, filename, md5)
            print("Extraction completed.")
        else:
            print(f"File already exists in {filepath}")

    @property
    def get_dataset_path(self):
        filename = "train.csv" if self.is_train else "test.csv"
        if self.custom_csv:
            filename = self.custom_csv + ".csv"
        return os.path.join(self.download_root, filename)

    @property
    def submission_form_path(cls):
        return os.path.join(cls.download_root, "sample_submission.csv")

    def __init__(self, root="./data", train=True, split_ratio=1, transform=None, custom_csv=None):
        """
        Voice Dataset for Contrastive Learning
        
        :param root: The path to the data directory
        :param train: is train or test
        :param split_ratio: split ratio for train(can be 0.5 or above) and valid(can be lower than 0.5) set
        :param transform: data transformer
        :param target_transform: label transformer
        """
        super().__init__()
        self.download(root)
        self.download_root = root
        self.is_train = train
        self.custom_csv = custom_csv
        self.dataset_name = ("train" if train else "test") if not custom_csv else custom_csv

        raw_data = self._load_data(self.get_dataset_path, split_ratio if split_ratio >= 0.5 else 1-split_ratio)
        if not self.is_train or split_ratio >= 0.5:
            self.raw_data, _ = raw_data
        else:
            _, self.raw_data = raw_data
            if "train" not in self.dataset_name:
                print(f"Warning: The name of dataset should start with 'train' for training set. (current - {self.dataset_name})")
            self.dataset_name = self.dataset_name.replace("train", "valid")

        self.data = self.raw_data['path'].tolist()

        if 'label' in self.raw_data.columns:
            self.label = [(0, 1) if lb == 'real' else (1, 0) for lb in self.raw_data['label'].tolist()]
        else:
            if 'real' in self.raw_data.columns and 'fake' in self.raw_data.columns:
                f_label = self.raw_data['fake'].tolist()
                r_label = self.raw_data['real'].tolist()
                self.label = list(zip(f_label, r_label))
            else:
                self.label = None

        self.transforms(transform)

    @staticmethod
    def _load_data(dataset_path, split_ratio=1):
        random_state = 1  # fixed random_state

        df = pd.read_csv(dataset_path)

        if split_ratio == 1 or split_ratio == 0:
            return (df, None) if split_ratio == 1 else (None, df)

        if 'label' in df.columns:
            df1, df2, _, _ = split(df, df['label'], test_size=1-split_ratio, random_state=random_state)
        else:
            df1, df2 = split(df, test_size=1-split_ratio, random_state=random_state)
        return df1, df2

    def transforms(self, transform=None):
        if transform is not None:
            if not isinstance(transform, list) and not isinstance(transform, tuple):
                transform = [transform]
            for t in transform:
                self.data, self.label = t(self.data, self.label)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        if self.label is not None:
            return self.data[index], self.label[index]
        return self.data[index]

In [7]:
split_ratio = 0.8

# Train Set
train_dataset = VoiceDataset(root=Config.ROOT_FOLDER, train=True, split_ratio=split_ratio)
train_augmented = [
    VoiceDataset(root=Config.ROOT_FOLDER, train=True, split_ratio=split_ratio, custom_csv="train_noise_type1"),
    VoiceDataset(root=Config.ROOT_FOLDER, train=True, split_ratio=split_ratio, custom_csv="train_noise_type2"),
    VoiceDataset(root=Config.ROOT_FOLDER, train=True, split_ratio=split_ratio, custom_csv="train_noise_type3"),
    VoiceDataset(root=Config.ROOT_FOLDER, train=True, split_ratio=split_ratio, custom_csv="train_augmented")
]

# Valid Set
valid_dataset = VoiceDataset(root=Config.ROOT_FOLDER, train=True, split_ratio=1-split_ratio)
valid_augmented = [
    VoiceDataset(root=Config.ROOT_FOLDER, train=True, split_ratio=1-split_ratio, custom_csv="train_noise_type1"),
    VoiceDataset(root=Config.ROOT_FOLDER, train=True, split_ratio=1-split_ratio, custom_csv="train_noise_type2"),
    VoiceDataset(root=Config.ROOT_FOLDER, train=True, split_ratio=1-split_ratio, custom_csv="train_noise_type3"),
    VoiceDataset(root=Config.ROOT_FOLDER, train=True, split_ratio=1-split_ratio, custom_csv="train_augmented")
]

# Test Set
test_dataset = VoiceDataset(root=Config.ROOT_FOLDER, train=False)

print(f"Loaded Dataset - train({len(train_dataset)}), valid({len(valid_dataset)}), test({len(test_dataset)})")
print("Query Dataset for checking:", train_dataset[0])
train_dataset.raw_data

File already exists in .\data\download.zip
File already exists in .\data\download.zip
File already exists in .\data\download.zip
Query Dataset for checking: ('./train/NQJUDUMG.ogg', (1, 0))


Unnamed: 0,id,path,label
19535,NQJUDUMG,./train/NQJUDUMG.ogg,fake
37414,SGACBBDI,./train/SGACBBDI.ogg,fake
40645,SIBSFMAP,./train/SIBSFMAP.ogg,fake
16487,LLBQPFAD,./train/LLBQPFAD.ogg,real
954,ZWYRTAOF,./train/ZWYRTAOF.ogg,real
...,...,...,...
50057,BDFFJCBX,./train/BDFFJCBX.ogg,fake
32511,NEFSVUCS,./train/NEFSVUCS.ogg,real
5192,MJFGSHIR,./train/MJFGSHIR.ogg,fake
12172,USIDOXOR,./train/USIDOXOR.ogg,real


#### Data Transformation

In [None]:
from huggingface_hub import hf_hub_download
import wespeaker


def get_resnet152():
    model_id = "Wespeaker/wespeaker-voxceleb-resnet152-LM"
    model_name = model_id.replace("Wespeaker/wespeaker-", "").replace("-", "_")

    root_dir = hf_hub_download(model_id, filename=model_name+".onnx").replace(model_name+".onnx", "")

    import os
    if not os.path.isfile(root_dir+"avg_model.pt"):
        os.rename(hf_hub_download(model_id, filename=model_name+".pt"), root_dir+"avg_model.pt")
    if not os.path.isfile(root_dir+"config.yaml"):
        os.rename(hf_hub_download(model_id, filename=model_name+".yaml"), root_dir+"config.yaml")

    resnet = wespeaker.load_model_local(root_dir)
    resnet.set_gpu(-1 if device == torch.device('cpu') else DEVICE_NUM)

    def resnet152(pcm, sample_rate=None):
        if isinstance(pcm, str):
            return resnet.extract_embedding(pcm)
        else:
            pass  # TODO: 메모리에 로드된 상태의 오디오 처리 코드 필요
            #return extract_embedding(resnet, pcm, sample_rate)

    print(f"ResNet152 Model Loaded on {resnet.device}")
    return resnet152

In [None]:
audio_cache_dir = "audio_cache"  # audio cache directory

if not os.path.isdir(audio_cache_dir):
    os.mkdir(audio_cache_dir)

In [None]:
def to_embedding(dataset_name, pretrained=get_resnet152, sample_rate=Config.SR):
    convert_path = lambda path: os.path.join(Config.ROOT_FOLDER, *path.replace("./", "").split("/"))
    embedding_path = os.path.join(audio_cache_dir, Config.NB_NAME, dataset_name+"_embedding.pt")

    def get_pretrained_embedding(dataset, label):
        if not os.path.isfile(embedding_path):
            new_dataset = []
            _pretrained = pretrained()

            for d in tqdm(dataset):
                new_dataset.append(_pretrained(convert_path(d), sample_rate))

            torch.save(new_dataset, embedding_path)
            print("INFO: Voice Embedding saved.")
        else:
            new_dataset = torch.load(embedding_path)
            print("INFO: Pretrained Voice Embedding loaded.", id(new_dataset))
        
        return new_dataset, label
    
    return get_pretrained_embedding

In [None]:
to_tensor = lambda d, l: (d, list(map(torch.tensor, l)))  # label to tensor

train_dataset.transforms(transform=[to_embedding(train_dataset.dataset_name), to_tensor])
for dataset in train_augmented:
    dataset.transforms(transform=[to_embedding(dataset.dataset_name), to_tensor])

valid_dataset.transforms(transform=[to_embedding(valid_dataset.dataset_name), to_tensor])
for dataset in valid_augmented:
    dataset.transforms(transform=[to_embedding(dataset.dataset_name), to_tensor])

test_dataset.transforms(transform=[to_embedding(test_dataset.dataset_name)])

In [None]:
for (data, label), i in zip(train_dataset, range(5)):
    print(f"Dataset {i}: {label}", data)

In [None]:
for (data, label), i in zip(valid_dataset, range(5)):
    print(f"Dataset {i}: {label}", data)

In [None]:
for dataset, i in zip(test_dataset, range(5)):
    print(f"Dataset {i}:", dataset)

In [None]:
# Combine Augmented
for dataset in train_augmented:
    train_dataset.data.extend(dataset.data)
    train_dataset.label.extend(dataset.label)

for dataset in valid_augmented:
    valid_dataset.data.extend(dataset.data)
    valid_dataset.label.extend(dataset.label)

print(f"Augmented Dataset - train({len(train_dataset)}), valid({len(valid_dataset)}), test({len(test_dataset)})")

## DataLoader
    - DataLoader는 구축된 데이터셋에서 배치크기(batch_size)에 맞게 데이터를 추출하고, 필요에 따라 섞거나(shuffle=True) 순서대로 반환(shuffle=False)하는 역할을 합니다.
    - 훈련 데이터(train_loader)는 일반적으로 섞어서 모델이 데이터에 덜 편향되게 학습하도록하며,
      검증 데이터(val_loader)는 모델 성능 평가를 위해 순서대로 사용하고,
      테스트 데이터(test_loader)는 최종적인 추론을 위해 사용합니다.

    이렇게 DataLoader를 사용함으로써, 효율적인 데이터 처리와 모델 학습 및 평가가 가능해집니다.

In [None]:
BATCH_SIZE = Config.BATCH_SIZE

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

## Define Model

In [None]:
class VoiceEncoder(nn.Module):
    """ Voice Encoder Model """
    
    def __init__(self, embedding_dim, hidden_dim, output_size):
        super().__init__()
        self.fc1 = nn.Linear(embedding_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, output_size)
        self.relu = nn.ReLU()

    def forward(self, x):
        h1 = self.relu(self.fc1(x))
        out = self.fc2(h1)
        return out

In [None]:
class VoiceDiscriminator(nn.Module):
    """ Voice Discriminator Model """
    
    def __init__(self, embedding_dim, hidden_size, latent_size):
        super().__init__()
        self.encoder = VoiceEncoder(embedding_dim, hidden_size, latent_size)
        self.classifier = nn.Linear(latent_size, 2)

    def forward(self, x):
        encoded = self.encoder(x)
        out = self.classifier(encoded)
        return F.sigmoid(out)

In [None]:
# 모델 파라미터 지정
model_params = dict(
    embedding_dim=len(train_dataset[0][0]),
    hidden_size=512,
    latent_size=128
)
model_params

In [None]:
# 모델 생성
voice_discrimination_model = VoiceDiscriminator(**model_params)
discriminator = voice_discrimination_model
discriminator.to(device)

In [None]:
# BinaryCrossEntropy
criterion = nn.BCELoss().to(device)

In [None]:
# Adam optimizer
optimizer = torch.optim.Adam(params=discriminator.parameters(), lr=Config.LR)

## Train & Validation

In [None]:
def multi_label_auc(y_true, y_scores):
    auc_scores = []
    for i in range(y_true.shape[1]):
        auc = roc_auc_score(y_true[:, i], y_scores[:, i])
        auc_scores.append(auc)
    mean_auc_score = np.mean(auc_scores)
    return mean_auc_score

In [None]:
def validation(model, loader):
    model.eval()
    val_loss, all_labels, all_probs = [], [], []

    with torch.no_grad():
        for features, labels in loader:
            features = features.float().to(device)
            labels = labels.float().to(device)

            probs = model(features)

            loss = criterion(probs, labels)

            val_loss.append(loss.item())

            all_labels.append(labels.cpu().numpy())
            all_probs.append(probs.cpu().numpy())

        _val_loss = np.mean(val_loss)

        all_labels = np.concatenate(all_labels, axis=0)
        all_probs = np.concatenate(all_probs, axis=0)

        # Calculate AUC score
        auc_score = multi_label_auc(all_labels, all_probs)

    return _val_loss, auc_score

In [None]:
from sklearn.metrics import roc_auc_score

def train(model):
    best_val_score = 0
    best_model = None
    
    train_length = len(train_loader)
    valid_length = len(valid_loader)
    
    epochs = tqdm(range(1, Config.N_EPOCHS+1), desc="Running Epochs")
    train_progressor = tqdm(train_loader, desc="Training")
    valid_progressor = tqdm(valid_loader, desc="Validation")
    
    for epoch in epochs:
        train_loader.reset(total=train_length)
        valid_loader.reset(total=valid_length)
        
        model.train()
        train_loss = []
        for features, labels in train_progressor:
            features = features.float().to(device)
            labels = labels.float().to(device)

            optimizer.zero_grad()

            output = model(features)
            loss = criterion(output, labels)

            loss.backward()
            optimizer.step()

            train_loss.append(loss.item())

        _val_loss, _val_score = validation(model, valid_progressor)
        _train_loss = np.mean(train_loss)
        print(f"\rEpoch [{epoch}], Train Loss : [{_train_loss:.6f}] Val Loss : [{_val_loss:.6f}] Val AUC : [{_val_score:.6%}]", end="\n" if epoch % 10 == 0 or epoch == Config.N_EPOCHS else "")

        if best_val_score < _val_score:
            best_val_score = _val_score
            best_model = model
    
    return best_model, best_val_score

In [None]:
infer_model, infer_score = train(discriminator)

In [None]:
if not os.path.isdir(os.path.join(".", "models")):
    os.mkdir(os.path.join(".", "models"))

# Model Save
save_path = os.path.join(".", "models", f"{Config.NB_NAME}_acc_{infer_score:6%}.pt")
torch.save(discriminator.state_dict(), save_path)
print(f"Model saved to {save_path}")

### Inference
테스트 데이터셋에 대한 추론은 다음 순서로 진행됩니다.

1. 모델 및 디바이스 설정
    - 모델을 주어진 device(GPU 또는 CPU)로 이동시키고, 평가모드로 전환합니다.
2. 예측 수행
    - 예측 결과를 저장한 빈 리스트를 초기화하고 test_loader에서 배치별로 데이터를 불러와 예측을 수행합니다.
    - 각 배치에 대해 스펙트로그램 데이터를 device로 이동시킵니다.
    - 모델 예측 확률(probs)을 계산합니다.
    - 예측 확률을 predictions리스트에 추가합니다.

In [None]:
def inference(model):
    predictions = []
    
    model.to(device)
    model.eval()
    with torch.no_grad():
        for features in tqdm(test_loader):
            probs = model(features)

            probs = probs.cpu().detach().numpy()
            predictions += probs.tolist()
    return predictions

In [None]:
predicted_labels = inference(discriminator)

### Submission
추론 결과를 제출 양식에 덮어 씌워 CSV 파일로 생성하는 과정은 다음과 같습니다.

1. 제출 양식 로드
    - pd.read_csv('./sample_submission.csv')를 사용하여 제출을 위한 샘플 형식 파일을 로드합니다.
    - 이 파일은 일반적으로 각 테스트 샘플에 대한 ID와 예측해야 하는 필드가 포함된 템플릿 형태를 가지고 있습니다.
2. 예측 결과 할당
    - submit.iloc[:,1:] = preds 추론함수(inference)에서 반환된 예측결과(preds)를 샘플 제출 파일에 2번째 열부터 할당합니다.
3. 제출 파일 저장
    - 수정된 제출 파일을 baseline_submit 이란 이름의 CSV 파일로 저장합니다.
    - index=False는 파일 저장시 추가적인 index가 발생하지 않도록 설정하여, 제작한 제출 파일과 동일한 형태의 파일을 저장합니다.

In [None]:
submit = pd.read_csv(test_dataset.submission_form_path)
submit.iloc[:, 1:] = predicted_labels
submit.head()

In [None]:
submit.to_csv(f"{Config.NB_NAME}_acc_{infer_score:6%}_submit.csv", index=False)