# Imports

In [4]:
import librosa

import numpy as np
import pandas as pd
import random

import torch
import torchmetrics
import os
import torch.nn.functional as F
from torch import nn
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from tqdm import tqdm
from joblib import Parallel, delayed

import tensorflow as tf



In [5]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

### Config

In [6]:
class Config:
    SR = 32000
    N_MFCC = 40
    
    # Dataset
    ROOT_DIR = 'C:/HongBeomsun/Dataset_SSD/FakeVoice'
    
    # Training
    N_CLASSES = 2
    BATCH_SIZE = 64
    N_EPOCHS = 70
    LEARNING_RATE = 3e-4
    
    # Others
    SEED = 42
    
CONFIG = Config()

In [7]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

In [8]:
seed_everything(CONFIG.SEED)

### Data

In [9]:
df = pd.read_csv(os.path.join(CONFIG.ROOT_DIR,'train.csv'))

In [10]:
print(len(df))
df.head()

55438


Unnamed: 0,id,path,label
0,RUNQPNJF,./train/RUNQPNJF.ogg,real
1,JFAWUOGJ,./train/JFAWUOGJ.ogg,fake
2,RDKEKEVX,./train/RDKEKEVX.ogg,real
3,QYHJDOFK,./train/QYHJDOFK.ogg,real
4,RSPQNHAO,./train/RSPQNHAO.ogg,real


In [11]:
df['label'].value_counts()

label
fake    27818
real    27620
Name: count, dtype: int64

### Data Argumentation

### Train test split

In [12]:
train, val, _, _ = train_test_split(df, df['label'], test_size=0.2, random_state=CONFIG.SEED, stratify=df['label'])

In [13]:
train['label'].value_counts()
val['label'].value_counts()

label
fake    5564
real    5524
Name: count, dtype: int64

### Feature Extraction

In [14]:
def noise(data):
    noise_amp = 0.01*np.random.uniform()*np.amax(data)
    data = data + noise_amp * np.random.normal(size=data.shape[0])
    return data

def stretch(data, rate=0.8):
    stretch_data = librosa.effects.time_stretch(data, rate=rate)
    return stretch_data

def pitch(data, sampling_rate, pitch_factor=0.7):
    pitch_data = librosa.effects.pitch_shift(y=data, sr=sampling_rate, n_steps=pitch_factor)
    return pitch_data

In [18]:
def normalize_volume(y, target_dB=-20):
    rms = np.sqrt(np.mean(y**2))
    loudness = 20 * np.log10(rms)
    loudness_change_dB = target_dB - loudness
    y_normalized = y * (10 ** (loudness_change_dB / 20))
    return y_normalized

def load_audio(file_path, sr):
    y, sr = librosa.load(file_path, sr=sr)
    y = normalize_volume(y)
    return y, sr

def extract_features(y, sr):
    n_fft = 512
    mfcc = np.mean(librosa.feature.mfcc(y=y, sr=sr, n_mfcc=CONFIG.N_MFCC, n_fft=n_fft).T, axis=0)
    stft = np.abs(librosa.stft(y, n_fft=n_fft))
    chroma_stft = np.mean(librosa.feature.chroma_stft(S=stft, sr=sr, n_fft=n_fft).T, axis=0)
    mel = np.mean(librosa.feature.melspectrogram(y=y, sr=sr, n_fft=n_fft).T, axis=0)
    rms = np.mean(librosa.feature.rms(y=y).T, axis=0)
    zcr = np.mean(librosa.feature.zero_crossing_rate(y).T, axis=0)
    spec_contrast = np.mean(librosa.feature.spectral_contrast(S=stft, sr=sr, n_fft=n_fft).T, axis=0)
    spec_bandwidth = np.mean(librosa.feature.spectral_bandwidth(y=y, sr=sr, n_fft=n_fft).T, axis=0)
    tonnetz = np.mean(librosa.feature.tonnetz(y=librosa.effects.harmonic(y), sr=sr).T, axis=0)
    
    return np.concatenate([mfcc, chroma_stft, mel, rms, zcr, spec_contrast, spec_bandwidth, tonnetz])

def augment_data(y, sr):
    augmented_data = []
    augmented_data.append(noise(y))
    augmented_data.append(stretch(y))
    augmented_data.append(pitch(y, sr))
    return augmented_data

def create_void_data(data, sr):
    void_data = np.zeros_like(data)
    void_data = noise(void_data)
    return void_data

def create_duo_data(data1, data2, sr):
    if len(data1) > len(data2):
        data2 = np.pad(data2, (0, len(data1)-len(data2)), 'constant')
    else:
        data1 = np.pad(data1, (0, len(data2)-len(data1)), 'constant')
        
    duo_data = data1 + data2
    max_val = np.max(np.abs(duo_data))
    if max_val > 1:
        duo_data = duo_data / max_val
    
    return duo_data

def mix_two_random_data(df, sr):
    idx1, idx2 = random.sample(range(len(df)), 2)
    y1, _ = load_audio(os.path.join(CONFIG.ROOT_DIR, df.iloc[idx1]['path']), sr)
    y2, _ = load_audio(os.path.join(CONFIG.ROOT_DIR, df.iloc[idx2]['path']), sr)
    y_duo = create_duo_data(y1, y2, sr)
    label_y1 = df.iloc[idx1]['label']
    label_y2 = df.iloc[idx2]['label']
    
    label_vector = np.zeros(CONFIG.N_CLASSES, dtype=float)
    label_vector[0 if label_y1 == 'fake' else 1] = 1
    label_vector[0 if label_y2 == 'fake' else 1] = 1
    
    return y_duo, label_vector

In [16]:
def get_features(df, train_mode=True, augment=False):
    features = []
    labels = []
    total = len(df)
    
    for i, (index, row) in enumerate(tqdm(df.iterrows(), total=total), 1):
        y, sr = load_audio(os.path.join(CONFIG.ROOT_DIR, row['path']), CONFIG.SR)
        
        if train_mode:
            label = row['label']
            label_vector = np.zeros(CONFIG.N_CLASSES, dtype=float)
            label_vector[0 if label == 'fake' else 1] = 1
            labels.append(label_vector)
        
        features.append(extract_features(y, sr))
    
    if augment:
        augmented_features = []
        augmented_labels = []
        
        num_mixed_samples = int(total * 0.1)
        for _ in range(num_mixed_samples):
            try:
                y_duo, y_duo_label = mix_two_random_data(df, CONFIG.SR)
                augmented_features.append(extract_features(y_duo, CONFIG.SR))
                augmented_labels.append(y_duo_label)
            except Exception as e:
                print(f'Error during data augmentation: {e}')
                continue
        
        num_augmented_samples = int(total * 0.2)
        original_features = list(features)
        original_labels = list(labels)
        for idx in range(num_augmented_samples):
            try:
                augmented_data = augment_data(original_features[idx], CONFIG.SR)
                for aug_y in augmented_data:
                    augmented_features.append(extract_features(aug_y, CONFIG.SR))
                    augmented_labels.append(original_labels[idx])
            except Exception as e:
                print(f'Error during augmentation: {e}')
                continue
        
        features.extend(augmented_features)
        labels.extend(augmented_labels)
    
    if train_mode:
        return np.array(features), np.array(labels)
    return np.array(features)

In [19]:
train_features, train_labels = get_features(train, train_mode=True, augment=True)
val_features, val_labels = get_features(val, train_mode=True, augment=False)

  0%|          | 17/44350 [00:06<4:57:47,  2.48it/s]


KeyboardInterrupt: 

In [None]:
def save_np():
    os.makedirs(os.path.join(CONFIG.ROOT_DIR, 'npy'), exist_ok=True)
    np.save(os.path.join(CONFIG.ROOT_DIR, 'npy/train_features_VariousFeatures_1000.npy'), train_features)
    np.save(os.path.join(CONFIG.ROOT_DIR, 'npy/train_labels_VariousFeatures_1000.npy'), train_labels)
    np.save(os.path.join(CONFIG.ROOT_DIR, 'npy/val_features_VariousFeatures_1000.npy'), val_features)
    np.save(os.path.join(CONFIG.ROOT_DIR, 'npy/val_labels_VariousFeatures_1000.npy'), val_labels)

In [None]:
def load_np():
    train_features = np.load(os.path.join(CONFIG.ROOT_DIR, 'npy/train_features_VariousFeatures_1000.npy'))
    train_labels = np.load(os.path.join(CONFIG.ROOT_DIR, 'npy/train_labels_VariousFeatures_1000.npy'))
    val_features = np.load(os.path.join(CONFIG.ROOT_DIR, 'npy/val_features_VariousFeatures_1000.npy'))
    val_labels = np.load(os.path.join(CONFIG.ROOT_DIR, 'npy/val_labels_VariousFeatures_1000.npy'))
    
    return train_features, train_labels, val_features, val_labels

In [None]:
# save_np()
# train_features, train_labels, val_features, val_labels = load_np()

In [None]:
train_features = np.array(train_features)
train_labels = np.array(train_labels)
val_features = np.array(val_features)
val_labels = np.array(val_labels)

In [None]:
print(train_features.shape, len(train_labels))
print(val_features.shape, len(val_labels))

### Dataset

In [None]:
class CustomDataset(Dataset):
    def __init__(self, mfcc, label):
        self.mfcc = mfcc
        self.label = label

    def __len__(self):
        return len(self.mfcc)

    def __getitem__(self, index):
        if self.label is not None:
            return self.mfcc[index], self.label[index]
        return self.mfcc[index]

In [None]:
train_dataset = CustomDataset(train_features, train_labels)
val_dataset = CustomDataset(val_features, val_labels)

In [None]:
train_loader = DataLoader(
    train_dataset,
    batch_size=CONFIG.BATCH_SIZE,
    shuffle=True
)
val_loader = DataLoader(
    val_dataset,
    batch_size=CONFIG.BATCH_SIZE,
    shuffle=False
)

### Define Model

In [None]:
class MLP(nn.Module):
    def __init__(self, input_dim=181, output_dim=CONFIG.N_CLASSES):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(input_dim, 256)
        self.bn1 = nn.BatchNorm1d(256)
        self.dropout1 = nn.Dropout(0.3)
        
        self.fc2 = nn.Linear(256, 128)
        self.bn2 = nn.BatchNorm1d(128)
        self.dropout2 = nn.Dropout(0.3)
        
        self.fc3 = nn.Linear(128, 64)
        self.bn3 = nn.BatchNorm1d(64)
        self.dropout3 = nn.Dropout(0.3)
        
        self.fc4 = nn.Linear(64, output_dim)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.relu(self.bn1(self.fc1(x)))
        x = self.dropout1(x)
        x = self.relu(self.bn2(self.fc2(x)))
        x = self.dropout2(x)
        x = self.relu(self.bn3(self.fc3(x)))
        x = self.dropout3(x)
        x = self.fc4(x)
        x = self.sigmoid(x)
        return x

### MLFlow

In [None]:
import mlflow
import mlflow.pytorch

mlflow.set_experiment('FakeVoice')

def mlflow_run_decorator(run_name=None):
    def decorator(func):
        def wrapper(*args, **kwargs):
            mlflow.start_run(run_name=run_name)
            try:
                result = func(*args, **kwargs)
                mlflow.set_tag("Status", "SUCCEESS")
            except Exception as e:
                mlflow.log_param("Exception", e)
                mlflow.set_tag("Status", "FAIL")
                raise e
            finally:
                mlflow.end_run()
            return result
        return wrapper
    return decorator

### Train & Validation

In [None]:
from sklearn.metrics import roc_auc_score
import torch.optim as optim
import torch.optim.lr_scheduler as lr_scheduler

In [None]:
@mlflow_run_decorator(run_name='MLP_VariousFeatures')
def training(model, scheduler, optimizer, train_loader, val_loader, device):
    mlflow.log_params(vars(CONFIG))
    
    model.to(device)
    criterion = nn.BCELoss().to(device)
    
    best_val_score = 0
    best_model = None
    
    for epoch in range(1, CONFIG.N_EPOCHS+1):
        model.train()
        train_loss = []
        for features, labels in tqdm(iter(train_loader)):
            features = features.float().to(device)
            labels = labels.float().to(device)
            
            optimizer.zero_grad()
            
            output = model(features)
            loss = criterion(output, labels)
            
            loss.backward()
            optimizer.step()
            
            train_loss.append(loss.item())
                    
        _val_loss, _val_score = validation(model, criterion, val_loader, device)
        _train_loss = np.mean(train_loss)
        print(f'Epoch [{epoch}], Train Loss : [{_train_loss:.5f}] Val Loss : [{_val_loss:.5f}] Val AUC : [{_val_score:.5f}] LEARNING RATE : [{optimizer.param_groups[0]["lr"]:.5f}]')
    
        mlflow.log_metrics({'train_loss': _train_loss, 'val_loss': _val_loss, 'val_auc': _val_score}, step=epoch)
        scheduler.step(_val_loss)
            
        if best_val_score < _val_score:
            best_val_score = _val_score
            best_model = model
    
    return best_model

In [None]:
def multiLabel_AUC(y_true, y_scores):
    auc_scores = []
    for i in range(y_true.shape[1]):
        auc = roc_auc_score(y_true[:, i], y_scores[:, i])
        auc_scores.append(auc)
    mean_auc_score = np.mean(auc_scores)
    return mean_auc_score

In [None]:
def validation(model, criterion, val_loader, device):
    model.eval()
    val_loss, all_labels, all_probs = [], [], []
    
    with torch.no_grad():
        for features, labels in tqdm(iter(val_loader)):
            features = features.float().to(device)
            labels = labels.float().to(device)
            
            probs = model(features)
            
            loss = criterion(probs, labels)

            val_loss.append(loss.item())

            all_labels.append(labels.cpu().numpy())
            all_probs.append(probs.cpu().numpy())
        
        _val_loss = np.mean(val_loss)

        all_labels = np.concatenate(all_labels, axis=0)
        all_probs = np.concatenate(all_probs, axis=0)
        
        # Calculate AUC score
        auc_score = multiLabel_AUC(all_labels, all_probs)
    
    return _val_loss, auc_score

### Run

In [None]:
model = MLP()
optimizer = torch.optim.Adam(params = model.parameters(), lr = CONFIG.LEARNING_RATE)
scheduler = lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=5, verbose=True)

infer_model = training(model, scheduler, optimizer, train_loader, val_loader, device)

### Inference

In [None]:
test = pd.read_csv(os.path.join(CONFIG.ROOT_DIR, 'test.csv'))
test_mfcc = get_features(test, train_mode=False, augment=False)

In [None]:
test_dataset = CustomDataset(test_mfcc, None)
test_loader = DataLoader(
    test_dataset,
    batch_size=CONFIG.BATCH_SIZE,
    shuffle=False
)

In [None]:
# np.save(os.path.join(CONFIG.ROOT_DIR, 'npy/test_VariousFeatures_1000.npy'), test_mfcc)
# test_mfcc = np.load(os.path.join(CONFIG.ROOT_DIR, 'npy/test_VariousFeatures_1000.npy'))

In [None]:
def inference(model, test_loader, device):
    model.to(device)
    model.eval()
    predictions = []
    with torch.no_grad():
        for features in tqdm(iter(test_loader)):
            features = features.float().to(device)
            
            probs = model(features)

            probs  = probs.cpu().detach().numpy()
            predictions += probs.tolist()
    return predictions

In [None]:
preds = inference(infer_model, test_loader, device)

### Submission

In [None]:
submit = pd.read_csv(os.path.join(CONFIG.ROOT_DIR,'./sample_submission.csv'))
submit.iloc[:, 1:] = preds
submit.head()

In [None]:
submit.to_csv(f'./output/submit_MLP_VariousFeatures.csv', index=False)

### AfterTest

In [None]:
print(model(torch.tensor(train_features).float().to(device)).cpu().detach().numpy()[:10])
print(train_labels[:10])

In [None]:
np.where((train_labels[:, 0] == 1) & (train_labels[:, 1] == 1))[0]