## Set up Environment

In [None]:
#packages needed
%pip install torch
%pip install wget
%pip install timm==0.4.5
%pip install librosa
%pip install numpy

## Setting a Fixed Random Seed for Reproducibility

In [129]:
import torch
import random
import numpy as np

def set_random_seed(seed):
    # Set the seed for Python's built-in random module
    random.seed(seed)

    # Set the seed for NumPy
    np.random.seed(seed)

    # Set the seed for PyTorch CPU and GPU (if available)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)  # For GPU (if available)
    torch.cuda.manual_seed_all(seed)  # For multi-GPU setups

    # Ensure deterministic behavior (important for reproducibility)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False  # Disable auto-tuning of algorithms

# Set a fixed random seed
set_random_seed(42)  # You can replace 42 with any integer


In [143]:
NUM_CLASSES = 20

## About AST Model

In [130]:
import os 
import torch
from ast_models import ASTModel 
# download pretrained model in this directory
os.environ['TORCH_HOME'] = '../pretrained_models'  
# assume each input spectrogram has 100 time frames
input_tdim = 100
label_dim = NUM_CLASSES
# create a pseudo input: a batch of 10 spectrogram, each with 100 time frames and 128 frequency bins 
test_input = torch.rand([10, input_tdim, 128]) 
# create an AST model
ast_mdl = ASTModel(label_dim=label_dim, input_tdim=input_tdim, imagenet_pretrain=True)
test_output = ast_mdl(test_input) 
# output should be in shape [10, 20], i.e., 10 samples, each with prediction of 20 classes. 
print(test_output.shape)  

---------------AST Model Summary---------------
ImageNet pretraining: True, AudioSet pretraining: False
frequncey stride=10, time stride=10
number of patches=108
torch.Size([10, 20])


## Account for longest audio in dataset

In [131]:
import json
import librosa
import numpy as np

def find_max_time_step(json_file, sr=16000):
    with open(json_file, 'r') as f:
        data = json.load(f)["data"]
    
    max_time_steps = 0
    for item in data:
        audio_path = item['wav']
        
        # Load audio file and compute the mel spectrogram
        y, _ = librosa.load(audio_path, sr=sr)
        mel_spectrogram = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=128)
        
        # Get the number of time frames (time steps)
        time_steps = mel_spectrogram.shape[1]
        
        # Update max_time_steps if current audio has more time steps
        max_time_steps = max(max_time_steps, time_steps)
    
    return max_time_steps

# Example usage
json_file = "data_colab.json" 
max_time_steps = find_max_time_step(json_file)
print(f"Max time steps: {max_time_steps}")


Max time steps: 688


## Dataset Class

In [132]:
import json
import torch
from torch.utils.data import Dataset
import librosa
import numpy as np

class ViolinAudioDataset(Dataset):
    def __init__(self, json_file, num_classes=NUM_CLASSES, sr=16000, max_time_steps=688):
        with open(json_file, 'r') as f:
            self.data = json.load(f)["data"]
        self.num_classes = num_classes
        self.sr = sr
        self.max_time_steps = max_time_steps  # The fixed time steps for all spectrograms

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        audio_path = self.data[idx]['wav']
        labels = self.data[idx]['labels']
        
        # Convert the comma-separated labels string into a list of integers
        label_list = list(map(lambda x: int(x) - 1, labels.split(','))) if isinstance(labels, str) else [label - 1 for label in labels]
        
        # Convert labels to a multi-hot encoding vector
        label_vector = np.zeros(self.num_classes, dtype=np.float32)
        label_vector[label_list] = 1
        
        # Load and preprocess the audio
        y, _ = librosa.load(audio_path, sr=self.sr)
        mel_spectrogram = librosa.feature.melspectrogram(y=y, sr=self.sr, n_mels=128)
        mel_spectrogram_db = librosa.power_to_db(mel_spectrogram, ref=np.max)
        
        # Pad or truncate to max_time_steps
        if mel_spectrogram_db.shape[1] < self.max_time_steps:
            padding = self.max_time_steps - mel_spectrogram_db.shape[1]
            mel_spectrogram_db = np.pad(mel_spectrogram_db, ((0, 0), (0, padding)), mode='constant')
        else:
            mel_spectrogram_db = mel_spectrogram_db[:, :self.max_time_steps]
        
        return torch.tensor(mel_spectrogram_db, dtype=torch.float32), torch.tensor(label_vector)
    
    def forward(self, x):
        # Debug: print the shape of the input tensor
        print(f"Input shape: {x.shape}")

        # Ensure positional embeddings align with the input
        if x.shape[1] != self.pos_embed.size(1):
            print(f"Resizing positional embeddings: current {self.pos_embed.size(1)}, expected {x.shape[1]}")
            self.pos_embed = torch.nn.Parameter(
                torch.zeros(1, x.shape[1], self.pos_embed.size(-1))  # Dynamically adjust
            )

        # Debug: print the shape of positional embeddings
        print(f"Positional embeddings shape: {self.pos_embed.shape}")

        # Add positional embeddings
        x = x + self.pos_embed

        # Debug: print the shape after adding positional embeddings
        print(f"Shape after adding positional embeddings: {x.shape}")

        # Pass through the transformer or other layers
        x = self.transformer(x)

        # Debug: print the final output shape
        print(f"Output shape: {x.shape}")

        return x
    

## Debug Model

In [4]:
#Suspect the model is adding its own dimensions and messing shit up, 
#so explicitly defined forward function to print debug statements

import torch
import torch.nn as nn  # This imports the neural network module

class ASTModel(nn.Module):
    def __init__(self, num_classes=20):
        super(ASTModel, self).__init__()
        # Model layers and components go here
        # e.g., self.conv1 = nn.Conv2d(...)

    def forward(self, x):
        print(f"Input shape: {x.shape}")
        
        # Example of a layer passing through
        x = self.conv1(x)  # Example layer
        print(f"After conv1: {x.shape}")
        
        # Continue with the rest of the layers
        x = self.fc1(x)  # Example fully connected layer
        print(f"After fc1: {x.shape}")
        
        return x


## Define Training Parameters

In [14]:
import torch
import torch.optim as optim
from torch.utils.data import DataLoader
from ast_models import ASTModel 

# Hyperparameters
batch_size = 1
num_epochs = 5
device = torch.device('cuda:1' if torch.cuda.is_available() else 'cpu')

# Load dataset
train_dataset = ViolinAudioDataset(json_file="data_colab.json", num_classes=NUM_CLASSES)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# Define model and optimizer (make sure your AST model is properly defined)
model = ASTModel(label_dim=num_classes, fstride=10, tstride=10, input_fdim=128, input_tdim=max_time_steps, 
                 audioset_pretrain=False)
model.to(device)

optimizer = optim.Adam(model.parameters(), lr=0.00005) 
criterion = torch.nn.BCEWithLogitsLoss()


---------------AST Model Summary---------------
ImageNet pretraining: True, AudioSet pretraining: False
frequncey stride=10, time stride=10
number of patches=816


## Define training Parameters (with weighted loss)

In [138]:
import torch
import torch.optim as optim
from torch.utils.data import DataLoader
from ast_models import ASTModel 

device = torch.device('cuda:1' if torch.cuda.is_available() else 'cpu')

# Calculate pos_weight for each class
num_samples_per_class = [1106 for i in range(NUM_CLASSES)]     #number of items in train_data.json
num_positives_per_class = [175, 30, 31, 30, 26, 92, 184, 285, 113, 113, 134, 41, 31, 32, 30, 28, 61, 216, 95, 24] #taken from Code(database)
scaling_factor = 2
# Compute pos_weight
pos_weight = torch.tensor(
    [num_samples / num_positives / scaling_factor for num_samples, num_positives in zip(num_samples_per_class, num_positives_per_class)],
    device=device
)

# Hyperparameters
batch_size = 1
num_epochs = 5

# Load dataset
train_dataset = ViolinAudioDataset(json_file="train_data.json", num_classes=num_classes)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# Define model and optimizer (make sure your AST model is properly defined)
model = ASTModel(label_dim=num_classes, fstride=10, tstride=10, input_fdim=128, input_tdim=max_time_steps, 
                 audioset_pretrain=False)
model.to(device)

optimizer = optim.Adam(model.parameters(), lr=0.00001) 
criterion = torch.nn.BCEWithLogitsLoss(pos_weight=pos_weight)


---------------AST Model Summary---------------
ImageNet pretraining: True, AudioSet pretraining: False
frequncey stride=10, time stride=10
number of patches=816


## Start Training

In [139]:
# Training loop
for epoch in range(num_epochs):
    model.train()
    print(f"Beginning Epoch {epoch + 1}/{num_epochs}")
    total_loss = 0
    for mel_spectrogram, labels in train_loader:
        mel_spectrogram, labels = mel_spectrogram.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(mel_spectrogram)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    
    print(f"Finished Epoch {epoch + 1}/{num_epochs}, Loss: {total_loss / len(train_loader)}")

Beginning Epoch 1/5
Finished Epoch 1/5, Loss: 0.7160080351056717
Beginning Epoch 2/5
Finished Epoch 2/5, Loss: 0.5517363650943229
Beginning Epoch 3/5
Finished Epoch 3/5, Loss: 0.46779788982722686
Beginning Epoch 4/5
Finished Epoch 4/5, Loss: 0.3912559452838859
Beginning Epoch 5/5
Finished Epoch 5/5, Loss: 0.30551777846410627


## Start Training (with Gradient Accumulation)

In [134]:
# Training loop
accumulation_steps = 2  # Number of steps to accumulate gradients before updating weights

for epoch in range(num_epochs):
    model.train()
    print(f"Beginning Epoch {epoch + 1}/{num_epochs}")
    total_loss = 0
    optimizer.zero_grad() # Clear gradients at the start of each epoch
    
    for batch_idx, (mel_spectrogram, labels) in enumerate(train_loader):
        mel_spectrogram, labels = mel_spectrogram.to(device), labels.to(device)
        
        outputs = model(mel_spectrogram)  # Explicit call to forward with debugging
        loss = criterion(outputs, labels)
        loss = loss / accumulation_steps  # Scale loss for accumulation
        loss.backward()  # Backward pass (accumulate gradients)

        # optimizer updates weights only after 4 steps
        if (batch_idx + 1) % accumulation_steps == 0:
            optimizer.step()
            optimizer.zero_grad()  # Clear gradients after the step
            
        total_loss += loss.item() * accumulation_steps  # Undo scaling for correct total loss

     # Handle leftover gradients if the dataset size is not divisible by accumulation_steps
    if (batch_idx + 1) % accumulation_steps != 0:
        optimizer.step()
        optimizer.zero_grad()
    
    print(f"Finished Epoch {epoch + 1}/{num_epochs}, Loss: {total_loss / len(train_loader)}")

Beginning Epoch 1/5
Finished Epoch 1/5, Loss: 0.7457695109314988
Beginning Epoch 2/5
Finished Epoch 2/5, Loss: 0.5753985324615165
Beginning Epoch 3/5
Finished Epoch 3/5, Loss: 0.4952367063456806
Beginning Epoch 4/5
Finished Epoch 4/5, Loss: 0.4231891400815673
Beginning Epoch 5/5
Finished Epoch 5/5, Loss: 0.3416443977801375


## Prodigy Optimizer

In [13]:
import torch
from torch.utils.data import DataLoader
from prodigyopt import Prodigy
from ast_models import ASTModel

# Hyperparameters
num_classes = 20
batch_size = 1
num_epochs = 5
device = torch.device('cuda:1' if torch.cuda.is_available() else 'cpu')

# Load dataset
train_dataset = ViolinAudioDataset(json_file="data_colab.json", num_classes=num_classes)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# Define model and optimizer (make sure your AST model is properly defined)
model = ASTModel(label_dim=num_classes, fstride=10, tstride=10, input_fdim=128, input_tdim=688, 
                 audioset_pretrain=False)
model.to(device)

optimizer = Prodigy(model.parameters(), lr=0.00005)
criterion = torch.nn.BCEWithLogitsLoss()


---------------AST Model Summary---------------
ImageNet pretraining: True, AudioSet pretraining: False
frequncey stride=10, time stride=10
number of patches=816


In [14]:
# Training loop
model.train()
for epoch in range(num_epochs):
    print(f"Beginning Epoch {epoch + 1}/{num_epochs}")
    total_loss = 0
    for mel_spectrogram, labels in train_loader:
        mel_spectrogram, labels = mel_spectrogram.to(device), labels.to(device)
        optimizer.zero_grad()
        #print(f"Batch shape: {mel_spectrogram.shape}")
        #outputs = model.forward(mel_spectrogram)
        outputs = model(mel_spectrogram)  # Explicit call to forward with debugging
        #print(f"Output shape: {outputs.shape}")
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    
    print(f"Finished Epoch {epoch + 1}/{num_epochs}, Loss: {total_loss / len(train_loader)}")

Beginning Epoch 1/5
Finished Epoch 1/5, Loss: 0.6333688658299174
Beginning Epoch 2/5
Finished Epoch 2/5, Loss: 0.6333136197801784
Beginning Epoch 3/5
Finished Epoch 3/5, Loss: 0.6332827596154147
Beginning Epoch 4/5
Finished Epoch 4/5, Loss: 0.6332504991940286
Beginning Epoch 5/5
Finished Epoch 5/5, Loss: 0.633215740204547


## Check Metrics of Model

In [140]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

def evaluate_model(model, data_loader, device):
    model.eval()  # Set model to evaluation mode
    all_predictions = []
    all_labels = []

    with torch.no_grad():  # Disable gradient computation
        for mel_spectrogram, labels in data_loader:
            mel_spectrogram = mel_spectrogram.to(device)
            labels = labels.to(device)

            # Forward pass
            outputs = model(mel_spectrogram)
            predictions = torch.sigmoid(outputs)  # Apply sigmoid to get probabilities

            # Threshold probabilities to get binary predictions
            predictions = (predictions > 0.5).float()

            # Collect predictions and true labels
            all_predictions.append(predictions.cpu())
            all_labels.append(labels.cpu())

    # Concatenate all batches
    all_predictions = torch.cat(all_predictions, dim=0)
    all_labels = torch.cat(all_labels, dim=0)

    # Compute metrics
    accuracy = accuracy_score(all_labels, all_predictions)
    precision = precision_score(all_labels, all_predictions, average='samples')
    recall = recall_score(all_labels, all_predictions, average='samples')
    f1 = f1_score(all_labels, all_predictions, average='samples')

    return accuracy, precision, recall, f1


# Load validation/test dataset
test_dataset = ViolinAudioDataset(json_file="test_data.json", num_classes=num_classes)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Evaluate the model
accuracy, precision, recall, f1 = evaluate_model(model, test_loader, device)
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-Score: {f1:.4f}")


Accuracy: 0.3610
Precision: 0.6237
Recall: 0.8712
F1-Score: 0.7021


  _warn_prf(average, modifier, msg_start, len(result))


## Save the Model

In [141]:
model_save_path = "ast_model_new.pth"  # Specify the path where you want to save the model
torch.save(model.state_dict(), model_save_path)
print(f"Model saved to {model_save_path}")


Model saved to ast_model_new.pth


### Test Single Sample

In [142]:
def test_single_sample(model, sample, device, threshold=0.5):
    model.eval()  # Set the model to evaluation mode

    with torch.no_grad():  # Disable gradient computation
        # Move sample to device
        sample = sample.to(device)

        # Forward pass
        output = model(sample.unsqueeze(0))  # Add batch dimension
        probabilities = torch.sigmoid(output)  # Convert logits to probabilities

        # Threshold probabilities to get binary predictions
        binary_predictions = (probabilities > threshold).float()

    return binary_predictions.squeeze(0).cpu().numpy()  # Remove batch dimension and move to CPU


# Get a single sample from the DataLoader
for mel_spectrogram, labels in train_loader:
    sample_mel_spectrogram = mel_spectrogram[0]  # Get the first sample
    sample_label = labels[0]  # Get the corresponding label
    break
    
# Test the model on a single sample
predictions = test_single_sample(model, sample_mel_spectrogram, device)

# Print results
print(sample_mel_spectrogram.shape)
print("Predicted Labels:", predictions)
print("True Labels:     ", sample_label.numpy())


torch.Size([128, 688])
Predicted Labels: [0. 0. 0. 0. 0. 1. 0. 0. 1. 1. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0.]
True Labels:      [0. 0. 0. 0. 0. 1. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
