# Frame to Phoneme Classifier

In [1]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [2]:
drivepath = '/content/gdrive/MyDrive/DL_Group_Project/Dataset/Preprocessed_Data'
word_path = '/content/gdrive/MyDrive/DL_Group_Project/Dataset/Preprocessed_Data/Word_Test'

In [3]:
!pip install tqdm



In [4]:
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import os
from tqdm import tqdm
import time
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
import pandas as pd

In [5]:
NUM_EPOCHS = 100
BATCH_SIZE = 64
HIDDEN_SIZE = 128
MODEL_VERSION = 1
LEARNING_RATE = 0.01
LOGISTIC_THRESHOLD = 0.5
OTHER_PHONEMES_PERCENT = 0.1

In [6]:
cuda = torch.cuda.is_available()
num_workers = 8 if cuda else 0
DEVICE = "cuda" if cuda else "cpu"
print("Cuda = "+str(cuda)+" with num_workers = "+str(num_workers))

Cuda = True with num_workers = 8


In [7]:
class PhonemesDataset(Dataset):
    
    def __init__(self, basepath, mode):
      phoneme_features = np.zeros((1, 40))  # eliminate this row
      phoneme_labels = np.zeros((1))  # eliminate this row

      with os.scandir(basepath) as entries:
        for entry in entries:
          if entry.is_file():
            if "features" in entry.name and mode in entry.name:
              phoneme_tag = entry.name.split("_")[0]

              features_filepath = entry.path
              other_phoneme_features = np.load(features_filepath, allow_pickle=True)
              
              # stack to phoneme features
              phoneme_features = np.concatenate((phoneme_features, other_phoneme_features))
              
      self.X = phoneme_features[1:]

    def __len__(self):
        return len(self.X)
 
    # get a row at an index
    def __getitem__(self, index):
        x = torch.Tensor(self.X[index]).float()
        return x

In [8]:
def make_dataloader(dataset, batch_size):
  shuffle = False
  drop_last = False
    
  loader = DataLoader(dataset=dataset, batch_size=batch_size,
                      drop_last=drop_last, shuffle=shuffle,
                      pin_memory=True, num_workers=8)
  
  return loader

In [9]:
class PhonemeShallowDetector(nn.Module):
  
  def __init__(self, hidden_size, activation):
    super(PhonemeShallowDetector, self).__init__()
    
    self.linear_layer = nn.Linear(in_features=40, out_features=hidden_size)
    self.bn_layer = nn.BatchNorm1d(num_features=hidden_size)
    self.activation = activation
    self.output_layer = nn.Linear(in_features=hidden_size, out_features=1)
    self.sigmoid = nn.Sigmoid()
    seq_params = [
      self.linear_layer,
      self.bn_layer,
      self.activation,
      self.output_layer,
      self.sigmoid
    ]

    self.network = nn.Sequential(*seq_params)
    
  def forward(self, x):
    return self.network(x)

In [10]:
class FramePhonemeClassifierModel(nn.Module):
  
  def __init__(self, phoneme_mapper):
    super(FramePhonemeClassifierModel, self).__init__()
    
    self.phoneme_mapper = phoneme_mapper

    shallow_detectors = []
    # generate the PhonemeShallowDetectors 
    for phoneme_index, phoneme_tag in phoneme_mapper.items():
      shallow_detector = PhonemeShallowDetector(hidden_size=HIDDEN_SIZE, 
                                                activation=nn.LeakyReLU())
      shallow_detectors.append(shallow_detector)
    self.shallow_detectors = nn.ModuleList(shallow_detectors)

    self.linear_layer = nn.Linear(in_features=len(phoneme_mapper), out_features=len(phoneme_mapper))

    self.initialize_shallow_detectors()

  def initialize_shallow_detectors(self):
    # load weights from shallow detectors pre-trained models
    for phoneme_index, phoneme_tag in self.phoneme_mapper.items():
      phoneme_shallow_detector = self.shallow_detectors[phoneme_index]

      phoneme_model_path = f"{drivepath}/shallow_detectors/model_{phoneme_tag}_{MODEL_VERSION}_99"
      temp = torch.load(phoneme_model_path)
      phoneme_shallow_detector.load_state_dict(temp['model_state_dict'])
  
  def forward(self, x):
    
    shallow_outputs = []
    for phoneme_index, phoneme_tag in self.phoneme_mapper.items():
      phoneme_shallow_detector = self.shallow_detectors[phoneme_index]

      # run frame through shallow detector
      output = phoneme_shallow_detector(x)
      shallow_outputs.append(output.reshape(-1))

    # convert to torch tensor
    shallow_outputs = torch.vstack(shallow_outputs).T
    outputs = self.linear_layer(shallow_outputs)

    return outputs

In [11]:
class FramePhonemeClassifier():

  def __init__(self, phoneme_mapper):

    word_data = PhonemesDataset(basepath=word_path, mode="word")
    self.word_loader = make_dataloader(dataset=word_data, batch_size=BATCH_SIZE)
    print(f"word_data.shape: {word_data.X.shape}")

    
    self.model = FramePhonemeClassifierModel(phoneme_mapper).to(DEVICE)

    self.criterion = nn.CrossEntropyLoss()
    self.optimizer = torch.optim.SGD(self.model.parameters(), lr=LEARNING_RATE, momentum=0.9)
    self.scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(self.optimizer, 'min')

  def load_model(self, epoch):

    model_epoch_path = "{}/final_classsifier/model_{}_{}".format(drivepath,
                                                                 MODEL_VERSION, 
                                                                 epoch)
    best_model = torch.load(model_epoch_path)
    self.model.load_state_dict(best_model['model_state_dict'], strict=False)
    self.optimizer.load_state_dict(best_model['optimizer_state_dict'])
    self.scheduler.load_state_dict(best_model['scheduler_state_dict'])

  def test_model(self):

    with torch.no_grad():
      self.model.eval()
      total_predictions = 0.0
      predictions = []

      example_ct = 0
      start_time = time.time()
      for batch_idx, features in enumerate(self.word_loader):
        features = features.to(DEVICE)
        example_ct += len(features)

        outputs = self.model(features)
        outputs = outputs.to(DEVICE)

        # check number of correct predictions
        output_classes = torch.argmax(outputs.log_softmax(1), dim=1)  # convert to class labels
        total_predictions += len(output_classes)
        predictions += list(output_classes.detach().cpu())
      
      end_time = time.time()  
      return predictions

# Test Word classifier

In [12]:
%cd /content/gdrive/MyDrive/DL_Group_Project/Dataset/Preprocessed_Data

/content/gdrive/.shortcut-targets-by-id/1qwJK2jyGMl2dPnVFe6JNZvrrG45HoonZ/DL_Group_Project/Dataset/Preprocessed_Data


In [13]:
from utilities import PHONEME_MAPPER

In [14]:
%cd /

/


In [15]:
print(PHONEME_MAPPER)

{0: 'SIL', 1: 'AE', 2: 'AH', 3: 'AW', 4: 'AY', 5: 'B', 6: 'EH', 7: 'D', 8: 'DH', 9: 'EE', 10: 'FF', 11: 'G', 12: 'HH', 13: 'IH', 14: 'II', 15: 'J', 16: 'K', 17: 'LL', 18: 'MM', 19: 'NN', 20: 'OH', 21: 'OO', 22: 'OW', 23: 'OY', 24: 'P', 25: 'RR', 26: 'SH', 27: 'SS', 28: 'T', 29: 'TH', 30: 'UE', 31: 'UH', 32: 'VV', 33: 'WW', 34: 'YY', 35: 'ZZ'}


In [16]:
classifier = FramePhonemeClassifier(PHONEME_MAPPER)
classifier.load_model(99)
predictions = classifier.test_model()
print(predictions)

  cpuset_checked))


word_data.shape: (1222, 40)
[tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), tensor(0), ten

In [17]:
print(predictions[0].item())

0


In [18]:
labels = list()
for prediction in predictions:
  labels.append(PHONEME_MAPPER[prediction.item()])
print(labels)

['SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'OW', 'OW', 'SIL', 'OW', 'OW', 'AW', 'SIL', 'SIL', 'B', 'SIL', 'OY', 'OY', 'OY', 'D', 'OY', 'OY', 'IH', 'IH', 'IH', 'IH', 'IH', 'IH', 'IH', 'IH', 'AE', 'J', 'SIL', 'T', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL', 'SIL