In [5]:
# Block 1: Importing Libraries
import tarfile
import resampy
import pandas as pd
import librosa
import numpy as np
import os
import shutil
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.preprocessing import StandardScaler, LabelEncoder
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, BatchNormalization
from tensorflow.keras.optimizers import Adam
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from concurrent.futures import ThreadPoolExecutor
from torch.optim.lr_scheduler import CosineAnnealingLR
import albumentations as A
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score

if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)} is available.")
else:
    print("No GPU available. Training will run on CPU.")

GPU: NVIDIA GeForce RTX 4080 Laptop GPU is available.


In [6]:
# Block 2: Extracting Data
def extract_tar(tar_file, target_dir):
    if os.path.exists(target_dir):
        user_input = input(f"The directory '{target_dir}' already exists. Do you want to skip extraction? (y/n): ")
        if user_input.lower() == 'y':
            print(f"Skipping extraction of {tar_file}.")
            return
        else:
            print(f"Overwriting the existing directory '{target_dir}'.")
            shutil.rmtree(target_dir)
    with tarfile.open(tar_file, 'r') as tar:
        tar.extractall(target_dir)
    # Remove residue "._" hidden files from the inner folder
    inner_folder = os.path.join(target_dir, os.path.splitext(os.path.basename(tar_file))[0])
    for root, dirs, files in os.walk(inner_folder):
        for file in files:
            if file.startswith("._"):
                os.remove(os.path.join(root, file))

extract_tar('train_mp3s.tar', 'train_mp3s')
extract_tar('test_mp3s.tar', 'test_mp3s')
train_labels = np.loadtxt('train_label.txt', dtype=int)

Skipping extraction of train_mp3s.tar.
Skipping extraction of test_mp3s.tar.


In [7]:
# Block 3: Preprocessing Functions

import pickle

def save_preprocessed_data(data, labels, folder_path):
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)

    with open(os.path.join(folder_path, 'train_features.pkl'), 'wb') as f:
        pickle.dump(data, f)

    with open(os.path.join(folder_path, 'train_labels.pkl'), 'wb') as f:
        pickle.dump(labels, f)

def load_preprocessed_data(folder_path):
    with open(os.path.join(folder_path, 'train_features.pkl'), 'rb') as f:
        data = pickle.load(f)

    with open(os.path.join(folder_path, 'train_labels.pkl'), 'rb') as f:
        labels = pickle.load(f)

    return data, labels
def preprocess_audio(file_path, augment=False):
    try:
        audio, sample_rate = librosa.load(file_path, res_type='kaiser_fast')
        print(f"Loaded audio file: {file_path}")

        # Apply audio augmentation
        if augment:
            audio = apply_audio_augmentation(audio, sample_rate)

        # Extract MFCC features
        mfccs = librosa.feature.mfcc(y=audio, sr=sample_rate, n_mfcc=40)

        # Extract additional features
        spectral_contrast = librosa.feature.spectral_contrast(y=audio, sr=sample_rate)
        tonnetz = librosa.feature.tonnetz(y=audio, sr=sample_rate)
        chroma_stft = librosa.feature.chroma_stft(y=audio, sr=sample_rate)
        chroma_cqt = librosa.feature.chroma_cqt(y=audio, sr=sample_rate)
        chroma_cens = librosa.feature.chroma_cens(y=audio, sr=sample_rate)

        # Concatenate features
        features = np.concatenate((mfccs, spectral_contrast, tonnetz, chroma_stft, chroma_cqt, chroma_cens), axis=0)

        features_scaled = np.mean(features.T, axis=0)
        print(f"Extracted features: {features_scaled.shape}")
        return features_scaled
    except Exception as e:
        print(f"Error processing file: {file_path}")
        print(f"Error message: {str(e)}")
        return None

def apply_audio_augmentation(audio, sample_rate):
    # Apply pitch shifting
    pitch_shift = np.random.randint(-2, 2)
    audio = librosa.effects.pitch_shift(audio, sr=sample_rate, n_steps=pitch_shift)

    # Apply time stretching
    stretch_factor = np.random.uniform(0.8, 1.2)
    audio = librosa.effects.time_stretch(audio, rate=stretch_factor)

    # Apply noise injection
    noise_amp = 0.005 * np.random.uniform(0, 1)
    noise = noise_amp * np.random.normal(size=audio.shape[0])
    audio = audio + noise

    return audio

def process_file(file_path, augment=False):
    print(f"Processing file: {file_path}")
    features = preprocess_audio(file_path, augment=augment)
    return features

def prepare_data(directory, augment=False):
    file_paths = [os.path.join(directory, f"{i}.mp3") for i in range(len(os.listdir(directory)))]
    with ThreadPoolExecutor() as executor:
        results = list(executor.map(lambda x: process_file(x, augment), file_paths))
    features = [feat for feat in results if feat is not None]
    print(f"Processed {len(features)} audio files")
    return np.array(features)

In [8]:
# Block 4: Preparing Data (modified)
folder_path = 'V5'

try:
    train_features, train_labels = load_preprocessed_data(folder_path)
    print("Loaded preprocessed data from the 'V5' folder.")
except FileNotFoundError:
    train_features_original = prepare_data('train_mp3s/train_mp3s')
    print(f"Original train features shape: {train_features_original.shape}")

    train_features_augmented = prepare_data('train_mp3s/train_mp3s', augment=True)
    print(f"Augmented train features shape: {train_features_augmented.shape}")

    train_features = np.concatenate((train_features_original, train_features_augmented), axis=0)
    print(f"Combined train features shape: {train_features.shape}")

    test_features = prepare_data('test_mp3s/test_mp3s')
    print(f"Test features shape: {test_features.shape}")

    label_encoder = LabelEncoder()
    train_labels_original = label_encoder.fit_transform(train_labels)
    train_labels_augmented = train_labels_original.copy()
    train_labels = np.concatenate((train_labels_original, train_labels_augmented), axis=0)
    print(f"Train labels shape: {train_labels.shape}")

    print(f"Number of training features: {len(train_features)}")
    print(f"Number of training labels: {len(train_labels)}")
    print(f"Number of test features: {len(test_features)}")

    save_preprocessed_data(train_features, train_labels, folder_path)
    print("Saved preprocessed data to the 'V5' folder.")

if len(train_features) == 0:
    print("No training features available. Please check the data.")

Processing file: train_mp3s/train_mp3s\0.mp3
Processing file: train_mp3s/train_mp3s\1.mp3
Processing file: train_mp3s/train_mp3s\2.mp3
Processing file: train_mp3s/train_mp3s\3.mp3
Processing file: train_mp3s/train_mp3s\4.mp3
Processing file: train_mp3s/train_mp3s\5.mp3
Processing file: train_mp3s/train_mp3s\6.mp3
Processing file: train_mp3s/train_mp3s\7.mp3
Processing file: train_mp3s/train_mp3s\8.mp3
Processing file: train_mp3s/train_mp3s\9.mp3
Processing file: train_mp3s/train_mp3s\10.mp3
Processing file: train_mp3s/train_mp3s\11.mp3
Processing file: train_mp3s/train_mp3s\12.mp3
Processing file: train_mp3s/train_mp3s\13.mp3
Processing file: train_mp3s/train_mp3s\14.mp3
Processing file: train_mp3s/train_mp3s\15.mp3
Processing file: train_mp3s/train_mp3s\16.mp3
Loaded audio file: train_mp3s/train_mp3s\0.mp3
Loaded audio file: train_mp3s/train_mp3s\1.mp3
Processing file: train_mp3s/train_mp3s\17.mp3
Processing file: train_mp3s/train_mp3s\18.mp3
Loaded audio file: train_mp3s/train_mp3s\3

KeyboardInterrupt: 

In [None]:
# Block 5: Model Training and Prediction
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

if len(train_features) > 0:
    if len(train_features) != len(train_labels):
        raise ValueError("Number of train features and labels do not match.")

    # Standardize the features
    scaler = StandardScaler()
    train_features = scaler.fit_transform(train_features)
    test_features = scaler.transform(test_features)

    # Create a custom dataset class
    class AudioDataset(Dataset):
        def __init__(self, features, labels):
            self.features = torch.tensor(features, dtype=torch.float32)
            self.labels = torch.tensor(labels, dtype=torch.long)

        def __len__(self):
            return len(self.features)

        def __getitem__(self, index):
            return self.features[index], self.labels[index]

    # Set up cross-validation
    num_folds = 10
    kfold = StratifiedKFold(n_splits=num_folds, shuffle=True, random_state=42)

    # Initialize variables to store the best model and its metrics
    best_model = None
    best_accuracy = 0.0
    best_f1 = 0.0
    best_precision = 0.0
    best_recall = 0.0

    # Perform cross-validation
    for fold, (train_indices, val_indices) in enumerate(kfold.split(train_features, train_labels), 1):
        print(f"Fold {fold}/{num_folds}")

        # Create data subsets for the current fold
        train_data = train_features[train_indices]
        train_labels_fold = train_labels[train_indices]
        val_data = train_features[val_indices]
        val_labels_fold = train_labels[val_indices]

        # Create PyTorch datasets
        train_dataset = AudioDataset(train_data, train_labels_fold)
        val_dataset = AudioDataset(val_data, val_labels_fold)

        # Create data loaders
        batch_size = 64
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=batch_size)

        # Define the model architecture
        model = nn.Sequential(
            nn.Linear(train_features.shape[1], 1024),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.BatchNorm1d(1024),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.BatchNorm1d(512),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.BatchNorm1d(256),
            nn.Linear(256, 4)
        ).to(device)

        # Define the loss function, optimizer, and scheduler
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=0.001)
        scheduler = CosineAnnealingLR(optimizer, T_max=50, eta_min=1e-6)

        # Train the model
        num_epochs = 200
        for epoch in range(num_epochs):
            model.train()
            for batch_data, batch_labels in train_loader:
                batch_data = batch_data.to(device)
                batch_labels = batch_labels.to(device)

                optimizer.zero_grad()
                outputs = model(batch_data)
                loss = criterion(outputs, batch_labels)
                loss.backward()
                optimizer.step()

            model.eval()
            with torch.no_grad():
                predictions = []
                true_labels = []
                for batch_data, batch_labels in val_loader:
                    batch_data = batch_data.to(device)
                    batch_labels = batch_labels.to(device)

                    outputs = model(batch_data)
                    _, predicted = torch.max(outputs, 1)
                    predictions.extend(predicted.cpu().numpy())
                    true_labels.extend(batch_labels.cpu().numpy())

                accuracy = accuracy_score(true_labels, predictions)
                f1 = f1_score(true_labels, predictions, average='weighted')
                precision = precision_score(true_labels, predictions, average='weighted')
                recall = recall_score(true_labels, predictions, average='weighted')
                print(f"Epoch [{epoch+1}/{num_epochs}], Validation Accuracy: {accuracy:.4f}, F1 Score: {f1:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}")

            scheduler.step()

        # Update the best model if the current model has higher F1 score
        if f1 > best_f1:
            best_model = model
            best_accuracy = accuracy
            best_f1 = f1
            best_precision = precision
            best_recall = recall

    # Evaluate the best model on the test set
    test_features = torch.tensor(test_features, dtype=torch.float32).to(device)
    best_model.eval()
    with torch.no_grad():
        outputs = best_model(test_features)
        _, predicted_labels = torch.max(outputs, 1)
        predicted_labels = label_encoder.inverse_transform(predicted_labels.cpu().numpy())

    submission = pd.DataFrame({'id': range(len(predicted_labels)), 'category': predicted_labels})
    submission.to_csv('submission.csv', index=False)

    print(f"Best model metrics:")
    print(f"Accuracy: {best_accuracy:.4f}")
    print(f"F1 Score: {best_f1:.4f}")
    print(f"Precision: {best_precision:.4f}")
    print(f"Recall: {best_recall:.4f}")

else:
    print("No training features available. Please check the data.")