In [2]:
!pip install transformers
!pip install torchaudio



In [3]:
import os
import numpy as np
import pandas as pd
import librosa
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
from transformers import Wav2Vec2ForSequenceClassification, Wav2Vec2Processor
import torch
import torchaudio
from torch import nn
from torch.utils.data import Dataset, DataLoader

In [4]:
class Config:
    SR = 16000
    N_MFCC = 13
    ROOT_FOLDER = r"C:\Users\KimDongyoung\Downloads\SW중심대학"  # Update this to your dataset path
    N_CLASSES = 2
    BATCH_SIZE = 16
    N_EPOCHS = 10
    LR = 1e-4
    HIDDEN_DIM = 256
    DROPOUT_RATE = 0.3
    SEED = 42

CONFIG = Config()

In [5]:
def seed_everything(seed):
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CONFIG.SEED)

# Load CSV files
train_df = pd.read_csv(os.path.join(CONFIG.ROOT_FOLDER, 'train.csv'))
test_df = pd.read_csv(os.path.join(CONFIG.ROOT_FOLDER, 'test.csv'))
sample_submission_df = pd.read_csv(os.path.join(CONFIG.ROOT_FOLDER, 'sample_submission.csv'))

# Update paths in DataFrames
train_df['path'] = train_df['path'].apply(lambda x: os.path.join(CONFIG.ROOT_FOLDER, x[2:]))
test_df['path'] = test_df['path'].apply(lambda x: os.path.join(CONFIG.ROOT_FOLDER, x[2:]))

# Split the training data
train_data, val_data = train_test_split(train_df, test_size=0.2, random_state=CONFIG.SEED)

In [6]:
# Load the pre-trained Wav2Vec2 model and processor
processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base-960h")
model = Wav2Vec2ForSequenceClassification.from_pretrained("facebook/wav2vec2-base-960h", num_labels=CONFIG.N_CLASSES)

Some weights of Wav2Vec2ForSequenceClassification were not initialized from the model checkpoint at facebook/wav2vec2-base-960h and are newly initialized: ['classifier.bias', 'classifier.weight', 'projector.bias', 'projector.weight', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original0', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original1', 'wav2vec2.masked_spec_embed']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [7]:
def pad_or_truncate(waveform, target_length):
    if len(waveform) > target_length:
        return waveform[:target_length]
    elif len(waveform) < target_length:
        padding = target_length - len(waveform)
        return np.pad(waveform, (0, padding), mode='constant')
    else:
        return waveform

def preprocess_audio(file_path, target_sample_rate=16000, target_length=16000*5):  # Assume 5 seconds as the target length
    waveform, sample_rate = torchaudio.load(file_path)
    if sample_rate != target_sample_rate:
        waveform = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=target_sample_rate)(waveform)
    waveform = waveform.squeeze().numpy()
    waveform = pad_or_truncate(waveform, target_length)
    return waveform

def get_wav2vec_features(df, train_mode=True, target_length=16000*5):
    features = []
    labels = []
    for _, row in tqdm(df.iterrows(), total=len(df)):
        file_path = row['path']
        if not os.path.exists(file_path):
            print(f"File not found: {file_path}")
            continue
        audio = preprocess_audio(file_path, target_sample_rate=CONFIG.SR, target_length=target_length)
        input_values = processor(audio, return_tensors="pt", sampling_rate=CONFIG.SR).input_values
        features.append(input_values.squeeze().numpy())
        if train_mode:
            label = row['label']
            label_vector = 0 if label == 'fake' else 1
            labels.append(label_vector)
    if train_mode:
        return features, labels
    return features

In [None]:
train_features, train_labels = get_wav2vec_features(train_data, True)
val_features, val_labels = get_wav2vec_features(val_data, True)

100%|███████████████████████████████████████████████████████████████████████████| 44350/44350 [06:10<00:00, 119.59it/s]
 93%|██████████████████████████████████████████████████████████████████████     | 10354/11088 [01:25<00:06, 107.79it/s]

In [None]:
class CustomDataset(Dataset):
    def __init__(self, features, labels=None):
        self.features = features
        self.labels = labels

    def __len__(self):
        return len(self.features)

    def __getitem__(self, index):
        if self.labels is not None:
            return torch.tensor(self.features[index]), torch.tensor(self.labels[index])
        return torch.tensor(self.features[index])

In [None]:
train_dataset = CustomDataset(train_features, train_labels)
val_dataset = CustomDataset(val_features, val_labels)


train_loader = DataLoader(train_dataset, batch_size=CONFIG.BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=CONFIG.BATCH_SIZE, shuffle=False)

In [None]:
def train(model, optimizer, train_loader, val_loader, device, patience=5):
    model.to(device)
    criterion = nn.CrossEntropyLoss().to(device)
    
    best_val_score = 0
    best_model = None
    early_stop_count = 0
    
    for epoch in range(1, CONFIG.N_EPOCHS+1):
        model.train()
        train_loss = []
        for features, labels in tqdm(iter(train_loader)):
            features = features.to(device)
            labels = labels.to(device)
            
            optimizer.zero_grad()
            output = model(features).logits
            loss = criterion(output, labels)
            loss.backward()
            optimizer.step()
            train_loss.append(loss.item())
                    
        _val_loss, _val_score, accuracy, precision, recall, f1 = validation(model, criterion, val_loader, device)
        _train_loss = np.mean(train_loss)
        print(f'Epoch [{epoch}], Train Loss : [{_train_loss:.5f}] Val Loss : [{_val_loss:.5f}] Val AUC : [{_val_score:.5f}]')
        print(f'Val Accuracy : [{accuracy:.5f}], Precision : [{precision:.5f}], Recall : [{recall:.5f}], F1 : [{f1:.5f}]')
            
        if best_val_score < _val_score:
            best_val_score = _val_score
            best_model = model
            early_stop_count = 0
        else:
            early_stop_count += 1
        
        if early_stop_count >= patience:
            print("Early stopping")
            break
    
    return best_model

In [None]:
def validation(model, criterion, val_loader, device):
    model.eval()
    val_loss, all_labels, all_probs = [], [], []
    
    with torch.no_grad():
        for features, labels in tqdm(iter(val_loader)):
            features = features.to(device)
            labels = labels.to(device)
            probs = model(features).logits
            loss = criterion(probs, labels)
            val_loss.append(loss.item())
            all_labels.append(labels.cpu().numpy())
            all_probs.append(probs.cpu().numpy())
        
        _val_loss = np.mean(val_loss)
        all_labels = np.concatenate(all_labels, axis=0)
        all_probs = np.concatenate(all_probs, axis=0)
        auc_score = multiLabel_AUC(all_labels, all_probs)
        
        all_preds = np.argmax(all_probs, axis=1)
        all_true = all_labels
        
        accuracy = accuracy_score(all_true, all_preds)
        precision = precision_score(all_true, all_preds)
        recall = recall_score(all_true, all_preds)
        f1 = f1_score(all_true, all_preds)
    
    return _val_loss, auc_score, accuracy, precision, recall, f1

def multiLabel_AUC(y_true, y_scores):
    auc_scores = []
    for i in range(y_true.shape[1]):
        auc = roc_auc_score(y_true[:, i], y_scores[:, i])
        auc_scores.append(auc)
    mean_auc_score = np.mean(auc_scores)
    return mean_auc_score


In [None]:
optimizer = torch.optim.Adam(params=model.parameters(), lr=CONFIG.LR)
infer_model = train(model, optimizer, train_loader, val_loader, torch.device('cuda' if torch.cuda.is_available() else 'cpu'))

In [None]:
test_features = get_wav2vec_features(test_df, False)
test_dataset = CustomDataset(test_features)
test_loader = DataLoader(test_dataset, batch_size=CONFIG.BATCH_SIZE, shuffle=False)

In [None]:
def inference(model, test_loader, device):
    model.to(device)
    model.eval()
    predictions = []
    with torch.no_grad():
        for features in tqdm(iter(test_loader)):
            features = features.to(device)
            probs = model(features).logits
            probs = probs.cpu().detach().numpy()
            predictions += probs.tolist()
    return predictions

preds = inference(infer_model, test_loader, torch.device('cuda' if torch.cuda.is_available() else 'cpu'))

# Assuming you have true labels for the test set to compute the metrics
test_labels = test_df['label'].apply(lambda x: 0 if x == 'fake' else 1).values
test_preds = np.argmax(preds, axis=1)
accuracy = accuracy_score(test_labels, test_preds)
precision = precision_score(test_labels, test_preds)
recall = recall_score(test_labels, test_preds)
f1 = f1_score(test_labels, test_preds)
auc = roc_auc_score(test_labels, [p[1] for p in preds])

print(f'Test Accuracy : {accuracy:.5f}')
print(f'Test Precision : {precision:.5f}')
print(f'Test Recall : {recall:.5f}')
print(f'Test F1 Score : {f1:.5f}')
print(f'Test AUC : {auc:.5f}')

In [None]:
# Generate submission file
submit = sample_submission_df.copy()
submit.iloc[:, 1:] = preds
submit.to_csv(os.path.join(CONFIG.ROOT_FOLDER, '0702wav2vec_submit.csv'), index=False)