In [1]:
# Description: This script trains an AST whose input has been modified to take audio insteasd of patches of images 
# Original code is based off a tutorial by Brian Pulfer
# https://medium.com/@brianpulfer/vision-transformers-from-scratch-pytorch-a-step-by-step-guide-96c3313c2e0c
# Andrei Cartera -- Mar 2025


import numpy as np
import CustomSpeechCommands_R as SpeechCommands
from tqdm.notebook import tqdm, trange
from pathlib import Path
import torch
import torch.nn as nn
from torch.optim import Adam, lr_scheduler
from torch.nn import CrossEntropyLoss
from torch.utils.data import DataLoader

np.random.seed(0)
torch.manual_seed(0)

print(torch.__version__)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

2.6.0+cu126
cuda:0


In [2]:
classes = ['zero', 'one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine']
NUM_CLASSES = 10

# Hyperparameters
N_SEGMENTS = 32
REPC_VEC_SIZE = 40

EPOCHS = 30
N_HEADS = 8
N_ENCODERS = 4
BATCH_SIZE = 32
HIDDEN_DIM = 16
DROPOUT = 0.05
ACTIVATION="gelu"
LR = 0.001


In [3]:
def get_positional_embeddings(sequence_length, d):
  result = torch.ones(sequence_length, d)
  for i in range(sequence_length):
    for j in range(d):
      result[i][j] = np.sin(i / (10000 ** (j / d))) if j % 2 == 0 else np.cos(i / (10000 ** ((j - 1) / d)))
  return result

In [4]:
class MyMSA(nn.Module):
  def __init__(self, d, n_heads=2):
    super(MyMSA, self).__init__()
    self.d = d
    self.n_heads = n_heads

    assert d % n_heads == 0, f"Can't divide dimension {d} into {n_heads} heads"

    d_head = int(d / n_heads)
    self.q_mappings = nn.ModuleList([nn.Linear(d_head, d_head) for _ in range(self.n_heads)])
    self.k_mappings = nn.ModuleList([nn.Linear(d_head, d_head) for _ in range(self.n_heads)])
    self.v_mappings = nn.ModuleList([nn.Linear(d_head, d_head) for _ in range(self.n_heads)])
    self.d_head = d_head
    self.softmax = nn.Softmax(dim=-1)

  def forward(self, sequences):
    # Sequences has shape (N, seq_length, token_dim)
    # We go into shape    (N, seq_length, n_heads, token_dim / n_heads)
    # And come back to    (N, seq_length, item_dim)  (through concatenation)
    result = []
    for sequence in sequences:
      seq_result = []
      for head in range(self.n_heads):
        q_mapping = self.q_mappings[head]
        k_mapping = self.k_mappings[head]
        v_mapping = self.v_mappings[head]

        seq = sequence[:, head * self.d_head: (head + 1) * self.d_head]
        q, k, v = q_mapping(seq), k_mapping(seq), v_mapping(seq)

        attention = self.softmax(q @ k.T / (self.d_head ** 0.5))
        seq_result.append(attention @ v)
      result.append(torch.hstack(seq_result))
    return torch.cat([torch.unsqueeze(r, dim=0) for r in result])

In [5]:
class MyViTBlock(nn.Module):
  def __init__(self, hidden_d, n_heads, mlp_ratio=4):
    super(MyViTBlock, self).__init__()
    self.hidden_d = hidden_d
    self.n_heads = n_heads

    self.norm1 = nn.LayerNorm(hidden_d)
    self.mhsa = MyMSA(hidden_d, n_heads)
    self.norm2 = nn.LayerNorm(hidden_d)
    self.mlp = nn.Sequential(
      nn.Linear(hidden_d, mlp_ratio * hidden_d),
      nn.GELU(),
      nn.Linear(mlp_ratio * hidden_d, hidden_d)
    )

  def forward(self, x):
    out = x + self.mhsa(self.norm1(x))
    out = out + self.mlp(self.norm2(out))
    return out

In [6]:
class AudioTransformer(nn.Module):
  def __init__(self, n_segments, repc_vec_size , n_blocks=2, hidden_d=8, n_heads=2, out_d=10):
    # Super constructor
    super(AudioTransformer, self).__init__()
    
    # Attributes
    self.n_segments = n_segments
    self.repc_vec_size  = repc_vec_size 
    self.n_blocks = n_blocks
    self.n_heads = n_heads
    self.hidden_d = hidden_d
    
    # 1) Linear mapper
    self.input_d = repc_vec_size
    self.linear_mapper = nn.Linear(self.input_d, self.hidden_d)
    
    # 2) Learnable classification token
    self.class_token = nn.Parameter(torch.rand(1, self.hidden_d))
    
    # 3) Positional Embedding
    self.register_buffer(
      'positional_embeddings',
      get_positional_embeddings(n_segments + 1, hidden_d),
      persistent=False
    )
    # 4) Transformer encoder blocks
    self.blocks = nn.ModuleList([MyViTBlock(hidden_d, n_heads) for _ in range(n_blocks)])
    
    # 5) Classification MLPk
    self.mlp = nn.Sequential(
        nn.Linear(self.hidden_d, out_d),
        nn.Softmax(dim=-1)
    )

  def forward(self, audio):
    
    # Running linear layer tokenization
    # Map the vector corresponding to each patch to the hidden size dimension
    tokens = self.linear_mapper(audio)
    
    # Adding classification token to the tokens
    tokens = torch.cat((self.class_token.expand(audio.shape[0], 1, -1), tokens), dim=1)
    
    # Adding positional embedding
    out = tokens + self.positional_embeddings.repeat(audio.shape[0], 1, 1)
    
    # Transformer Blocks
    for block in self.blocks:
        out = block(out)
        
    # Getting the classification token only
    out = out[:, 0]
    
    return self.mlp(out) # Map to output dimension, output category distribution

In [None]:
def main():
  # Loading data

  print("Using device: ", device, f"({torch.cuda.get_device_name(device)})" if torch.cuda.is_available() else "")
  model = AudioTransformer(N_SEGMENTS, REPC_VEC_SIZE, N_ENCODERS, HIDDEN_DIM, N_HEADS, NUM_CLASSES).to(device)
  #model.load_state_dict(torch.load('my_model3_2_Transfer.pth',map_location=torch.device('cpu')))
  #https://pytorch.org/tutorials/beginner/basics/data_tutorial.html

  train_set = SpeechCommands.CustomSpeechCommandsDataset_R("../custom_speech_commands", shuffle=True, vec_size=REPC_VEC_SIZE, divisor=BATCH_SIZE,)
  test_set = SpeechCommands.CustomSpeechCommandsDataset_R("../custom_speech_commands", subset="testing", shuffle=True, vec_size=REPC_VEC_SIZE,divisor=BATCH_SIZE,)
  
  train_loader = DataLoader(train_set, shuffle=True, batch_size=BATCH_SIZE)
  test_loader = DataLoader(test_set, shuffle=False, batch_size=BATCH_SIZE)

  # Defining model and training options

  # Training loop
  optimizer = Adam(model.parameters(), lr=LR)
  criterion = CrossEntropyLoss()
  #scheduler = lr_scheduler.LinearLR(optimizer)
  scheduler = lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=5, verbose=True)

  model.train()  # Set the model to training mode                                     
  for epoch in trange(EPOCHS, desc="Training"):
    train_loss = 0.0

    for batch in tqdm(train_loader, desc=f"Epoch {epoch + 1} in training", leave=False):
      x, y = batch
      x, y = x.to(device), y.to(device)
      y_hat = model(x)
      loss = criterion(y_hat, y)

      optimizer.zero_grad()
      loss.backward()
      optimizer.step()

      train_loss += loss.item() * x.size(0)
      
    train_loss /= len(train_loader.dataset) 
    scheduler.step(train_loss)
    #torch.cuda.empty_cache()
    
    current_lr = optimizer.param_groups[0]['lr']
    print(f"Epoch {epoch+1}/{EPOCHS} loss: {train_loss:.2f}, LR: {current_lr}")


  torch.save(model.state_dict(), 'models/ATmodel_32SEG_40VEC_E30_8_4_B32_H16.pth')

  # Test loop
  model.eval()
  with torch.no_grad():
    correct, total = 0, 0
    test_loss = 0.0
    for batch in tqdm(test_loader, desc="Testing"):
      x, y = batch
      x, y = x.to(device), y.to(device)
      y_hat = model(x)
      loss = criterion(y_hat, y)

      batch_size = x.size(0)
      total_test_loss += loss.item() * batch_size
      total_samples += batch_size

    average_test_loss = total_test_loss / total_samples
    print(f"Test loss: {average_test_loss:.2f}")

### Training Loop

In [8]:
if __name__ == "__main__":
    main()

Using device:  cuda:0 (NVIDIA GeForce RTX 4060)
Balanced dataset to 3296 samples per label.
Balanced dataset to 384 samples per label.




Training:   0%|          | 0/30 [00:00<?, ?it/s]

Epoch 1 in training:   0%|          | 0/1030 [00:00<?, ?it/s]

KeyboardInterrupt: 