In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from termcolor import colored

In [None]:
print(f'Using Pytorch Version - {torch.__version__}')

Using Pytorch Version - 1.10.0+cu111


In [None]:
all_chars = '0123456789+-'
num_features = len(all_chars)
char_to_index = {c : i for i, c in enumerate(all_chars)}
index_to_char = {i : c for i, c in enumerate(all_chars)}
print(f'Number of features : {len(all_chars)}')

Number of features : 12


In [None]:
def generate_data():
    first_num = np.random.randint(low=0,high=100)
    second_num = np.random.randint(low=0,high=100)
    add = np.squeeze(np.random.randint(low=0, high=100)) > 50.
    if add:
        example = str(first_num) + '+' + str(second_num)
        label = str(first_num+second_num)
    else:
        example = str(first_num) + '-' + str(second_num)
        label = str(first_num-second_num)
    return example, label

generate_data()

('74+10', '84')

In [None]:
max_time_steps = 5
x = np.zeros((max_time_steps, num_features))
y = np.zeros((max_time_steps, num_features))
print(x)
print()
print(y)

[[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]]

[[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]]


In [None]:
def encode(example, label):
    
    x = np.zeros((max_time_steps, num_features))
    y = np.zeros((max_time_steps, num_features))
    
    diff_x = max_time_steps - len(example)
    diff_y = max_time_steps - len(label)
    
    for i, c in enumerate(example):
        x[diff_x+i, char_to_index[c]] = 1
    for i in range(diff_x):
        x[i, char_to_index['0']] = 1
    for i, c in enumerate(label):
        y[diff_y+i, char_to_index[c]] = 1
    for i in range(diff_y):
        y[i, char_to_index['0']] = 1
        
    return x, y

In [None]:
e, l = generate_data()
print(f'Text Example and Label : {e, l}')
x, y = encode(e, l)
print(f'Vectorized Example and Label : {x, y}')
print()
print(f'Shapes of Vectorized Example : {x.shape, y.shape}')

Text Example and Label : ('25+66', '91')
Vectorized Example and Label : (array([[0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0.],
       [0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0.]]), array([[1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0.],
       [0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]]))

Shapes of Vectorized Example : ((5, 12), (5, 12))


In [None]:
def decode(example):
  res = [index_to_char[np.argmax(vec)] for i, vec in enumerate(example)]
  return ''.join(res)

def strip_zeros(example):
    encountered_non_zero = False
    output = ''
    for c in example:
        if not encountered_non_zero and c == '0':
            continue
        if c == '+' or c == '-':
            encountered_non_zero = False
        else:
            encountered_non_zero = True
        output += c
    return output

In [None]:
print(strip_zeros(decode(y)))

91


In [None]:
print(decode(y))

00091


In [None]:
def create_dataset(num_examples=200000):

    x_train = np.zeros((num_examples, max_time_steps, num_features))
    y_train = np.zeros((num_examples, max_time_steps, num_features))

    for i in range(num_examples):
        e, l = generate_data()
        x, y = encode(e, l)
        x_train[i] = x
        y_train[i] = y
    
    return x_train, y_train

x_train, y_train = create_dataset(200000)
print(x_train.shape, y_train.shape)

(200000, 5, 12) (200000, 5, 12)


In [None]:
print(x_train.shape, y_train.shape)

(200000, 5, 12) (200000, 5, 12)


In [None]:
print(x_train[0])

[[0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1.]
 [0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0.]]


In [None]:
print(decode(x_train[0]))

46-79


In [None]:
print(decode(y_train[0]))

00-33


In [None]:
x_train = torch.from_numpy(x_train)
y_train = torch.from_numpy(y_train)
x_train = torch.tensor(x_train, dtype = torch.float32)
y_train = torch.tensor(y_train, dtype = torch.float32)

  This is separate from the ipykernel package so we can avoid doing imports until
  after removing the cwd from sys.path.


In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using Device : {device}')

Using Device : cuda


In [None]:
class SimpleRNN(nn.Module):
  def __init__(self, input_size, output_size, hidden_dim,n_layers):
    super(SimpleRNN, self).__init__()
    self.hidden_dim = hidden_dim
    self.n_layers = n_layers
    self.rnn = nn.RNN(input_size, hidden_dim, n_layers, batch_first = True)
    self.fc1 = nn.Linear(hidden_dim,hidden_dim * 2)
    self.fc2 = nn.Linear(hidden_dim * 2, output_size)
    self.relu = nn.ReLU()

  def forward(self, x):
    batch_size = x.size(0)
    hidden = self.init_hidden(batch_size)
    hidden = hidden.cuda()
    out, hidden = self.rnn(x, hidden)
    #out = out.contiguous().view(-1, self.hidden_dim)
    out = self.fc1(out)
    out = self.fc2(self.relu(out))

    return out, hidden
  
  def init_hidden(self, batch_size):
    hidden = torch.zeros(self.n_layers, batch_size, self.hidden_dim)
    return hidden

In [None]:
model = SimpleRNN(input_size = num_features, output_size = num_features , hidden_dim = 12, n_layers = 10)
model.cuda()

n_epochs = 1000
lr = 0.01

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr = lr)

In [None]:
for epoch in range(1, n_epochs + 1):
  optimizer.zero_grad()
  x_train = x_train.cuda()
  output, hidden = model(x_train)
  #print(output.shape,y_train.shape)
  loss = criterion(output, y_train.cuda())
  loss.backward()
  optimizer.step()

  if epoch % 10 == 0:
    print('Epoch: {}/{}.............'.format(epoch, n_epochs), end=' ')
    print("Loss: {:.4f}".format(loss.item()))

Epoch: 10/1000............. Loss: 0.5646
Epoch: 20/1000............. Loss: 0.4723
Epoch: 30/1000............. Loss: 0.4440
Epoch: 40/1000............. Loss: 0.4381
Epoch: 50/1000............. Loss: 0.4360
Epoch: 60/1000............. Loss: 0.4347
Epoch: 70/1000............. Loss: 0.4335
Epoch: 80/1000............. Loss: 0.4317
Epoch: 90/1000............. Loss: 0.4340
Epoch: 100/1000............. Loss: 0.4319
Epoch: 110/1000............. Loss: 0.4300
Epoch: 120/1000............. Loss: 0.4278
Epoch: 130/1000............. Loss: 0.4283
Epoch: 140/1000............. Loss: 0.4252
Epoch: 150/1000............. Loss: 0.4234
Epoch: 160/1000............. Loss: 0.4207
Epoch: 170/1000............. Loss: 0.4204
Epoch: 180/1000............. Loss: 0.4193
Epoch: 190/1000............. Loss: 0.4177
Epoch: 200/1000............. Loss: 0.4167
Epoch: 210/1000............. Loss: 0.4164
Epoch: 220/1000............. Loss: 0.4132
Epoch: 230/1000............. Loss: 0.4162
Epoch: 240/1000............. Loss: 0.4113
E

In [None]:
x_test, y_test = create_dataset(num_examples=20)
x_test = torch.from_numpy(x_test)
y_test = torch.from_numpy(y_test)
x_test = torch.tensor(x_test, dtype = torch.float32)
y_test = torch.tensor(y_test, dtype = torch.float32)
preds, _ = model(x_test.cuda())

  after removing the cwd from sys.path.
  """


In [None]:
y_test = y_test.cpu().numpy()
x_test = x_test.cpu().numpy()

In [None]:
preds = preds.cpu();

In [None]:
preds = preds.detach().numpy()

In [None]:
full_seq_acc = 0

for i, pred in enumerate(preds):
    pred_str = strip_zeros(decode(pred))
    y_test_str = strip_zeros(decode(y_test[i]))
    x_test_str = strip_zeros(decode(x_test[i]))
    col = 'green' if pred_str == y_test_str else 'red'
    full_seq_acc += 1/len(preds) * int(pred_str == y_test_str)
    outstring = 'Input: {}, Out: {}, Pred: {}'.format(x_test_str, y_test_str, pred_str)
    print(colored(outstring, col))
print('\nFull sequence accuracy: {:.3f} %'.format(100 * full_seq_acc))

In [None]:
def predict(model, character):
    # One-hot encoding our input to fit into the model
    character = np.array([[char2int[c] for c in character]])
    character = one_hot_encode(character, dict_size, character.shape[1], 1)
    character = torch.from_numpy(character)
    character.to(device)
    
    out, hidden = model(character)

    prob = nn.functional.softmax(out[-1], dim=0).data
    # Taking the class with the highest probability score from the output
    char_ind = torch.max(prob, dim=0)[1].item()

    return int2char[char_ind], hidden

In [None]:
def sample(model, out_len, start='hey'):
    model.eval() # eval mode
    start = start.lower()
    # First off, run through the starting characters
    chars = [ch for ch in start]
    size = out_len - len(chars)
    # Now pass in the previous characters and get a new one
    for ii in range(size):
        char, h = predict(model, chars)
        chars.append(char)

    return ''.join(chars)