In this notebook, we train the RNN model that predicts the pronounciation (as phonemes) given some input word.

In [0]:
import json
import matplotlib.ticker as ticker
import matplotlib.pyplot as plt
import numpy as np
import random

import torch
import torch.nn as nn
from torch import optim
from torch.nn.functional import relu

Use a CUDA device if available.

In [0]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Preparing the Data

We mount our Google Drive to access the data files from within this notebook.

In [35]:
from google.colab import drive

mounted_folder_fp = '/content/drive'
drive.mount(mounted_folder_fp, force_remount=True)

Mounted at /content/drive


We then load all the JSON files we need from the mounted drive.

In [0]:
with open('%s/My Drive/Colab Notebooks/hola/data/cmudict-char-ids.json' % mounted_folder_fp, 'r', encoding='ascii') as f:
    char_ids_dict = json.load(f)

with open('%s/My Drive/Colab Notebooks/hola/data/cmudict-phoneme-ids.json' % mounted_folder_fp, 'r', encoding='ascii') as f:
    phoneme_ids_dict = json.load(f)

with open('%s/My Drive/Colab Notebooks/hola/data/cmudict-processed.json' % mounted_folder_fp, 'r', encoding='ascii') as f:
    cmudict_pairs = json.load(f)

We find the length of the longest word/list of phonemes to define the length of our output PyTorch tensors later.

In [0]:
max_length = max([max(len(pair[0]), len(pair[1])) for pair in cmudict_pairs])

We split our pairs into training and testing sets. In order to use with PyTorch, we also convert our data to tensors. 

In [0]:
train_proportion = 0.9

cmudict_pairs = cmudict_pairs[:100]  # TODO: fix this problem, seem to always run OOM

random.shuffle(cmudict_pairs)
cmudict_train = cmudict_pairs[:int(train_proportion * len(cmudict_pairs))]
cmudict_test = cmudict_pairs[int(train_proportion * len(cmudict_pairs)):]

# TODO: maybe need dtype=torch.long within torch.tensor()
cmudict_train = [(torch.tensor(pair[0], device=device), torch.tensor(pair[1], device=device)) for pair in cmudict_train]
cmudict_test = [(torch.tensor(pair[0], device=device), torch.tensor(pair[1], device=device)) for pair in cmudict_test]

# Designing the Model

For model architecture, we use a vanilla encoder-decoder RNN model with GRUs.

In [0]:
class Encoder(nn.Module):
    def __init__(self, num_inputs, hidden_size):
        super(Encoder, self).__init__()

        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(num_inputs, hidden_size)
        self.gru = nn.GRU(hidden_size, hidden_size)

    def forward(self, input, hidden):
        output = torch.reshape(self.embedding(input), (1, 1, -1))
        output, hidden = self.gru(output, hidden)
        return output, hidden

    def init_hidden(self):
        return torch.zeros(1, 1, self.hidden_size, device=device)

In [0]:
class Decoder(nn.Module):
    def __init__(self, num_outputs, hidden_size):
        super(Decoder, self).__init__()

        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(num_outputs, hidden_size)
        self.gru = nn.GRU(hidden_size, hidden_size)
        self.out = nn.Linear(hidden_size, num_outputs)
        self.softmax = nn.LogSoftmax(dim=1)

    def forward(self, input, hidden):
        output = torch.reshape(self.embedding(input), (1, 1, -1))
        output = relu(output)
        output, hidden = self.gru(output, hidden)
        output = self.softmax(self.out(output[0]))
        return output, hidden

    def init_hidden(self):
        return torch.zeros(1, 1, self.hidden_size, device=device)

# Training the Model

The `SOS` and `EOS` tokens will be important later during training, so we give them their special variables now.

In [0]:
SOS_token = char_ids_dict['SOS']
EOS_token = char_ids_dict['EOS']

For training, we invoke teacher forcing at random in order to speed up training.

In [0]:
teacher_forcing_ratio = 0.5

def train(input_tensor, target_tensor, encoder, decoder, encoder_optim, decoder_optim, loss_fn):
    loss = 0

    input_length = input_tensor.size(0)
    target_length = target_tensor.size(0)

    encoder_optim.zero_grad()
    decoder_optim.zero_grad()

    encoder_hidden = encoder.init_hidden()
    encoder_outputs = torch.zeros(max_length, encoder.hidden_size, device=device)

    for i in range(input_length):
        encoder_input = input_tensor[i]
        encoder_output, encoder_hidden = encoder(encoder_input, encoder_hidden)
        encoder_outputs[i] = encoder_output[0, 0]

    decoder_input = torch.tensor([[SOS_token]], device=device)
    decoder_hidden = encoder_hidden

    if random.random() < teacher_forcing_ratio:  # if True, invoke teacher forcing
        for i in range(target_length):
            decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
            decoder_input = target_tensor[i]
            loss += loss_fn(decoder_output, target_tensor[i])
    else:
        for i in range(target_length):
            decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
            _, top_index = decoder_output.topk(1)
            decoder_input = top_index.squeeze().detach()
            loss += loss_fn(decoder_output, target_tensor[i])

            if decoder_input.item() == EOS_token:
                break

    loss.backward()

    encoder_optim.step()
    decoder_optim.step()

    return loss.item() / target_length

We want to repeat this training process over our entire training set, simultaneously plotting loss over time.

In [0]:
plt.switch_backend('agg')

def plot_losses(losses_over_time):
    plt.figure()
    fig, ax = plt.subplots()
    loc = ticker.MultipleLocator(base=0.2)  # puts ticks at regular intervals
    ax.yaxis.set_major_locator(loc)
    plt.plot(losses_over_time)

In [0]:
def train_epoch(encoder, decoder, plot_every=1000, learning_rate=0.01):
    losses_over_time = []
    curr_loss = 0

    encoder_optimizer = optim.SGD(encoder.parameters(), lr=learning_rate)
    decoder_optimizer = optim.SGD(decoder.parameters(), lr=learning_rate)
    loss = nn.NLLLoss()

    for i in range(len(cmudict_train)):
        input_tensor, target_tensor = cmudict_train[i]

        loss = train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, loss)
        curr_loss += loss

        if i % plot_every == 0:
            avg_curr_loss = curr_loss / plot_every
            losses_over_time.append(avg_curr_loss)
            curr_loss = 0
            print('ITER %d (%d%%) LOSS: %.4f' % (i, i / len(cmudict_train) * 100, avg_curr_loss))

    plot_losses(losses_over_time)

# Evaluating the Model

It will be important to have a `phoneme_id` to `phoneme` dict, so we set that up now.

In [0]:
phoneme_ids_inv_dict = {phoneme_id: phoneme for phoneme, phoneme_id in phoneme_ids_dict.items()}

We use accuracy as the metric to evaluate our model.

In [0]:
def predict(input_tensor, encoder, decoder):
    decoded_phonemes = []
    with torch.no_grad():
        input_length = input_tensor.size(0)

        encoder_hidden = encoder.init_hidden()
        encoder_outputs = torch.zeros(max_length, encoder.hidden_size, device=device)

        for i in range(input_length):
            encoder_output, encoder_hidden = encoder(input_tensor[i], encoder_hidden)
            encoder_outputs[i] = encoder_output[0, 0]  # or +=?

        decoder_input = torch.tensor([[SOS_token]], device=device)
        decoder_hidden = encoder_hidden

        for i in range(max_length):
            decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
            _, top_index = decoder_output.topk(1)  # or decoder_output.data.topk(1)?
            decoder_input = top_index.squeeze().detach()
            if decoder_input.item() == EOS_token:
                decoded_phonemes.append(phoneme_ids_inv_dict[EOS_token])
                break
            else:
                decoded_phonemes.append(phoneme_ids_inv_dict[decoder_input.item()])

    return decoded_phonemes

In [0]:
def evaluate(encoder, decoder):
    predictions = []
    with torch.no_grad():
        for i in range(len(cmudict_test)):
            input_tensor, target_tensor = cmudict_test[i]

            predicted_phonemes = predict(input_tensor, encoder, decoder)
            predictions.append(np.array(predicted_phonemes), target_tensor.numpy())

    # calculate accuracy
    accuracy = sum([np.array_equal(predicted, actual) for predicted, actual in predictions.items()]) / len(predictions)
    print('ACCURACY: %.3f' % accuracy)

# Putting it all Together!

In [52]:
hidden_size = 64

encoder = Encoder(len(cmudict_train), hidden_size).to(device)
decoder = Decoder(hidden_size, len(cmudict_train)).to(device)

train_epoch(encoder, decoder)

RuntimeError: ignored