Paper: https://arxiv.org/abs/1506.00196

### setup

In [1]:
import argparse
import os

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import torch.optim as optim
from torch.nn.utils import clip_grad_norm
import torchtext.data as data

In [3]:
from beam_search import Beam

In [4]:
parser = {
    'data_path': '../data/cmudict/',
    'epochs': 1,  # default: 50
    'batch_size': 100,
    'd_embed': 500,
    'd_hidden': 500,
    'n_layers': 1,  # default: 2, to be implemented
    'log_every': 100,
    'lr': 0.007,
    'lr_decay_by': 0.5,
    'lr_decay_every': 5,  # iterations
    'optim': 'adagrad',  # sgd
    'clip': 2.3,  # torch.nn.utils.clip_grad_norm(parameters, clip, 'inf')
    'val_every': 100,
    'cuda': True,
    'seed': 5,
    'intermediate_path': '../intermediate/g2p/',
}
args = argparse.Namespace(**parser)

args.cuda = args.cuda and torch.cuda.is_available()
args.gpu = None if args.cuda else -1  # None is current gpu

if not os.path.isdir(args.intermediate_path):
    os.makedirs(args.intermediate_path)
if not os.path.isdir(args.data_path):
    URL = "https://github.com/cmusphinx/cmudict/archive/master.zip"
    !wget $URL -O ../data/cmudict.zip
    !unzip ../data/cmudict.zip -d ../data/
    !mv ../data/cmudict-master $args.data_path

torch.manual_seed(args.seed)
if args.cuda:
    torch.cuda.manual_seed(args.seed)

### model

In [5]:
class Encoder(nn.Module):

    def __init__(self, vocab_size, d_embed, d_hidden):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(vocab_size, d_embed)
        self.lstm = nn.LSTMCell(d_embed, d_hidden)
        self.d_hidden = d_hidden

    def forward(self, x_seq):  # dim(e_seq): len_seq x batch_size x d_embed
        e_seq = self.embedding(x_seq)
        h = Variable(torch.zeros(e_seq.size(1), self.d_hidden))
        c = Variable(torch.zeros(e_seq.size(1), self.d_hidden))
        for e in e_seq.chunk(e_seq.size(0), 1):
            e = e.squeeze(0)
            h, c = self.lstm(e, (h, c))
        return h, c

In [6]:
class Decoder(nn.Module):
    
    def __init__(self, vocab_size, d_embed, d_hidden):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(vocab_size, d_embed)
        self.lstm = nn.LSTMCell(d_embed, d_hidden)
        self.linear = nn.Linear(d_hidden, vocab_size)

    def forward(self, x_seq, h, c):
        o = []
        e_seq = self.embedding(x_seq)
        for e in e_seq.chunk(e_seq.size(0), 0):
            e = e.squeeze(0)
            h, c = self.lstm(e, (h, c))
            o.append(h)
        o = torch.stack(o, 0)
        o = self.linear(o.view(-1, h.size(1)))
        return F.log_softmax(o).view(x_seq.size(0), -1, o.size(1)), h, c

In [7]:
class G2P(nn.Module):
    
    def __init__(self, g_size, p_size, d_embed, d_hidden):
        super(G2P, self).__init__()
        self.encoder = Encoder(g_size, d_embed, d_hidden)
        self.decoder = Decoder(p_size, d_embed, d_hidden)
        
    def forward(self, g_seq, p_seq=None, max_len=0):
        h, c = self.encoder(g_seq)
        if p_seq is not None:  # not generate
            return self.decoder(p_seq[:-1], h, c)
        else:  # start with <os> -> 1
            assert g_seq.size(1) == 1  # make sure batch_size = 1
            return self.generate(h, c, max_len=max_len)
        
    def generate(self, h, c, max_len):
        beam = Beam(args.beam_size, cuda=args.cuda)
        h = h.expand(beam.size, h.size(1))
        c = c.expand(beam.size, c.size(1))
        for i in range(max_len):  # max_len = 20
            x = beam.get_current_state()  # beam to batch
            o, h, c = self.decoder(Variable(x.unsqueeze(0)), h, c)
            if beam.advance(o.data.squeeze(0)):
                break
            h.data.copy_(h.data.index_select(0, beam.get_current_origin()))
            c.data.copy_(c.data.index_select(0, beam.get_current_origin()))
        return beam.get_hyp(0)

### utils

In [8]:
class CMUDict(data.Dataset):

    def __init__(self, data_lines, g_field, p_field):
        fields = [('grapheme', g_field), ('phoneme', p_field)]
        examples = []
        for line in data_lines:
            grapheme, phoneme = line.split(maxsplit=1)
            examples.append(data.Example.fromlist([grapheme, phoneme],
                                                  fields))
        super(CMUDict, self).__init__(examples, fields)
    
    @classmethod
    def splits(cls, path, g_field, p_field, seed=None):
        import random
        
        if seed is not None:
            random.seed(seed)
        with open(path) as f:
            lines = f.readlines()
        random.shuffle(lines)
        train_lines, val_lines, test_lines = [], [], []
        for i, line in enumerate(lines):
            if i % 20 == 0:
                val_lines.append(line)
            elif i % 20 < 3:
                test_lines.append(line)
            else:
                train_lines.append(line)
        train_data = cls(train_lines, g_field, p_field)
        val_data = cls(val_lines, g_field, p_field)
        test_data = cls(test_lines, g_field, p_field)
        return (train_data, val_data, test_data)

### train

In [9]:
def train(args, epoch):
    pass

### prepare

In [10]:
g_field = data.Field(init_token='<s>',
                     tokenize=(lambda x: list(x.split('(')[0])[::-1]))
p_field = data.Field(init_token='<os>', eos_token='</os>',
                     tokenize=(lambda x: x.split('#')[0].split()))

In [11]:
filepath = os.path.join(args.data_path, 'cmudict.dict')
train_data, val_data, test_data = CMUDict.splits(filepath, g_field, p_field)

In [12]:
g_field.build_vocab(train_data, val_data, test_data)
p_field.build_vocab(train_data, val_data, test_data)

In [13]:
train_iter, val_iter, test_iter = data.BucketIterator.splits(
    (train_data, val_data, test_data), batch_size=args.batch_size,
    device=args.gpu)

In [14]:
g_size = len(g_field.vocab)
p_size = len(p_field.vocab)
model = G2P(g_size, p_size, args.d_embed, args.d_hidden)
criterion = nn.NLLLoss()
optimizer = optim.Adagrad(model.parameters(), lr=args.lr)
if args.cuda:
    model.cuda()
    criterion.cuda()

### run

In [15]:
for epoch in range(args.epochs+1):
    train(args, epoch)

### test