The following code was modified from [this source](https://colab.research.google.com/drive/1IPpwx4rX32rqHKpLz7dc8sOKspUa-YKO?undefined#scrollTo=XZodve8PGKfS).

In [11]:
import os
import torch
import torch.nn as nn
import torch.utils.data as data
import torch.optim as optim
import torch.nn.functional as F
import torchaudio
import numpy as np
import editdistance

class TextTransform:
    """Maps characters to integers and vice versa"""
    def __init__(self):
      self.char_map = { '': 0, ' ': 1, 'a': 2, 'b': 3, 'c': 4, 'd': 5, 'e': 6, 'f': 7, 'g': 8, 'h': 9, 'i': 10, 'j': 11, 'k': 12, 'l': 13, 'm': 14, 'n': 15, 'o': 16, 'p': 17, 'q': 18, 'r': 19, 's': 20, 't': 21, 'u': 22, 'v': 23, 'w': 24, 'x': 25, 'y': 26, 'z': 27 }
      self.index_map = { key: value for value, key in self.char_map.items() }

    def text_to_int(self, text):
      """ Use a character map and convert text to an integer sequence """
      return [self.char_map.get(c, 0) for c in text]

    def int_to_text(self, labels):
      """ Use a character map and convert integer labels to an text sequence """
      return ''.join([self.index_map[i] for i in labels])

train_audio_transforms = nn.Sequential(
    torchaudio.transforms.MelSpectrogram(sample_rate=16000, n_mels=128),
    torchaudio.transforms.FrequencyMasking(freq_mask_param=30),
    torchaudio.transforms.TimeMasking(time_mask_param=100)
)

valid_audio_transforms = torchaudio.transforms.MelSpectrogram()

text_transform = TextTransform()

def data_processing(data, data_type="train"):
  spectrograms = []
  labels = []
  input_lengths = []
  label_lengths = []
  for (waveform, sample_rate, original_text, normalized_text, speaker_id, chapter_id, utterance_id) in data:
    if data_type == 'train':
      spec = train_audio_transforms(waveform).squeeze(0).transpose(0, 1)
    elif data_type == 'valid':
      spec = valid_audio_transforms(waveform).squeeze(0).transpose(0, 1)
    else:
      raise Exception('data_type should be train or valid')
    spectrograms.append(spec)
    label = torch.Tensor(text_transform.text_to_int(normalized_text.lower()))
    labels.append(label)
    input_lengths.append(spec.shape[0]//2)
    label_lengths.append(len(label))

  spectrograms = nn.utils.rnn.pad_sequence(spectrograms, batch_first=True).unsqueeze(1).transpose(2, 3)
  labels = nn.utils.rnn.pad_sequence(labels, batch_first=True)

  return spectrograms, labels, input_lengths, label_lengths

def GreedyDecoder(output, labels, label_lengths, blank_label=28, collapse_repeated=True):
  arg_maxes = torch.argmax(output, dim=2)
  decodes = []
  targets = []
  for i, args in enumerate(arg_maxes):
    decode = []
    targets.append(text_transform.int_to_text(labels[i][:label_lengths[i]].tolist()))
    for j, index in enumerate(args):
      if index != blank_label:
        if collapse_repeated and j != 0 and index == args[j -1]:
          continue
        decode.append(index.item())
    decodes.append(text_transform.int_to_text(decode))
  return decodes, targets



In [12]:
# class CNNLayerNorm(nn.Module):
#   """Layer normalization built for cnns input"""
#   def __init__(self, n_feats):
#     super(CNNLayerNorm, self).__init__()
#     self.layer_norm = nn.LayerNorm(n_feats)

#   def forward(self, x):
#     # x (batch, channel, feature, time)
#     x = x.transpose(2, 3).contiguous() # (batch, channel, time, feature)
#     x = self.layer_norm(x)
#     return x.transpose(2, 3).contiguous() # (batch, channel, feature, time) 

# class ResidualCNN(nn.Module):
#   """Residual CNN inspired by https://arxiv.org/pdf/1603.05027.pdf
#     except with layer norm instead of batch norm
#   """
#   def __init__(self, in_channels, out_channels, kernel, stride, dropout, n_feats):
#     super(ResidualCNN, self).__init__()

#     self.cnn1 = nn.Conv2d(in_channels, out_channels, kernel, stride, padding=kernel//2)
#     self.cnn2 = nn.Conv2d(out_channels, out_channels, kernel, stride, padding=kernel//2)
#     self.dropout1 = nn.Dropout(dropout)
#     self.dropout2 = nn.Dropout(dropout)
#     self.layer_norm1 = CNNLayerNorm(n_feats)
#     self.layer_norm2 = CNNLayerNorm(n_feats)

#   def forward(self, x):
#     residual = x  # (batch, channel, feature, time)
#     x = self.layer_norm1(x)
#     x = F.gelu(x)
#     x = self.dropout1(x)
#     x = self.cnn1(x)
#     x = self.layer_norm2(x)
#     x = F.gelu(x)
#     x = self.dropout2(x)
#     x = self.cnn2(x)
#     x += residual
#     return x # (batch, channel, feature, time)

# class BidirectionalGRU(nn.Module):
#   def __init__(self, rnn_dim, hidden_size, dropout, batch_first):
#     super(BidirectionalGRU, self).__init__()

#     self.BiGRU = nn.GRU(
#       input_size=rnn_dim, hidden_size=hidden_size,
#       num_layers=1, batch_first=batch_first, bidirectional=True
#     )
#     self.layer_norm = nn.LayerNorm(rnn_dim)
#     self.dropout = nn.Dropout(dropout)

#   def forward(self, x):
#     x = self.layer_norm(x)
#     x = F.gelu(x)
#     x, _ = self.BiGRU(x)
#     x = self.dropout(x)
#     return x

class BidirectionalLSTM(nn.Module):
  def __init__(self, rnn_dim, hidden_size, dropout, batch_first):
    super(BidirectionalLSTM, self).__init__()

    self.BiLSTM = nn.LSTM(
      input_size=rnn_dim, hidden_size=hidden_size,
      num_layers=1, batch_first=batch_first, bidirectional=True
    )
    self.layer_norm = nn.LayerNorm(rnn_dim)
    self.dropout = nn.Dropout(dropout)

  def forward(self, x):
    x = self.layer_norm(x)
    x = F.gelu(x)
    x, _ = self.BiLSTM(x)
    x = self.dropout(x)
    return x

class SpeechRecognitionModel(nn.Module):
  def __init__(self, n_cnn_layers, n_rnn_layers, rnn_dim, n_class, n_feats, stride=2, dropout=0.1):
    super(SpeechRecognitionModel, self).__init__()
    n_feats = n_feats//2
    self.cnn = nn.Conv2d(1, 32, 3, stride=stride, padding=3//2)  # cnn for extracting heirachal features

    # n ~residual~ cnn layers with filter size of 32
    self.cnn_layers = nn.Sequential(*[
      # ResidualCNN(32, 32, kernel=3, stride=1, dropout=dropout, n_feats=n_feats) 
      nn.Conv2d(32, 32, kernel_size=3, stride=1, padding=3//2)
      for _ in range(n_cnn_layers * 2) # Times two to make this more compatible with the original model I am modifying, in which each one in n_cnn_layers created two Conv2d layers with skip connections
    ])
    self.fully_connected = nn.Linear(n_feats*32, rnn_dim)
    self.birnn_layers = nn.Sequential(*[
      # BidirectionalGRU(rnn_dim=rnn_dim if i==0 else rnn_dim*2,
      #                   hidden_size=rnn_dim, dropout=dropout, batch_first=i==0)
      BidirectionalLSTM(rnn_dim=rnn_dim if i==0 else rnn_dim*2,
                        hidden_size=rnn_dim, dropout=dropout, batch_first=i==0)
      for i in range(n_rnn_layers)
    ])
    self.classifier = nn.Sequential(
      nn.Linear(rnn_dim*2, rnn_dim),  # birnn returns rnn_dim*2
      nn.GELU(),
      nn.Dropout(dropout),
      nn.Linear(rnn_dim, n_class)
    )

  def forward(self, x):
    x = self.cnn(x)
    x = self.cnn_layers(x)
    sizes = x.size()
    x = x.view(sizes[0], sizes[1] * sizes[2], sizes[3])  # (batch, feature, time)
    x = x.transpose(1, 2) # (batch, time, feature)
    x = self.fully_connected(x)
    x = self.birnn_layers(x)
    x = self.classifier(x)
    return x

In [13]:
def train(model, device, train_loader, criterion, optimizer, scheduler, epoch):
  model.train()
  data_len = len(train_loader.dataset)
  for batch_idx, _data in enumerate(train_loader):
    spectrograms, labels, input_lengths, label_lengths = _data 
    spectrograms, labels = spectrograms.to(device), labels.to(device)

    optimizer.zero_grad()

    output = model(spectrograms)  # (batch, time, n_class)
    output = F.log_softmax(output, dim=2)
    output = output.transpose(0, 1) # (time, batch, n_class)

    loss = criterion(output, labels, input_lengths, label_lengths)
    loss.backward()

    # print('loss', loss.item())
    # print('learning_rate', scheduler.get_lr())

    optimizer.step()
    scheduler.step()
    if batch_idx % 10 == 0 or batch_idx == data_len:
      print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
        epoch, batch_idx * len(spectrograms), data_len,
        100. * batch_idx / len(train_loader), loss.item())
      )

def test(model, device, test_loader, criterion):
  print('\nevaluating...')
  model.eval()
  test_loss = 0
  test_char_edit_dist = []
  test_word_edit_dist = []
  with torch.no_grad():
    for data in test_loader:
      spectrograms, labels, input_lengths, label_lengths = data 
      spectrograms, labels = spectrograms.to(device), labels.to(device)

      output = model(spectrograms) # (batch, time, n_class)
      output = F.log_softmax(output, dim=2)
      output = output.transpose(0, 1) # (time, batch, n_class)

      loss = criterion(output, labels, input_lengths, label_lengths)
      test_loss += loss.item() / len(test_loader)

      decoded_preds, decoded_targets = GreedyDecoder(output.transpose(0, 1), labels, label_lengths)
      for j in range(len(decoded_preds)):
        test_char_edit_dist.append(editdistance.eval(decoded_targets[j], decoded_preds[j]))
        test_word_edit_dist.append(editdistance.eval(decoded_targets[j].split(" "), decoded_preds[j].split(" ")))

  avg_char_edit_dist = sum(test_char_edit_dist)/len(test_char_edit_dist)
  avg_word_edit_dist = sum(test_word_edit_dist)/len(test_word_edit_dist)

  print("Test set:")
  print("Average loss: {:.4f}".format(test_loss))
  print("Average character edit distance: {:4f}".format(avg_char_edit_dist))
  print("Average word edit distance: {:.4f}".format(avg_word_edit_dist))

def main(learning_rate=5e-4, batch_size=20, epochs=10, train_url="train-clean-100", test_url="test-clean"):
  hparams = {
    "n_cnn_layers": 3,
    "n_rnn_layers": 5,
    "rnn_dim": 512,
    "n_class": 29,
    "n_feats": 128,
    "stride": 2,
    "dropout": 0.1,
    "learning_rate": learning_rate,
    "batch_size": batch_size,
    "epochs": epochs
  }

  torch.manual_seed(7)

  # Get ideal device (CPU, GPU, or MPS for Apple Silicon)
  use_cuda = torch.cuda.is_available()
  device = torch.device("cuda" if use_cuda else "cpu")
  # if torch.backends.mps.is_available():
  #   if not torch.backends.mps.is_built():
  #     torch.backends.mps.build()
  #   device = torch.device("mps")

  if not os.path.isdir("./data"):
    os.makedirs("./data")

  train_dataset = torchaudio.datasets.LIBRITTS(root="data", url=train_url, download=True)
  test_dataset = torchaudio.datasets.LIBRITTS(root="data", url=test_url, download=True)

  kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {}
  train_loader = data.DataLoader(dataset=train_dataset,
                              batch_size=hparams['batch_size'],
                              shuffle=True,
                              collate_fn=lambda x: data_processing(x, 'train'),
                              **kwargs)
  test_loader = data.DataLoader(dataset=test_dataset,
                              batch_size=hparams['batch_size'],
                              shuffle=False,
                              collate_fn=lambda x: data_processing(x, 'valid'),
                              **kwargs)

  model = SpeechRecognitionModel(
    hparams['n_cnn_layers'], hparams['n_rnn_layers'], hparams['rnn_dim'],
    hparams['n_class'], hparams['n_feats'], hparams['stride'], hparams['dropout']
  ).to(device)

  print(model)
  print('Num Model Parameters', sum([param.nelement() for param in model.parameters()]))

  optimizer = optim.AdamW(model.parameters(), hparams['learning_rate'])
  criterion = nn.CTCLoss(blank=28).to(device)
  scheduler = optim.lr_scheduler.OneCycleLR(optimizer, max_lr=hparams['learning_rate'], 
                                          steps_per_epoch=int(len(train_loader)),
                                          epochs=hparams['epochs'],
                                          anneal_strategy='linear')
  
  for epoch in range(1, epochs + 1):
    train(model, device, train_loader, criterion, optimizer, scheduler, epoch)
    test(model, device, test_loader, criterion)

    torch.save({
      "epoch": epoch,
      "model_state_dict": model.state_dict(),
      "optimizer_state_dict": optimizer.state_dict(),
    }, "blstm.pt")

In [14]:
# On Apple Silicon, fallback to CPU for operations that are not supported by MPS
# (e.g. CTC loss)
os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = "1"

In [15]:
learning_rate = 5e-4
batch_size = 10
epochs = 10
libri_train_set = "train-clean-100"
libri_test_set = "test-clean"

main(learning_rate, batch_size, epochs, libri_train_set, libri_test_set)

TypeError: Conv2d.__init__() got an unexpected keyword argument 'dropout'