In [1]:
import torch as th
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import numpy as np
import math
from matplotlib import pyplot as plt

In [10]:
class RNN(th.nn.Module):
    def __init__(self, hidden_dim, num_embeddings=None, embedding_dim=None):
        super(RNN, self).__init__()
        self.num_embeddings = num_embeddings
        self.embedding_dim = embedding_dim
        self.in_embedding = th.nn.Embedding(self.num_embeddings, self.embedding_dim, padding_idx=0)
        
        self.in_dim = self.embedding_dim
        self.hidden_dim = hidden_dim
        self.i2h = th.nn.Linear(self.in_dim, self.hidden_dim)
        self.h2h = th.nn.Linear(self.hidden_dim, self.hidden_dim)
        self.out_dim = self.num_embeddings
        self.h2o = th.nn.Linear(self.hidden_dim, self.out_dim)
    
    def forward(self, x):
        # x.shape = (batch_size, seq_len, in_dim)
        in_seq = self.in_embedding(x)
        
        seq_len = in_seq.shape[1]
        batch_size = in_seq.shape[0]
        hidden = th.zeros(batch_size, self.hidden_dim)
        hidden_seq = th.zeros(batch_size, seq_len, self.hidden_dim)
        out_seq = th.zeros(batch_size, seq_len, self.out_dim)
        
        for seq_step in range(seq_len):
            hidden = th.tanh(self.i2h(in_seq[:, seq_step, :]) + self.h2h(hidden))
            hidden_seq[:, seq_step, :] = hidden
            out_seq[:, seq_step, :] = self.h2o(hidden)

        return hidden_seq, out_seq

    @th.no_grad()
    def predict(self, x):
        self.eval()
        hidden_seq, out_seq = self(x)
        return hidden_seq, F.softmax(out_seq, dim=-1)
        
    def setup(self, learning_rate=1.e-3, batch_size=1, epochs=10):
        self.learning_rate = learning_rate
        self.batch_size = batch_size
        self.epochs = epochs
        self.criterion = th.nn.CrossEntropyLoss()
        self.optimizer = th.optim.Adam(self.parameters(), lr=self.learning_rate)
    
    def train_step(self, dataset):
        history = th.zeros(epochs)
        data_loader = DataLoader(dataset, batch_size=self.batch_size, shuffle=True)
        
        for epoch in range(self.epochs):
            self.train()
            for in_train, out_train in data_loader:
                _, pred_seq = self(in_train)
                pred_seq = pred_seq.view(-1, self.num_embeddings)
                out_train = out_train.view(-1)
                
                self.optimizer.zero_grad()
                loss = self.criterion(pred_seq, out_train)
                loss.backward()
                
                self.optimizer.step()
                history[epoch] = loss
            
        return history

In [6]:
class LSTM(th.nn.Module):
    def __init__(self, in_dim, hidden_dim):
        super(LSTM, self).__init__()
        self.in_dim = in_dim
        self.hidden_dim = hidden_dim
        self.cell_dim = hidden_dim
        
        self.forget_gate_i2c = th.nn.Linear(self.in_dim, self.cell_dim, bias=False)
        self.forget_gate_h2c = th.nn.Linear(self.hidden_dim, self.cell_dim)
        
        self.input_gate_i2c = th.nn.Linear(self.in_dim, self.cell_dim, bias=False)
        self.input_gate_h2c = th.nn.Linear(self.hidden_dim, self.cell_dim)
        
        self.cell_state_i2c = th.nn.Linear(self.in_dim, self.cell_dim, bias=False)
        self.cell_state_h2c = th.nn.Linear(self.hidden_dim, self.cell_dim)
        
        self.out_gate_i2h = th.nn.Linear(self.in_dim, self.hidden_dim, bias=False)
        self.out_gate_h2h = th.nn.Linear(self.hidden_dim, self.hidden_dim)
        
    def forget_gate(self, x, hidden):
        return th.sigmoid(self.forget_gate_i2c(x) + self.forget_gate_h2c(hidden))
    def input_gate(self, x, hidden):
        return th.sigmoid(self.input_gate_i2c(x) + self.input_gate_h2c(hidden))
    def output_gate(self, x, hidden):
        return th.sigmoid(self.out_gate_i2h(x) + self.out_gate_h2h(hidden))
        
    def lstm_step(self, x, hidden, cell_state):
        candidate_cell_state = th.tanh(self.cell_state_i2c(x) + self.cell_state_h2c(hidden))
        updated_cell_state = cell_state*self.forget_gate(x, hidden) + candidate_cell_state*self.input_gate(x, hidden)
        updated_hidden = th.tanh(updated_cell_state)*self.output_gate(x, hidden)
        
        return updated_hidden, updated_cell_state
    
    def forward(self, x):
        batch_size = x.shape[0]
        seq_len = x.shape[1]
        cell_state = th.zeros(batch_size, self.cell_dim)
        hidden = th.zeros(batch_size, self.hidden_dim)
        hidden_seq = th.zeros(batch_size, seq_len, self.hidden_dim)
        
        for seq_step in range(seq_len):
            cell_state, hidden = self.lstm_step(x[:, seq_step,:], hidden, cell_state)
            hidden_seq[:, seq_step,:] = hidden
            
        return hidden_seq, hidden_seq[:, -1,:]
        
    @th.no_grad()
    def predict(self, x):
        self.eval()
        return self(x)
        
    def setup(self, lr=1.e-3, momentum=0.9, epochs=10):
        self.epochs = epochs
        self.lr = lr
        self.momentum = momentum
        
        self.criterion = th.nn.MSELoss(reduction="mean")
        self.optimizer = th.optim.SGD(self.parameters(), lr=lr, momentum=momentum)
        
    def train_step(self, x_train, target, hidden=None):
        history = th.zeros(self.epochs)
        
        for epoch in range(self.epochs):
            self.train()
            pred_seq, _ = self(x_train)
            self.optimizer.zero_grad()
            loss = self.criterion(pred_seq, target)
            loss.backward()
            self.optimizer.step()
            history[epoch] = loss
        return history

In [11]:
class copyingTaskDataset(Dataset):
    def __init__(self, seq_len, num_fields2rem, num_samples, num_letters=10):
        self.seq_len = seq_len
        self.num_letters = num_letters
        self.num_fields2rem = num_fields2rem
        self.num_samples = num_samples
        self.fields2rem = th.zeros(self.num_fields2rem)
        
    def __len__(self):
        return self.num_samples
        
    def __getitem__(self, idx):
        in_seq = th.zeros(self.seq_len, dtype=int)
        out_seq = th.zeros(self.seq_len, dtype=int)
        fields2rem = th.randint(2, num_letters, size=(self.num_fields2rem,))
        
        in_pos_fields = th.randint(0, self.seq_len-2*self.num_fields2rem+1, size=())
        in_seq[in_pos_fields:in_pos_fields+self.num_fields2rem] = fields2rem
        
        out_pos_fields = th.randint(in_pos_fields+self.num_fields2rem, self.seq_len-self.num_fields2rem+1, size=())
        out_seq[out_pos_fields: out_pos_fields+self.num_fields2rem] = fields2rem
        in_seq[out_pos_fields] = 1
        
        return in_seq, out_seq

In [12]:
seq_len = 15
num_fields2rem = 1
num_samples = 5000
batch_size = 400
epochs = 50
num_letters = 10

dataset = copyingTaskDataset(seq_len, num_fields2rem, num_samples, num_letters=num_letters)

In [13]:
embedding_dim = 50
hidden_dim = 50

rnn = RNN(hidden_dim, num_embeddings=num_letters+1, embedding_dim=embedding_dim)

In [14]:
learning_rate = 1.e-3

rnn.setup(learning_rate=learning_rate, batch_size=batch_size, epochs=epochs)

In [238]:
rnn.train_step(dataset)

tensor([1.6576, 0.7244, 0.5019, 0.3995, 0.3329, 0.2958, 0.2704, 0.2518, 0.2364,
        0.2264, 0.2170, 0.2118, 0.2026, 0.1963, 0.1916, 0.1870, 0.1821, 0.1788,
        0.1747, 0.1727, 0.1689, 0.1656, 0.1627, 0.1607, 0.1588, 0.1571, 0.1556,
        0.1537, 0.1531, 0.1512, 0.1497, 0.1491, 0.1480, 0.1466, 0.1454, 0.1442,
        0.1427, 0.1424, 0.1414, 0.1400, 0.1397, 0.1367, 0.1372, 0.1359, 0.1345,
        0.1315, 0.1280, 0.1203, 0.1123, 0.1076], grad_fn=<CopySlices>)

In [239]:
num_test_samples = 10
test_dataset = copyingTaskDataset(seq_len, num_fields2rem, num_test_samples, num_letters=num_letters)

In [240]:
test_data_loader = DataLoader(test_dataset, batch_size=2)

In [247]:
test_in, test_out = next(iter(test_data_loader))

test_pred = rnn.predict(test_in)[1]

print("test_in")
print(test_in)
print("test_out")
print(test_out)
print("test_pred")
#print(test_pred)
print(th.argmax(test_pred, dim=2))

test_in
tensor([[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5, 1, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 1, 0, 0]])
test_out
tensor([[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0]])
test_pred
tensor([[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0]])


In [7]:
in_dim = 5
hidden_dim = 10

lstm = LSTM(in_dim, hidden_dim)

In [8]:
learning_rate = 1.e-3
epochs = 100

lstm.setup(lr=learning_rate, epochs=epochs)