In [1]:
!curl -O https://download.pytorch.org/tutorial/data.zip; unzip data.zip

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 2814k  100 2814k    0     0  7792k      0 --:--:-- --:--:-- --:--:-- 7796k
Archive:  data.zip
   creating: data/
  inflating: data/eng-fra.txt        
   creating: data/names/
  inflating: data/names/Arabic.txt   
  inflating: data/names/Chinese.txt  
  inflating: data/names/Czech.txt    
  inflating: data/names/Dutch.txt    
  inflating: data/names/English.txt  
  inflating: data/names/French.txt   
  inflating: data/names/German.txt   
  inflating: data/names/Greek.txt    
  inflating: data/names/Irish.txt    
  inflating: data/names/Italian.txt  
  inflating: data/names/Japanese.txt  
  inflating: data/names/Korean.txt   
  inflating: data/names/Polish.txt   
  inflating: data/names/Portuguese.txt  
  inflating: data/names/Russian.txt  
  inflating: data/names/Scottish.txt  
  inflating: data/names/Spanish.txt  
  inflating

In [1]:
import os
import random
from string import ascii_letters
import torch
from torch import nn
import torch.nn.functional as F
from unidecode import unidecode
import numpy as np

In [2]:
_ = torch.manual_seed(69)
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
#device = "cpu"

In [5]:
data_dir = "rnn_data/names"

# construct a dictionary that maps a language to a numerical label
lang2label = {
    file_name.split(".")[0]: torch.tensor([i], dtype=torch.long) # torch.long is the same as torch.int64
    for i, file_name in enumerate(os.listdir(data_dir))
}
lang2label

{'Czech': tensor([0]),
 'German': tensor([1]),
 'Arabic': tensor([2]),
 'Japanese': tensor([3]),
 'Chinese': tensor([4]),
 'Vietnamese': tensor([5]),
 'Russian': tensor([6]),
 'French': tensor([7]),
 'Irish': tensor([8]),
 'English': tensor([9]),
 'Spanish': tensor([10]),
 'Greek': tensor([11]),
 'Italian': tensor([12]),
 'Portuguese': tensor([13]),
 'Scottish': tensor([14]),
 'Dutch': tensor([15]),
 'Korean': tensor([16]),
 'Polish': tensor([17])}

In [6]:
num_langs = len(lang2label)
num_langs

18

In [7]:
# We first want to use unidecode to standardize all names and remove any acute symbols or the likes
char2idx = {letter: i for i, letter in enumerate(ascii_letters + " .,:;-'")}
num_letters = len(char2idx)
num_letters

59

In [8]:
# This means that each name will now be expressed as a tensor of size (num_char, 59)
# in other words, each character will be a tensor of size (59,)`
def name2tensor(name):
    tensor = torch.zeros(len(name), 1, num_letters) # batch size of 1
    for i, char in enumerate(name):
        tensor[i][0][char2idx[char]] = 1
    return tensor

RNN layers expect the input tensor to be of size (seq_len, batch_size, input_size)
Since every name is going to have a different length, we don’t batch the inputs for simplicity purposes and simply use each input as a single batch

In [9]:
name2tensor("abc")

tensor([[[1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
          0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
          0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
          0., 0., 0., 0., 0., 0., 0., 0.]],

        [[0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
          0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
          0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
          0., 0., 0., 0., 0., 0., 0., 0.]],

        [[0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
          0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
          0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
          0., 0., 0., 0., 0., 0., 0., 0.]]])

Now we need to build a our dataset with all the preprocessing steps. Let’s collect all the decoded and converted tensors in a list, with accompanying labels. The labels can be obtained easily from the file name

In [10]:
tensor_names = []
target_langs = []

for file in os.listdir(data_dir):
    with open(os.path.join(data_dir, file)) as f:
        lang = file.split(".")[0]
        names = [unidecode(line.rstrip()) for line in f] # rstrip() removes extra spaces at end of a word, but leaves one
        for name in names:
            try:
                tensor_names.append(name2tensor(name))
                target_langs.append(lang2label[lang])
            except KeyError:
                pass
            
# We could wrap this in a PyTorch Dataset class, but for simplicity sake let’s just use a good old for loop to feed this data into our model.

In [11]:
# Since we are dealing with normal lists, we can easily use sklearn’s train_test_split() 
from sklearn.model_selection import train_test_split

train_idx, test_idx = train_test_split(
    range(len(target_langs)),
    test_size=0.1,
    shuffle=True,
    stratify=np.array([tensor.item() for tensor in target_langs])
)

train_dataset = [(tensor_names[i], target_langs[i]) for i in train_idx]
test_dataset = [(tensor_names[i], target_langs[i]) for i in test_idx]

In [12]:
print(f"Train: {len(train_dataset)}")
print(f"Test: {len(test_dataset)}")

Train: 18063
Test: 2007


We will be building two models: a simple RNN, which is going to be built from scratch, and a GRU-based model using PyTorch’s layers.

This is a very simple RNN that takes a single character tensor representation as input and produces some prediction and a hidden state, which can be used in the next iteration. Notice that it is just some fully connected layers with a sigmoid non-linearity applied during the hidden state computation

In [13]:
class MyRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(MyRNN, self).__init__()
        self.hidden_size = hidden_size
        self.in2hidden = nn.Linear(input_size + hidden_size, hidden_size)
        self.in2output = nn.Linear(input_size + hidden_size, output_size)
        
    def forward(self, x, hidden_state):
        combined = torch.cat((x, hidden_state), 1)
        hidden = torch.sigmoid(self.in2hidden(combined))
        output = self.in2output(combined)
        return output, hidden
    
    def init_hidden(self):
        return nn.init.kaiming_uniform_(torch.empty(1, self.hidden_size))

We call init_hidden() at the start of every new batch. For easier training and learning, I decided to use kaiming_uniform_() to initialize these hidden states.

We can now build our model and start training it.

In [14]:
hidden_size = 256
learning_rate = 0.001

model = MyRNN(num_letters, hidden_size, num_langs)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [15]:
num_epochs = 2
print_interval = 3000

for epoch in range(num_epochs):
    random.shuffle(train_dataset)
    for i, (name, label) in enumerate(train_dataset):
        hidden_state = model.init_hidden()
        for char in name:
            output, hidden_state = model(char, hidden_state)
        loss = criterion(output, label)

        optimizer.zero_grad()
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), 1)
        optimizer.step()
        
        if (i + 1) % print_interval == 0:
            print(
                f"Epoch [{epoch + 1}/{num_epochs}], "
                f"Step [{i + 1}/{len(train_dataset)}], "
                f"Loss: {loss.item():.4f}"
            )

Epoch [1/2], Step [3000/18063], Loss: 0.1325
Epoch [1/2], Step [6000/18063], Loss: 3.4649
Epoch [1/2], Step [9000/18063], Loss: 0.6217
Epoch [1/2], Step [12000/18063], Loss: 0.0161
Epoch [1/2], Step [15000/18063], Loss: 1.2954
Epoch [1/2], Step [18000/18063], Loss: 0.6237
Epoch [2/2], Step [3000/18063], Loss: 0.0003
Epoch [2/2], Step [6000/18063], Loss: 0.0030
Epoch [2/2], Step [9000/18063], Loss: 4.1942
Epoch [2/2], Step [12000/18063], Loss: 1.5010
Epoch [2/2], Step [15000/18063], Loss: 1.5929
Epoch [2/2], Step [18000/18063], Loss: 3.4665


In [16]:
num_correct = 0
num_samples = len(test_dataset)

model.eval()

with torch.no_grad():
    for name, label in test_dataset:
        hidden_state = model.init_hidden()
        for char in name:
            output, hidden_state = model(char, hidden_state)
        _, pred = torch.max(output, dim=1)
        num_correct += bool(pred == label)

print(f"Accuracy: {num_correct / num_samples * 100:.4f}%")

Accuracy: 71.8984%


concrete examples

I don’t know if any of these names were actually in the training or testing set; these are just some random names I came up with that I thought would be pretty reasonable

In [17]:
label2lang = {label.item(): lang for lang, label in lang2label.items()}

def myrnn_predict(name):
    model.eval()
    tensor_name = name2tensor(name)
    with torch.no_grad():
        hidden_state = model.init_hidden()
        for char in tensor_name:
            output, hidden_state = model(char, hidden_state)
        _, pred = torch.max(output, dim=1)
    model.train()    
    return label2lang[pred.item()]

In [18]:
myrnn_predict("Randy"), myrnn_predict("Qin"), myrnn_predict("Vladamir")

('English', 'Chinese', 'Russian')

This is cool and all, and I could probably stop here, but I wanted to see how this custom model fares in comparison to, say, a model using PyTorch layers. Gated Recurrent Unit is probably not fair game for our simple RNN, but let’s see how well it does

A Gated Recurrent Unit (GRU) is a type of recurrent neural network (RNN) architecture introduced by Kyunghyun Cho et al. in 2014. It aims to solve the vanishing gradient problem inherent in traditional RNNs, making it more effective for learning from long sequences. 

### Mathematical Formulation

Given an input sequence \( X = \{x_1, x_2, \ldots, x_T\} \), the GRU updates its hidden state \( h_t \) at each time step \( t \) using the following equations:

1. **Update Gate $z_t$**
\[
z_t = \sigma(W_z \cdot [h_{t-1}, x_t] + b_z)
\]

2. **Reset Gate \( r_t \)**
\[
r_t = \sigma(W_r \cdot [h_{t-1}, x_t] + b_r)
\]

3. **Candidate Hidden State \( \tilde{h}_t \)**
\[
\tilde{h}_t = \tanh(W \cdot [r_t \odot h_{t-1}, x_t] + b)
\]

4. **Hidden State \( h_t \)**
\[
h_t = (1 - z_t) \odot h_{t-1} + z_t \odot \tilde{h}_t
\]

Here:

- \( \sigma \) is the sigmoid activation function.
- \( \odot \) denotes element-wise multiplication.
- \( W_z, W_r, W \) and \( b_z, b_r, b \) are trainable parameters.
- \( [h_{t-1}, x_t] \) denotes the concatenation of \( h_{t-1} \) and \( x_t \).

### Key Features

1. **Gating Mechanism**: Update and reset gates regulate the flow of information, allowing the model to learn long-term dependencies.
2. **Efficiency**: Fewer parameters than its LSTM counterpart, which makes it computationally more efficient.
3. **Vanishing Gradient**: Mitigates but does not completely eliminate the vanishing gradient problem.

GRUs are widely used in natural language processing, time-series analysis, and other sequence modeling tasks.

In [13]:
class GRUModel(nn.Module):
    def __init__(self, num_layers, hidden_size):
        super(GRUModel, self).__init__()
        self.num_layers = num_layers
        self.hidden_size = hidden_size
        self.gru = nn.GRU(
            input_size=num_letters, 
            hidden_size=hidden_size, 
            num_layers=num_layers,
        )
        self.fc = nn.Linear(hidden_size, num_langs)
    
    def forward(self, x):
        hidden_state = self.init_hidden()
        output, hidden_state = self.gru(x, hidden_state)
        output = self.fc(output[-1])
        return output
    
    def init_hidden(self):
        return torch.zeros(self.num_layers, 1, self.hidden_size).to(device)

In [14]:
model = GRUModel(num_layers=2, hidden_size=hidden_size)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [15]:
num_epochs = 2
print_interval = 3000

for epoch in range(num_epochs):
    random.shuffle(train_dataset)
    for i, (name, label) in enumerate(train_dataset):
        output = model(name)
        loss = criterion(output, label)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
         
        if (i + 1) % print_interval == 0:
            print(
                f"Epoch [{epoch + 1}/{num_epochs}], "
                f"Step [{i + 1}/{len(train_dataset)}], "
                f"Loss: {loss.item():.4f}"
            )

Epoch [1/2], Step [3000/18063], Loss: 0.0886
Epoch [1/2], Step [6000/18063], Loss: 2.4803
Epoch [1/2], Step [9000/18063], Loss: 0.0501
Epoch [1/2], Step [12000/18063], Loss: 0.2171
Epoch [1/2], Step [15000/18063], Loss: 0.1890
Epoch [1/2], Step [18000/18063], Loss: 0.0100
Epoch [2/2], Step [3000/18063], Loss: 1.3591
Epoch [2/2], Step [6000/18063], Loss: 0.0076
Epoch [2/2], Step [9000/18063], Loss: 0.4325
Epoch [2/2], Step [12000/18063], Loss: 0.2355
Epoch [2/2], Step [15000/18063], Loss: 3.4074
Epoch [2/2], Step [18000/18063], Loss: 0.0001


The training appeared somewhat more stable at first, but we do see a weird jump near the end of the second epoch. This is partially because I didn’t use gradient clipping for this GRU model, and we might see better results with clipping applied.

In [17]:
num_correct = 0
num_samples = len(test_dataset)
model.eval()

with torch.no_grad():
    for name, label in test_dataset:
        output = model(name)
        _, pred = torch.max(output, dim=1)
        num_correct += bool(pred == label)

print(f"Accuracy: {num_correct / num_samples * 100:.4f}%")

Accuracy: 81.4150%


In [18]:
def pytorch_predict(name):
    model.eval()
    tensor_name = name2tensor(name)
    with torch.no_grad():
        output = model(tensor_name)
        _, pred = torch.max(output, dim=1)
    model.train()
    return label2lang[pred.item()]

In [19]:
pytorch_predict("Randy"), pytorch_predict("Qin"), pytorch_predict("Vladamir")

NameError: name 'label2lang' is not defined