In [None]:

import librosa
from sklearn.model_selection import train_test_split
import numpy as np
import pandas as pd
import random

from torch import nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm

import torch
import torchmetrics
import os
import warnings
warnings.filterwarnings('ignore')
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
class Config:
    SR = 32000
    N_MFCC = 13
    # Dataset
    ROOT_FOLDER = './'
    # Training
    N_CLASSES = 2
    BATCH_SIZE = 96
    N_EPOCHS = 20
    LR = 3e-4
    # Others
    SEED = 42

CONFIG = Config()

def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CONFIG.SEED) # Seed 고정
df = pd.read_csv('train.csv')
train, val, _, _ = train_test_split(df, df['label'], test_size=0.2, random_state=CONFIG.SEED)

def add_noise(y, noise_factor=0.005):
    noise = np.random.randn(len(y))
    augmented_data = y + noise_factor * noise
    return augmented_data

def pitch_shift(y, sr, n_steps):
    return librosa.effects.pitch_shift(y=y, sr=sr, n_steps=n_steps)

def time_stretch(y, rate):
    return librosa.effects.time_stretch(y=y, rate=rate)

def get_mfcc_feature(df, train_mode=True):
    features = []
    labels = []
    for _, row in tqdm(df.iterrows(), total=df.shape[0]):
        y, sr = librosa.load(row['path'], sr=CONFIG.SR)

        if train_mode:
            # Data augmentation
            if random.random() < 0.5:
                y = add_noise(y)
            if random.random() < 0.5:
                y = pitch_shift(y, sr, random.uniform(-2, 2))
            if random.random() < 0.5:
                y = time_stretch(y, random.uniform(0.8, 1.2))

        # Feature extraction
        mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=CONFIG.N_MFCC)
        chroma = librosa.feature.chroma_stft(y=y, sr=sr)
        spec_contrast = librosa.feature.spectral_contrast(y=y, sr=sr)
        tonnetz = librosa.feature.tonnetz(y=y, sr=sr)
        rms = librosa.feature.rms(y=y)

        mfcc = np.mean(mfcc.T, axis=0)
        chroma = np.mean(chroma.T, axis=0)
        spec_contrast = np.mean(spec_contrast.T, axis=0)
        tonnetz = np.mean(tonnetz.T, axis=0)
        rms = np.mean(rms.T, axis=0)

        feature_vector = np.concatenate((mfcc, chroma, spec_contrast, tonnetz, rms))
        features.append(feature_vector)

        if train_mode:
            label = row['label']
            label_vector = np.zeros(CONFIG.N_CLASSES, dtype=float)
            label_vector[0 if label == 'fake' else 1] = 1
            labels.append(label_vector)

    if train_mode:
        return features, labels
    return features

train_mfcc, train_labels = get_mfcc_feature(train, True)
val_mfcc, val_labels = get_mfcc_feature(val, True)

class CustomDataset(Dataset):
    def __init__(self, mfcc, label, sequence_length=1):
        self.mfcc = mfcc
        self.label = label
        self.sequence_length = sequence_length

    def __len__(self):
        return len(self.mfcc)

    def __getitem__(self, index):
        mfcc = self.mfcc[index]
        mfcc = mfcc.reshape(self.sequence_length, -1)  # Reshape to (sequence_length, input_dim)
        if self.label is not None:
            label = self.label[index]
            return mfcc, label
        return mfcc

train_dataset = CustomDataset(train_mfcc, train_labels, sequence_length=1)
val_dataset = CustomDataset(val_mfcc, val_labels, sequence_length=1)

train_loader = DataLoader(
    train_dataset,
    batch_size=CONFIG.BATCH_SIZE,
    shuffle=True
)
val_loader = DataLoader(
    val_dataset,
    batch_size=CONFIG.BATCH_SIZE,
    shuffle=False
)

In [None]:
import torch.optim as optim

In [None]:
# Define deeper MLP model
class DeeperMLP(nn.Module):
    def __init__(self, input_dim, hidden_dim=128, output_dim=CONFIG.N_CLASSES):
        super(DeeperMLP, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.bn1 = nn.BatchNorm1d(hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.bn2 = nn.BatchNorm1d(hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, hidden_dim)
        self.bn3 = nn.BatchNorm1d(hidden_dim)
        self.fc4 = nn.Linear(hidden_dim, hidden_dim)
        self.bn4 = nn.BatchNorm1d(hidden_dim)
        self.fc5 = nn.Linear(hidden_dim, output_dim)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=0.3)

    def forward(self, x):
        x = self.relu(self.bn1(self.fc1(x)))
        x = self.dropout(x)
        x = self.relu(self.bn2(self.fc2(x)))
        x = self.dropout(x)
        x = self.relu(self.bn3(self.fc3(x)))
        x = self.dropout(x)
        x = self.relu(self.bn4(self.fc4(x)))
        x = self.dropout(x)
        x = self.fc5(x)
        x = torch.sigmoid(x)
        return x

# Define LSTM model with additional layers
class EnhancedBiLSTM(nn.Module):
    def __init__(self, input_dim=39, hidden_dim=128, output_dim=CONFIG.N_CLASSES):
        super(EnhancedBiLSTM, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, bidirectional=True, batch_first=True, num_layers=2)
        self.fc = nn.Linear(hidden_dim * 2, output_dim)

    def forward(self, x):
        h_lstm, _ = self.lstm(x)
        output = self.fc(h_lstm[:, -1, :])
        return output

# Define Enhanced EnsembleModel
class EnhancedEnsembleModel(nn.Module):
    def __init__(self, cnn_model, lstm_model, output_dim=CONFIG.N_CLASSES):
        super(EnhancedEnsembleModel, self).__init__()
        self.cnn = cnn_model
        self.lstm = lstm_model
        self.fc = nn.Linear(output_dim * 2, output_dim)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        batch_size = x.size(0)
        x = x.view(batch_size, -1)

        cnn_output = self.cnn(x)
        lstm_output = self.lstm(x.view(batch_size, 1, -1))
        combined = torch.cat((cnn_output, lstm_output), dim=1)
        output = self.fc(combined)
        output = self.sigmoid(output)
        return output

# Training function with learning rate scheduling
def train(model, optimizer, train_loader, val_loader, device):
    model.to(device)
    criterion = nn.BCELoss().to(device)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=3, factor=0.5)

    best_val_score = 0
    best_model = None

    for epoch in range(1, CONFIG.N_EPOCHS + 1):
        model.train()
        train_loss = []
        for features, labels in tqdm(iter(train_loader)):
            features = features.float().to(device)
            labels = labels.float().to(device)

            optimizer.zero_grad()

            output = model(features)
            loss = criterion(output, labels)

            loss.backward()
            optimizer.step()

            train_loss.append(loss.item())

        _val_loss, _val_score = validation(model, criterion, val_loader, device)
        _train_loss = np.mean(train_loss)
        print(f'Epoch [{epoch}], Train Loss : [{_train_loss:.5f}] Val Loss : [{_val_loss:.5f}] Val AUC : [{_val_score:.5f}]')

        scheduler.step(_val_loss)

        if best_val_score < _val_score:
            best_val_score = _val_score
            best_model = model

    return best_model

# Validation function
def validation(model, criterion, val_loader, device):
    model.eval()
    val_loss, all_labels, all_probs = [], [], []

    with torch.no_grad():
        for features, labels in tqdm(iter(val_loader)):
            features = features.float().to(device)
            labels = labels.float().to(device)

            probs = model(features)
            loss = criterion(probs, labels)
            val_loss.append(loss.item())

            all_labels.append(labels.cpu().numpy())
            all_probs.append(probs.cpu().numpy())

        _val_loss = np.mean(val_loss)
        all_labels = np.concatenate(all_labels, axis=0)
        all_probs = np.concatenate(all_probs, axis=0)
        auc_score = multiLabel_AUC(all_labels, all_probs)

    return _val_loss, auc_score

# AUC calculation function
def multiLabel_AUC(y_true, y_scores):
    auc_scores = []
    for i in range(y_true.shape[1]):
        auc = roc_auc_score(y_true[:, i], y_scores[:, i])
        auc_scores.append(auc)
    mean_auc_score = np.mean(auc_scores)
    return mean_auc_score

# Inference function
def inference(model, test_loader, device):
    model.to(device)
    model.eval()
    predictions = []
    with torch.no_grad():
        for features in tqdm(iter(test_loader)):
            features = features.float().to(device)
            probs = model(features)
            probs = probs.cpu().detach().numpy()
            predictions += probs.tolist()
    return predictions

# Load test data and create DataLoader
test = pd.read_csv('./test.csv')
test_mfcc = get_mfcc_feature(test, False)
test_dataset = CustomDataset(test_mfcc, None)
test_loader = DataLoader(test_dataset, batch_size=CONFIG.BATCH_SIZE, shuffle=False)

# Model initialization and training
input_dim = 39  # mfcc + chroma + spectral contrast + tonnetz
cnn_model = DeeperMLP(input_dim=input_dim)
lstm_model = EnhancedBiLSTM()
ensemble_model = EnhancedEnsembleModel(cnn_model, lstm_model)
optimizer = optim.AdamW(params=ensemble_model.parameters(), lr=CONFIG.LR, weight_decay=1e-5)
infer_model = train(ensemble_model, optimizer, train_loader, val_loader, device)

# Model inference
preds = inference(infer_model, test_loader, device)

# Save submission
submit = pd.read_csv('./sample_submission.csv')
submit.iloc[:, 1:] = preds
submit.head()
submit.to_csv('./baseline_submit.csv', index=False)