<a href="https://colab.research.google.com/github/archit436/Birds_Classifier/blob/main/Models/Main_Models/Main_Model_Audio_Main_File.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


0. Setup


In [1]:
# Start by importing the relevant libraries.
# Copied from Archit's Lab 3 Submission and then some more.
import os
import glob
import pandas as pd
import numpy as np
import seaborn as sns
import pickle
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
from torch.utils.data.sampler import SubsetRandomSampler
from torch.utils.data import TensorDataset, DataLoader, Dataset
import torchvision.transforms as transforms
from sklearn.model_selection import StratifiedShuffleSplit
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.preprocessing import LabelEncoder
from transformers import Wav2Vec2Model, Wav2Vec2FeatureExtractor

  from .autonotebook import tqdm as notebook_tqdm


1. Data Processing - WAV Tensors


In [2]:
# Start by loading the birds dataframe for deets on birds to classify.
birds_df = pd.read_csv('../../Data Processing/chosen_classes_80_data_stats.csv')
birds_df.head()

Unnamed: 0,Index,Class ID,Images Count,XC Recordings Count,Species Name,XC Species Name
0,0,315,116,169,Gadwall (Breeding male),Gadwall
1,1,317,120,243,Mallard (Breeding male),Mallard
2,2,333,105,112,Common Goldeneye (Breeding male),Common Goldeneye
3,3,352,120,283,Black-crowned Night-Heron (Adult),Black-crowned Night Heron
4,4,366,101,127,Common Gallinule (Adult),Common Gallinule


In [3]:
# Start by importing the tensors stored in the .pt files, one for each class.
# Define the directory of the audio data.
wav_tensors_dir = '../../Data/Xeno_Canto_WAV_Tensors'
# Define a list to store the tensors and associated labels.
all_features = []
all_labels = []

# Iterate through each bird, importing only the ones in birds_df.
for file in os.listdir(wav_tensors_dir):
    # Check if the file is a .pt file.
    if(file.endswith('.pt') == False):
        continue
    # Load the pt file.
    pt_file = glob.glob(os.path.join(wav_tensors_dir, file))
    # Error check
    if(pt_file == []):
        continue
    # Extract features and create labels in tensors.
    features_tensor = torch.load(pt_file[0])
    label_value = int(os.path.splitext(file)[0])
    labels_tensor = torch.full((features_tensor.shape[0],), label_value, dtype=torch.long)
    # Check if the label is in birds_df, skip if not.
    if(label_value not in birds_df['Class ID'].values):
        continue
    # Add these tensors to their respective lists.
    all_features.append(features_tensor)
    all_labels.append(labels_tensor)

  features_tensor = torch.load(pt_file[0])


In [4]:
# Print the number of classes for which data has been extracted.
num_classes = len(all_labels)
print(f"Number of classes for which data has been extracted: {num_classes}")

Number of classes for which data has been extracted: 80


In [5]:
# Error check for type of tensors.
all_features[0].dtype

torch.float32

In [6]:
# Concatenate all the tensors into one tensor.
features_tensor = torch.cat(all_features, dim=0)
labels_tensor = torch.cat(all_labels, dim=0)

print("Concatenation complete.")

# Encode the labels to make them suitable for training the model.
label_encoder = LabelEncoder()
encoded_labels = label_encoder.fit_transform(labels_tensor)
# Create mapping dictionaries for the encoding.
id_to_index = dict(zip(labels_tensor, encoded_labels))
index_to_id = dict(zip(encoded_labels, labels_tensor))

print("Encoding complete.")

# Pickle dump these mappings for use later.
with open('label_mappings.pkl', 'wb') as f:
    pickle.dump({'id_to_index': id_to_index, 'index_to_id': index_to_id}, f)

print("Mappings pickled.")

# Replace the labels tensor.
labels_tensor = torch.from_numpy(encoded_labels)

# Print out stats.
print(f"Shape of features tensor: {features_tensor.shape}")
print(f"Shape of labels tensor: {labels_tensor.shape}")

Concatenation complete.
Encoding complete.
Mappings pickled.
Shape of features tensor: torch.Size([17455, 320000])
Shape of labels tensor: torch.Size([17455])


In [7]:
# Now we move on to splitting the data into training, validation, and test sets.
# We will use a stratified split to ensure uniform distribution of classes.

# Get labels as numpy array.
labels_np = labels_tensor.numpy()
# Use the labels np array to create indices array.
indices = np.arange(len(labels_np))

# First Split: train + val vs test - 80:20
sss1 = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=0)
temp_idx, test_idx = next(sss1.split(indices, labels_np))

# Get the temporary set.
temp_indices = np.arange(len(temp_idx))
temp_labels_np = labels_np[temp_idx]

# Second Split: train vs val - 80:20
sss2 = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=0)
train_temp_idx, val_temp_idx = next(sss2.split(temp_indices, temp_labels_np))

# Convert to original indices.
train_idx = temp_idx[train_temp_idx]
val_idx = temp_idx[val_temp_idx]

In [8]:
# Create a full dataset combining the features and labels tensors.
full_dataset = TensorDataset(features_tensor, labels_tensor)

# Create subset datasets using the split indices.
train_dataset = torch.utils.data.Subset(full_dataset, train_idx)
val_dataset = torch.utils.data.Subset(full_dataset, val_idx)
test_dataset = torch.utils.data.Subset(full_dataset, test_idx)

2. Wav2Vec as Feature Extractor & CNN Implementation


In [9]:
"""FOR MACBOOK LOCAL SETUP USERS ONLY """
use_mps = True
# Set device to GPU if available
device = torch.device("mps") if use_mps and torch.backends.mps.is_available() else torch.device("cpu")

In [10]:
# First we start by setting up Wav2Vec to extract features from the audio data.
# Define a custom dataset wrapper to incorporate wav2vec feature extraction.
class Wav2VecFeatureDataset(Dataset):
    def __init__(self, original_dataset, cache_features=True):
        self.original_dataset = original_dataset
        self.cache_features = cache_features
        self.cached_features = {} if cache_features else None
        
        # Initialize wav2vec model and feature extractor
        self.feature_extractor = Wav2Vec2FeatureExtractor.from_pretrained("facebook/wav2vec2-base")
        self.wav2vec_model = Wav2Vec2Model.from_pretrained("facebook/wav2vec2-base")
        self.wav2vec_model.to(device) # Shift to GPU if available.
        self.wav2vec_model.eval()  # Set to evaluation mode since we're just extracting features
    
    def __len__(self):
        return len(self.original_dataset)
    
    def __getitem__(self, idx):
        # Get original data
        audio, label = self.original_dataset[idx]
        
        # Check if features are already cached
        if self.cache_features and idx in self.cached_features:
            features = self.cached_features[idx]
        else:
            # Process audio through wav2vec
            with torch.no_grad():
                # Prepare inputs for wav2vec
                inputs = self.feature_extractor(
                    audio.cpu().numpy() if isinstance(audio, torch.Tensor) else audio, 
                    sampling_rate=16000, 
                    return_tensors="pt"
                )
                
                # Shift to GPU if available, same device as model.
                inputs = {key: value.to(device) for key, value in inputs.items()}

                # Extract features
                outputs = self.wav2vec_model(**inputs)
                features = outputs.last_hidden_state.squeeze(0)  # Remove batch dimension
            
            # Cache if enabled
            if self.cache_features:
                self.cached_features[idx] = features
        
        return features, label

In [11]:
# Create DataLoaders with wav2vec feature extraction
def create_feature_dataloaders(train_dataset, val_dataset, test_dataset, batch_size):
    # Wrap the original datasets with wav2vec feature extraction
    train_feature_dataset = Wav2VecFeatureDataset(train_dataset)
    val_feature_dataset = Wav2VecFeatureDataset(val_dataset)
    test_feature_dataset = Wav2VecFeatureDataset(test_dataset)
    
    # Create DataLoaders
    train_loader = DataLoader(
        train_feature_dataset, 
        batch_size=batch_size,
        shuffle=True,
        num_workers=4
    )
    
    val_loader = DataLoader(
        val_feature_dataset, 
        batch_size=batch_size,
        shuffle=False,
        num_workers=4
    )
    
    test_loader = DataLoader(
        test_feature_dataset, 
        batch_size=batch_size,
        shuffle=False,
        num_workers=4
    )
    
    return train_loader, val_loader, test_loader

In [12]:
# Define a CNN model to train.
class Wav2VecCNN(nn.Module):
    def __init__(self, num_classes = num_classes, output_size=(32, 32)):
        super(Wav2VecCNN, self).__init__()
        self.name = "Wav2VecCNN_AB_1"

        # Input shape is assumed to be [batch, 1, T, 768]
        # Convolutional layers
        self.conv1 = nn.Conv2d(1, 16, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(16)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(32)
        
        # Pooling layer
        self.pool = nn.MaxPool2d(2, 2)
        
        # Adaptive pooling to force output to a fixed size (output_size)
        self.adapt_pool = nn.AdaptiveAvgPool2d(output_size)
        
        # Calculate fully connected layer input size:
        # after two poolings, the number of channels is 32 and spatial dims become output_size.
        fc_input_size = 32 * output_size[0] * output_size[1]
        
        # Fully connected layers
        self.fc1 = nn.Linear(fc_input_size, 256)
        self.dropout = nn.Dropout(0.3)
        self.fc2 = nn.Linear(256, num_classes)
    
    def forward(self, x):
        # x is expected to have shape [batch, T, 768]
        # Add a channel dimension to make it [batch, 1, T, 768]
        x = x.unsqueeze(1)
        
        # Convolutional blocks
        x = self.pool(F.relu(self.bn1(self.conv1(x))))
        x = self.pool(F.relu(self.bn2(self.conv2(x))))
        
        # Use adaptive pooling to get a fixed-size output regardless of T
        x = self.adapt_pool(x)
        
        # Flatten for fully connected layers
        x = x.reshape(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

In [13]:
# Helper function to create a name for each model on the basis of its hyperparameters.
def get_model_name(name, batch_size, learning_rate, epoch):
    """ Generate a name for the model consisting of all the hyperparameter values

    Args:
        config: Configuration object containing the hyperparameters
    Returns:
        path: A string with the hyperparameter name and value concatenated
    """
    path = "model_{0}_bs{1}_lr{2}_epoch{3}".format(name,
                                                   batch_size,
                                                   learning_rate,
                                                   epoch)
    return path

In [14]:
def train_net(model, batch_size, train_loader, val_loader, learning_rate=0.001, num_epochs=20):
    print(f"Now training model with spec: {model.name}")

    # # Fixed PyTorch random seed for reproducibility
    # torch.manual_seed(1000)

    # Define loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    
    # Define a scheduler to control the learning rate.
    # It will reduce the LR when the validation loss has stopped improving.
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=5, verbose=True)
    print("Loss Function, Optimizer, and Scheduler set up.")

    # Arrays to store accuracy metrics
    train_acc = np.zeros(num_epochs)
    val_acc = np.zeros(num_epochs)

    # Create an output folder for performance files
    output_folder = "Audio_Model_Performance"
    os.makedirs(output_folder, exist_ok=True)

    # Early Stopping to prevent overfitting
    best_val_acc = 0.0
    patience = 5
    patience_counter = 0

    start_time = time.time()
    print("Training Started.")

    # Iterate for number of epochs.
    for epoch in range(num_epochs):
        # --- Training Phase ---
        model.train()
        # Forwards and backwards pass for each batch
        for _, data in enumerate(train_loader, 0):
            recordings, labels = data
            recordings = recordings.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()
            outputs = model(recordings)
            loss = criterion(outputs, labels)
            loss.backward()
            nn.utils.clip_grad_norm_(model.parameters(), max_norm=5.0)  # clip gradients to prevent exploding gradients (LSTM)
            optimizer.step()
        
        print(f"Finished adjusting parameters for epoch {epoch + 1}")

        # --- Evaluation Phase ---
        model.eval()
        correct_t, total_t = 0, 0
        correct_v, total_v = 0, 0
        val_loss = 0.0
        
        # Calculate training accuracy
        with torch.no_grad():
            # Forward pass for each batch
            for recordings, labels in train_loader:
                recordings = recordings.to(device)
                labels = labels.to(device)
                output = model(recordings)
                pred = output.max(1, keepdim=True)[1]
                correct_t += pred.eq(labels.view_as(pred)).sum().item()
                total_t += recordings.shape[0]
        # Calculate training accuracy        
        train_acc[epoch] = correct_t / total_t

        # Calculate validation accuracy and loss
        with torch.no_grad():
            # Forward pass for each batch
            for recordings, labels in val_loader:
                recordings = recordings.to(device)
                labels = labels.to(device)
                output = model(recordings)
                loss_val = criterion(output, labels)
                val_loss += loss_val.item()  # accumulate loss
                pred = output.max(1, keepdim=True)[1]
                correct_v += pred.eq(labels.view_as(pred)).sum().item()
                total_v += recordings.shape[0]
        # Calculate validation accuracy and loss.        
        val_acc[epoch] = correct_v / total_v
        val_loss /= len(val_loader)  # average validation loss
        
        # Update the learning rate based on validation loss.
        # scheduler.step(val_loss)
        
        # Print epoch results including validation loss.
        print(f"Epoch {epoch + 1}: Train acc: {train_acc[epoch]:.4f}, "
              f"Validation acc: {val_acc[epoch]:.4f}")
        
        # Early Stopping
        # Case 1: Validation accuracy has increased.
        if val_acc[epoch] > best_val_acc:
            # Reset the parameters.
            best_val_acc = val_acc[epoch]
            patience_counter = 0
        # Case 2: Validation accuracy has not increased.
        else:
            patience_counter += 1
        # Check if patience has been exceeded.
        if patience_counter > patience:
            print(f"Early stopping at epoch {epoch + 1}")
            break

    print('Finished Training')
    elapsed_time = time.time() - start_time
    
    # Save metrics to CSV
    model_filename = get_model_name(model.name, batch_size, learning_rate, num_epochs - 1)
    model_path = os.path.join(output_folder, model_filename)
    train_acc_path = f"{model_path}_train_acc.csv"
    val_acc_path = f"{model_path}_val_acc.csv"
    np.savetxt(train_acc_path, train_acc[:epoch + 1])
    np.savetxt(val_acc_path, val_acc[:epoch + 1])
    
    return train_acc[:epoch + 1], val_acc[:epoch + 1]

In [15]:
# Create data loaders using the feature extraction wrapper.
batch_size = 32
train_loader, val_loader, test_loader = create_feature_dataloaders(
    train_dataset, val_dataset, test_dataset, batch_size)



In [16]:
# Create an instance of the model.
model = Wav2VecCNN()

# Move the model to GPU if available.
model.to(device)

# Train the model
train_acc, val_acc = train_net(model, batch_size, train_loader, val_loader,
                                   learning_rate = 0.001, num_epochs = 30)
n = len(train_acc)
plt.title("Training Curve")
plt.plot(range(1,n+1), train_acc, label="Train")
plt.plot(range(1,n+1), val_acc, label="Validation")
plt.xlabel("Iterations")
plt.ylabel("Accuracy")
plt.legend(loc='best')
plt.show()

Now training model with spec: Wav2VecCNN_AB_1
Loss Function, Optimizer, and Scheduler set up.
Training Started.


python(2276) MallocStackLogging: can't turn off malloc stack logging because it was not enabled.
python(2277) MallocStackLogging: can't turn off malloc stack logging because it was not enabled.


: 