In [1]:
# !pip install --upgrade pip
# !pip install mido
# !pip install git+https://github.com/KinWaiCheuk/AudioLoader.git
# !pip install torchaudio
# !pip install soundfile

In [2]:
from AudioLoader.speech import TIMIT
from torch.utils.data import DataLoader
import torchaudio.transforms as T
import torch

# AudioLoader helps you to set up supported datasets
dataset = TIMIT('./YourFolder',
                split='train',
                groups=[1,1],
                download=False)
train_loader = DataLoader(dataset,
                          batch_size=4)

Using all data at ./YourFolder\TIMIT\data\TRAIN


# Preprocess Audio

In [3]:
def preprocess_audio(waveform, sample_rate):
    frame_length_ms = 25
    hop_length_ms = 10
    num_filterbank = 26
    num_mfcc = 12
    n_fft = 256

    frame_length_samples = int(sample_rate * (frame_length_ms / 1000))
    hop_length_samples = int(sample_rate * (hop_length_ms / 1000))

    # Create Mel Filterbank transform
    mel_filterbank = T.MelSpectrogram(
        sample_rate=sample_rate,
        n_fft=n_fft,
        hop_length=hop_length_samples,
        n_mels=num_filterbank
    )

    # Create MFCC transform
    mfcc_transform = T.MFCC(
        sample_rate=sample_rate,
        n_mfcc=num_mfcc,
        melkwargs={"n_fft": n_fft, "hop_length": hop_length_samples, "n_mels": num_filterbank}
    )

    # Process the waveform in frames
    filterbank_features = []
    mfccs = []

    # Iterate through the waveform
    for start in range(0, waveform.size(1) - frame_length_samples + 1, hop_length_samples):
        frame = waveform[:, start:start + frame_length_samples]

        # Extract filterbank features
        mel_spectrogram = mel_filterbank(frame)
        filterbank_features.append(mel_spectrogram)
        
        # Extract MFCCs
        mfcc = mfcc_transform(frame)
        mfccs.append(mfcc)

    # Stack all features
    filterbank_features = torch.stack(filterbank_features, dim=0)
    mfccs = torch.stack(mfccs, dim=0)
    mfccs_reshaped = mfccs.squeeze(1)  # Remove batch dimension
    # Select only MFCCs and delta (first derivative)
    mfccs_final = torch.cat((mfccs_reshaped[:, :, 0], mfccs_reshaped[:, :, 1]), dim=1)
    return mfccs_final


In [4]:
# Check the features of the dataset
features = preprocess_audio(dataset[1]['waveform'], dataset[1]['sample_rate'])
print(features.shape)

torch.Size([290, 24])


# Create phonemes

In [5]:
all_phonemes = 'b d g p t k dx q jh ch s sh z zh f th v dh m n ng em en eng nx l r w y hh hv el iy ih eh ey ae aa aw ay ah ao oy ow uh uw ux er ax ix axr ax-h pau epi h# 1 2'.split(' ')
alphabet = dict()
for idx in range(len(all_phonemes)):
  ph = all_phonemes[idx]
  alphabet[ph] = idx + 1
alphabet[' '] = 0
closure_intervals = ['bcl', 'dcl', 'gcl', 'pcl', 'tcl', 'kcl', '']

# Create Dataset

In [6]:
from torch.nn.utils.rnn import pad_sequence

def make_dataset(size):
  input_tensor_list = []
  output_tensor_list = []
  for i in range(size):
    input_tensor = preprocess_audio(dataset[2*i+1]['waveform'], dataset[2*i+1]['sample_rate'])
    input_tensor_list.append(input_tensor)

    phonemes = dataset[2*i-1]['phonemics']
    phonemes_list = phonemes.split(' ')
    output = [alphabet[ph] for ph in phonemes_list if ph not in closure_intervals]
    output_tensor = torch.tensor(output)
    output_tensor_list.append(output_tensor)

  train_size = int(size*0.8)
  I_train = pad_sequence(input_tensor_list[:train_size], batch_first=True)
  O_train = pad_sequence(output_tensor_list[:train_size], batch_first=True)
  I_test = pad_sequence(input_tensor_list[train_size:], batch_first=True)
  O_test = pad_sequence(output_tensor_list[train_size:], batch_first=True)
  return I_train, O_train, I_test, O_test

I_train, O_train, I_test, O_test = make_dataset(5)
print("Train input shape: ", I_train.shape, "output shape: ", O_train.shape)

Train input shape:  torch.Size([4, 307, 24]) output shape:  torch.Size([4, 35])


# RNN

In [7]:
import torch
import torch.nn as nn
import torch.optim as optim

class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=1):
        super(RNN, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True)
        # wights of hidden to output layer, output of this layer is logits
        self.Why = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        # Initialize hidden state
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device) 
        
        # Forward propagate the RNN
        out, _ = self.rnn(x, h0)  # out: (batch_size, seq_length, hidden_size)

        # Pass the RNN output through the last layer 
        logits = self.Why(out)  # logits: (batch_size, seq_length, output_size)
        return logits

# CTC Loss

In [63]:
import numpy as np
import torch

class CTCLoss(nn.Module):
  def __init__(self, logit_shape):
    super(CTCLoss, self).__init__()
    self.grads = torch.zeros(logit_shape)

  def l_to_prime(self, l_tensor):
    l_prime = l_tensor.repeat_interleave(2)
    l_prime = torch.cat((torch.tensor([0]), l_prime), dim=0)
    return l_prime
  
  def calc_ALPHA_BETA_Q(self, l, probs, b_idx = 0):
    l_prime = self.l_to_prime(l)
    len_l_prime = l_prime.shape[0] # 2 * |l| + 1
    len_T = probs.shape[0] # T
  
    ALPHA = torch.zeros(len_l_prime, len_T)
    BETA = torch.zeros(len_l_prime, len_T)
    Q = torch.ones(len_T)

    ALPHA[0, 0] = probs[0, b_idx]
    ALPHA[1, 0] = probs[0, l[0].item()]
    C_alpha_col = ALPHA[0, 0] + ALPHA[1, 0]
    

    BETA[-1, -1] = probs[len_T - 1, b_idx]
    BETA[-2, -1] = probs[len_T - 1, l[-1].item()]
    D_beta_col = BETA[-1, -1] + BETA[-2, -1]

    ALPHA[0, 0] = ALPHA[0, 0]/C_alpha_col
    ALPHA[1, 0] = ALPHA[1, 0]/C_alpha_col
    BETA[-1, -1] = BETA[-1, -1] / D_beta_col
    BETA[-2, -1] = BETA[-2, -1] / D_beta_col
    Q[-1] = D_beta_col
    for t in range(1, len_T):
      alpha_col = t
      beta_col = len_T - 1 - t
      D_beta_col, C_alpha_col = 0, 0
      start, end = max(0, len_l_prime - 2 * (len_T - t)), min(len_l_prime, 2 * (t + 1))

      for s in range(start, end):
        # Calculate ALPHA
        max_idx = 0
        if s - 1 >= 0 and l_prime[s] == b_idx:
          max_idx = 1
        elif (s - 2) >= 0 and l_prime[s] == l_prime[s - 2]:
          max_idx = 1
        elif (s - 2) >= 0:
          max_idx = 2

        alpha_y_val = probs[alpha_col, l_prime[s]]
        alpha_bar = 0
        for idx in range(max_idx + 1):
          alpha_bar += ALPHA[s - idx, alpha_col - 1]
        val = alpha_bar * alpha_y_val
        ALPHA[s, alpha_col] = val
        C_alpha_col += val

        # Calculate BETA
        max_idx = 0
        if (s + 1) < len_l_prime and l_prime[s] == b_idx:
          max_idx = 1
        elif (s + 2) < len_l_prime and l_prime[s] == l_prime[s + 2]:
          max_idx = 1
        elif (s + 2) < len_l_prime:
          max_idx = 2

        beta_y_val = probs[beta_col, l_prime[s]]
        beta_bar = 0
        for idx in range(0, max_idx + 1):
          beta_bar += BETA[s + idx, beta_col + 1]
        val = beta_bar * beta_y_val
        BETA[s, beta_col] = val
        D_beta_col += val

      Q[beta_col] *= D_beta_col
      Q[alpha_col - 1] /= C_alpha_col

      if C_alpha_col != 0:
        ALPHA[:, alpha_col] = ALPHA[:, alpha_col] / C_alpha_col
      if D_beta_col != 0:
        BETA[:, beta_col] = BETA[:, beta_col] / D_beta_col
    

    for t in range(len_T - 2, -1, -1):
      Q[t] *= Q[t + 1]
    
    return ALPHA, BETA, Q


  def ctc_loss_gradient(self, probs, ALPHA, BETA, Q, z):
    # for one target!
    len_T = probs.shape[0]
    len_alphabet = probs.shape[1]
    grads = torch.zeros((len_T, len_alphabet))
    for t in range(0, len_T):
      for k in range(0, len(alphabet)):

        k_in_z_indices = [i for i, c in enumerate(z) if c == k] # lab(z, k)
        mult = 1
        for s in k_in_z_indices:
          mult *= ALPHA[s, t] * BETA[s, t]

        y_t_k = probs[t, k]
        grads[t, k] = y_t_k - Q[t]/y_t_k * mult
    return grads
  
    
  def loss(self, targets, probs, b_idx = 0):
    grad_matrices = []
    len_target = len(targets)
    total_loss = 0
    for i in range(len_target):
      print("sequence number ", i)
      z = targets[i]
      ALPHA, BETA, Q = self.calc_ALPHA_BETA_Q(z, probs[i], b_idx)
      total_loss += ALPHA[-1, -1].item() + ALPHA[-2, -1].item()

      gradient = self.ctc_loss_gradient(probs[i], ALPHA, BETA, Q, z)
      grad_matrices.append(gradient)
    
    self.grads = torch.stack(grad_matrices, dim=0) #(len_targets, T, L) the same as logits
    return total_loss
  

In [64]:
import torch.nn.functional as F

input_size = I_train.shape[2]     # Number of input features per time step
batch_size = I_train.shape[1]     # Number of sequences in each batch
train_size = I_train.shape[0]
hidden_size = 256                 # Number of units in RNN's hidden layer
output_size = len(alphabet)       # Number of output classes (including blank token)
num_layers = 1                    # Number of RNN layers (can be increased)

model = RNN(input_size, hidden_size, output_size, num_layers)
optimizer = optim.SGD(model.parameters(), lr=0.01)  # Stochastic Gradient Descent
logits = model(I_train)           # logits: (batch_size, seq_length, output_size) = (|S|, T, len(alphabet) + 1)
probs = F.softmax(logits, dim=2)  # same size (batch_size, seq_length, num_classes) =  = (|S|, T, len(alphabet) + 1)
ctc = CTCLoss(logits.shape)
loss = ctc.loss(O_train, probs)
logits.grad = ctc.grads
logits.backward(gradient=ctc.grads)
optimizer.step()

True
sequence number  0
sequence number  1
sequence number  2
sequence number  3
