In [86]:
import os

In [87]:
data_path = '/Users/advait.d/Documents/G2P/data/cmudict'

In [88]:
cmudict_dict = 'cmudict.dict'
cmudict_symbols = 'cmudict.symbols'
cmudict_phones = 'cmudict.phones'

In [89]:
dict_path = os.path.join(data_path, cmudict_dict)
symbols_path = os.path.join(data_path, cmudict_symbols)
# phones_path = os.path.join(data_path, cmudict_phones)

idx2chr = "a b c d e f g h i j k l m n o p q r s t u v w x y z ' <pad>".split(' ')
chr2idx = {ch: idx for idx, ch in enumerate(idx2chr)}

def parse_phones_file():
    with open(symbols_path, 'r') as f:
        lines = f.read().strip().split('\n')

    ph2idx = {phone: idx for idx, phone in enumerate(lines)}
    sos_token = "<sos>"
    eos_token = "<eos>"
    pad_token = "<pad>"
    ph2idx[sos_token] = len(ph2idx)
    ph2idx[eos_token] = len(ph2idx)
    ph2idx[pad_token] = len(ph2idx)
    idx2ph = {idx: ph for ph, idx in ph2idx.items()}
    return ph2idx, idx2ph, sos_token, eos_token, pad_token

ph2idx, idx2ph, sos_token, eos_token, pad_token = parse_phones_file()

def tokenize(words, phoness):
    words_tokens = []
    phones_tokens = []
    for word, phones in zip(words, phoness):
        words_tokens.append([chr2idx[c] for c in word if c in chr2idx])
        phones_tokens.append([ph2idx[ph] for ph in phones])

    return words_tokens, phones_tokens

def parse_cmudict():
    with open(dict_path, 'r') as f:
        data = f.read().strip().split('\n')

    words, phones = [], []
    for point in data:
        point = point.split('#')[0].strip()
        word, ph = point.split(' ')[0], point.split(' ')[1:]
        ph = [sos_token] + ph + [eos_token]
        word = [c for c in word]
        
        words.append(word)
        phones.append(ph)

    return words, phones

words, phones = parse_cmudict()
word_tokens, phone_tokens = tokenize(words, phones)


In [90]:
output_size = len(ph2idx)
input_size = len(chr2idx)
hidden_size = 256

In [91]:
output_size

87

In [92]:
word_tokens[:5]

[[26, 1, 14, 20, 19],
 [26, 2, 0, 20, 18, 4],
 [26, 2, 14, 20, 17, 18, 4],
 [26, 2, 20, 18, 4],
 [26, 4, 12]]

In [93]:
phone_tokens[:5]

[[84, 24, 18, 69, 85],
 [84, 52, 9, 82, 85],
 [84, 52, 14, 66, 67, 85],
 [84, 52, 81, 77, 82, 85],
 [84, 9, 54, 85]]

In [94]:
len(word_tokens), len(phone_tokens)

(135167, 135167)

In [95]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import random
import heapq
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from tqdm import tqdm
import math

# BiLSTM

In [96]:
class Encoder(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers=1):
        super(Encoder, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.lstm = nn.LSTM(hidden_size, hidden_size, num_layers, bidirectional=True, batch_first=True)

    def forward(self, input_seq):
        embedded = self.embedding(input_seq)
        outputs, (hidden, cell) = self.lstm(embedded)
        return outputs, hidden


class Decoder(nn.Module):
    def __init__(self, output_size, hidden_size, num_layers=1):
        super(Decoder, self).__init__()
        self.output_size = output_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.embedding = nn.Embedding(output_size, hidden_size)
        self.lstm = nn.LSTM(hidden_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, input_step, hidden, cell):
        embedded = self.embedding(input_step)
        output, (hidden, cell) = self.lstm(embedded, (hidden, cell))
        prediction = self.fc(output)
        return prediction, hidden, cell
    

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, src, trg, teacher_forcing_ratio=0.5, device='cpu'):
        batch_size = src.shape[0]  # Adjusted for batch_first=True
        max_len = trg.shape[1]     # Adjusted for batch_first=True
        trg_vocab_size = self.decoder.output_size

        outputs = torch.zeros(batch_size, max_len, trg_vocab_size).to(device)

        encoder_outputs, hidden = self.encoder(src)

        hidden_forward = hidden[0::2,:,:]
        hidden_backward = hidden[1::2,:,:]
        hidden = hidden_forward + hidden_backward

        cell = torch.zeros(self.decoder.num_layers, batch_size, self.decoder.hidden_size).to(device)

        input_step = trg[:, 0:1]  # Adjusted for batch_first=True
        for t in range(1, max_len):
            output, hidden, cell = self.decoder(input_step, hidden, cell)
            outputs[:, t:t+1, :] = output  # Adjusted for batch_first=True
            top1 = output.argmax(-1)
            input_step = trg[:, t:t+1] if random.random() < teacher_forcing_ratio else top1

        return outputs
    

def beam_search(model, src, max_len, beam_width=3, device='cpu'):
    model.eval()
    with torch.no_grad():
        encoder_outputs, hidden = model.encoder(src)

        hidden_forward = hidden[0::2,:,:]
        hidden_backward = hidden[1::2,:,:]
        hidden = hidden_forward + hidden_backward

        cell = torch.zeros(model.decoder.num_layers, 1, model.decoder.hidden_size).to(device)


        input_step = torch.tensor([ph2idx[sos_token]], device=device)

        beam = [(0, torch.tensor([input_step]), hidden, cell)]  # (score, sequence, hidden, cell)

        for _ in range(max_len):
            new_beam = []
            for score, seq, hidden, cell in beam:
                output, hidden, cell = model.decoder(seq[-1:].unsqueeze(0), hidden, cell)
                log_probs = F.log_softmax(output, dim=-1)
                topk_log_probs, topk_indices = log_probs.topk(beam_width)

                for i in range(beam_width):
                    new_score = score + topk_log_probs[0][0][i].item()
                    new_seq = torch.concat([seq, torch.tensor([topk_indices[0][0][i]])])
                    new_beam.append((new_score, new_seq, hidden, cell))

            beam = heapq.nlargest(beam_width, new_beam, key=lambda x: x[0])

            if any(seq[-1].item() == ph2idx[eos_token] for _, seq, _, _ in beam):
                break

        best_score, best_seq, _, _ = max(beam, key=lambda x: x[0])
        return [token.item() for token in best_seq if token.item() != ph2idx[sos_token]]


In [97]:
def collate_fn(batch):
    word_batch, phone_batch = zip(*batch)
    word_batch = pad_sequence(word_batch, batch_first=True, padding_value=chr2idx['<pad>'])
    phone_batch = pad_sequence(phone_batch, batch_first=True, padding_value=ph2idx['<pad>'])
    return word_batch, phone_batch

class CMUDictDataset(Dataset):
    def __init__(self, word_tokens, phone_tokens):
        self.word_tokens = word_tokens
        self.phone_tokens = phone_tokens

    def __len__(self):
        return len(self.word_tokens)
    
    def __getitem__(self, idx):
        return torch.tensor(self.word_tokens[idx]), torch.tensor(self.phone_tokens[idx])

dataset = CMUDictDataset(word_tokens, phone_tokens)
batch_size = 32
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)

In [98]:
encoder = Encoder(input_size, hidden_size, num_layers=3)
decoder = Decoder(output_size, hidden_size, num_layers=3)
model = Seq2Seq(encoder, decoder)

In [99]:
criterion = nn.CrossEntropyLoss(ignore_index=ph2idx[pad_token])
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
num_epochs = 1
device = 'cpu'

pbar =  tqdm(total=num_epochs * len(dataloader), desc="Epochs", leave=False, position=0, ascii=True)

for epoch in range(num_epochs):
    for word_batch, phone_batch in dataloader:
        word_batch = word_batch.to(device)
        phone_batch = phone_batch.to(device)
        outputs = model(word_batch, phone_batch)

        outputs = outputs.view(-1, outputs.shape[-1])
        phone_batch = phone_batch.view(-1)

        loss = criterion(outputs, phone_batch)

        pbar.set_description(f"Loss: {loss.item():.4f}")
        pbar.update(1)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()


Loss: 1.1764: 100%|#########################| 4224/4224 [04:44<00:00, 14.21it/s]

In [101]:
word = "president"

word_tokens = torch.tensor([chr2idx[c] for c in word]).reshape(1, -1)
phone_tokens = beam_search(model, word_tokens, 10)
phones = [idx2ph[tok] for tok in phone_tokens][:-1]
print(phones)

['P', 'R', 'IY0', 'S', 'AY1', 'D', 'AH0', 'N', 'T']


# Transformer

In [79]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000, dropout=0.1):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1)]
        return self.dropout(x)

class TransformerEncoder(nn.Module):
    def __init__(self, input_size, hidden_size, nhead=4, num_layers=2, dropout=0.1):
        super(TransformerEncoder, self).__init__()
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.positional_encoding = PositionalEncoding(hidden_size, dropout=dropout)
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=hidden_size,
            nhead=nhead,
            dim_feedforward=hidden_size * 4,
            dropout=dropout,
            batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers)
        
    def forward(self, src, src_mask=None):
        embedded = self.embedding(src)
        embedded = self.positional_encoding(embedded)
        output =  self.transformer(embedded, src_mask)
        return output
    

class TransformerDecoder(nn.Module):
    def __init__(self, output_size, hidden_size, nhead=4, num_layers=2, dropout=0.1):
        super(TransformerDecoder, self).__init__()
        self.embedding = nn.Embedding(output_size, hidden_size)
        self.positional_encoding = PositionalEncoding(hidden_size, dropout=dropout)
        decoder_layer = nn.TransformerDecoderLayer(
            d_model=hidden_size,
            nhead=nhead,
            dim_feedforward=hidden_size * 4,
            dropout=dropout,
            batch_first=True
        )
        self.transformer = nn.TransformerDecoder(decoder_layer, num_layers)
        self.fc = nn.Linear(hidden_size, output_size)
        
    def forward(self, tgt, memory, tgt_mask=None, memory_mask=None):
        embedded = self.embedding(tgt)
        embedded = self.positional_encoding(embedded)
        output = self.transformer(embedded, memory, tgt_mask, memory_mask)
        output = self.fc(output)
        return output
    
class TransformerSeq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super(TransformerSeq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        
    def generate_square_subsequent_mask(self, sz, device):
        mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
        mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
        return mask.to(device)

    def forward(self, src, tgt, teacher_forcing_ratio=1.0, device='cpu'):
        tgt_mask = self.generate_square_subsequent_mask(tgt.shape[1], device)
        
        encoder_output = self.encoder(src)
        output = self.decoder(tgt, encoder_output, tgt_mask)

        return output


In [80]:
encoder = TransformerEncoder(input_size, hidden_size=128)
decoder = TransformerDecoder(output_size, hidden_size=128)
model = TransformerSeq2Seq(encoder, decoder)

In [81]:
criterion = nn.CrossEntropyLoss(ignore_index=ph2idx[pad_token])
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
num_epochs = 1
device = 'cpu'

pbar =  tqdm(total=num_epochs * len(dataloader), desc="Epochs", leave=False, position=0, ascii=True)

for epoch in range(num_epochs):
    for word_batch, phone_batch in dataloader:
        word_batch = word_batch.to(device)
        phone_batch = phone_batch.to(device)
        outputs = model(word_batch, phone_batch)

        outputs = outputs[:, :-1, :].contiguous()
        phone_batch = phone_batch[:, 1:].contiguous()

        outputs = outputs.view(-1, outputs.shape[-1])
        phone_batch = phone_batch.view(-1)

        loss = criterion(outputs, phone_batch)

        pbar.set_description(f"Loss: {loss.item():.4f}")
        pbar.update(1)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

Loss: 0.5525: 100%|########################9| 4223/4224 [02:09<00:00, 32.22it/s]

In [82]:
def beam_search_decode(model, src, beam_width=5, max_len=50, device='cpu'):
    model.eval()
    with torch.no_grad():
        src = src.to(device)
        encoder_output = model.encoder(src)
        
        beams = [(0, torch.tensor([[ph2idx[sos_token]]], device=device))]
        
        for _ in range(max_len - 1):
            new_beams = []
            
            for score, sequence in beams:
                if sequence[0, -1].item() == ph2idx[eos_token]:
                    new_beams.append((score, sequence))
                    continue
                
                tgt_mask = model.generate_square_subsequent_mask(sequence.size(1), device)
                
                output = model.decoder(sequence, encoder_output, tgt_mask)
                
                log_probs = F.log_softmax(output[:, -1:], dim=-1)

                values, indices = log_probs.squeeze().topk(beam_width)
                
                for value, idx in zip(values, indices):
                    new_score = score + value.item()
                    new_sequence = torch.cat([sequence, 
                                           idx.unsqueeze(0).unsqueeze(0)], dim=1)
                    new_beams.append((new_score, new_sequence))
            
            beams = sorted(new_beams, key=lambda x: x[0], reverse=True)[:beam_width]
            
            if all(beam[1][0, -1].item() == ph2idx[eos_token] for beam in beams):
                break
        
        return beams[0][1].squeeze(0).cpu().numpy()

def predict_pronunciation(model, word, beam_width=5, device='cpu'):
    word_tokens = torch.tensor([[chr2idx[c] for c in word.lower() if c in chr2idx]], 
                             device=device)
    
    phone_tokens = beam_search_decode(model, word_tokens, beam_width=beam_width, device=device)
    phones = [idx2ph[idx] for idx in phone_tokens]
    
    phones = [p for p in phones if p not in [sos_token, eos_token, pad_token]]
    
    return phones

In [84]:
predict_pronunciation(model, "president", beam_width=5)

['P', 'R', 'IY0', 'Z', 'AY1', 'D', 'AH0', 'N', 'T']