# Class 4: Neural Networks and Deep Learning

**Topics**

* RNNs
* LSTMs

**Libraries**: Pytorch


**Reading**

* [Goodfellow, Bengio, & Courville: Introduction](https://www.deeplearningbook.org/contents/intro.html)
* [Jurafsky & Martin Chapter 6: Neural Networks](https://web.stanford.edu/~jurafsky/slp3/6.pdf)
* [Jurafsky & Martin Chapter 13: RNNs and LSTMs](https://web.stanford.edu/~jurafsky/slp3/13.pdf)



## 1. RNNs

Adapted from [this](https://docs.pytorch.org/tutorials/intermediate/char_rnn_classification_tutorial.html) Pytorch tutorial


### Classifying Names with a Character-Level RNN


We will be building and training a basic character-level Recurrent
Neural Network (RNN) to classify words.

A character-level RNN reads words as a series of characters -outputting
a prediction and \"hidden state\" at each step, feeding its previous
hidden state into each next step. We take the final prediction to be the
output, i.e. which class the word belongs to.

Specifically, we\'ll train on a few thousand surnames from 18 languages
of origin, and predict which language a name is from based on the
spelling.


#### Preparing Torch


Set up torch to default to the right device use GPU acceleration
depending on your hardware (CPU or CUDA).


In [None]:
import torch

# Check if CUDA is available
device = torch.device('cpu')
if torch.cuda.is_available():
    device = torch.device('cuda')

torch.set_default_device(device)
print(f"Using device = {torch.get_default_device()}")

#### Preparing the Data

Download the data from
[here](https://download.pytorch.org/tutorial/data.zip) and extract it to
the current directory.

Included in the `data/names` directory are 18 text files named as
`[Language].txt`. Each file contains a bunch of names, one name per
line, mostly romanized (but we still need to convert from Unicode to
ASCII).

The first step is to define and clean our data. Initially, we need to
convert Unicode to plain ASCII to limit the RNN input layers. This is
accomplished by converting Unicode strings to ASCII and allowing only a
small set of allowed characters.


In [None]:
!unzip data.zip

In [None]:
import string
import unicodedata

# We can use "_" to represent an out-of-vocabulary character, that is, any character we are not handling in our model
allowed_characters = string.ascii_letters + " .,;'" + "_"
n_letters = len(allowed_characters)

# Turn a Unicode string to plain ASCII, thanks to https://stackoverflow.com/a/518232/2809427
def unicodeToAscii(s):
    return ''.join(
        c for c in unicodedata.normalize('NFD', s)
        if unicodedata.category(c) != 'Mn'
        and c in allowed_characters
    )

Here\'s an example of converting a unicode alphabet name to plain ASCII.
This simplifies the input layer


In [None]:
print (f"converting 'Ślusàrski' to {unicodeToAscii('Ślusàrski')}")

#### Turning Names into Tensors


Now that we have all the names organized, we need to turn them into
Tensors to make use of them.

To represent a single letter, we use a \"one-hot vector\" of size
`<1 x n_letters>`. A one-hot vector is filled with 0s except for a 1 at
index of the current letter, e.g. `"b" = <0 1 0 0 0 ...>`.

To make a word we join a bunch of those into a 2D matrix
`<line_length x 1 x n_letters>`.

That extra 1 dimension is because PyTorch assumes everything is in
batches - we\'re just using a batch size of 1 here.


In [None]:
# Find letter index from all_letters, e.g. "a" = 0
def letterToIndex(letter):
    # return our out-of-vocabulary character if we encounter a letter unknown to our model
    if letter not in allowed_characters:
        return allowed_characters.find("_")
    else:
        return allowed_characters.find(letter)

# Turn a line into a <line_length x 1 x n_letters>,
# or an array of one-hot letter vectors
def lineToTensor(line):
    tensor = torch.zeros(len(line), 1, n_letters)
    for li, letter in enumerate(line):
        tensor[li][0][letterToIndex(letter)] = 1
    return tensor

Here are some examples of how to use `lineToTensor()` for a single and
multiple character string.


In [None]:
print (f"The letter 'a' becomes {lineToTensor('a')}") #notice that the first position in the tensor = 1
print (f"The name 'Ahn' becomes {lineToTensor('Ahn')}") #notice 'A' sets the 27th index to 1

Next, we need to combine all our examples into a dataset so we can
train, test and validate our models. For this, we will use the [Dataset
and
DataLoader](https://pytorch.org/tutorials/beginner/basics/data_tutorial.html)
classes to hold our dataset. Each Dataset needs to implement three
functions: `__init__`, `__len__`, and `__getitem__`.


In [None]:
from io import open
import glob
import os
import time

import torch
from torch.utils.data import Dataset

class NamesDataset(Dataset):

    def __init__(self, data_dir):
        self.data_dir = data_dir #for provenance of the dataset
        self.load_time = time.localtime #for provenance of the dataset
        labels_set = set() #set of all classes

        self.data = []
        self.data_tensors = []
        self.labels = []
        self.labels_tensors = []

        #read all the ``.txt`` files in the specified directory
        text_files = glob.glob(os.path.join(data_dir, '*.txt'))
        for filename in text_files:
            label = os.path.splitext(os.path.basename(filename))[0]
            labels_set.add(label)
            lines = open(filename, encoding='utf-8').read().strip().split('\n')
            for name in lines:
                self.data.append(name)
                self.data_tensors.append(lineToTensor(name))
                self.labels.append(label)

        #Cache the tensor representation of the labels
        self.labels_uniq = list(labels_set)
        for idx in range(len(self.labels)):
            temp_tensor = torch.tensor([self.labels_uniq.index(self.labels[idx])], dtype=torch.long)
            self.labels_tensors.append(temp_tensor)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        data_item = self.data[idx]
        data_label = self.labels[idx]
        data_tensor = self.data_tensors[idx]
        label_tensor = self.labels_tensors[idx]

        return label_tensor, data_tensor, data_label, data_item

Here we can load our example data into the `NamesDataset`


In [None]:
alldata = NamesDataset("data/names")
print(f"loaded {len(alldata)} items of data")
print(f"example = {alldata[0]}")

Using the dataset object allows us to easily split the data into train and test sets. Here we create a 80/20 split but the `torch.utils.data` has more useful utilities. Here we specify a generator since we need to use the same device as PyTorch defaults to above.


In [None]:
train_set, test_set = torch.utils.data.random_split(alldata, [.85, .15], generator=torch.Generator(device=device).manual_seed(2024))

print(f"train examples = {len(train_set)}, validation examples = {len(test_set)}")

Now we have a basic dataset containing **20074** examples where each
example is a pairing of label and name. We have also split the dataset
into training and testing so we can validate the model that we build.


#### Creating the Network


##### Autograd

The layers containing hidden states and the gradients used for uodates  handled by the "autograd" graph. Autograd in PyTorch is the automatic differentiation engine that powers neural network training. It enables the computation of gradients for any computational graph, which is crucial for optimizing model parameters using algorithms like backpropagation.

##### The CharRNN class

Our CharRNN class implements an RNN with three components. First, we
use the [nn.RNN
implementation](https://pytorch.org/docs/stable/generated/torch.nn.RNN.html).
Next, we define a layer that maps the RNN hidden layers to our output.
And finally, we apply a `softmax` function. Using `nn.RNN` leads to a
significant improvement in performance, such as cuDNN-accelerated
kernels, versus implementing each layer as a `nn.Linear`. It also
simplifies the implementation in `forward()`.


In [None]:
import torch.nn as nn
import torch.nn.functional as F

class CharRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(CharRNN, self).__init__()

        self.rnn = nn.RNN(input_size, hidden_size)
        self.h2o = nn.Linear(hidden_size, output_size)
        self.softmax = nn.LogSoftmax(dim=1)

    def forward(self, line_tensor):
        rnn_out, hidden = self.rnn(line_tensor)
        output = self.h2o(hidden[0])
        output = self.softmax(output)

        return output

We can then create an RNN with 58 input nodes, 128 hidden nodes, and 18
outputs:


In [None]:
n_hidden = 128
rnn = CharRNN(n_letters, n_hidden, len(alldata.labels_uniq))
print(rnn)

After that we can pass our Tensor to the RNN to obtain a predicted
output. Subsequently, we use a helper function, `label_from_output`, to
derive a text label for the class.


In [None]:
def label_from_output(output, output_labels):
    top_n, top_i = output.topk(1)
    label_i = top_i[0].item()
    return output_labels[label_i], label_i

input = lineToTensor('Albert')
output = rnn(input) #this is equivalent to ``output = rnn.forward(input)``
print(output)
print(label_from_output(output, alldata.labels_uniq))

#### Training the Network


Now all it takes to train this network is show it a bunch of examples,
have it make guesses, and tell it if it\'s wrong.

We do this by defining a `train()` function which trains the model on a
given dataset using minibatches. RNNs RNNs are trained similarly to
other networks; therefore, for completeness, we include a batched
training method here. The loop (`for i in batch`) computes the losses
for each of the items in the batch before adjusting the weights. This
operation is repeated until the number of epochs is reached.


In [None]:
import random
import numpy as np

def train(rnn, training_data, n_epoch = 10, n_batch_size = 64, report_every = 50, learning_rate = 0.2, criterion = nn.NLLLoss()):
    """
    Learn on a batch of training_data for a specified number of iterations and reporting thresholds
    """
    # Keep track of losses for plotting
    current_loss = 0
    all_losses = []
    rnn.train()
    optimizer = torch.optim.SGD(rnn.parameters(), lr=learning_rate)

    start = time.time()
    print(f"training on data set with n = {len(training_data)}")

    for iter in range(1, n_epoch + 1):
        rnn.zero_grad() # clear the gradients

        # create some minibatches
        # we cannot use dataloaders because each of our names is a different length
        batches = list(range(len(training_data)))
        random.shuffle(batches)
        batches = np.array_split(batches, len(batches) //n_batch_size )

        for idx, batch in enumerate(batches):
            batch_loss = 0
            for i in batch: #for each example in this batch
                (label_tensor, text_tensor, label, text) = training_data[i]
                output = rnn.forward(text_tensor)
                loss = criterion(output, label_tensor)
                batch_loss += loss

            # optimize parameters
            batch_loss.backward()
            nn.utils.clip_grad_norm_(rnn.parameters(), 3)
            optimizer.step()
            optimizer.zero_grad()

            current_loss += batch_loss.item() / len(batch)

        all_losses.append(current_loss / len(batches) )
        if iter % report_every == 0:
            print(f"{iter} ({iter / n_epoch:.0%}): \t average batch loss = {all_losses[-1]}")
        current_loss = 0

    return all_losses

We can now train a dataset with minibatches for a specified number of
epochs. The number of epochs for this example is reduced to speed up the
build. You can get better results with different parameters.


In [None]:
start = time.time()
all_losses = train(rnn, train_set, n_epoch=27, learning_rate=0.15, report_every=5)
end = time.time()
print(f"training took {end-start}s")

#### Plotting the Results


Plotting the historical loss from `all_losses` shows the network
learning:


In [None]:
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker

plt.figure()
plt.plot(all_losses)
plt.show()

#### Evaluating the Results


To see how well the network performs on different categories, we will
create a confusion matrix, indicating for every actual language (rows)
which language the network guesses (columns). To calculate the confusion
matrix a bunch of samples are run through the network with `evaluate()`,
which is the same as `train()` minus the backprop.


In [None]:
def evaluate(rnn, testing_data, classes):
    confusion = torch.zeros(len(classes), len(classes))

    rnn.eval() #set to eval mode
    with torch.no_grad(): # do not record the gradients during eval phase
        for i in range(len(testing_data)):
            (label_tensor, text_tensor, label, text) = testing_data[i]
            output = rnn(text_tensor)
            guess, guess_i = label_from_output(output, classes)
            label_i = classes.index(label)
            confusion[label_i][guess_i] += 1

    # Normalize by dividing every row by its sum
    for i in range(len(classes)):
        denom = confusion[i].sum()
        if denom > 0:
            confusion[i] = confusion[i] / denom

    # Set up plot
    fig = plt.figure()
    ax = fig.add_subplot(111)
    cax = ax.matshow(confusion.cpu().numpy()) #numpy uses cpu here so we need to use a cpu version
    fig.colorbar(cax)

    # Set up axes
    ax.set_xticks(np.arange(len(classes)), labels=classes, rotation=90)
    ax.set_yticks(np.arange(len(classes)), labels=classes)

    # Force label at every tick
    ax.xaxis.set_major_locator(ticker.MultipleLocator(1))
    ax.yaxis.set_major_locator(ticker.MultipleLocator(1))

    # sphinx_gallery_thumbnail_number = 2
    plt.show()



evaluate(rnn, test_set, classes=alldata.labels_uniq)

You can pick out bright spots off the main axis that show which
languages it guesses incorrectly, e.g. Chinese for Korean, and Spanish
for Italian. It seems to do very well with Greek, and very poorly with
English (perhaps because of overlap with other languages).


## 2. LSTMs


Adapted from [this](https://docs.pytorch.org/tutorials/beginner/nlp/sequence_models_tutorial.html) Pytorch tutorial


As described in the slides and in Jurafsky & Martin Chapter 13,  and LSTM maintains a *hidden state* $h_t$ for each element in a sequence, which in principle can
contain information from arbitrary points earlier in the sequence. We
can use the hidden state to predict words in a language model,
part-of-speech tags, and a myriad of other things.

### LSTMs in Pytorch


Before getting to the example, note a few things. Pytorch\'s LSTM
expects all of its inputs to be 3D tensors. The semantics of the axes of
these tensors is important. The first axis is the sequence itself, the
second indexes instances in the mini-batch, and the third indexes
elements of the input. We haven\'t discussed mini-batching, so let\'s
just ignore that and assume we will always have just 1 dimension on the
second axis. If we want to run the sequence model over the sentence
\"The cow jumped\", our input should look like

$$\begin{aligned}
\begin{bmatrix}
\overbrace{q_\text{The}}^\text{row vector} \\
q_\text{cow} \\
q_\text{jumped}
\end{bmatrix}
\end{aligned}$$

Except remember there is an additional 2nd dimension with size 1.

In addition, you could go through the sequence one at a time, in which
case the 1st axis will have size 1 also.

Let\'s see a quick example.


In [None]:

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

torch.manual_seed(1)

In [None]:
lstm = nn.LSTM(3, 3)  # Input dim is 3, output dim is 3
inputs = [torch.randn(1, 3) for _ in range(5)]  # make a sequence of length 5

# initialize the hidden state.
hidden = (torch.randn(1, 1, 3),
          torch.randn(1, 1, 3))
for i in inputs:
    # Step through the sequence one element at a time.
    # after each step, hidden contains the hidden state.
    out, hidden = lstm(i.view(1, 1, -1), hidden)

# alternatively, we can do the entire sequence all at once.
# the first value returned by LSTM is all of the hidden states throughout
# the sequence. the second is just the most recent hidden state
# (compare the last slice of "out" with "hidden" below, they are the same)
# The reason for this is that:
# "out" will give you access to all hidden states in the sequence
# "hidden" will allow you to continue the sequence and backpropagate,
# by passing it as an argument  to the lstm at a later time
# Add the extra 2nd dimension
inputs = torch.cat(inputs).view(len(inputs), 1, -1)
hidden = (torch.randn(1, 1, 3), torch.randn(1, 1, 3))  # clean out hidden state
out, hidden = lstm(inputs, hidden)
print(out)
print(hidden)

#### An LSTM for Part-of-Speech Tagging

In this example, we make use of embeddings. If you are unfamiliar with embeddings, you can read
up about them
[here](https://pytorch.org/tutorials/beginner/nlp/word_embeddings_tutorial.html).

We'll be illustrating the power of LSTMs on a classic sequence tagging task, part-of-speech tagging. This task involves taking a sequence of words and assigning each word a part of speech such as NOUN, VERB, or DET. The lattermost tag might be unfamiliar -- DET is the tag for Determiner, i.e., articles such as *the* and *a*


The model is structured as follows: let our input sentence be $w_1, \dots, w_M$,
where $w_i \in V$, our vocab. Also, let $T$ be our tag set, and $y_i$
the tag of word $w_i$. Denote our prediction of the tag of word $w_i$ by
$\hat{y}_i$.

This is a structure prediction, model, where our output is a sequence
$\hat{y}_1, \dots, \hat{y}_M$, where $\hat{y}_i \in T$.

To do the prediction, pass an LSTM over the sentence. Denote the hidden
state at timestep $i$ as $h_i$. Also, assign each tag a unique index
(like how we had word\_to\_ix in the word embeddings section). Then our
prediction rule for $\hat{y}_i$ is

$$\hat{y}_i = \text{argmax}_j \  (\log \text{Softmax}(Ah_i + b))_j$$

That is, take the log softmax of the affine map of the hidden state, and
the predicted tag is the tag that has the maximum value in this vector.
Note this implies immediately that the dimensionality of the target
space of $A$ is $|T|$.

Prepare data:


In [None]:
def prepare_sequence(seq, to_ix):
    idxs = [to_ix[w] for w in seq]
    return torch.tensor(idxs, dtype=torch.long)


training_data = [
    # Tags are: DET - determiner; NN - noun; V - verb
    # For example, the word "The" is a determiner
    ("The dog ate the apple".split(), ["DET", "NN", "V", "DET", "NN"]),
    ("Everybody read that book".split(), ["NN", "V", "DET", "NN"])
]
word_to_ix = {}
# For each words-list (sentence) and tags-list in each tuple of training_data
for sent, tags in training_data:
    for word in sent:
        if word not in word_to_ix:  # word has not been assigned an index yet
            word_to_ix[word] = len(word_to_ix)  # Assign each word with a unique index
print(word_to_ix)
tag_to_ix = {"DET": 0, "NN": 1, "V": 2}  # Assign each tag with a unique index

# These will usually be more like 32 or 64 dimensional.
# We will keep them small, so we can see how the weights change as we train.
EMBEDDING_DIM = 6
HIDDEN_DIM = 6

Create class LSTMTAgger

In [None]:
class LSTMTagger(nn.Module):

    def __init__(self, embedding_dim, hidden_dim, vocab_size, tagset_size):
        super(LSTMTagger, self).__init__()
        self.hidden_dim = hidden_dim

        self.word_embeddings = nn.Embedding(vocab_size, embedding_dim)

        # The LSTM takes word embeddings as inputs, and outputs hidden states
        # with dimensionality hidden_dim.
        self.lstm = nn.LSTM(embedding_dim, hidden_dim)

        # The linear layer that maps from hidden state space to tag space
        self.hidden2tag = nn.Linear(hidden_dim, tagset_size)

    def forward(self, sentence):
        embeds = self.word_embeddings(sentence)
        lstm_out, _ = self.lstm(embeds.view(len(sentence), 1, -1))
        tag_space = self.hidden2tag(lstm_out.view(len(sentence), -1))
        tag_scores = F.log_softmax(tag_space, dim=1)
        return tag_scores

Train the tagger

In [None]:
model = LSTMTagger(EMBEDDING_DIM, HIDDEN_DIM, len(word_to_ix), len(tag_to_ix))
loss_function = nn.NLLLoss()
optimizer = optim.SGD(model.parameters(), lr=0.1)

# See what the scores are before training
# Note that element i,j of the output is the score for tag j for word i.
# Here we don't need to train, so the code is wrapped in torch.no_grad()
with torch.no_grad():
    inputs = prepare_sequence(training_data[0][0], word_to_ix)
    tag_scores = model(inputs)
    print(tag_scores)

for epoch in range(300):  # again, normally you would NOT do 300 epochs, it is toy data
    for sentence, tags in training_data:
        # Step 1. Remember that Pytorch accumulates gradients.
        # We need to clear them out before each instance
        model.zero_grad()

        # Step 2. Get our inputs ready for the network, that is, turn them into
        # Tensors of word indices.
        sentence_in = prepare_sequence(sentence, word_to_ix)
        targets = prepare_sequence(tags, tag_to_ix)

        # Step 3. Run our forward pass.
        tag_scores = model(sentence_in)

        # Step 4. Compute the loss, gradients, and update the parameters by
        #  calling optimizer.step()
        loss = loss_function(tag_scores, targets)
        loss.backward()
        optimizer.step()



#### Examine model predictions for "The dog ate the apple"

In [None]:
test_example = training_data[0][0]
test_example

Let's remind ourselves of the indices for each POS tag

In [None]:
tag_to_ix

Run the model over the sequence

In [None]:
scores = []
with torch.no_grad():
    inputs = prepare_sequence(training_data[0][0], word_to_ix)
    tag_scores = model(inputs)
    scores = tag_scores

In the output prediction, i,j corresponds to score for tag j for word i.

In [None]:
scores

Let's get the index of the max of each prediction and get the POS tag corresponding to the index.

In [None]:
# Get the index of the max score for each word (row)
predicted_indices = torch.argmax(scores, dim=1)

# Create a mapping from index to tag for easy lookup
ix_to_tag = {v: k for k, v in tag_to_ix.items()}

# Get the predicted tags
predicted_tags = [ix_to_tag[idx.item()] for idx in predicted_indices]

print("Predicted tags:", list(zip(test_example,predicted_tags)))

Correct!