In [1]:
import numpy as np
import math 
import matplotlib.pyplot as plt


In [2]:

np.random.seed(70)
alphabet=['a','b','c','d']
def generate_dataset(num_sequences=100):
    """
    Generates a number of sequences as our dataset.
    Args:
     `num_sequences`: the number of sequences to be generated.
    Returns a list of sequences.
    """
    samples = []
    for _ in range(num_sequences): 
        num_tokens = 4
        start_letter=np.random.randint(0,len(alphabet)-1)
        sample=[]
        for i in range(num_tokens):
            sample.append(alphabet[start_letter])
            sample.append(alphabet[start_letter+1])
        sample.append('EOS')
        samples.append(sample)
    return samples


sequences = generate_dataset()

print('A single sample from the generated dataset:')
print(sequences[0])

A single sample from the generated dataset:
['c', 'd', 'c', 'd', 'c', 'd', 'c', 'd', 'EOS']


In [3]:
from collections import defaultdict

def sequences_to_dicts(sequences):
    """
    Creates word_to_idx and idx_to_word dictionaries for a list of sequences.
    """
    flatten = lambda l: [item for sublist in l for item in sublist]
    
    # Flatten the dataset
    all_words = flatten(sequences)
    
    # Count number of word occurences
    word_count = defaultdict(int)
    for word in flatten(sequences):
        word_count[word] += 1

    # Sort by frequency
    word_count = sorted(list(word_count.items()), key=lambda l: -l[1])

    # Create a list of all unique words
    unique_words = [item[0] for item in word_count]
    
    # Add UNK token to list of words
    unique_words.append('UNK')

    # Count number of sequences and number of unique words
    num_sentences, vocab_size = len(sequences), len(unique_words)

    # Create dictionaries so that we can go from word to index and back
    # If a word is not in our vocabulary, we assign it to token 'UNK'
    word_to_idx = defaultdict(lambda: vocab_size-1)
    idx_to_word = defaultdict(lambda: 'UNK')

    # Fill dictionaries
    for idx, word in enumerate(unique_words):
        # YOUR CODE HERE!
        word_to_idx[word] =idx
        idx_to_word[idx] =word

    return word_to_idx, idx_to_word, num_sentences, vocab_size


word_to_idx, idx_to_word, num_sequences, vocab_size = sequences_to_dicts(sequences)

print(f'We have {num_sequences} sentences and {len(word_to_idx)} unique tokens in our dataset (including UNK).\n')
print('The index of \'b\' is', word_to_idx['b'])
print(f'The word corresponding to index 1 is \'{idx_to_word[1]}\'')

We have 100 sentences and 6 unique tokens in our dataset (including UNK).

The index of 'b' is 0
The word corresponding to index 1 is 'c'


In [4]:
word_to_idx['b']

0

In [5]:
from torch.utils import data

class Dataset(data.Dataset):
    def __init__(self, inputs, targets):
        self.inputs = inputs
        self.targets = targets

    def __len__(self):
        # Return the size of the dataset
        return len(self.targets)

    def __getitem__(self, index):
        # Retrieve inputs and targets at the given index
        X = self.inputs[index]
        y = self.targets[index]

        return X, y

    
def create_datasets(sequences, dataset_class, p_train=0.8, p_val=0.1, p_test=0.1):
    # Define partition sizes
    num_train = int(len(sequences)*p_train)
    num_val = int(len(sequences)*p_val)
    num_test = int(len(sequences)*p_test)

    # Split sequences into partitions
    sequences_train = sequences[:num_train]
    sequences_val = sequences[num_train:num_train+num_val]
    sequences_test = sequences[-num_test:]

    def get_inputs_targets_from_sequences(sequences):
        # Define empty lists
        inputs, targets = [], []
        
        # Append inputs and targets s.t. both lists contain L-1 words of a sentence of length L
        # but targets are shifted right by one so that we can predict the next word
        for sequence in sequences:
            inputs.append(sequence[:-1])
            targets.append(sequence[1:])
            
        return inputs, targets

    # Get inputs and targets for each partition
    inputs_train, targets_train = get_inputs_targets_from_sequences(sequences_train)
    inputs_val, targets_val = get_inputs_targets_from_sequences(sequences_val)
    inputs_test, targets_test = get_inputs_targets_from_sequences(sequences_test)

    # Create datasets
    training_set = dataset_class(inputs_train, targets_train)
    validation_set = dataset_class(inputs_val, targets_val)
    test_set = dataset_class(inputs_test, targets_test)

    return training_set, validation_set, test_set
    

training_set, validation_set, test_set = create_datasets(sequences, Dataset)

print(f'We have {len(training_set)} samples in the training set.')
print(f'We have {len(validation_set)} samples in the validation set.')
print(f'We have {len(test_set)} samples in the test set.')

We have 80 samples in the training set.
We have 10 samples in the validation set.
We have 10 samples in the test set.


In [27]:
def one_hot_encode(idx, vocab_size):
    """
    One-hot encodes a single word given its index and the size of the vocabulary.
    
    Args:
     `idx`: the index of the given word
     `vocab_size`: the size of the vocabulary
    
    Returns a 1-D numpy array of length `vocab_size`.
    """
    # Initialize the encoded array
    one_hot = np.zeros(vocab_size)
    
    # Set the appropriate element to one
    one_hot[idx] = 1.0

    return one_hot


def one_hot_encode_sequence(sequence, vocab_size):
    """
    One-hot encodes a sequence of words given a fixed vocabulary size.
    
    Args:
     `sentence`: a list of words to encode
     `vocab_size`: the size of the vocabulary
     
    Returns a 3-D numpy array of shape (num words, vocab size, 1).
    """
    # Encode each word in the sentence
    encoding = np.array([one_hot_encode(word_to_idx[word], vocab_size) for word in sequence])
    # Reshape encoding s.t. it has shape (num words, vocab size, 1)
    encoding = encoding.reshape(encoding.shape[0], encoding.shape[1], 1)
    
    return encoding


#test_word = one_hot_encode(word_to_idx['a'], vocab_size)
#print(f'Our one-hot encoding of \'a\' has shape {test_word.shape}.')

test_sentence = one_hot_encode_sequence(['a', 'b'], vocab_size)
print(f'Our one-hot encoding of \'a b\' has shape {test_sentence.shape}.')

Our one-hot encoding of 'a b' has shape (2, 6, 1).


In [7]:
test_sentence

array([[[0.],
        [0.],
        [1.],
        [0.],
        [0.],
        [0.]],

       [[1.],
        [0.],
        [0.],
        [0.],
        [0.],
        [0.]]])

In [28]:
import torch
import torch.nn as nn
import torch.nn.functional as F


class encoder(nn.Module):
    def __init__(self,input_dim,hidden_dim,embed_dim,num_layers):
        super(encoder, self).__init__()
        self.input_dim=input_dim
        self.hidden_dim=hidden_dim
        self.embed_dim=embed_dim
        self.num_layers=num_layers
        
        # Embedding layer
        self.embed = nn.Embedding(input_dim, self.embed_dim)
        # GRU layer
        self.gru=nn.GRU(self.embed_dim,self.hidden_dim,self.num_layers)
        
    def forward(self, x):
        # RNN returns output and last hidden state
        embedded_x = self.embed(x)
        # Flatten output for feed-forward layer
        embedded_x = embedded_x.view(x.shape[0]*x.shape[1],x.shape[2],self.embed_dim)
        # Output layer
        output,hidden = self.gru(embedded_x)
        return output,hidden

class decoder(nn.Module):
    def __init__(self,output_dim,hidden_dim,embed_dim,num_layers):
        super(decoder, self).__init__()
        self.output_dim=output_dim
        self.hidden_dim=hidden_dim
        self.embed_dim=embed_dim
        self.num_layers=num_layers
        # Embedding layer
        self.embed = nn.Embedding(output_dim, self.embed_dim)
        # GRU layer
        self.gru=nn.GRU(self.embed_dim,self.hidden_dim,self.num_layers)
        # Output layer
        self.l_out = nn.Linear(self.hidden_dim,output_dim)
        # Softmax
        self.softmax=nn.LogSoftmax(dim=1)
        
    def forward(self, x,hidden):
        # RNN returns output and last hidden state
  
        embedded_x = F.relu(self.embed(x))
        output,hidden = self.gru(embedded_x,hidden)

        # Flatten output for feed-forward layer
#        embedded_x = embedded_x.view(embedded_x.shape[0]*embedded_x.shape[1],embedded_x.shape[2],self.embed_dim)
        # Output layer
        embedded_x=self.l_out(output[0])
        embedded_x=self.softmax(embedded_x)
        return embedded_x,hidden
    
    
class seq2seq(nn.Module):
    def __init__(self,encoder,decoder):
        super(seq2seq, self).__init__()
        self.enc=encoder
        self.dec=decoder
        # Recurrent layer
        
#         self.lstm = nn.LSTM(vocab_size, vocab_size)
#         # Output layer
#         self.l_out = nn.Linear(in_features=vocab_size,
#                             out_features=vocab_size,
#                             bias=False)
        
    def forward(self, x,input_seq):
        # RNN returns output and last hidden state
        
        x, (h, c) = self.lstm(x)
        # Flatten output for feed-forward layer
        x = x.view(-1, self.lstm.hidden_size)
        # Output layer
        x = self.l_out(x)
        return x  
    

    
    
net1 = encoder(6,512,256,1)
net2=decoder(6,512,256,1)
print(net)
shit2shit=seq2seq(net1,net2)

encoder(
  (embed): Embedding(6, 256)
  (gru): GRU(256, 512)
)


In [29]:
import torch.optim as optim
# Hyper-parameters
num_epochs = 200

# Initialize a new network
net = encoder(6,512,256,1)


# Define a loss function and optimizer for this problem
# YOUR CODE HERE!
criterion =nn.CrossEntropyLoss()
optimizer =optim.SGD(net.parameters(), lr=0.2, momentum=0.9)

# Track loss
training_loss, validation_loss = [], []

# For each epoch
for i in range(num_epochs):
    
    # Track loss
    epoch_training_loss = 0
    epoch_validation_loss = 0
    
    net.eval()
        
    # For each sentence in validation set
    for inputs, targets in validation_set:
        
        # One-hot encode input and target sequence
        inputs_one_hot = one_hot_encode_sequence(inputs, vocab_size)
        targets_idx = [word_to_idx[word] for word in targets]
        # Convert input to tensor
        inputs_one_hot = torch.from_numpy(inputs_one_hot).long()
        inputs_one_hot = inputs_one_hot.permute(0, 2, 1)
        
        # Convert target to tensor
        targets_idx = torch.LongTensor(targets_idx)
        
        # Forward pass
        # YOUR CODE HERE!
        outputs =net.forward(inputs_one_hot)
        

In [None]:
import torch.optim as optim
# Hyper-parameters
num_epochs = 200

# Initialize a new network
net = seq2seq()


# Define a loss function and optimizer for this problem
# YOUR CODE HERE!
criterion =nn.CrossEntropyLoss()
optimizer =optim.SGD(net.parameters(), lr=0.2, momentum=0.9)

# Track loss
training_loss, validation_loss = [], []

# For each epoch
for i in range(num_epochs):
    
    # Track loss
    epoch_training_loss = 0
    epoch_validation_loss = 0
    
    net.eval()
        
    # For each sentence in validation set
    for inputs, targets in validation_set:
        
        # One-hot encode input and target sequence
        inputs_one_hot = one_hot_encode_sequence(inputs, vocab_size)
        targets_idx = [word_to_idx[word] for word in targets]
        
        # Convert input to tensor
        inputs_one_hot = torch.Tensor(inputs_one_hot)
        inputs_one_hot = inputs_one_hot.permute(0, 2, 1)
        
        # Convert target to tensor
        targets_idx = torch.LongTensor(targets_idx)
        
        # Forward pass
        # YOUR CODE HERE!
        outputs =net.forward(inputs_one_hot)
        
        # Compute loss
        # YOUR CODE HERE!
      
        loss =criterion(outputs, targets_idx)
        
        # Update loss
        epoch_validation_loss += loss.detach().numpy()
    
    net.train()
    
    # For each sentence in training set
    for inputs, targets in training_set:
        
        # One-hot encode input and target sequence
        inputs_one_hot = one_hot_encode_sequence(inputs, vocab_size)
        targets_idx = [word_to_idx[word] for word in targets]
        
        # Convert input to tensor
        inputs_one_hot = torch.Tensor(inputs_one_hot)
        inputs_one_hot = inputs_one_hot.permute(0, 2, 1)
        
        # Convert target to tensor
        targets_idx = torch.LongTensor(targets_idx)
        
        # Forward pass
        # YOUR CODE HERE!
        outputs = net.forward(inputs_one_hot)
        
        # Compute loss
        # YOUR CODE HERE!
        loss =criterion(outputs, targets_idx)
        
        
        # Backward pass
        # YOUR CODE HERE!
        
        # zero grad, backward, step...
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        # Update loss
        epoch_training_loss += loss.detach().numpy()
        net.eval()
    # Save loss for plot
    training_loss.append(epoch_training_loss/len(training_set))
    validation_loss.append(epoch_validation_loss/len(validation_set))

    # Print loss every 5 epochs
    if i % 5 == 0:
        print(f'Epoch {i}, training loss: {training_loss[-1]}, validation loss: {validation_loss[-1]}')

        
# Get first sentence in test set
inputs, targets = test_set[1]

# One-hot encode input and target sequence
inputs_one_hot = one_hot_encode_sequence(inputs, vocab_size)
targets_idx = [word_to_idx[word] for word in targets]

# Convert input to tensor
inputs_one_hot = torch.Tensor(inputs_one_hot)
inputs_one_hot = inputs_one_hot.permute(0, 2, 1)

# Convert target to tensor
targets_idx = torch.LongTensor(targets_idx)

# Forward pass
outputs = net.forward(inputs_one_hot).data.numpy()

print('\nInput sequence:')
print(inputs)

print('\nTarget sequence:')
print(targets)

print('\nPredicted sequence:')
print([idx_to_word[np.argmax(output)] for output in outputs])

# Plot training and validation loss
epoch = np.arange(len(training_loss))
plt.figure()
plt.plot(epoch, training_loss, 'r', label='Training loss',)
plt.plot(epoch, validation_loss, 'b', label='Validation loss')
plt.legend()
plt.xlabel('Epoch'), plt.ylabel('NLL')
plt.show()

In [None]:
[np.argmax(output) for output in outputs]