In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from enum import Enum
import re
import random

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
SOS_token = 0
EOS_token = 1
OOV_token = 2

## Dataloading

In [None]:
!git clone https://github.com/brendenlake/SCAN

fatal: destination path 'SCAN' already exists and is not an empty directory.


In [None]:
class ScanSplit(Enum):
    SIMPLE_SPLIT = 'simple_split'
    LENGTH_SPLIT = 'length_split'
    FEW_SHOT_SPLIT = 'few_shot_split'
    ADD_PRIM_JUMP_SPLIT = 'add_prim_split'
    ADD_PRIM_TURNLEFT_SPLIT = 'add_prim_split' # shouldn't have same value => if condition won't happen

In [None]:
class Lang:
    def __init__(self):
        self.word2index = {}
        self.word2count = {}
        self.index2word = {SOS_token: "SOS", EOS_token: "EOS", OOV_token: 'OOV'}
        self.n_words = len(self.index2word)  # Count tokens

        self.max_length = 0

    def add_sentence(self, sentence):
        """Add sentence to vocab"""
        for word in sentence.split(' '):
            self._add_word(word)

    def _add_word(self, word):
        """Add word to vocab"""
        if word not in self.word2index:
            self.word2index[word] = self.n_words
            self.word2count[word] = 1
            self.index2word[self.n_words] = word
            self.n_words += 1
            self.max_length = max(len(word), self.max_length)
        else:
            self.word2count[word] += 1

    def indexes_from_sentence(self, sentence: str):
        """Get word ids from sentence"""
        indexes = [self.word2index.get(word,OOV_token) for word in sentence.split(' ')]
        return indexes

    def tensor_from_sentence(self, sentence:str):
        """Convert sentence to torch tensor"""
        indexes = self.indexes_from_sentence(sentence)
        return torch.tensor(indexes, dtype=torch.long, device=device).view(-1, 1)

In [None]:
class ScanDataset(Dataset):
    def __init__(self, split: ScanSplit, input_lang: Lang, output_lang: Lang, train: bool = True):
        
        self.input_lang = input_lang
        self.output_lang = output_lang


        self.X, self.y = self._get_data(split, train)


    def __len__(self):
            return len(self.y)

    def __getitem__(self, idx):
            input_tensor = self.input_lang.tensor_from_sentence(self.X[idx])
            target_tensor = self.output_lang.tensor_from_sentence(self.y[idx])
            return (input_tensor, target_tensor)
    

    def _get_data(self, split: ScanSplit, train: bool = True):
        """Retrieve the right data for the selected split"""
        
        if split == ScanSplit.SIMPLE_SPLIT:
            X_train, y_train = self._extract_data_from_file('SCAN/simple_split/tasks_train_simple.txt')
            X_test, y_test = self._extract_data_from_file('SCAN/simple_split/tasks_test_simple.txt')
        elif split == ScanSplit.LENGTH_SPLIT:
            X_train, y_train = self._extract_data_from_file('SCAN/length_split/tasks_train_length.txt')
            X_test, y_test = self._extract_data_from_file('SCAN/length_split/tasks_test_length.txt')
        elif split == ScanSplit.ADD_PRIM_JUMP_SPLIT:
            X_train, y_train = self._extract_data_from_file('SCAN/add_prim_split/tasks_train_addprim_jump.txt')
            X_test, y_test = self._extract_data_from_file('SCAN/add_prim_split/tasks_test_addprim_jump.txt')
        elif split == ScanSplit.ADD_PRIM_TURNLEFT_SPLIT:
            X_train, y_train = self._extract_data_from_file('SCAN/add_prim_split/tasks_train_addprim_turn_left.txt')
            X_test, y_test = self._extract_data_from_file('SCAN/add_prim_split/tasks_test_addprim_turn_left.txt')
        else:
            raise Exception('Split not implemented')
        
        if train:
            X = X_train
            y = y_train

            # Add words to vocabs
            for sen in X:
                self.input_lang.add_sentence(sen)

            for sen in y:
                self.output_lang.add_sentence(sen)
        else:
            X = X_test
            y = y_test

        return X,y
        
    def _extract_data_from_file(self, filepath: str):
        """Get X and y from SCAN file"""
        with open(filepath) as f:
            txt_data = f.readlines()

        # Format is in IN: ... OUT: ...
        lead_token = 'IN:'
        split_token = 'OUT:'

        # Split at OUT and remove IN
        txt_data = [sen.strip(lead_token).split(split_token) for sen in txt_data]

        in_txt = [sen[0] for sen in txt_data]
        out_txt = [sen[1] for sen in txt_data]

        return in_txt, out_txt

In [None]:
# torch.cuda.is_available() checks and returns a Boolean True if a GPU is available, else it'll return False
is_cuda = torch.cuda.is_available()

# If we have a GPU available, we'll set our device to GPU. We'll use this device variable later in our code.
if is_cuda:
    device = torch.device("cuda")
    print("GPU is available")
else:
    device = torch.device("cpu")
    print("GPU not available, CPU used")

GPU not available, CPU used


In [None]:
train_dataset = ScanDataset(
    split=ScanSplit.SIMPLE_SPLIT,
    input_lang=Lang(),
    output_lang=Lang(),
    train=True
    )

train_dataloader = DataLoader(train_dataset, batch_size=1, shuffle=False)

MAX_LENGTH = max(train_dataset.input_lang.max_length, train_dataset.output_lang.max_length)

In [None]:
test_dataset = ScanDataset(
    split=ScanSplit.SIMPLE_SPLIT,
    input_lang=Lang(),
    output_lang=Lang(),
    train=False
)

## Model

In [None]:
class EncoderRNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim):
        super(EncoderRNN, self).__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.rnn = nn.RNN(embedding_dim, embedding_dim)

    def forward(self, x):

        embeds = self.embedding(x)
        _, hidden = self.rnn(embeds)
    
        return hidden.squeeze()

In [None]:
class DecoderCell(nn.Module):
    def __init__(self, vocab_size, embedding_dim):
        super(DecoderCell, self).__init__()
        self.rnn = nn.RNN(1, embedding_dim)
        self.W = torch.nn.Parameter(torch.randn((embedding_dim, embedding_dim)))
        self.U = torch.nn.Parameter(torch.randn((embedding_dim, embedding_dim)))
        self.v = torch.nn.Parameter(torch.randn((embedding_dim, 1)))
        self.nonlinear = torch.nn.Tanh()

    def e(self, g, h):
        h = torch.tensor([h]).unsqueeze(1)
        return self.v.T @ self.nonlinear(self.W * g + self.U * h)

    def alpha(self, encoder_hiddens, input_hidden, t):
        T = len(encoder_hiddens)
        top = torch.exp(self.e(input_hidden, encoder_hiddens[t]))

        bottom = 0

        for j in range(T):
            bottom += torch.exp(self.e(input_hidden, encoder_hiddens[j]))

        return top/bottom

    def forward(self, x, encoder_hiddens, input_hidden):
        c_i = 0

        for t in range(len(encoder_hiddens)):
            alpha_it = self.alpha(encoder_hiddens, input_hidden, t)
            h_t = encoder_hiddens[t]
            c_i += alpha_it * h_t

        _, hidden = self.rnn(x, c_i)
        hidden = torch.concat((hidden, c_i), dim=1).squeeze()
        prediction = torch.argmax(F.softmax(hidden, dim=0))

        return prediction, hidden

class DecoderRNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, comm_to_ix, act_to_ix):
        super(DecoderRNN, self).__init__()
        self.decoder_cell = DecoderCell(vocab_size, embedding_dim)
        self.comm_to_ix = comm_to_ix
        self.act_to_ix = act_to_ix

    def forward(self, encoder_hiddens, use_teacher_forcing, targets):        
        preds = []

        if not use_teacher_forcing:
            x = torch.tensor(self.comm_to_ix["<SOS>"], dtype=torch.long)
            pred, hiddens = self.decoder_cell(x.reshape(1, -1).float(), encoder_hiddens, encoder_hiddens[-1])
            preds.append(pred)

            while preds[-1].item() != self.act_to_ix["<EOS>"]:
                pred, hiddens = self.decoder_cell(pred.reshape(1, -1).float(), encoder_hiddens, hiddens[-1])
                preds.append(pred)
        else:
            hiddens = encoder_hiddens

            for x in targets:
                pred, hiddens = self.decoder_cell(x.reshape(1, -1).float(), encoder_hiddens, hiddens[-1])
                preds.append(pred)

        return preds

In [None]:
class Model(nn.Module):
    def __init__(self, vocab_size, embedding_dim, comm_to_ix, act_to_ix):
        super(Model, self).__init__()
        self.encoder = EncoderRNN(vocab_size, embedding_dim)
        self.decoder = DecoderRNN(vocab_size, embedding_dim, comm_to_ix, act_to_ix)

    def forward(self, x, use_teacher_forcing=False, targets=None):
        assert (not use_teacher_forcing and targets is None) or (use_teacher_forcing and targets is not None)

        encoder_hiddens = self.encoder(x)
        preds = self.decoder(encoder_hiddens, use_teacher_forcing, targets)

        return preds

In [None]:
""" EXAMPLE ON HOW TO USE THE SEQ2SEQ-model """

# Defining and encoding the input command
command = "jump twice and walk".split()
comm_to_ix = {"<SOS>" : 0 , "<EOS>" : 1}

for word in command:
    if word not in comm_to_ix:
        comm_to_ix[word] = len(comm_to_ix)

# Defining and encoding the target output action sequence
action_sequence = "JUMP JUMP WALK".split()
act_to_ix = {"<SOS>" : 0 , "<EOS>" : 1}

for act in action_sequence:
    if act not in act_to_ix:
        act_to_ix[act] = len(act_to_ix)

# Preparing the input command and target output action sequence
def prepare_sequence(seq, to_ix):
    seq = ["<SOS>"] + seq + ["<EOS>"]
    idxs = [to_ix[w] for w in seq]
    return torch.tensor(idxs, dtype=torch.long)

command_prep = prepare_sequence(command, comm_to_ix)
action_seq_prep = prepare_sequence(action_sequence, act_to_ix)

# Some parameters
VOCAB_SIZE = len(command_prep)
EMBEDDING_DIM = 6 # Can be changed. In the paper they use 256

# Defining the model and using it for predicting
model = Model(VOCAB_SIZE, EMBEDDING_DIM, comm_to_ix, act_to_ix)
pred = model(command_prep, use_teacher_forcing=True, targets=action_seq_prep)

In [None]:
teacher_forcing_ratio = 0.5


def train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion, max_length=MAX_LENGTH):
    encoder_hidden = encoder.initHidden()

    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()

    input_length = input_tensor.size(0)
    target_length = target_tensor.size(0)

    encoder_outputs = torch.zeros(max_length, encoder.hidden_size, device=device)

    loss = 0

    for ei in range(input_length):
        encoder_output, encoder_hidden = encoder(
            input_tensor[ei], encoder_hidden)
        encoder_outputs[ei] = encoder_output[0, 0]

    decoder_input = torch.tensor([[SOS_token]], device=device)

    decoder_hidden = encoder_hidden

    use_teacher_forcing = True if random.random() < teacher_forcing_ratio else False

    if use_teacher_forcing:
        # Teacher forcing: Feed the target as the next input
        for di in range(target_length):
            decoder_output, decoder_hidden, decoder_attention = decoder(
                decoder_input, decoder_hidden, encoder_outputs)
            loss += criterion(decoder_output, target_tensor[di])
            decoder_input = target_tensor[di]  # Teacher forcing

    else:
        # Without teacher forcing: use its own predictions as the next input
        for di in range(target_length):
            decoder_output, decoder_hidden, decoder_attention = decoder(
                decoder_input, decoder_hidden, encoder_outputs)
            topv, topi = decoder_output.topk(1)
            decoder_input = topi.squeeze().detach()  # detach from history as input

            loss += criterion(decoder_output, target_tensor[di])
            if decoder_input.item() == EOS_token:
                break

    loss.backward()

    encoder_optimizer.step()
    decoder_optimizer.step()

    return loss.item() / target_length

In [None]:
import time
import math


def asMinutes(s):
    m = math.floor(s / 60)
    s -= m * 60
    return '%dm %ds' % (m, s)


def timeSince(since, percent):
    now = time.time()
    s = now - since
    es = s / (percent)
    rs = es - s
    return '%s (- %s)' % (asMinutes(s), asMinutes(rs))

In [None]:
import matplotlib.pyplot as plt
plt.switch_backend('agg')
import matplotlib.ticker as ticker
import numpy as np


def showPlot(points):
    plt.figure()
    fig, ax = plt.subplots()
    # this locator puts ticks at regular intervals
    loc = ticker.MultipleLocator(base=0.2)
    ax.yaxis.set_major_locator(loc)
    plt.plot(points)

In [None]:
def trainIters(encoder, decoder, n_iters, print_every=1000, plot_every=100, learning_rate=0.01):
    start = time.time()
    plot_losses = []
    print_loss_total = 0  # Reset every print_every
    plot_loss_total = 0  # Reset every plot_every

    encoder_optimizer = optim.SGD(encoder.parameters(), lr=learning_rate)
    decoder_optimizer = optim.SGD(decoder.parameters(), lr=learning_rate)
    criterion = nn.NLLLoss()

    for iter in range(1, n_iters + 1):
        input_tensor, target_tensor = train_dataset[random.randrange(len(train_dataset))]

        loss = train(input_tensor,target_tensor,encoder,decoder,encoder_optimizer,decoder_optimizer,criterion)
        print_loss_total += loss
        plot_loss_total += loss

        if iter % print_every == 0:
            print_loss_avg = print_loss_total / print_every
            print_loss_total = 0
            print('%s (%d %d%%) %.4f' % (timeSince(start, iter / n_iters),
                                         iter, iter / n_iters * 100, print_loss_avg))

        if iter % plot_every == 0:
            plot_loss_avg = plot_loss_total / plot_every
            plot_losses.append(plot_loss_avg)
            plot_loss_total = 0

    showPlot(plot_losses)

In [None]:
def evaluate(encoder, decoder, sentence, max_length):
    with torch.no_grad():
        input_tensor = tensorFromSentence(input_lang, sentence)
        input_length = input_tensor.size()[0]
        encoder_hidden = encoder.initHidden()

        encoder_outputs = torch.zeros(max_length, encoder.hidden_size, device=device)

        for ei in range(input_length):
            encoder_output, encoder_hidden = encoder(input_tensor[ei],
                                                     encoder_hidden)
            encoder_outputs[ei] += encoder_output[0, 0]

        decoder_input = torch.tensor([[SOS_token]], device=device)  # SOS

        decoder_hidden = encoder_hidden

        decoded_words = []
        decoder_attentions = torch.zeros(max_length, max_length)

        for di in range(max_length):
            decoder_output, decoder_hidden, decoder_attention = decoder(
                decoder_input, decoder_hidden, encoder_outputs)
            decoder_attentions[di] = decoder_attention.data
            topv, topi = decoder_output.data.topk(1)
            if topi.item() == EOS_token:
                decoded_words.append('<EOS>')
                break
            else:
                decoded_words.append(output_lang.index2word[topi.item()])

            decoder_input = topi.squeeze().detach()

        return decoded_words, decoder_attentions[:di + 1]

In [None]:
hidden_size = 256
encoder1 = EncoderRNN(train_dataset.input_lang.n_words, hidden_size).to(device)
decoder1 = AttnDecoderRNN(hidden_size, train_dataset.output_lang.n_words, train_dataset.output_lang.max_length).to(device)

trainIters(encoder1, decoder1, 75000, print_every=5000)

4m 28s (- 62m 35s) (5000 6%) 1.1104
9m 15s (- 60m 12s) (10000 13%) 0.3982
13m 53s (- 55m 34s) (15000 20%) 0.1921
18m 37s (- 51m 12s) (20000 26%) 0.1191
23m 37s (- 47m 15s) (25000 33%) 0.1643
28m 38s (- 42m 57s) (30000 40%) 0.1135
33m 32s (- 38m 19s) (35000 46%) 0.2540
38m 27s (- 33m 39s) (40000 53%) 2.0510
42m 55s (- 28m 36s) (45000 60%) 4.1028
47m 24s (- 23m 42s) (50000 66%) 5.9702
51m 26s (- 18m 42s) (55000 73%) 5.8151
55m 11s (- 13m 47s) (60000 80%) 5.4395
59m 8s (- 9m 5s) (65000 86%) 5.5666
63m 12s (- 4m 30s) (70000 93%) 5.6084
66m 55s (- 0m 0s) (75000 100%) 5.5129


### Experiment 1

The top-performing architecture was a LSTM with no attention, 2
layers of 200 hidden units, and no dropout. The best-overall
network achieved 99.7% correct.

SCAN tasks were randomly split into a training set (80%) and a test set (20%).

### Experiment 2

The best result (20.8% on average, again over 5 runs) is achieved
by a GRU with attention, one 50-dimensional hidden layer,
and dropout 0.5

### Experiment 3

The best performance is achieved by
a GRU network with attention, one layer with 100 hidden
units, and dropout of 0.1 (90.3% accuracy). 

<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=ec00d141-8917-4313-a10a-78395d2ec852' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>