In [None]:
#building sound transformers



## Building sound transformers
### with Whisper
#### part 1 preprocess data

In [1]:
import pandas as pd

# Load the metadata
metadata = pd.read_csv('./subsample/original_train_metadata.csv')

# Display the first few rows of the metadata
metadata.head(2)

Unnamed: 0,primary_label,secondary_labels,type,latitude,longitude,scientific_name,common_name,author,license,rating,url,filename
0,asbfly,[],['call'],39.2297,118.1987,Muscicapa dauurica,Asian Brown Flycatcher,Matt Slaymaker,Creative Commons Attribution-NonCommercial-Sha...,5.0,https://www.xeno-canto.org/134896,asbfly/XC134896.ogg
1,asbfly,[],['song'],51.403,104.6401,Muscicapa dauurica,Asian Brown Flycatcher,Magnus Hellström,Creative Commons Attribution-NonCommercial-Sha...,2.5,https://www.xeno-canto.org/164848,asbfly/XC164848.ogg


In [2]:
#preprocess the audio files to convert them into mel spectrograms.
import soundfile as sf
import torch
import torchaudio
import torchaudio.transforms as T

def preprocess_ogg(file_path, target_sample_rate=16000, target_length=3000):
    """
    Preprocesses an .ogg file for Whisper.
    
    Args:
        file_path (str): Path to the .ogg file.
        target_sample_rate (int): Target sample rate for the waveform.
        target_length (int): Target length for the mel spectrogram.
    
    Returns:
        torch.Tensor: Preprocessed mel spectrogram tensor.
    """
    # Load .ogg file using soundfile
    waveform, sample_rate = sf.read(file_path)
    
    # Convert to PyTorch tensor
    waveform = torch.tensor(waveform).float()
    
    # Convert to mono if stereo
    if waveform.ndimension() > 1 and waveform.shape[1] > 1:
        waveform = waveform.mean(dim=1, keepdim=True)
    
    # Resample to target sample rate if necessary
    if sample_rate != target_sample_rate:
        resampler = T.Resample(orig_freq=sample_rate, new_freq=target_sample_rate)
        waveform = resampler(waveform)
    
    # Ensure waveform is 1D
    if waveform.ndimension() > 1:
        waveform = waveform.squeeze()
    
    # Convert waveform to mel spectrogram
    mel_spectrogram = T.MelSpectrogram(sample_rate=target_sample_rate, n_mels=80)(waveform)
    
    # Ensure mel spectrogram is 2D
    if mel_spectrogram.ndimension() > 2:
        mel_spectrogram = mel_spectrogram.squeeze()
    
    # Pad or truncate to the target length
    if mel_spectrogram.size(1) < target_length:
        # Pad with zeros
        padding = target_length - mel_spectrogram.size(1)
        mel_spectrogram = torch.nn.functional.pad(mel_spectrogram, (0, padding))
    else:
        # Truncate to target length
        mel_spectrogram = mel_spectrogram[:, :target_length]
    
    return mel_spectrogram

In [3]:
# test the function
mel_spectrogram = preprocess_ogg('./subsample/train/comgre/XC507426.ogg')

print("mel spec shape is", mel_spectrogram.shape) #expects torch.Size([80, 3000] as whispe expects fixed num of num_mel_channels at 80 - 80 mel frequency bing and For 30 seconds of audio, this will be 3000 frames because Whisper processes audio in frames of 10 ms (16,000 Hz audio divided into 160-sample frames).
 

mel spec shape is torch.Size([80, 3000])


#### create a custom dataset and dataloader to load the audio files and their corresponding labels.

In [4]:
import os
from torch.utils.data import Dataset, DataLoader

class AudioDataset(Dataset):
    def __init__(self, folder_path, label_dict, num_files_per_label=None):
        self.folder_path = folder_path
        self.label_dict = label_dict
        self.num_files_per_label = num_files_per_label
        self.file_paths = []
        self.labels = []

        self._load_files()

    def _load_files(self):
        for label in os.listdir(self.folder_path):
            label_path = os.path.join(self.folder_path, label)
            if os.path.isdir(label_path) and label in self.label_dict:
                files = [os.path.join(label_path, f) for f in os.listdir(label_path) if f.endswith('.ogg')]
                if self.num_files_per_label:
                    files = files[:self.num_files_per_label]
                self.file_paths.extend(files)
                self.labels.extend([self.label_dict[label]] * len(files))

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        file_path = self.file_paths[idx]
        label = self.labels[idx]
        mel_spectrogram = preprocess_ogg(file_path)
        return mel_spectrogram, label



In [5]:
# create a dictionary to map the labels to integers from the folder for noqw

label_dict = {label: i for i, label in enumerate(os.listdir('./subsample/train'))}

print(label_dict)

#label_dict = {label: i for i, label in enumerate(metadata['primary_label'].unique())}



{'comgre': 0, 'commoo3': 1, 'comsan': 2, 'eucdov': 3, 'eurcoo': 4, 'graher1': 5, 'grnsan': 6, 'lirplo': 7, 'litgre1': 8, 'rorpar': 9}


In [None]:
# Example usage

folder_path = "./subsample/train/"
label_dict = label_dict 
dataset = AudioDataset(folder_path, label_dict, num_files_per_label=5)
data_loader = DataLoader(dataset, batch_size=2, shuffle=True)

# Iterate through the DataLoader and print the batches
for batch in data_loader:
    mel_spectrograms, labels = batch
    print("Mel Spectrograms shape:", mel_spectrograms.shape)
    print("Labels shape:", labels.shape)

Mel Spectrograms shape: torch.Size([2, 80, 3000])
Labels shape: torch.Size([2])
Mel Spectrograms shape: torch.Size([2, 80, 3000])
Labels shape: torch.Size([2])


Mel Spectrograms shape: torch.Size([2, 80, 3000])
Labels shape: torch.Size([2])
Mel Spectrograms shape: torch.Size([2, 80, 3000])
Labels shape: torch.Size([2])
Mel Spectrograms shape: torch.Size([2, 80, 3000])
Labels shape: torch.Size([2])
Mel Spectrograms shape: torch.Size([2, 80, 3000])
Labels shape: torch.Size([2])
Mel Spectrograms shape: torch.Size([2, 80, 3000])
Labels shape: torch.Size([2])
Mel Spectrograms shape: torch.Size([2, 80, 3000])
Labels shape: torch.Size([2])
Mel Spectrograms shape: torch.Size([2, 80, 3000])
Labels shape: torch.Size([2])
Mel Spectrograms shape: torch.Size([2, 80, 3000])
Labels shape: torch.Size([2])


In [None]:
print("test")

In [None]:
!pip install transformers


Collecting transformers
  Downloading transformers-4.46.3-py3-none-any.whl.metadata (44 kB)
Collecting huggingface-hub<1.0,>=0.23.2 (from transformers)
  Downloading huggingface_hub-0.26.3-py3-none-any.whl.metadata (13 kB)
Collecting regex!=2019.12.17 (from transformers)
  Downloading regex-2024.11.6-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (40 kB)
Collecting tokenizers<0.21,>=0.20 (from transformers)
  Downloading tokenizers-0.20.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.7 kB)
Collecting safetensors>=0.4.1 (from transformers)
  Downloading safetensors-0.4.5-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.8 kB)
Collecting tqdm>=4.27 (from transformers)
  Downloading tqdm-4.67.1-py3-none-any.whl.metadata (57 kB)
Downloading transformers-4.46.3-py3-none-any.whl (10.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.0/10.0 MB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m00:01[0m0:01[0m
[?2

#### Let's create the magick, whisper for encoder and custom classification layer

In [None]:
import logging
import torch
from transformers import WhisperModel, WhisperProcessor

print("imports are done")
# Configure logging
logging.basicConfig(level=logging.DEBUG)

# Check if GPU is available and set the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Load the pretrained Whisper model and move it to the device
logging.info("Loading Whisper model...")
whisper_model = WhisperModel.from_pretrained("openai/whisper-small").to(device)
processor = WhisperProcessor.from_pretrained("openai/whisper-small")
logging.info("Whisper model loaded successfully.")

In [None]:
import torch.nn as nn
import torch.nn.functional as F

from transformers import WhisperModel, WhisperProcessor
# Load pretrained Whisper model
model = WhisperModel.from_pretrained("openai/whisper-small")
processor = WhisperProcessor.from_pretrained("openai/whisper-small")

class FullModel(nn.Module):
    def __init__(self, num_classes, hidden_dim=128):
        super(FullModel, self).__init__()
        # Load the pretrained Whisper encoder
        whisper_model = WhisperModel.from_pretrained("openai/whisper-small")
        self.encoder = whisper_model.encoder  # Extract only the encoder
        self.classification_block = ClassificationBlock(
            input_dim=self.encoder.config.hidden_size,  # Use encoder's hidden size
            hidden_dim=hidden_dim,
            num_classes=num_classes
        )

    def forward(self, x):
        # Get encoder outputs (assume x is already preprocessed and in the correct format)
        encoder_outputs = self.encoder(x, output_hidden_states=True)
        hidden_states = encoder_outputs.last_hidden_state  # Extract the last hidden state

        # Pass the encoder outputs through the classification block
        logits = self.classification_block(hidden_states)
        return logits

class ClassificationBlock(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_classes):
        super(ClassificationBlock, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool1d(1)
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, num_classes)

    def forward(self, x):
        x = self.avg_pool(x.permute(0, 2, 1)).squeeze(-1)  # Average pooling
        x = F.relu(self.fc1(x))  # First linear layer + ReLU
        x = self.fc2(x)  # Second linear layer
        return x



# Define the number of classes
num_classes = len(label_dict)

# create the full model
model = FullModel(num_classes=num_classes)

# Move the model to the appropriate device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

#create full model


NameError: name 'WhisperModel' is not defined

#### Let's create the training loop

In [None]:
import torch.optim as optim
import wandb

# Initialize WandB
wandb.init(project="to list is all you need", name="after the fatal removal")

# Initialize Model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = FullModel(num_classes=len(label_dict)).to(device)

# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# Training loop
num_epochs = 10
for epoch in range(num_epochs):
    model.train()  # Set model to training mode
    running_loss = 0.0
    
    for batch in data_loader:
        mel_spectrograms, labels = batch
        mel_spectrograms = mel_spectrograms.to(device)  # Move data to device
        labels = labels.long().to(device)              # Ensure labels are of type long
        
        # Zero the parameter gradients
        optimizer.zero_grad()
        
        # Forward pass
        logits = model(mel_spectrograms)  # Combined forward pass through encoder and classification block
        
        # Calculate the loss
        loss = criterion(logits, labels)
        
        # Backward pass and optimization
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()

        # Log batch loss to WandB
        wandb.log({"Train Batch Loss": loss.item()})
       
    
    # Calculate and log epoch loss
    epoch_loss = running_loss / len(data_loader)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss}")
    wandb.log({"Epoch Loss": epoch_loss})

Epoch [1/1], Loss: 0.2312929630279541


In [None]:
# torch.save(model.state_dict(), 'model_full.pth')

# model = FullModel(num_classes=num_classes)
# model.load_state_dict(torch.load('model_full.pth'))
# model.to(device)

In [None]:
#### Let's add inference
# this part is from saved model

# Ensure the model is moved to the appropriate device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

index_to_class = {v: k for k, v in label_dict.items()}



def preprocess_ogg_for_inference(file_path, target_sample_rate=16000, target_length=3000):
    return preprocess_ogg(file_path, target_sample_rate, target_length)

def predict(file_path):
    # Preprocess the sample
    mel_spectrogram = preprocess_ogg_for_inference(file_path)
    mel_spectrogram = mel_spectrogram.unsqueeze(0).to(device)  # Add batch dimension and move to device
    
    # Get encoder output
    with torch.no_grad():
        logits = model(mel_spectrogram)
    
    # Convert logits to probabilities using softmax
    probabilities = F.softmax(logits, dim=-1)
    
    # Get the predicted class
    predicted_class_index = torch.argmax(probabilities, dim=-1).item()
    predicted_class_name = index_to_class[predicted_class_index]
    
    return predicted_class_index, predicted_class_name, probabilities.detach().cpu().numpy()

# Example usage
file_paths = [
    "./subsample/train/comgre/XC507426.ogg",
    "./subsample/train/comsan/XC367395.ogg",
    # Add more file paths as needed
]
for file_path in file_paths:
    predicted_class_index, predicted_class_name, probabilities = predict(file_path)
    print(f"File: {file_path}")
    print(f"Predicted class index: {predicted_class_index}")
    print(f"Predicted class name: {predicted_class_name}")
    print(f"Probabilities: {probabilities}")

  state_dict = torch.load('model1.pth')


KeyError: 'encoder'

In [None]:
torch.save({
    'encoder': encoder.state_dict(),
    'classification_block': classification_block.state_dict()
}, 'model1.pth')

In [18]:
# Example usage
file_paths = [
    "./subsample/train/comgre/XC507426.ogg",
    "./subsample/train/comsan/XC367395.ogg",
    # Add more file paths as needed
]
for file_path in file_paths:
    predicted_class_index, predicted_class_name, probabilities = predict(file_path)
    print(f"File: {file_path}")
    print(f"Predicted class index: {predicted_class_index}")
    print(f"Predicted class name: {predicted_class_name}")
    print(f"Probabilities: {probabilities}")

AttributeError: 'collections.OrderedDict' object has no attribute 'encoder'

In [None]:
# Define the test dataset and data loader
test_folder_path = "./subsample/test/"
test_dataset = AudioDataset(test_folder_path, label_dict, num_files_per_label=10)
test_data_loader = DataLoader(test_dataset, batch_size=2, shuffle=False, collate_fn=custom_collate_fn)

def validate(model, classification_block, data_loader, criterion, device):
    model.eval()
    classification_block.eval()
    running_loss = 0.0
    correct_predictions = 0
    total_predictions = 0
    
    with torch.no_grad():
        for batch in data_loader:
            mel_spectrograms, labels = batch
            mel_spectrograms = mel_spectrograms.to(device)
            labels = torch.tensor(labels).to(device)
            
            # Forward pass
            encoder_outputs = model.encoder(mel_spectrograms)
            logits = classification_block(encoder_outputs.last_hidden_state)
            
            # Calculate the loss
            loss = criterion(logits, labels)
            running_loss += loss.item()
            
            # Get the predicted class
            _, predicted_classes = torch.max(logits, dim=1)
            
            # Update correct and total predictions
            correct_predictions += (predicted_classes == labels).sum().item()
            total_predictions += labels.size(0)
    
    avg_loss = running_loss / len(data_loader)
    accuracy = correct_predictions / total_predictions
    return avg_loss, accuracy

# Run validation
val_loss, val_accuracy = validate(model, classification_block, test_data_loader, criterion, device)
print(f"Validation Loss: {val_loss}, Validation Accuracy: {val_accuracy}")