In [1]:
from src.utils import *

In [2]:
import torch
from transformers import AutoFeatureExtractor, ASTForAudioClassification

feature_extractor = AutoFeatureExtractor.from_pretrained("MIT/ast-finetuned-audioset-10-10-0.4593")
model = ASTForAudioClassification.from_pretrained("MIT/ast-finetuned-audioset-10-10-0.4593")

In [3]:
import torch
import torchaudio
from torch.utils.data import Dataset, DataLoader
import os

class CustomAudioDataset(Dataset):
    def __init__(self, data_dir, transform=None, fixed_length=None):
        self.data_dir = data_dir
        self.file_list, self.labels = self._get_file_list_and_labels()
        self.transform = transform
        self.fixed_length = fixed_length

    def _get_file_list_and_labels(self):
        file_list = []
        labels = []
        for root, dirs, files in os.walk(self.data_dir):
            for file in files:
                if file.endswith(".wav"):  # Adjust file extension if needed
                    file_list.append(root + "/" + file)
                    labels.append(os.path.basename(root))  # Extract label from directory name
        return file_list, labels

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, idx):
        file_path = self.file_list[idx]
        waveform, sample_rate = torchaudio.load(file_path)
        
        
        
        if self.fixed_length:
            waveform = self._pad_waveform(waveform, self.fixed_length)

        label = self.labels[idx]

        

        if self.transform:
            waveform = self.transform(waveform, sampling_rate=sample_rate)['input_values'][0]

        return waveform, sample_rate, label

    def _pad_waveform(self, waveform, target_length):
        length_diff = target_length - waveform.size(1)
        if length_diff > 0:
            padding = torch.zeros((1, length_diff))
            waveform = torch.cat([waveform, padding], dim=1)
        return waveform.squeeze(0)

# Example usage
data_dir = "data/path/train10/"
transform = feature_extractor  # You can define transformations if needed
fixed_length = 16000  # Assuming you want to fix the length to 16000 samples
sampling_rate = fixed_length  # Assuming you want to fix the sampling rate to 16000 Hz

# Create custom dataset
dataset = CustomAudioDataset(data_dir, transform=transform, fixed_length=fixed_length)

batch_size = 8


In [4]:
num_classes = 30
num_ftrs = 768

In [5]:
num_epochs = 10
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = torch.nn.CrossEntropyLoss()

In [6]:
device = torch.device('cuda')
device

device(type='cuda')

In [7]:
def count_trainable_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

# Assuming your model is named 'model'
trainable_params = count_trainable_parameters(model)
print("Number of trainable parameters:", trainable_params)

Number of trainable parameters: 86594063


In [8]:
def freeze_layers_except_last_n(model, n):
    # Get all parameters
    parameters = list(model.parameters())
    total_layers = len(parameters)

    # Freeze all layers except the last n
    for i, param in enumerate(parameters):
        if i < total_layers - n:
            param.requires_grad = False

# Assuming your model is named ast_model
freeze_layers_except_last_n(model.audio_spectrogram_transformer.encoder.layer, 4)

model

ASTForAudioClassification(
  (audio_spectrogram_transformer): ASTModel(
    (embeddings): ASTEmbeddings(
      (patch_embeddings): ASTPatchEmbeddings(
        (projection): Conv2d(1, 768, kernel_size=(16, 16), stride=(10, 10))
      )
      (dropout): Dropout(p=0.0, inplace=False)
    )
    (encoder): ASTEncoder(
      (layer): ModuleList(
        (0-11): 12 x ASTLayer(
          (attention): ASTAttention(
            (attention): ASTSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.0, inplace=False)
            )
            (output): ASTSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.0, inplace=False)
            )
          )
          (intermediate): ASTIntermediate(
            (de

In [9]:
def count_trainable_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

# Assuming your model is named 'model'
trainable_params = count_trainable_parameters(model)
print("Number of trainable parameters:", trainable_params)


Number of trainable parameters: 1542671


In [10]:
train_dataset = CustomAudioDataset(data_dir, fixed_length=16000, transform=feature_extractor)

perc = 0.025
n_train = len(train_dataset)
n_val = int(perc * n_train)
n_train = n_train - n_val

train_dataset, val_dataset = torch.utils.data.random_split(train_dataset, [n_train, n_val])

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

In [11]:
labels = set(train_dataset.dataset.labels)
label_to_index = dict((label, i) for i, label in enumerate(sorted(labels)))
label_to_index


{'bed': 0,
 'bird': 1,
 'dog': 2,
 'no': 3,
 'off': 4,
 'one': 5,
 'seven': 6,
 'stop': 7,
 'two': 8,
 'wow': 9}

In [12]:
from tqdm.notebook import tqdm



model = model.to(device)
for i, epoch in enumerate(range(num_epochs)):
    for waveforms, sr, labels in tqdm(train_loader, total=len(train_loader), desc=f"Epoch {epoch+1}/{num_epochs}"):
        
        optimizer.zero_grad()
        waveforms = waveforms.to(device)
        outputs = model(waveforms.squeeze(1))

        # Convert string labels to integer indices
        target_indices = [label_to_index[label] for label in labels]

        # Convert the list of indices to a tensor
        target_tensor = torch.tensor(target_indices)

        # print(outputs['logits'])

        loss = criterion(outputs['logits'], target_tensor.to(device))
        loss.backward()
        optimizer.step()
    
    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {loss.item()}")
    
    # predict on validation set
    val_losses = []
    val_accuracies = []
    for waveforms, sr, labels in tqdm(val_loader, total=len(val_loader), desc=f"Validation"):
        waveforms = waveforms.to(device)
        outputs = model(waveforms.squeeze(1))
        target_indices = [label_to_index[label] for label in labels]
        target_tensor = torch.tensor(target_indices)
        loss = criterion(outputs['logits'], target_tensor.to(device))
        val_losses.append(loss.item())
        val_accuracies.append((outputs['logits'].argmax(1) == target_tensor.to(device)).float().mean().item())
    
    print(f"Epoch {epoch+1}/{num_epochs}, Val Loss: {sum(val_losses)/len(val_losses)}, Val Accuracy: {sum(val_accuracies)/len(val_accuracies)}")
    

Epoch 1/10:   0%|          | 0/5160 [00:00<?, ?it/s]

Epoch 1/10, Train Loss: 0.022912606596946716


Validation:   0%|          | 0/133 [00:00<?, ?it/s]

Epoch 1/10, Val Loss: 0.3205191664123309, Val Accuracy: 0.9229323308270677


Epoch 2/10:   0%|          | 0/5160 [00:00<?, ?it/s]

KeyboardInterrupt: 