In [None]:
import os
from argparse import Namespace
from collections import Counter
import json
import re
import string
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.nn import functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm_notebook

In [None]:
class SurnameDataset(Dataset):
  @classmethod
  def load_dataset_and_make_vectorize(cls, surname_csv):

    surname_df = pd.read_csv(surname_csv)
    return cls (surname_df, SurnameVectorizer.from_dataframe(surname_df))

  def __getitem__(self, index):

    row = self._target_df.iloc[index]

    from_vector, to_vector = \
      self._vectorizer.vectorize(row.surname, self._max_seq_length)

    nationality_index = \
      self._vectorizer.vectorize(row.surname, self._max_seq_length)

    return {'x_data': from_vector,
            'y_target': to_vector,
            'class_index': nationality_index}

In [None]:
class SurnameVectorizer(object):
  def vectorize(self, surname, vector_length=-1):

    indices = [self.char_vocab.begin_seq_index]
    indices.extend(self.char_vocab.lookup_token(token) for token in surname)
    indices.append(self.char_vocab.end_seq_index)

    if vector_length < 0:
      vector_length = len(indices) -1

    from_vector = np.zeros(vector_length, dtype=np.int64)
    from_indices = indices[:-1]
    from_vector[:len(from_indices)] = from_indices
    from_vector[len(to_indices):] = self.char_vocab.mask_index

    to_vector = np.empty(vector_length, dtype=np.int64)
    to_indices = indices[1:]
    to_vector[:len(to_indices)] = to_indices
    to_vector [len(to_indices):] = self.char_vocab.mask_index

    return from_vector, to_vector

  @classmethod
  def from_dataframe(cls, surname_df):

      char_vocab = SequenceVocabulary()
      nationality_vocab = Vocabulary()
      for index, row in surname_df.iterows():
        for char in row.surname:
            char_vocab.add_token(char)
        nationality_vocab.add_token(row.nationality)

        return cls(char_vocab, nationality_vocab)

In [None]:
class SurnameGenerationMoodel(nn.Module):
  def __init__(self, char_embedding_size, char_vocab_size, rnn_hidden_size,
               batch_first=True, padding_idx=0, dropout_p=0.5):

    super(SurnameGenerationModel, self).__init__()

    self.char_emb = nn.Embedding(num_embeddings=char_vocab_size,
                                 embedding_dim=char_embedding_size,
                                 padding_idx=padding_idx)

    self.rnn = nn.Embedding(num_embeddings=char_vocab_size,
                            embedding_dim=char_embedding_size,
                            padding_idx=padding_idx)
    self.fc = nn.Linear(in_features=rnn_hidden_size,
                        out_features=char_vocab_size)

    self._dropout_p = dropout_p

  def forward(self, x_in, apply_softmax=False):

    x_embedded = self.char_emb(x_in)

    y_out, _ = self.rnn(x_embedded)

    batch_size, seq_size, feat_size = y_out.shape
    y_out = y_out.contiguous().view(batch_size * seq_size, feat_size)

    y_out = self.fc(F.dropout(y_out, p=self.dropout_p))

    if apply_softmax:
      y_out = F.softmax(y_out, dim=1)

    new_feat_size = y_out.shape[-1]
    y_out = y_out.view(batch_size, seq_size, new_feat_size)

    return y_out

In [None]:
def normalize_sizes(y_pred, y_true):

  if len(y_pred.size()) == 3:
    y_pred = y_pred.contiguous().view(-1, y_pred.size(2))
  if len(y_true.size()) == 2:
    y_true = y_true.contiguous().view(-1)
  return y_pred, y_true

def sequence_loss(y_pred, y_true, mask_index):
  y_pred, y_true = normalize_sizes(y_pred, y_true)
  return F.cross_entropy(y_pred, y_true, ignore_index=mask_index)

In [None]:
args = Namespace(
    surname_csv='data/surnames/surnames_with_splits.csv',
    vectorizer_file="vectorizer.json",
    model_state_file="model.pth",
    save_dir="model_storage/ch7/model1_unconditioned_surname_generation",
    char_embedding_size=32,
    rnn_hidden_size=32,
    seed=1337,
    learning_rate=0.001,
    batch_size=128,
    num_epochs=100,
    early_stopping_criteria=5,
)

In [None]:
def sample_from_model(model, vectorizer, num_samples=1, sample_size=20,
                      temperature=1.0):

  begin_seq_index = [vectorizer.char_vocab.begin_seq_index
                     for _ in range(num_samples)]

  begin_seq_index = torch.tensor(begin_seq_index, dtype=torch.int64).unsqueeze(dim=1)
  indices = [begin_seq_index]
  h_t = None

  for time_step in range(sample_size):
    x_t = indices[time_step]
    x_emb_t = model.char_emb(x_t)
    rnn_out_t, h_t = model.rnn(x_emb_t, h_t)
    prediction_vector = model.fc(rnn_out_t.squeeze(dim=1))
    probability_vector = F.softmax(prediction_vector / temperature, dim=1)
    indices.append(torch.multinomial(probability_vector, num_samples=1))
  indices = torch.stack(indices).squeeze().permute(1,0)
  return indices

In [None]:
def decode_samples(sampled_indices, vectorizer):

  decoded_surnames = []
  vocab = vectorizer.char_vocab

  for sample_index in range(sampled_indices.shape[0]):
    surname = ""
    for time_step in range(sampled_indices.shape[1]):
      sample_item = sampled_indices[sample_index, time_step].item()
      if sample_item == vocab.begin_seq_index:
        continue
      elif sample_item == vocab.end_seq_index:
        break
      else:
          surname += vocab.lookup_index(sample_item)
    decoded_surnames.append(surname)
  return decoded_surnames

In [None]:
def sample_from_model(model, vectorizer, nationalities, sample_size=20, temperature=1.0):
    num_samples = len(nationalities)
    begin_seq_index = [vectorizer.char_vocab.begin_seq_index for _ in range(num_samples)]
    begin_seq_index = torch.tensor(begin_seq_index, dtype=torch.int64).unsqueeze(dim=1)

    indices = [begin_seq_index]
    nationality_indices = torch.tensor(nationalities, dtype=torch.int64).unsqueeze(dim=0)
    h_t = model.nation_emb(nationality_indices)

    for time_step in range(sample_size):
        x_t = indices[time_step]
        x_emb_t = model.char_emb(x_t)
        rnn_out_t, h_t = model.rnn(x_emb_t, h_t)
        prediction_vector = model.fc(rnn_out_t.squeeze(dim=1))
        probability_vector = F.softmax(prediction_vector / temperature, dim=1)
        indices.append(torch.multinomial(probability_vector, num_samples=1))

    indices = torch.stack(indices).squeeze().permute(1, 0)
    return indices