Basic simple encoder decoder net.
Could be expanded to become a fully fledged translator.

This should be able to translate any set of token pairs. The first column should contain the input and the second column the target.
The input file has a sequence of digits mapped to the number spelled out in words. 
Improvement ideas:
* Use embeddings instead of one hot vector representation
* Do not pass the target to the seq2seq during evaluation
* Try using teacher forcing some % of the time
* Generates the max length sequence - maybe stop when hit <end>


In [310]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
import csv


In [311]:


filename = 'nums.txt'  # Replace with your file name

nums =[]
words=[]

with open(filename, 'r') as file:
    reader = csv.reader(file)
    for row in reader:
        nums.append(row[0])
        words.append(row[1])

zipped = zip(nums, words)

pairs=[]
for item in zipped:
    n,w = item    
    pairs.append(["<s> "+n+" <e>","<s> "+w+" <e>"])
    
print("Num examples:", len(pairs))


# This class creates a word -> index mapping (e.g,. "dad" -> 5) and vice-versa 
# (e.g., 5 -> "dad") for each language,
class LanguageIndex():
    def __init__(self, lang):
        self.lang = lang
        self.word2idx = {}
        self.idx2word = {}
        self.vocab = set()

        self.create_index()

    def create_index(self):
        for phrase in self.lang:
            self.vocab.update(phrase.split(' '))

        self.vocab = sorted(self.vocab)

        self.word2idx['<pad>'] = 0
        
        for index, word in enumerate(self.vocab):
            self.word2idx[word] = index + 1

        for word, index in self.word2idx.items():
            self.idx2word[index] = word
            
def max_length(tensor):
    return max(len(t) for t in tensor)


import torch
from torch.nn.utils.rnn import pad_sequence

def load_dataset(pairs, num_examples):
    # pairs => already created cleaned input, output pairs

    # index language using the class defined above
    inp_lang = LanguageIndex(n for n, w in pairs)
    targ_lang = LanguageIndex(w for n, w in pairs)

    # Vectorize the input and target languages

    # Input sentences
    input_tensor = [torch.tensor([inp_lang.word2idx[s] for s in en.split(' ')], dtype=torch.long) for en, ma in pairs]

    # Target sentences
    target_tensor = [torch.tensor([targ_lang.word2idx[s] for s in ma.split(' ')], dtype=torch.long) for en, ma in pairs]

    # Calculate max_length of input and output tensor
    # Here, we'll set those to the longest sentence in the dataset
    max_length_inp, max_length_tar = max_length(input_tensor), max_length(target_tensor)

    # Padding the input and output tensor to the maximum length
    input_tensor = pad_sequence(input_tensor, batch_first=True, padding_value=0)
    input_tensor = input_tensor[:, :max_length_inp]

    target_tensor = pad_sequence(target_tensor, batch_first=True, padding_value=0)
    target_tensor = target_tensor[:, :max_length_tar]

    return input_tensor, target_tensor, inp_lang, targ_lang, max_length_inp, max_length_tar


input_tensor, target_tensor, inp_lang, targ_lang, max_length_inp, max_length_targ = load_dataset(pairs, len(pairs))

# One hot tensors -> could move to an embedding layer
oh_input = F.one_hot(input_tensor)
oh_target = F.one_hot(target_tensor)






Num examples: 999


In [312]:
# PARAMETERS AND HYPERPARAMETERS 
INP_VOCAB_SIZE = len(inp_lang.word2idx)
TAR_VOCAB_SIZE = len(targ_lang.word2idx)
UNITS = 128 # #of units in the GRU - both encoder and decoder use this setting
INP_SEQ_LEN = max_length_inp
TAR_SEQ_LEN = max_length_targ
BATCH_SIZE = 64

print(f"INPUT VOCAB SIZE:{INP_VOCAB_SIZE}, TARGET VOCAB SIZE: {TAR_VOCAB_SIZE}, UNITS: {UNITS},\
        INPUT SEQLEN:{INP_SEQ_LEN}, TARGET SEQ LENGTH: {TAR_SEQ_LEN}")


INPUT VOCAB SIZE:14, TARGET VOCAB SIZE: 33, UNITS: 128,        INPUT SEQLEN:5, TARGET SEQ LENGTH: 6


In [313]:


# Wrap the tensors in a TensorDataset and create the loader
dataset = TensorDataset(oh_input, oh_target)

# Split the dataset into train and eval subsets
train_size = int(0.9 * len(dataset))  # 90% for training
eval_size = len(dataset) - train_size  # 10% for evaluation
train_dataset, eval_dataset = torch.utils.data.random_split(dataset, [train_size, eval_size])

# Create DataLoaders for training and evaluation
dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
evalloader = DataLoader(eval_dataset, batch_size=BATCH_SIZE, shuffle=False)


# Encoder class 
class Encoder(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size
        # Encoder GRU
        self.encoder_gru = nn.GRU(input_size, hidden_size, batch_first=True)    

    def forward(self, input_seq):
        # input_seq: [batch, seq, input_vocab]
        # outout : [batch, seq, units]
        # hidden: [1, seq, units] or [numlayers, seq, units]
        encoder_hidden = self.initialize_hidden(input_seq.shape[0])
        
        # Encoder
        encoder_output, encoder_hidden = self.encoder_gru(input_seq, encoder_hidden)
        return encoder_output, encoder_hidden
    
    def initialize_hidden(self, batchlen):
        return torch.zeros(1, batchlen, self.hidden_size)

# Decoder class
class Decoder(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(Decoder, self).__init__()
        self.hidden_size = hidden_size
        # Decoder GRU
        self.decoder_gru = nn.GRU(input_size, hidden_size, batch_first=True)
        # Linear layer to map hidden state to output
        self.fc = nn.Linear(hidden_size, output_size) 

    def forward(self, input_seq, hidden):
        # input_seq: [batch, seq (1), input_vocab]
        # outout : [batch, seq, units]
        # hidden: [1, seq, units] or [numlayers, seq, units]

        dec_output, dec_hidden = self.decoder_gru(input_seq, hidden)
        # dec_output shape: 1, seq_len, hidden_size 
        
        dec_output = self.fc(dec_output)
        # output shape: 1, seq_len, output_size
        return dec_output, dec_hidden

    
# The Seq2Seq model to connect the encoder and decoder
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    
    def forward(self, source, target, batchsize, training):
        # The sequence we produce will be stored in this tensor
        outputs = torch.zeros(batchsize, TAR_SEQ_LEN , TAR_VOCAB_SIZE)
        #print("outputs storage shape:", outputs.shape)
        
        # Call the encoder
        enc_output, enc_hidden = self.encoder(source)
        # Grab the last hidden layer for the decoder
        # enc_output shape: [batch, seq, units]
        # hidden shape: [1, seq, units] or [numlayers, seq, units] 
        
        # Grab the last time slice of the hidden units
        hidden = enc_hidden
        # hidden shape: [1, seq, units]
        
        # Grab the first input to the Decoder which will be the <start> token
        output = seed_oh = F.one_hot(torch.tensor(targ_lang.word2idx['<s>']), TAR_VOCAB_SIZE).unsqueeze(0).float()
        output = output.unsqueeze(0)
        ##print("output shape A:", output.shape)
        output = output.repeat(batchsize,1,1)
        ##print("output shape B:", output.shape)
        ##print("target shape:", target.shape)
        
        # Generate the full sequence -> loop could be improved to a while loop
        for t in range(TAR_SEQ_LEN):
            #if(training):
            #    #Teacher forcing only if during training
            #    x = target[:,t,:].unsqueeze(1)
            #    #print("Train x shape:", x.shape)
            #else:
            x = output
            ##print("x shape in loop:", x.shape)
            ##print("hidden shape in loop:", hidden.shape)

            # Use previous hidden, cell as context from encoder at start
            output, hidden = self.decoder(x, hidden)
            ##print("dec output shape:", output.shape)
            ##print("dec hidden shape:", hidden.shape)
            # output shape: 1, seq_len, output_size
            
            # Store next output prediction
            outputs[:,t,:] = output.squeeze()           
            # outputs shape: seq_len, output_size
            ##print("output[] shape:", outputs.shape)
            
        return outputs

    
# Build the model and print out some stats    
encoder = Encoder(input_size=INP_VOCAB_SIZE, hidden_size=UNITS)
decoder = Decoder(input_size=TAR_VOCAB_SIZE, hidden_size=UNITS, output_size=TAR_VOCAB_SIZE)
model = Seq2Seq(encoder, decoder)

print(sum(p.numel() for p in encoder.parameters())/1e6, 'M parameters')
print(sum(p.numel() for p in decoder.parameters())/1e6, 'M parameters')


optimizer = optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss()

0.055296 M parameters
0.066849 M parameters


In [314]:
# Training - clean

# for param_group in optimizer.param_groups:
#     param_group['lr'] = 0.01

bsize = 1

EPOCHS = 500
model.train()

for e in range(EPOCHS):    
    loss =0 
    for index, (inp, targ) in enumerate(dataloader):

        bsize = inp.shape[0]
        
        output = model(inp.float(), targ.float(), bsize, 1)
        
        #print("model output:", output.shape)
        output = output.squeeze(1)
        targ = torch.argmax(targ, dim=2).squeeze(0)
        
        outr = output.view(-1,TAR_VOCAB_SIZE)
        tarr = targ.view(-1)
        loss += criterion(outr,tarr)

    if(e%(EPOCHS/10)==0):
        print("Loss:", loss.item())

    # Training time ...
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    

print("Done!")

Loss: 52.58573532104492
Loss: 27.98406982421875
Loss: 22.918245315551758
Loss: 20.16581916809082
Loss: 18.225666046142578
Loss: 16.084938049316406
Loss: 15.08467960357666
Loss: 14.502851486206055
Loss: 13.335037231445312
Loss: 12.06436824798584
Done!


In [315]:

count = 0

for index, (inp, targ) in enumerate(evalloader):

    bsize = inp.shape[0]
    output = model(inp.float(), targ.float(), bsize, 0)

    #print("model output:", output.shape)
    output = output.squeeze(1)
    targ = torch.argmax(targ, dim=2).squeeze(0)

    outr = output.view(-1,TAR_VOCAB_SIZE)
    tarr = targ.view(-1)
    loss += criterion(outr,tarr)

    omax = torch.argmax(output, dim=2)
    imax = torch.argmax(inp, dim=2)

    outr = output.reshape(-1,TAR_VOCAB_SIZE)
    tarr = targ.view(-1)

    for b in range(bsize):

        print("Input: ", end="")
        for i in range(INP_SEQ_LEN):
            #print(f"b:{b} s:{s}")
            print(inp_lang.idx2word[imax[b][i].item()] + " ", end='')

        print("Pred: ", end="")
        for t in range(TAR_SEQ_LEN):
            #print(f"b:{b} t:{t}")
            #print("omax entry:", omax[b][t].item())
            print(targ_lang.idx2word[omax[b][t].item()] + " ", end='')

        print("Target: ", end="")
        for s in range(TAR_SEQ_LEN):
            #print(f"b:{b} s:{s}")
            print(targ_lang.idx2word[targ[b][s].item()] + " ", end='')

        print()


Input: <s> 9 6 4 <end> Pred: <start> nine hundred twenty seven <e> Target: <start> nine hundred sixty four <e> 
Input: <s> 2 1 7 <end> Pred: <start> nine hundred four <e> <pad> Target: <start> two hundred seventeen <e> <pad> 
Input: <s> 5 4 2 <end> Pred: <start> four hundred forty two <e> Target: <start> five hundred forty two <e> 
Input: <s> 9 1 9 <end> Pred: <start> nine hundred eleven <e> <pad> Target: <start> nine hundred nineteen <e> <pad> 
Input: <s> 4 5 4 <end> Pred: <start> four hundred forty six <e> Target: <start> four hundred fifty four <e> 
Input: <s> 5 3 <end> <pad> Pred: <start> eighty seven <e> <pad> <pad> Target: <start> fifty three <e> <pad> <pad> 
Input: <s> 7 3 4 <end> Pred: <start> six hundred twenty seven <e> Target: <start> seven hundred thirty four <e> 
Input: <s> 4 1 2 <end> Pred: <start> eight hundred six <e> <pad> Target: <start> four hundred twelve <e> <pad> 
Input: <s> 6 3 4 <end> Pred: <start> six hundred twenty seven <e> Target: <start> six hundred thirty 