In [1]:
import os
import torch
import torch.nn as nn
import torch.utils.data as data
import torch.optim as optim
import torch.nn.functional as F
import torchaudio
import numpy as np
import re
import random
import string
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from keras.models import Model
from keras.layers import Input, LSTM, Dense
from keras.layers import Bidirectional, Concatenate, Permute, Dot, Multiply, Reshape, RepeatVector, Lambda, Flatten
from keras.activations import softmax
from keras.layers import Embedding, Dropout, Activation
from fast_ctc_decode import viterbi_search
from sklearn.model_selection import train_test_split
from keras.callbacks import ModelCheckpoint, EarlyStopping
from keras import backend as K
import librosa
import soundfile as sf
from pydub import AudioSegment

##Load LibriSpeech
if not os.path.isdir("./data"):
    os.makedirs("./data")
    test_dataset = torchaudio.datasets.LIBRISPEECH("./data", url="test-clean", download=True)
    train_dataset = torchaudio.datasets.LIBRISPEECH("./data", url="train-clean-100", download=True)
else:
    test_dataset = torchaudio.datasets.LIBRISPEECH("./data", url="test-clean", download=False)
    train_dataset = torchaudio.datasets.LIBRISPEECH("./data", url="train-clean-100", download=False)

####################################################################################################################################

##Load CMU Pronunciation Dictionary 
CMU_DICT_PATH = os.path.join(
    'GRAPHEME_TO_PHONEME/input/cmu-pronunciation-dictionary-unmodified-07b/cmudict-0.7b')
CMU_SYMBOLS_PATH = os.path.join(
    'GRAPHEME_TO_PHONEME/input/cmu-pronouncing-dictionary/cmudict.symbols')
##Remove words with numbers or symbols
ILLEGAL_CHAR_REGEX = "[^A-Z-'.]"
##Setting limits for the words from the Dataset
MAX_DICT_WORD_LEN = 20
MIN_DICT_WORD_LEN = 2
def load_clean_phonetic_dictionary():
    def is_alternate_pho_spelling(word):
        ##No word has more than 9 pronunciations to it is safe to use .isdigit() 
        ##Format alternative pronunciations as "WORD(#)"
        return word[-1] == ')' and word[-3] == '(' and word[-2].isdigit() 
    ##Ignore words with symbols or numbers or that don't satisfy the set limits
    def should_skip(word):
        if not word[0].isalpha():
            return True
        ##Ignore abbreviations
        if word[-1] == '.':  
            return True
        if re.search(ILLEGAL_CHAR_REGEX, word):
            return True
        if len(word) > MAX_DICT_WORD_LEN:
            return True
        if len(word) < MIN_DICT_WORD_LEN:
            return True
        return False
    phonetic_dict = {}
    ##Cleaning up the Dataset
    with open(CMU_DICT_PATH, encoding="ISO-8859-1") as cmu_dict:
        for line in cmu_dict:
            ##Skip comments
            if line[0:3] == ';;;':
                continue
            word, phonetic = line.strip().split('  ')
            # Alternate pronounciations are formatted: "WORD(#)  PHONETICS"
            # Remove (#)
            if is_alternate_pho_spelling(word):
                word = word[:word.find('(')]
            if should_skip(word):
                continue

            if word not in phonetic_dict:
                phonetic_dict[word] = []
            phonetic_dict[word].append(phonetic)
    #phonetic_dict = {key:phonetic_dict[key] for key in random.sample(list(phonetic_dict.keys()), 5000)} #Limiting Dataset for Testing
    return phonetic_dict
phonetic_dict = load_clean_phonetic_dictionary()
example_count = np.sum([len(prons) for _, prons in phonetic_dict.items()])

Using TensorFlow backend.
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])


In [2]:
##LibriSpeech Model Data Preperation
class TextTransform:
    """Maps characters to integers and vice versa"""
    def __init__(self):
        char_map_str = """
        ' 0
        <SPACE> 1
        a 2
        b 3
        c 4
        d 5
        e 6
        f 7
        g 8
        h 9
        i 10
        j 11
        k 12
        l 13
        m 14
        n 15
        o 16
        p 17
        q 18
        r 19
        s 20
        t 21
        u 22
        v 23
        w 24
        x 25
        y 26
        z 27
        """
        self.char_map = {}
        self.index_map = {}
        for line in char_map_str.strip().split('\n'):
            ch, index = line.split()
            self.char_map[ch] = int(index)
            self.index_map[int(index)] = ch
        self.index_map[1] = ' '

    def text_to_int(self, text):
        """ Use a character map and convert text to an integer sequence """
        int_sequence = []
        for c in text:
            if c == ' ':
                ch = self.char_map['<SPACE>']
            else:
                ch = self.char_map[c]
            int_sequence.append(ch)
        return int_sequence

    def int_to_text(self, labels):
        """ Use a character map and convert integer labels to an text sequence """
        string = []
        for i in labels:
            string.append(self.index_map[i])
        return ''.join(string).replace('<SPACE>', ' ')

train_audio_transforms = nn.Sequential(
    torchaudio.transforms.MelSpectrogram(sample_rate=16000, n_mels=128),
    torchaudio.transforms.FrequencyMasking(freq_mask_param=30),
    torchaudio.transforms.TimeMasking(time_mask_param=100)
)
valid_audio_transforms = torchaudio.transforms.MelSpectrogram()
text_transform = TextTransform()
def data_processing(data, data_type="train"):
    spectrograms = []
    labels = []
    input_lengths = []
    label_lengths = []
    for (waveform, _, utterance, _, _, _) in data:
        if data_type == 'train':
            spec = train_audio_transforms(waveform).squeeze(0).transpose(0, 1)
        elif data_type == 'valid':
            spec = valid_audio_transforms(waveform).squeeze(0).transpose(0, 1)
        else:
            raise Exception('data_type should be train or valid')
        spectrograms.append(spec)
        label = torch.Tensor(text_transform.text_to_int(utterance.lower()))
        labels.append(label)
        input_lengths.append(spec.shape[0]//2)
        label_lengths.append(len(label))

    spectrograms = nn.utils.rnn.pad_sequence(spectrograms, batch_first=True).unsqueeze(1).transpose(2, 3)
    labels = nn.utils.rnn.pad_sequence(labels, batch_first=True)
    return spectrograms, labels, input_lengths, label_lengths

#################################################################################################################################

##Data Preperation for CMU Phonetics Dictionary
START_PHONE_SYM = '\t'
END_PHONE_SYM = '\n'
def char_list():
    allowed_symbols = [".", "-", "'"]
    uppercase_letters = list(string.ascii_uppercase)
    return [''] + allowed_symbols + uppercase_letters
def phone_list():
    phone_list = [START_PHONE_SYM, END_PHONE_SYM]
    with open(CMU_SYMBOLS_PATH) as file:
        for line in file: 
            phone_list.append(line.strip())
    return [''] + phone_list
def id_mappings_from_list(str_list):
    str_to_id = {s: i for i, s in enumerate(str_list)} 
    id_to_str = {i: s for i, s in enumerate(str_list)}
    return str_to_id, id_to_str
char_to_id, id_to_char = id_mappings_from_list(char_list())
phone_to_id, id_to_phone = id_mappings_from_list(phone_list())
CHAR_TOKEN_COUNT = len(char_to_id)
PHONE_TOKEN_COUNT = len(phone_to_id)
def char_to_1_hot(char):
    char_id = char_to_id[char]
    hot_vec = np.zeros((CHAR_TOKEN_COUNT))
    hot_vec[char_id] = 1.
    return hot_vec
def phone_to_1_hot(phone):
    phone_id = phone_to_id[phone]
    hot_vec = np.zeros((PHONE_TOKEN_COUNT))
    hot_vec[phone_id] = 1.
    return hot_vec
MAX_CHAR_SEQ_LEN = max([len(word) for word, _ in phonetic_dict.items()])
MAX_PHONE_SEQ_LEN = max([max([len(pron.split()) for pron in pronuns]) 
                         for _, pronuns in phonetic_dict.items()]
                       ) + 2  ## "+ 2" token Start and End
def dataset_to_1_hot_tensors():
    char_seqs = []
    phone_seqs = []
    for word, pronuns in phonetic_dict.items():
        word_matrix = np.zeros((MAX_CHAR_SEQ_LEN, CHAR_TOKEN_COUNT))
        for t, char in enumerate(word):
            word_matrix[t, :] = char_to_1_hot(char)
        for pronun in pronuns:
            pronun_matrix = np.zeros((MAX_PHONE_SEQ_LEN, PHONE_TOKEN_COUNT))
            phones = [START_PHONE_SYM] + pronun.split() + [END_PHONE_SYM]
            for t, phone in enumerate(phones):
                pronun_matrix[t,:] = phone_to_1_hot(phone)
                
            char_seqs.append(word_matrix)
            phone_seqs.append(pronun_matrix)
    
    return np.array(char_seqs), np.array(phone_seqs)
char_seq_matrix, phone_seq_matrix = dataset_to_1_hot_tensors()        

  "At least one mel filterbank has all zero values. "


In [3]:
##Test Helper Functions for the LibriSpeech Model
def avg_wer(wer_scores, combined_ref_len):
    return float(sum(wer_scores)) / float(combined_ref_len)
##levenshtein disctance is defined as the minimum number of single-character edits (substitutions, insertions ordeletions) required to change one word into the other. 
##We can naturally extend the edits to word level when calculating levenshtein disctance for two sentences.
def _levenshtein_distance(ref, hyp):
    m = len(ref)
    n = len(hyp)
    ##Special Scenarios
    if ref == hyp:
        return 0
    if m == 0:
        return n
    if n == 0:
        return m
    if m < n:
        ref, hyp = hyp, ref
        m, n = n, m
    distance = np.zeros((2, n + 1), dtype=np.int32)
    ##Set Initial Distances in Matrix
    for j in range(0,n + 1):
        distance[0][j] = j
    ##Levenshtein Distance Algorithm
    for i in range(1, m + 1):
        prev_row_idx = (i - 1) % 2
        cur_row_idx = i % 2
        distance[cur_row_idx][0] = i
        for j in range(1, n + 1):
            if ref[i - 1] == hyp[j - 1]:
                distance[cur_row_idx][j] = distance[prev_row_idx][j - 1]
            else:
                s_num = distance[prev_row_idx][j - 1] + 1 ##Substitutions
                i_num = distance[cur_row_idx][j - 1] + 1 ##Insertions
                d_num = distance[prev_row_idx][j] + 1 ##Deletions
                distance[cur_row_idx][j] = min(s_num, i_num, d_num)
    return distance[m % 2][n]
def word_errors(reference, hypothesis, ignore_case=False, delimiter=' '):
    if ignore_case == True:
        reference = reference.lower()
        hypothesis = hypothesis.lower()
    ref_words = reference.split(delimiter)
    hyp_words = hypothesis.split(delimiter)
    edit_distance = _levenshtein_distance(ref_words, hyp_words)
    return float(edit_distance), len(ref_words)
def char_errors(reference, hypothesis, ignore_case=False, remove_space=False):
    if ignore_case == True:
        reference = reference.lower()
        hypothesis = hypothesis.lower()
    join_char = ' '
    if remove_space == True:
        join_char = ''
    reference = join_char.join(filter(None, reference.split(' ')))
    hypothesis = join_char.join(filter(None, hypothesis.split(' ')))
    edit_distance = _levenshtein_distance(reference, hypothesis)
    return float(edit_distance), len(reference)
##WER = (Sw + Dw + Iw) / Nw
##Sw is the number of words subsituted
##Dw is the number of words deleted
##Iw is the number of words inserted
##Nw is the number of words in the reference
def wer(reference, hypothesis, ignore_case=False, delimiter=' '):
    edit_distance, ref_len = word_errors(reference, hypothesis, ignore_case, delimiter)
    if ref_len == 0:
        raise ValueError("Reference's word number should be greater than 0.")
    wer = float(edit_distance) / ref_len
    return wer
##CER = (Sc + Dc + Ic) / Nc
##Sc is the number of characters substituted
##Dc is the number of characters deleted
##Ic is the number of characters inserted
##Nc is the number of characters in the reference
def cer(reference, hypothesis, ignore_case=False, remove_space=False):
    edit_distance, ref_len = char_errors(reference, hypothesis, ignore_case, remove_space)
    if ref_len == 0:
        raise ValueError("Length of reference should be greater than 0.")
    cer = float(edit_distance) / ref_len
    return cer
##Decoder for Evaluation
def GreedyDecoder(output, labels, label_lengths, blank_label=28, collapse_repeated=True):
    arg_maxes = torch.argmax(output, dim=2)
    decodes = []
    targets = []
    for i, args in enumerate(arg_maxes):
        decode = []
        targets.append(text_transform.int_to_text(labels[i][:label_lengths[i]].tolist()))
        for j, index in enumerate(args):
            if index != blank_label:
                if collapse_repeated and j != 0 and index == args[j -1]:
                    continue
                decode.append(index.item())
        decodes.append(text_transform.int_to_text(decode))
    return decodes, targets

In [4]:
##First Model Implementation
##First Module
##CNN Layer Normalization
class CNNLayerNorm(nn.Module):
    def __init__(self, n_feats):
        super(CNNLayerNorm, self).__init__()
        self.layer_norm = nn.LayerNorm(n_feats)
    def forward(self, x):
        # x (batch, channel, feature, time)
        x = x.transpose(2, 3).contiguous() ##(batch, channel, time, feature)
        x = self.layer_norm(x)
        return x.transpose(2, 3).contiguous() ##(batch, channel, feature, time) 

##Residual CNN (The implementation is inspired from arXiv:1603.05027v3 [cs.CV] 25 Jul 2016)
##We Normalize Layer by Layer instead of Batch Normalization
class ResidualCNN(nn.Module):
    def __init__(self, in_channels, out_channels, kernel, stride, dropout, n_feats):
        super(ResidualCNN, self).__init__()
        self.cnn1 = nn.Conv2d(in_channels, out_channels, kernel, stride, padding=kernel//2)
        self.cnn2 = nn.Conv2d(out_channels, out_channels, kernel, stride, padding=kernel//2)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.layer_norm1 = CNNLayerNorm(n_feats)
        self.layer_norm2 = CNNLayerNorm(n_feats)
    def forward(self, x):
        residual = x  ##(batch, channel, feature, time)
        x = self.layer_norm1(x)
        x = F.gelu(x)
        x = self.dropout1(x)
        x = self.cnn1(x)
        x = self.layer_norm2(x)
        x = F.gelu(x)
        x = self.dropout2(x)
        x = self.cnn2(x)
        x += residual
        return x ##(batch, channel, feature, time)

##Second Module
class BidirectionalGRU(nn.Module):
    def __init__(self, rnn_dim, hidden_size, dropout, batch_first):
        super(BidirectionalGRU, self).__init__()
        self.BiGRU = nn.GRU(
            input_size=rnn_dim, hidden_size=hidden_size,
            num_layers=1, batch_first=batch_first, bidirectional=True)
        self.layer_norm = nn.LayerNorm(rnn_dim)
        self.dropout = nn.Dropout(dropout)
    def forward(self, x):
        x = self.layer_norm(x)
        x = F.gelu(x)
        x, _ = self.BiGRU(x)
        x = self.dropout(x)
        return x
##Complete Model Structure The Number of CNNs and GRUs is variable and Accuracy increases as the increase with diminishing returns
class SpeechRecognitionModel(nn.Module):  
    def __init__(self, n_cnn_layers, n_rnn_layers, rnn_dim, n_class, n_feats, stride=2, dropout=0.1):
        super(SpeechRecognitionModel, self).__init__()
        n_feats = n_feats//2
        self.cnn = nn.Conv2d(1, 32, 3, stride=stride, padding=3//2)  ##CNN for extracting features of interest
        ##N Residual CNN Layers (Filter size = 32)
        self.rescnn_layers = nn.Sequential(*[
            ResidualCNN(32, 32, kernel=3, stride=1, dropout=dropout, n_feats=n_feats) 
            for _ in range(n_cnn_layers)
        ])
        self.fully_connected = nn.Linear(n_feats*32, rnn_dim)
        self.birnn_layers = nn.Sequential(*[
            BidirectionalGRU(rnn_dim=rnn_dim if i==0 else rnn_dim*2,
                             hidden_size=rnn_dim, dropout=dropout, batch_first=i==0)
            for i in range(n_rnn_layers)
        ])
        self.classifier = nn.Sequential(
            nn.Linear(rnn_dim*2, rnn_dim),  ##Bidirectional RNN returns rnn_dim*2
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(rnn_dim, n_class)
        )
    def forward(self, x):
        x = self.cnn(x)
        x = self.rescnn_layers(x)
        sizes = x.size()
        x = x.view(sizes[0], sizes[1] * sizes[2], sizes[3])  ##(batch, feature, time)
        x = x.transpose(1, 2) ##(batch, time, feature)
        x = self.fully_connected(x)
        x = self.birnn_layers(x)
        x = self.classifier(x)
        return x
    
##Second Model

def attention_model(hidden_nodes = 256, emb_size = 256):
    ##Attention Mechanism Layers
    attn_repeat = RepeatVector(MAX_CHAR_SEQ_LEN)
    attn_concat = Concatenate(axis=-1)
    attn_dense1 = Dense(128, activation="tanh")
    attn_dense2 = Dense(1, activation="relu")
    attn_softmax = Lambda(lambda x: softmax(x,axis=1))
    attn_dot = Dot(axes = 1)
    def get_context(encoder_outputs, h_prev):
        h_prev = attn_repeat(h_prev)
        concat = attn_concat([encoder_outputs, h_prev])
        e = attn_dense1(concat)
        e = attn_dense2(e)
        attention_weights = attn_softmax(e)
        context = attn_dot([attention_weights, encoder_outputs])
        return context
    ##Encoder
    char_inputs = Input(shape=(None,))
    char_embedding_layer = Embedding(CHAR_TOKEN_COUNT, emb_size, input_length=MAX_CHAR_SEQ_LEN)
    encoder = Bidirectional(LSTM(hidden_nodes, return_sequences=True, recurrent_dropout=0.2))
    ##Decoder
    decoder = LSTM(hidden_nodes, return_state=True, recurrent_dropout=0.2)
    phone_embedding_layer = Embedding(PHONE_TOKEN_COUNT, emb_size)
    embedding_reshaper = Reshape((1,emb_size,))
    context_phone_concat = Concatenate(axis=-1)
    context_phone_dense = Dense(hidden_nodes*3, activation="relu")
    output_layer = Dense(PHONE_TOKEN_COUNT, activation='softmax')
    ##Training Encoder
    char_embeddings = char_embedding_layer(char_inputs)
    char_embeddings = Activation('relu')(char_embeddings)
    char_embeddings = Dropout(0.5)(char_embeddings)
    encoder_outputs = encoder(char_embeddings)
    ##Training Decoder
    h0 = Input(shape=(hidden_nodes,))
    c0 = Input(shape=(hidden_nodes,))
    h = h0 ##Hidden State
    c = c0 #Cell State
    phone_inputs = []
    phone_outputs = []
    for t in range(MAX_PHONE_SEQ_LEN):
        phone_input = Input(shape=(None,))
        phone_embeddings = phone_embedding_layer(phone_input)
        phone_embeddings = Dropout(0.5)(phone_embeddings)
        phone_embeddings = embedding_reshaper(phone_embeddings)
        context = get_context(encoder_outputs, h)
        phone_and_context = context_phone_concat([context, phone_embeddings])
        phone_and_context = context_phone_dense(phone_and_context)
        decoder_output, h, c = decoder(phone_and_context, initial_state = [h, c])
        decoder_output = Dropout(0.5)(decoder_output)
        phone_output = output_layer(decoder_output)
        phone_inputs.append(phone_input)
        phone_outputs.append(phone_output)
    training_model = Model(inputs=[char_inputs, h0, c0] + phone_inputs, outputs=phone_outputs)
   ##Testing Encoder
    testing_encoder_model = Model(char_inputs, encoder_outputs)
    ##Testing Decoder
    test_prev_phone_input = Input(shape=(None,))
    test_phone_embeddings = phone_embedding_layer(test_prev_phone_input)
    test_phone_embeddings = embedding_reshaper(test_phone_embeddings)
    test_h = Input(shape=(hidden_nodes,), name='test_h')
    test_c = Input(shape=(hidden_nodes,), name='test_c')
    test_encoding_input = Input(shape=(MAX_CHAR_SEQ_LEN, hidden_nodes*2,))
    test_context = get_context(test_encoding_input, test_h)
    test_phone_and_context = Concatenate(axis=-1)([test_context, test_phone_embeddings])
    test_phone_and_context = context_phone_dense(test_phone_and_context)
    test_seq, out_h, out_c = decoder(test_phone_and_context, initial_state = [test_h, test_c])
    test_out = output_layer(test_seq)
    testing_decoder_model = Model([test_prev_phone_input, test_h, test_c, test_encoding_input], [test_out,out_h,out_c])
    return training_model, testing_encoder_model, testing_decoder_model

In [None]:
phone_seq_matrix_decoder_output = np.pad(phone_seq_matrix,((0,0),(0,1),(0,0)), mode='constant')[:,1:,:]
TEST_SIZE = 0.2
(char_input_train, char_input_test, 
 phone_input_train, phone_input_test, 
 phone_output_train, phone_output_test) = train_test_split(
    char_seq_matrix, phone_seq_matrix, phone_seq_matrix_decoder_output, 
    test_size=TEST_SIZE, random_state=42)
TEST_EXAMPLE_COUNT = char_input_test.shape[0]
##Helper Function for retrieving word from vector.
def one_hot_matrix_to_word(char_seq):
    word = ''
    for char_vec in char_seq[0]:
        if np.count_nonzero(char_vec) == 0:
            break
        hot_bit_idx = np.argmax(char_vec)
        char = id_to_char[hot_bit_idx]
        word += char
    return word
##Some words have multiple correct pronunciations.
##If a prediction matches any alternative, consider it correct.
def is_correct(word,test_pronunciation):
    correct_pronuns = phonetic_dict[word]
    for correct_pronun in correct_pronuns:
        if test_pronunciation == correct_pronun:
            return True
    return False
def syllable_count(phonetic_sp): 
    count = 0
    for phone in phonetic_sp.split(): 
        if phone[-1].isdigit():
            count += 1 
    return count
def is_syllable_count_correct(word, test_pronunciation):
    correct_pronuns = phonetic_dict[word]
    for correct_pronun in correct_pronuns:
        if syllable_count(test_pronunciation) == syllable_count(correct_pronun):
            return True
    return False    
def bleu_score(word,test_pronunciation):
    references = [pronun.split() for pronun in phonetic_dict[word]]
    smooth = SmoothingFunction().method1
    return sentence_bleu(references, test_pronunciation.split(), smoothing_function=smooth)
def evaluate(test_examples, encoder, decoder, word_decoder, predictor):
    correct_syllable_counts = 0
    perfect_predictions = 0
    bleu_scores = []
    for example_idx in range(TEST_EXAMPLE_COUNT):
        example_char_seq = test_examples[example_idx:example_idx+1]
        predicted_pronun = predictor(example_char_seq, encoder, decoder)
        example_word = word_decoder(example_char_seq)
        perfect_predictions += is_correct(example_word,predicted_pronun)
        correct_syllable_counts += is_syllable_count_correct(example_word,predicted_pronun)
        bleu = bleu_score(example_word,predicted_pronun)
        bleu_scores.append(bleu)
    syllable_acc = correct_syllable_counts / TEST_EXAMPLE_COUNT
    perfect_acc = perfect_predictions / TEST_EXAMPLE_COUNT
    avg_bleu_score = np.mean(bleu_scores)
    return syllable_acc, perfect_acc, avg_bleu_score
def print_results(model_name, syllable_acc, perfect_acc, avg_bleu_score):
    print(model_name)
    print('-'*20)
    print('Syllable Accuracy: %s%%' % round(syllable_acc*100, 1))
    print('Perfect Accuracy: %s%%' % round(perfect_acc*100, 1))
    print('Bleu Score: %s' % round(avg_bleu_score, 4))
def dataset_for_embeddings():
    char_seqs = []
    phone_seqs = []
    for word,pronuns in phonetic_dict.items():
        word_matrix = np.zeros((MAX_CHAR_SEQ_LEN))
        for t,char in enumerate(word):
            word_matrix[t] = char_to_id[char]
        for pronun in pronuns:
            pronun_matrix = np.zeros((MAX_PHONE_SEQ_LEN))
            phones = [START_PHONE_SYM] + pronun.split() + [END_PHONE_SYM]
            for t, phone in enumerate(phones):
                pronun_matrix[t] = phone_to_id[phone]
            char_seqs.append(word_matrix)
            phone_seqs.append(pronun_matrix)
    return np.array(char_seqs), np.array(phone_seqs)
char_emb_matrix, phone_emb_matrix = dataset_for_embeddings()
(emb_char_input_train, emb_char_input_test,emb_phone_input_train, emb_phone_input_test) = train_test_split(char_emb_matrix, phone_emb_matrix, test_size=TEST_SIZE, random_state=42)
def id_vec_to_word(emb_char_seq):
    word = ''
    for char_id in emb_char_seq[0]:
        char = id_to_char[char_id]
        word += char
    return word.strip()

In [1]:
##Training First Model
def train(model, device, train_loader, criterion, optimizer, scheduler, epoch):
    model.train()
    data_len = len(train_loader.dataset)
    for batch_idx, _data in enumerate(train_loader):
        spectrograms, labels, input_lengths, label_lengths = _data 
        spectrograms, labels = spectrograms.to(device), labels.to(device)
        optimizer.zero_grad()
        output = model(spectrograms)  # (batch, time, n_class)
        output = F.log_softmax(output, dim=2)
        output = output.transpose(0, 1) # (time, batch, n_class)
        loss = criterion(output, labels, input_lengths, label_lengths)
        loss.backward()
        optimizer.step()
        scheduler.step()
        if batch_idx % 100 == 0 or batch_idx == data_len:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(epoch, batch_idx * len(spectrograms), data_len,100. * batch_idx / len(train_loader), loss.item()))

##Training
train_url="train-other-500"
learning_rate = 5e-4
batch_size = 10
hparams = {
        "n_cnn_layers": 3,
        "n_rnn_layers": 5,
        "rnn_dim": 512,
        "n_class": 29,
        "n_feats": 128,
        "stride":2,
        "dropout": 0.1,
        "learning_rate": learning_rate,
        "batch_size": batch_size,
        "epochs": 10
    }
torch.manual_seed(7)
device = torch.device("cpu")
if not os.path.isdir("./data"):
  os.makedirs("./data")
  train_dataset = torchaudio.datasets.LIBRISPEECH("./data", url=train_url, download=True)
else:
  train_dataset = torchaudio.datasets.LIBRISPEECH("./data", url=train_url, download=False)
kwargs = {}
train_loader = data.DataLoader(dataset=train_dataset,
                                batch_size=hparams['batch_size'],
                                shuffle=True,
                                collate_fn=lambda x: data_processing(x, 'train'),
                                **kwargs)
model = SpeechRecognitionModel(
        hparams['n_cnn_layers'], hparams['n_rnn_layers'], hparams['rnn_dim'],
        hparams['n_class'], hparams['n_feats'], hparams['stride'], hparams['dropout']
        ).to(device)
if os.path.exists("Model/FinalModel.pt"):
    model.load_state_dict(torch.load("Model/FinalModel.pt", map_location=torch.device('cpu')))
else:
    print("No old Weights.")
print(model)
print('Num Model Parameters', sum([param.nelement() for param in model.parameters()]))
optimizer = optim.AdamW(model.parameters(), hparams['learning_rate'])
criterion = nn.CTCLoss(blank=28).to(device)
scheduler = optim.lr_scheduler.OneCycleLR(optimizer, max_lr=hparams['learning_rate'], 
                                            steps_per_epoch=int(len(train_loader)),
                                            epochs=hparams['epochs'],
                                            anneal_strategy='linear')
for epoch in range(1, epochs + 1):
    train(model, device, train_loader, criterion, optimizer, scheduler, epoch, iter_meter, experiment)
    if os.path.exists("Model/Model"+str(epoch)+".pt"):
        os.remove("Model/Model"+str(epoch)+".pt")
    print("Saving Model Weights.")
    torch.save(model.state_dict(), "Model/Model"+str(epoch)+".pt")
if os.path.exists("Model/FinalModel.pt"):
    os.remove("Model/FinalModel.pt")
print("Saving Model Weights.")
torch.save(model.state_dict(), "Model/FinalModel.pt")

##Training Second Model
def train_attention(model, weights_path, validation_size=0.2, epochs=100):    
    h0 = np.zeros((emb_char_input_train.shape[0], 256))
    c0 = np.zeros((emb_char_input_train.shape[0], 256))
    inputs = list(emb_phone_input_train.swapaxes(0,1))
    outputs = list(phone_output_train.swapaxes(0,1))
    callbacks = []
    if validation_size > 0:
        checkpointer = ModelCheckpoint(filepath=weights_path, verbose=1, save_best_only=True)
        stopper = EarlyStopping(monitor='val_loss',patience=3)
        callbacks = [checkpointer, stopper]
    model.compile(optimizer='adam', loss='categorical_crossentropy')
    model.fit([emb_char_input_train, h0, c0] + inputs, outputs,
              batch_size=256,
              epochs=epochs,
              validation_split=validation_size,
              callbacks=callbacks)
    if validation_size == 0:
        model.save_weights(weights_path)
FINAL_ATTENTION_MODEL_WEIGHTS = os.path.join('GRAPHEME_TO_PHONEME/input', 'predicting-english-pronunciations-model-weights', 'final_attention_model_weights.hdf5')
attn_training_model, attn_testing_encoder_model, attn_testing_decoder_model = attention_model()
train_attention(attn_training_model, FINAL_ATTENTION_MODEL_WEIGHTS, validation_size=0.0, epochs=29)

In [None]:
##Evaluating First Model with Greedy Decoder
def test(model, device, test_loader, criterion):
    print('\nevaluating...')
    model.eval()
    test_loss = 0
    test_cer, test_wer = [], []
    with torch.no_grad():
        for i, _data in enumerate(test_loader):
            spectrograms, labels, input_lengths, label_lengths = _data 
            spectrograms, labels = spectrograms.to(device), labels.to(device)
            output = model(spectrograms)  ##(batch, time, n_class)
            output = F.log_softmax(output, dim=2)
            output = output.transpose(0, 1) ##(time, batch, n_class)
            loss = criterion(output, labels, input_lengths, label_lengths)
            test_loss += loss.item() / len(test_loader)
            decoded_preds, decoded_targets = GreedyDecoder(output.transpose(0, 1), labels, label_lengths)
            for j in range(len(decoded_preds)):
                test_cer.append(cer(decoded_targets[j], decoded_preds[j]))
                test_wer.append(wer(decoded_targets[j], decoded_preds[j]))
    avg_cer = sum(test_cer)/len(test_cer)
    avg_wer = sum(test_wer)/len(test_wer)
    print('Test set: Average loss: {:.4f}, Average CER: {:4f} Average WER: {:.4f}\n'.format(test_loss, avg_cer, avg_wer))
##Testing
test_url="test-other"
learning_rate = 5e-4
batch_size = 10
hparams = {
        "n_cnn_layers": 3,
        "n_rnn_layers": 5,
        "rnn_dim": 512,
        "n_class": 29,
        "n_feats": 128,
        "stride":2,
        "dropout": 0.1,
        "learning_rate": learning_rate,
        "batch_size": batch_size,
        "epochs": 1
    }
torch.manual_seed(7)
device = torch.device("cpu")
if not os.path.isdir("./data"):
  os.makedirs("./data")
  test_dataset = torchaudio.datasets.LIBRISPEECH("./data", url=test_url, download=True)
else:
  test_dataset = torchaudio.datasets.LIBRISPEECH("./data", url=test_url, download=False)
kwargs = {}
test_loader = data.DataLoader(dataset=test_dataset,
                                batch_size=hparams['batch_size'],
                                shuffle=False,
                                collate_fn=lambda x: data_processing(x, 'valid'),
                                **kwargs)
model = SpeechRecognitionModel(
        hparams['n_cnn_layers'], hparams['n_rnn_layers'], hparams['rnn_dim'],
        hparams['n_class'], hparams['n_feats'], hparams['stride'], hparams['dropout']
        ).to(device)
model.load_state_dict(torch.load("Model/FinalModel.pt", map_location=torch.device('cpu')))
optimizer = optim.AdamW(model.parameters(), hparams['learning_rate'])
criterion = nn.CTCLoss(blank=28).to(device) 
test(model, device, test_loader, criterion)

##Evaluating Second Model
def predict_attention(input_char_seq, encoder, decoder):
    encoder_outputs = encoder.predict(input_char_seq) 
    output_phone_seq = np.array([[phone_to_id[START_PHONE_SYM]]])
    h = np.zeros((emb_char_input_train.shape[0], 256))
    c = np.zeros((emb_char_input_train.shape[0], 256))
    end_found = False 
    pronunciation = '' 
    while not end_found:
        decoder_output, h, c = decoder.predict([output_phone_seq, h, c, encoder_outputs])
        ##Greedy Prediction
        predicted_phone_idx = np.argmax(decoder_output[0,:])
        predicted_phone = id_to_phone[predicted_phone_idx]
        pronunciation += predicted_phone + ' '
        if predicted_phone == END_PHONE_SYM or len(pronunciation.split()) > MAX_PHONE_SEQ_LEN: 
            end_found = True
        ##Prepare for next time step
        output_phone_seq = np.array([[predicted_phone_idx]])
    return pronunciation.strip()
##BeamSearch Decoder
def predict_beamsearch(input_char_seq, encoder, decoder, k=3):
    a = encoder.predict(input_char_seq) 
    s = np.zeros((emb_char_input_train.shape[0], 256))
    c = np.zeros((emb_char_input_train.shape[0], 256))
    all_seqs = []
    all_seq_scores = []
    live_seqs = [[phone_to_id[START_PHONE_SYM]]]
    live_scores = [0]
    live_states = [[s,c]]
    while len(live_seqs) > 0: 
        new_live_seqs = [] 
        new_live_scores = [] 
        new_live_states = []
        for sidx,seq in enumerate(live_seqs):
            target_seq = np.array([[seq[-1]]])
            output_token_probs, s, c = decoder.predict([target_seq] + live_states[sidx] + [a])
            best_token_indicies = output_token_probs[0,:].argsort()[-k:]
            for token_index in best_token_indicies:
                new_seq = seq + [token_index]
                prob = output_token_probs[0,:][token_index]
                new_seq_score = live_scores[sidx] - np.log(prob)
                if id_to_phone[token_index] == END_PHONE_SYM or len(new_seq) > MAX_PHONE_SEQ_LEN:
                    all_seqs.append(new_seq) 
                    all_seq_scores.append(new_seq_score) 
                    continue
                new_live_seqs.append(new_seq)
                new_live_scores.append(new_seq_score)
                new_live_states.append([s, c])        
        while len(new_live_scores) > k:
            worst_seq_score_idx = np.array(new_live_scores).argsort()[-1] 
            del new_live_seqs[worst_seq_score_idx]
            del new_live_scores[worst_seq_score_idx]
            del new_live_states[worst_seq_score_idx]      
        live_seqs = new_live_seqs
        live_scores = new_live_scores
        live_states = new_live_states
    best_idx = np.argmin(all_seq_scores)
    score = all_seq_scores[best_idx]
    pronunciation = ''
    for i in all_seqs[best_idx]:
        pronunciation += id_to_phone[i] + ' '
    return pronunciation.strip()
FINAL_ATTENTION_MODEL_WEIGHTS = os.path.join('GRAPHEME_TO_PHONEME/input', 'predicting-english-pronunciations-model-weights', 'final_attention_model_weights.hdf5')
attn_training_model, attn_testing_encoder_model, attn_testing_decoder_model = attention_model()
attn_training_model.load_weights(FINAL_ATTENTION_MODEL_WEIGHTS)
syllable_acc, perfect_acc, avg_bleu_score = evaluate(emb_char_input_test, attn_testing_encoder_model, attn_testing_decoder_model, id_vec_to_word, predict_beamsearch)
print_results('Attention Model with BeamSearch Decoder', syllable_acc, perfect_acc, avg_bleu_score)
K.clear_session()

In [None]:
##Prediction for first model
def GreedyDecoderPredictor(output, blank_label=28, collapse_repeated=True):
    arg_maxes = torch.argmax(output, dim=2)
    decodes = []
    for i, args in enumerate(arg_maxes):
        decode = []
        for j, index in enumerate(args):
            if index != blank_label:
                if collapse_repeated and j != 0 and index == args[j -1]:
                    continue
                decode.append(index.item())
        decodes.append(text_transform.int_to_text(decode))
    return decodes
def predict(model, device, audiofile):
    print('\nPredicting...')
    model.eval()
    with torch.no_grad():
        waveform, sample_rate = torchaudio.load(audiofile)
        spectrogram=[]
        spec = valid_audio_transforms(waveform).squeeze(0).transpose(0, 1)
        spectrogram.append(spec)
        spectrogram=nn.utils.rnn.pad_sequence(spectrogram, batch_first=True).unsqueeze(1).transpose(2, 3)
        spectrogram=spectrogram.to(device)
        output = model(spectrogram)  ##(batch, time, n_class)
        output = F.log_softmax(output, dim=2)
        output = output.transpose(0, 1) ##(time, batch, n_class)
        output=output.transpose(0, 1)
        labels=""
        for x in range(len(text_transform.index_map)):
            labels=labels+text_transform.index_map[x]
        labels=labels+"!"
        ImprovedPrediction,Ignore = viterbi_search(output[0].numpy(),labels) ##Viterbi Decoder
        return str(ImprovedPrediction)
def FinalPredict1(Audiofile):
    sound = AudioSegment.from_mp3("TestAudio/Assessments/"+Audiofile+".mp3")
    sound = sound.set_channels(1)
    sound.export("TestAudio/Noisy/Input.wav", format="wav")
    x, sr = librosa.load("TestAudio/Noisy/Input.wav", sr=16000)
    sf.write("C:/Users/samer/Desktop/ASR/TestAudio/Noisy/Input.wav", x, samplerate =16000, subtype ='PCM_16')
    !python -m denoiser.enhance --master64 --noisy_dir C:\Users\samer\Desktop\ASR\TestAudio\Noisy --out_dir C:\Users\samer\Desktop\ASR\TestAudio\Clean
    learning_rate = 5e-4
    batch_size = 10
    hparams = {
        "n_cnn_layers": 3,
        "n_rnn_layers": 5,
        "rnn_dim": 512,
        "n_class": 29,
        "n_feats": 128,
        "stride":2,
        "dropout": 0.1,
        "learning_rate": learning_rate,
        "batch_size": batch_size,
        "epochs": 10
    }
    torch.manual_seed(7)
    device = torch.device("cpu")
    model = SpeechRecognitionModel(
        hparams['n_cnn_layers'], hparams['n_rnn_layers'], hparams['rnn_dim'],
        hparams['n_class'], hparams['n_feats'], hparams['stride'], hparams['dropout']
        )
    model.load_state_dict(torch.load("Model/FinalModel.pt", map_location=torch.device('cpu')))
    Answer=predict(model,device,"TestAudio/Clean/Input_enhanced.wav")
    if(Answer[len(Answer)-1]=='s' and (Answer[len(Answer)-2]=='t' or Answer[len(Answer)-2]=='d')):
        Answer=Answer[:-1]
    Answer=Answer.replace(" ", "")
    Answer=Answer.replace("!", "")
    return Answer

##Prediction for Second Model
def predict_beamsearch(input_char_seq, encoder, decoder, k=3):
    a = encoder.predict(input_char_seq) 
    s = np.zeros((emb_char_input_train.shape[0], 256))
    c = np.zeros((emb_char_input_train.shape[0], 256))
    all_seqs = []
    all_seq_scores = []
    live_seqs = [[phone_to_id[START_PHONE_SYM]]]
    live_scores = [0]
    live_states = [[s,c]]
    while len(live_seqs) > 0: 
        new_live_seqs = [] 
        new_live_scores = [] 
        new_live_states = []
        for sidx,seq in enumerate(live_seqs):
            target_seq = np.array([[seq[-1]]])
            output_token_probs, s, c = decoder.predict([target_seq] + live_states[sidx] + [a])
            best_token_indicies = output_token_probs[0,:].argsort()[-k:]
            for token_index in best_token_indicies:
                new_seq = seq + [token_index]
                prob = output_token_probs[0,:][token_index]
                new_seq_score = live_scores[sidx] - np.log(prob)
                if id_to_phone[token_index] == END_PHONE_SYM or len(new_seq) > MAX_PHONE_SEQ_LEN:
                    all_seqs.append(new_seq) 
                    all_seq_scores.append(new_seq_score) 
                    continue
                new_live_seqs.append(new_seq)
                new_live_scores.append(new_seq_score)
                new_live_states.append([s, c])        
        while len(new_live_scores) > k:
            worst_seq_score_idx = np.array(new_live_scores).argsort()[-1] 
            del new_live_seqs[worst_seq_score_idx]
            del new_live_scores[worst_seq_score_idx]
            del new_live_states[worst_seq_score_idx]      
        live_seqs = new_live_seqs
        live_scores = new_live_scores
        live_states = new_live_states
    best_idx = np.argmin(all_seq_scores)
    score = all_seq_scores[best_idx]
    pronunciation = ''
    for i in all_seqs[best_idx]:
        pronunciation += id_to_phone[i] + ' '
    return pronunciation.strip()
def FinalPredict2(TEXT):
    FINAL_ATTENTION_MODEL_WEIGHTS = os.path.join('GRAPHEME_TO_PHONEME/input', 'predicting-english-pronunciations-model-weights', 'final_attention_model_weights.hdf5')
    attn_training_model, attn_testing_encoder_model, attn_testing_decoder_model = attention_model()
    attn_training_model.load_weights(FINAL_ATTENTION_MODEL_WEIGHTS)
    char_seqs = []
    word_matrix = np.zeros((MAX_CHAR_SEQ_LEN))
    for t,char in enumerate(TEXT):
        word_matrix[t] = char_to_id[char.upper()]
    char_seqs.append(word_matrix)
    word_matrix=np.array(char_seqs)
    return predict_beamsearch(word_matrix,attn_testing_encoder_model, attn_testing_decoder_model)

##Final Function Using both Models to Retrieve Spoken Phonemes and Number of Syllables in an Audio File
def Final(AudioFile):
    Grapheme=FinalPredict1(AudioFile)
    Phonemes=FinalPredict2(Grapheme)
    SyllableCount=syllable_count(Phonemes)
    Result=[]
    Result.append(Grapheme)
    Result.append(Phonemes)
    Result.append(SyllableCount)
    return Result

Final("scissors")