[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://github.com/ZiyangS/AI_5010_4010_Deep_Learning/blob/main/RNN.ipynb)

# Recurrent Neural Network (RNN)
Recurrent Neural Networks (RNNs) are designed to capture the sequential structure in data. Unlike feed-forward networks—which treat each input as independent—RNNs model dependencies across time. This is important for tasks like language modeling, where predicting the next word depends on the words that came before it.

## Vanilla RNN

The input $x$ will be a sequence of words, and each $x_t$ is a single word. Because matrix multiplication requires vector inputs, we cannot directly use a word index (such as 36) as the input. Instead, we represent each word as a one-hot vector whose length equals the vocabulary size. For example, a word with index 36 would be represented as a vector where the 36-th position is 1 and all other positions are 0.


In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

torch.manual_seed(777)

<torch._C.Generator at 0x1075794d0>

In [2]:
# suppose we have a one-hot encoding for each character in 'hello‘ and
h = [1, 0, 0, 0]
e = [0, 1, 0, 0]
l = [0, 0, 1, 0]
o = [0, 0, 0, 1]

In [3]:
# the sequence length for the word 'hello' is 5
seq_len = 5

# our input shape should be (batch, seq_len, feature_size) when batch_first=True.
# Even if we have only one sequence, PyTorch still requires a batch dimension.
# Thus we reshape "hello" from shape (5, 4) to (1, 5, 4), where batch_size=1.
inputs = torch.Tensor([h, e, l, l, o])
inputs = inputs.view(1, 5, -1)
print(inputs.shape)

torch.Size([1, 5, 4])


In [4]:
# We specify a single RNN cell with the property of input_dim (4) -> output_dim (2)
rnn_cell = nn.RNN(input_size=4, hidden_size=2, batch_first=True)

# Initialize the hidden state: Shape: (batch, num_layers, hidden_size)
hidden = torch.zeros(1, 1, 2)

# Run the RNN: 'out' contains the hidden state at each time step
# 'hidden' is the final hidden state of the sequence.
out, hidden = rnn_cell(inputs, hidden)
print('sequence input size', inputs.size())
print('out size', out.size())
print('final hidden state size', hidden.size())

# 'out' contains the hidden state at every time step, while 'hidden' stores only the last one.
# The final step of 'out' should match 'hidden'.
print('\ncomparing rnn cell output:')
print(out[:, -1, :])
print(hidden[0])

sequence input size torch.Size([1, 5, 4])
out size torch.Size([1, 5, 2])
final hidden state size torch.Size([1, 1, 2])

comparing rnn cell output:
tensor([[-0.7762,  0.8319]], grad_fn=<SelectBackward0>)
tensor([[-0.7762,  0.8319]], grad_fn=<SelectBackward0>)


In [5]:
# create an index to character mapping
idx2char = ['h', 'i', 'e', 'l', 'o']

# Teach hihell -> ihello
x_data = [[0, 1, 0, 2, 3, 3]]    # hihell
x_one_hot = [[[1, 0, 0, 0, 0],   # h 0
              [0, 1, 0, 0, 0],   # i 1
              [1, 0, 0, 0, 0],   # h 0
              [0, 0, 1, 0, 0],   # e 2
              [0, 0, 0, 1, 0],   # l 3
              [0, 0, 0, 1, 0]]]  # l 3

x_one_hot = np.array(x_one_hot)
y_data = np.array([1, 0, 2, 3, 3, 4])  # ihello

# As we have one batch of samples, we will change them to variables only once
inputs = torch.Tensor(x_one_hot)
labels = torch.LongTensor(y_data)

# hyperparameters
seq_len = 6      # |hihell| == 6, equivalent to time step
input_size = 5   # one-hot size
batch_size = 1   # one sentence per batch
num_layers = 1   # one-layer rnn
num_classes = 5  # predicting 5 distinct character
hidden_size = 4  # output from the RNN

In [6]:
class RNN(nn.Module):
    """
    The RNN model will be a RNN followed by a linear layer,
    i.e. a fully-connected layer
    """
    def __init__(self, seq_len, num_classes, input_size, hidden_size, num_layers):
        super().__init__()
        self.seq_len = seq_len
        self.num_layers = num_layers
        self.input_size = input_size
        self.num_classes = num_classes
        self.hidden_size = hidden_size
        self.rnn = nn.RNN(input_size, hidden_size, batch_first=True)
        self.linear = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        # assuming batch_first = True for RNN cells
        batch_size = x.size(0)
        hidden = self._init_hidden(batch_size)
        x = x.view(batch_size, self.seq_len, self.input_size)

        # 'rnn_out' contains the hidden state for every time step. Shape: [batch_size, seq_len, hidden_size].
        # ’linear_out‘ is the outputs using all hidden states for each time step individually. Shape: [batch_size, seq_len, num_classes].
        rnn_out, _ = self.rnn(x, hidden)
        linear_out = self.linear(rnn_out.view(-1, hidden_size))
        return linear_out

    def _init_hidden(self, batch_size):
        """
        Initialize hidden cell states, assuming batch_first = True for RNN cells
        """
        return torch.zeros(batch_size, self.num_layers, self.hidden_size)


In [7]:
# Set loss, optimizer and the RNN model
rnn = RNN(seq_len, num_classes, input_size, hidden_size, num_layers)
print('network architecture:\n', rnn)

network architecture:
 RNN(
  (rnn): RNN(5, 4, batch_first=True)
  (linear): Linear(in_features=4, out_features=5, bias=True)
)


In [8]:
# train the model
num_epochs = 15
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(rnn.parameters(), lr=0.1)
for epoch in range(1, num_epochs + 1):
    optimizer.zero_grad()
    outputs = rnn(inputs)
    loss = criterion(outputs, labels)
    loss.backward()
    optimizer.step()

    # check the current predicted string, max gives the maximum value and its corresponding index
    _, idx = outputs.max(dim = 1)
    idx = idx.detach().numpy()
    result_str = [idx2char[c] for c in idx]
    print('epoch: {}, loss: {:1.3f}'.format(epoch, loss.item()))
    print('Predicted string: ', ''.join(result_str))

epoch: 1, loss: 1.658
Predicted string:  llllol
epoch: 2, loss: 1.500
Predicted string:  llllll
epoch: 3, loss: 1.387
Predicted string:  llllll
epoch: 4, loss: 1.260
Predicted string:  ililll
epoch: 5, loss: 1.152
Predicted string:  ililll
epoch: 6, loss: 1.047
Predicted string:  ililll
epoch: 7, loss: 0.926
Predicted string:  ililll
epoch: 8, loss: 0.808
Predicted string:  ililll
epoch: 9, loss: 0.698
Predicted string:  ehello
epoch: 10, loss: 0.601
Predicted string:  ehello
epoch: 11, loss: 0.522
Predicted string:  ehello
epoch: 12, loss: 0.455
Predicted string:  ehello
epoch: 13, loss: 0.397
Predicted string:  ehello
epoch: 14, loss: 0.348
Predicted string:  ehello
epoch: 15, loss: 0.312
Predicted string:  ihello


## LSTM

The following example uses an LSTM to generate part-of-speech (POS) tags. Its structure is similar to the RNN from the previous section, but here we add an embedding layer before the LSTM. Instead of representing each word with a one-hot vector—which is high-dimensional and ignores relationships between words—the embedding layer maps each word index to a dense vector that captures semantic similarity within the corpus.

In [9]:
# These will usually be more like 32 or 64 dimensional.
# We will keep them small for this toy example
EMBEDDING_SIZE = 6
HIDDEN_SIZE = 6

training_data = [
    ("The dog ate the apple".split(), ["DET", "NN", "V", "DET", "NN"]),
    ("Everybody read that book".split(), ["NN", "V", "DET", "NN"])
]

idx_to_tag = ['DET', 'NN', 'V']
tag_to_idx = {'DET': 0, 'NN': 1, 'V': 2}

word_to_idx = {}
for sent, tags in training_data:
    for word in sent:
        if word not in word_to_idx:
            word_to_idx[word] = len(word_to_idx)

print(word_to_idx)

{'The': 0, 'dog': 1, 'ate': 2, 'the': 3, 'apple': 4, 'Everybody': 5, 'read': 6, 'that': 7, 'book': 8}


In [10]:
def prepare_sequence(seq, to_idx):
    """Convert sentence/sequence to torch Tensors"""
    idxs = [to_idx[w] for w in seq]
    return torch.LongTensor(idxs)

seq = training_data[0][0]
inputs = prepare_sequence(seq, word_to_idx)

print("Original sequence: ", seq)
print("Indexed tensor:    ", inputs.tolist())

Original sequence:  ['The', 'dog', 'ate', 'the', 'apple']
Indexed tensor:     [0, 1, 2, 3, 4]


In [11]:
class LSTMTagger(nn.Module):
    def __init__(self, embedding_size, hidden_size, vocab_size, tagset_size):
        super().__init__()
        self.embedding_size = embedding_size
        self.hidden_size = hidden_size
        self.vocab_size = vocab_size
        self.tagset_size = tagset_size

        self.embedding = nn.Embedding(vocab_size, embedding_size)
        self.lstm = nn.LSTM(embedding_size, hidden_size)
        self.hidden2tag = nn.Linear(hidden_size, tagset_size)

    def forward(self, x):
        embed = self.embedding(x)
        hidden = self._init_hidden()

        # Reshape to (seq_len, batch_size=1, embedding_size) since this example uses batch size 1.
        lstm_out, lstm_hidden = self.lstm(embed.view(len(x), 1, -1), hidden)
        # Apply the linear layer to each time step to obtain tag scores.
        output = self.hidden2tag(lstm_out.view(len(x), -1))

        return output

    def _init_hidden(self):
        # the dimension semantics are [num_layers, batch_size, hidden_size]
        return (torch.rand(1, 1, self.hidden_size),
                torch.rand(1, 1, self.hidden_size))


In [12]:
lstm = LSTMTagger(EMBEDDING_SIZE, HIDDEN_SIZE, len(word_to_idx), len(tag_to_idx))
print('network architecture:\n', lstm)

network architecture:
 LSTMTagger(
  (embedding): Embedding(9, 6)
  (lstm): LSTM(6, 6)
  (hidden2tag): Linear(in_features=6, out_features=3, bias=True)
)


In [13]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(lstm.parameters(), lr=0.1)

epochs = 300
for epoch in range(epochs):
    for sentence, tags in training_data:
        lstm.zero_grad()

        sentence = prepare_sequence(sentence, word_to_idx)
        target = prepare_sequence(tags, tag_to_idx)

        output = lstm(sentence)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()

In [14]:
inputs = prepare_sequence(training_data[0][0], word_to_idx)
tag_scores = lstm(inputs)

# validating that the sentence "the dog ate the apple".
# the correct tag should be DET NOUN VERB DET NOUN
print('expected target: ', training_data[0][1])

tag_scores = tag_scores.detach().numpy()
tag = [idx_to_tag[idx] for idx in np.argmax(tag_scores, axis = 1)]
print('generated target: ', tag)

expected target:  ['DET', 'NN', 'V', 'DET', 'NN']
generated target:  ['DET', 'NN', 'V', 'DET', 'NN']
