# Learn to Write Like Shakespeare

In this assignment we will implement a simple recurrent network with one hidden layer.
We train this network on a medium-size poem "The Sonnet" written by William Shakespeare and use it for auto-completing sentences/phrases.

For training the network, we will need to transform the text into something machine-processable.
Basically, for each of the characters in the text, we provide a $D$-element one-hot encoding vector, where D is the total number of unique characters in the dataset.
Character sequences of length $S$ will, hence, be turned into matrices of size $\mathbf X = \{\vec x^{\{s\}}, 1 \leq s\leq S\} \in \mathbb R^{S\times D}$.
For each input, we provide the target values $\mathbf T$ of the same size, where the target for each sample is the next character: $\vec t^{\{s\}} = \vec x ^{\{s+1\}}$.

To speed up processing, these sequences will be put into batches, i.e., $\mathcal X, \mathcal T \in \mathbb R^{B\times S\times D}$.
This will automatically be achieved using the default PyTorch `DataLoader`.

The data that we will use is originally provided here: http://raw.githubusercontent.com/brunoklein99/deep-learning-notes/master/shakespeare.txt

## Data and Targets Preprocessing

First, we need to load the whole dataset $\vec c \in \mathbb R^N$, a vector of characters, and turn the data sequence into one-hot encodings.
For this purpose, we need to know the number $D$ of unique characters in our text.
For simplicity, we only consider lower-case characters and special symbols such as punctuation marks.
Also, the newline characters `'\n'` need to be handled -- you can also leave them inside and see what happens.

Then, for each of the characters, we need to assign a one-hot encoding, and build sequences of encodings.
For a given index $n$ into our data and a given sequence length $S$, we provide the input $\mathbf X ^{[n]}$ and the target $\mathbf T^{[n]}$ as follows:


  $$\mathbf X^{[n]} = \{\mathrm{enc}(n-S+s-1) | 1 \leq s \leq S\}$$
  $$\mathbf T^{[n]} = \{\mathrm{enc}(n-S+s) | 1 \leq s \leq S\}$$

where $\mathrm{enc}$ is a function that returns the one-hot encoding for the character at the specified location in the original dataset $\vec c$.
In the case that the computation ($n-S+s-1$ or $n-S+s$) results in a negative value $\vec 0$ should be used instead.

For example, for the original text `abcde`, sequence length $S=7$ and index $n=4$, we would have the representations for $x = $ `000abcd` and $t=$ `00abcde`.

Finally, we implement our own `Dataset` that returns the input and target encodings for any element of our input text.

### Download the data file

Please run the code block below to download the data file.

In [None]:
import os
import random
import torch

# download the data file
filename = "shakespeare.txt"
if not os.path.exists(filename):
  url = "http://raw.githubusercontent.com/brunoklein99/deep-learning-notes/master/"
  import urllib.request
  urllib.request.urlretrieve(url+filename, 'data/' + filename)
  print ("Downloaded datafile", filename)

### Set up of the device to run everything

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

### Task 1: Data Characteristics

Implement a function that:
1. Loads all text data from the poem file `shakespeare.txt`, iterates through and collect all the lowercase data that we want to learn from.
2. Create a list of unique characters contained in our data. This will allow us to obtain the dimension $D$.

Note:

* Here, we consider only lowercase characters to reduce the alphabet size.
* Please make sure that you handle the newline character at the end of each line consistently.


In [None]:
def get_data(datafile='shakespeare.txt'):
    # 1. Read the entire file into a single string
    with open(datafile) as f:
        full_text = f.read()

    # 2. Process the text: convert to lowercase and replace newlines with asterisks
    processed_text = full_text.lower().replace('\n', '*')

    # 3. The 'data' list is simply a list of all characters in the processed text
    data = list(processed_text)

    # 4. The 'characters' list is the sorted set of unique characters
    #    This guarantees a consistent order every time.
    characters = sorted(list(set(processed_text)))

    return data, characters

In [None]:
data, characters = get_data(datafile='data/shakespeare.txt')

D = len(characters)
print (f"Collected a total of {len(data)} elements of {D} unique characters")

### Task 2: One-hot Encoding

Implement a dictionary that provides a unique one-hot encoding for each of the characters in the dataset.
The dictionary takes as:

1. the key a character
2. its value is its one-hot vector representation of dimension $D$

Each of the characters need to be represented by a one-hot encoding.
Create a dictionary that provides the encoding for each unique character.

Note:

* You can use your own one-hot encoding procedure for the task.

In [None]:
one_hot_matrix = torch.eye(D)

one_hot = {}
cnt = 0
for c in characters:
    one_hot[c] = one_hot_matrix[cnt]
    cnt += 1

# handling of 0 content
one_hot[0] = torch.zeros(D)

### Task 3: Sequence Coding

Write a function that provides the inputs and targets for a given sequence of the specified sequence length and index.
The last value of the target sequence should be the encoding of the character at the given index.
If a character is requested from outside the data range, prepend the inputs (and the targets) with 0.
Assure that $\vec t^{\{s\}} = \vec x^{\{s+1\}}$ $\forall s<S$.

In [None]:
def sequence(index, S):
    # collect both input and target encodings
    inputs, targets = [], []
    # go through the sequence and turn characters into encodings

    pos = index-S

    for count in range(S):
        # position is still away from the target being the first value of the list
        if pos < 0:
            inputs.append(one_hot[0])
            targets.append(one_hot[0])

        # the first letter appears to be the first target. However the input is still 0
        elif pos == 0:
            inputs.append(one_hot[0])
            targets.append(one_hot[data[pos]])

        # the first letter is now input to the target of the second letter; normal case as long as within dataset
        elif pos < len(data):
            inputs.append(one_hot[data[pos-1]])
            targets.append(one_hot[data[pos]])

        # when reached the end of the data, there's an input but no target
        elif pos == len(data):
            inputs.append(one_hot[data[pos-1]])
            targets.append(one_hot[0])

        # over the end of the dataset, just zeros again
        else:
            inputs.append(one_hot[0])
            targets.append(one_hot[0])

        pos += 1

    return torch.stack(inputs), torch.stack(targets)

### Test 1: Sequences

Get a sequence for size 5 with index 2. This test assures that the data and target vectors are as desired, i.e., the first elements are 0 vectors, and later one-hot encoded data is added.

In [None]:
# get sequence
x,t = sequence(2,5)

# check prepended zeros
assert torch.all(x[:4] == 0)
assert torch.all(t[:3] == 0)

# check one-hot encoded inputs and targets
assert torch.all(torch.sum(x[4:], axis=1) == 1)
assert torch.all(torch.sum(t[3:], axis=1) == 1)

We use the standard data loader with a batch size of $B=256$. Theoretically, each training sample could have its own sequence length $S$. To enable batch processing, the sequence size must be the same for each element in the batch (otherwise it cannot be transformed as one large tensor). Thus, our dataset needs to have a fixed sequence size $S$. An exact value for $S$ can be selected by yourself.

### Task 4: Dataset and Data Loader

Implement a `Dataset` class derived from `torch.utils.data.Dataset` that provides $\mathbf X^{[n]}$ and $\mathbf T^{[n]}$. Implement three functions:

1. The constructor `__init__(self, data, S)` that takes the dataset $\vec c$ and (initial) sequence length $S$.
2. The function `__len__(self)` that returns the number of samples in our dataset.
3. Finally the index function `__getitem__(self, index)` that returns the sequences $\mathbf X^{[n]}$ and $\mathbf T^{[n]}$ for a given `index`. The function from Task 3 can be used for this.

After implementing the `Dataset`, instantiate a `DatLoader` for the dataset with batch size of $B=256$.


In [None]:
class Dataset(torch.utils.data.Dataset):
  def __init__(self, data, S):
    # prepare the data as required
    self.S = S

  def __getitem__(self, index):
    # return input and target value for the given index
    return sequence(index, self.S)

  def __len__(self):
    # return the length of this dataset
    return len(data)

# instantiate dataset and data loader for a reasonable sequence length S
dataset = Dataset(data, 100)
data_loader = torch.utils.data.DataLoader(dataset, batch_size=128, shuffle=False)

### Test 2: Data Sizes

Here we check that the shape of the input and target from all batches are appropriate.

In [None]:
for x,t in data_loader:
  dataset.S = random.choice(range(1,20))
  assert x.shape[2] == D
  assert x.shape == t.shape
  assert torch.all(x[:, 1:, :] == t[:, :-1, :])

## Elman Network Implementation

While there are implementations for recursive networks available in PyTorch, we here attempt our own implementation of the Elman network.
The input to our network is a batch of sequences of dimension $\mathcal X\in \mathbb R^{B\times S\times D}$.
Our network contains three fully-connected layers with dimensions $\mathbf W^{(1)} \in \mathbb R^{K\times D}$, $\mathbf W^{(r)} \in \mathbb R^{K\times K}$ and $\mathbf W^{2} \in \mathbb R^{D\times K}$ (plus bias neurons as handled by PyTorch).
The network processing will iterate through our sequence, and processes all elements in the batch simultaneously.
First, the hidden activation $\mathbf H^{\{0\}} \in \mathbb R^{B,K}$ is initialized with 0.
Then, we iterate over $1\leq s\leq S$ to process:

$\mathbf A^{\{s\}} = \mathbf W^{(1)} \mathbf X^{\{s\}} + \mathbf W^{(r)} \mathbf H^{\{s-1\}}$ $~~~~~~~~~$
$\mathbf H^{\{s\}}= g\bigl(\mathbf A^{\{s\}}\bigr)$ $~~~~~~~~~$
$\mathbf Z^{\{s\}} = \mathbf W^{(2)} \mathbf H^{\{s\}}$

where $g$ is the activation function, `PReLU` in our case, and $\mathbf X^{\{s\}}$ is the data matrix stored as $\mathcal X_{:,s,:}$. The final output of our network $\mathcal Z$ is a combination of all $\mathbf Z^{\{s\}}$ matrices in dimension as our input $\mathcal Z\in \mathbb R^{B\times S\times D}$.

For training, we need to compare the output $\mathcal Z$ of our network with our target batch $\mathcal T$. We will make use of the categorical cross-entropy loss as implemented in PyTorch's `torch.nn.CrossEntropyLoss`. In our case, we will implicitly compute:

$$\mathcal J^{CCE} = \frac1{SB} \sum\limits_{b=1}^B \sum\limits_{s=1}^S \sum\limits_{d=1}^D \mathcal T_{b,s,d} \log \mathcal Y_{b,s,d}$$

where $\mathcal Y_{b,s,d}$ is the result of SoftMax over the dimension $D$, which is the last index of our tensor.
As the documentation of `torch.nn.CrossEntropyLoss` states, the SoftMax is always computed across the `second` dimension of the data tensor (which would be $s$ in our case).
Hence, we need to make sure to reorder the dimensions of the tensors $\mathcal X$ and $\mathcal T$ such that the computation is correct.

### Task 5: Elman Network Implementation

Manually implement an Elman network derived from `torch.nn.Module` class using one fully-connected layer for hidden, recurrent and output units.

1. In the constructor, instantiate all required layers and activation functions for the given values of $D$ and $K$.
2. In the `forward` function, implement the processing of the input in the Elman network. Make sure that logit values are computed and returned for each element in the sequence. Try to use as much tensor processing as possible. Remember the shape of $\mathcal X$ is $B\times S\times D$, and when going through the sequence, we need to process $\vec x^{\{s\}}$ separately, while working on all batch elements simultaneously.


Note:

* You can also make use of `torch.nn.RNN` Pytorch module to build the recurrent layer and use a fully connected layer on top to implement the Elman network. However, using this module might not be easy.

In [None]:
import torch

# Assume 'device' is defined, e.g.,
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class ElmanNetwork(torch.nn.Module):
  def __init__(self, D, K):
    super(ElmanNetwork, self).__init__()
    # Use nn.Linear for input-to-hidden, recurrent, and hidden-to-output layers
    self.fc_in = torch.nn.Linear(D, K)
    self.fc_rec = torch.nn.Linear(K, K)
    self.fc_out = torch.nn.Linear(K, D)
    self.activation = torch.nn.PReLU()

  def forward(self, x):
    B, S, D = x.shape
    h_s = torch.zeros(B, self.fc_rec.in_features, device=x.device)
    Z = torch.empty(x.shape, device=x.device)

    for s in range(S):
      x_s = x[:, s, :]
      # The logic is the same, but we call the layers
      a_s = self.fc_in(x_s) + self.fc_rec(h_s)
      h_s = self.activation(a_s)
      z = self.fc_out(h_s)
      Z[:, s, :] = z

    return Z

### Test 3: Network Output

We instantiate an Elman network with $K=1000$ hidden units.
We generate test samples in a given format, and forward them through the network and assure that the results are in the required dimensionality.

In [None]:
network = ElmanNetwork(D, 1000).to(device)

with torch.no_grad():
  test_input = torch.empty(7, 25, D, device=device)
  test_output = network(test_input)
  assert test_input.shape == test_output.shape

### Task 6: Training Loop

To train the Elman network, we will use categorical cross-entropy loss, averaged over all samples in the sequence.
For each batch, we can optionally use a different sequence size -- while the size inside a batch must stay the same.

According to the PyTorch documentation, the `CrossEntropyLoss` handles logits and targets in shape $B\times O\times\ldots$.
In our case, logits and targets are in dimension $B\times S\times O$.
Hence, we need to make sure that we re-order the indexes such that we fulfil the requirement; you might want to use the `permute` or `transpose` operator.

Instantiate the optimizer with an appropriate learning rate $\eta$ and the loss function.
Implement the training loop for 20 epochs -- more epochs will further improve the results.
Compute the average training loss per epoch.

Note that 20 epochs will train for about 2 minutes, if implemented in an optimized way, on the GPU. Non-optimized training will take considerably longer.

WARNING: `CrossEntropyLoss` will not complain when the index order for the output $\mathcal Y$ and targets $\mathcal T$ is incorrect, just the results will be wrong.

In [None]:
import torch.optim as optim
from tqdm.notebook import tqdm
D = len(characters)

# Instantiate the network and move it to the correct device
network = ElmanNetwork(D, 1000).to(device)

# Instantiate the Adam optimizer and the loss function
optimizer = optim.Adam(network.parameters(), lr=0.001)
loss = torch.nn.CrossEntropyLoss()

print("Starting training...")
for epoch in tqdm(range(50), desc="Epochs"):
  # Reset average loss for the epoch
  train_loss = 0.

  for x, t in data_loader:
    # Move data to the same device as the network
    x, t = x.to(device), t.to(device)

    # 1. Reset gradients
    optimizer.zero_grad()

    # 2. Compute network output (forward pass)
    z = network(x)

    # 3. Compute loss
    # Convert the one-hot target 't' to a tensor of class indices
    # torch.argmax(t, dim=2) finds the index of the '1' in the last dimension
    target_indices = torch.argmax(t, dim=2)

    # Calculate the loss using the permuted logits and the correct target format
    J = loss(z.permute(0, 2, 1), target_indices)


    # 4. Compute gradients for this batch (backward pass)
    J.backward()

    # 5. Update weights
    optimizer.step()

    # Accumulate the loss for the epoch
    train_loss += J.item()

  # print average loss for training
  print(f"\rEpoch {epoch+1}; train loss: {train_loss/len(data_loader):1.5f}")

print("Training finished.")

## Writing a Poem

With the trained network, we will turn it into a poet.
Given some initial seed strings, we let the network predict the next character, which we append to our text. We repeat this process until we have produced a given string size.

For this purpose, we need to implement three functions.
The first function needs to turn a given text into something that the network understands as input.
The second function needs to interpret the network output, i.e., it needs to select a character from the predicted logits.
There, we can implement two ways:
1. We take the character with the highest predicted class:
$$o^* = \argmax_o \vec y^{\{S\}}_o$$
2. We can also perform probability sampling, where each of the sample is drawn based on the probability that SoftMax provides -- such a function is for example implemented in `random.choices`.

Finally, we need to implement a function to iterstively call the encoding and prediction functions.

### Task 7: Text Encoding

For a given text (a sequence of $S$ characters), provide the encoding $\mathcal X \in R^{B\times S\times D}$.
Assure that the batch dimension for $B=1$ is added to the encoding, so that the network is able to handle it.

In [None]:
def encode(text):
  """
  Converts a string of text into a one-hot encoded tensor.
  """
  # Create a list of one-hot vectors for each character in the text
  encoding_list = [one_hot[c] for c in text]

  # Stack the list of vectors into a single tensor of shape (S, D)
  stacked_encoding = torch.stack(encoding_list)

  # Add a batch dimension to create the final shape (1, S, D)
  encoding = stacked_encoding.unsqueeze(0)

  return encoding


### Task 8: Next Element Prediction

Write a function that predicts the next character of the sequence based on the logit values obtained from the network.
Implement both ways:
1. Using the maximum probability, i.e., selecting the character with the highest SoftMax probability $\max_o z^{\{S\}}_o$ and append this character to the `text`.
2. Using a probability sampling, i.e., we randomly draw a character based on the SoftMax probability distribution $\vec y^{\{S\}}$. `random.choices` provides the possibility to pass a list of characters and a list of probabilities.

Use the Boolean parameter `use_best` of your function to distinguish the two cases.

Note:

* In our case, we are only interested in the logit for the last element of our sequences, i.e., $\vec z^{\{S\}}$.
* The logits are in dimension $\mathcal Z \in \mathbb R^{B\times S\times D}$ with $B=1$, and we are generally only interested in the prediction for the last sequence item.

In [None]:
import random
import torch.nn.functional as F

def predict(z, use_best=True):
  """
  Predicts the next character from a logit tensor.

  Args:
    z (torch.Tensor): The logit tensor of shape (1, S, D).
    use_best (bool): If True, returns the most likely character.
                     If False, samples from the probability distribution.

  Returns:
    str: The predicted next character.
  """
  # Select the logits for the last element in the sequence, shape: (D,)
  z_S = z[0, -1, :]

  if use_best:
    # Find the index of the logit with the highest value
    best_index = torch.argmax(z_S).item()
    # Look up the character corresponding to that index
    next_char = characters[best_index]
  else:
    # Convert logits to probabilities using the softmax function
    probabilities = F.softmax(z_S, dim=0)
    # Use random.choices to sample a character based on the probabilities
    # We need to move the probabilities to the CPU and convert to a list/numpy array
    next_char = random.choices(characters, weights=probabilities.cpu().numpy(), k=1)[0]

  return next_char

### Task 9: Sequence Completion


Write a function that takes a `seed` text which it will complete with the given number of characters.
Write a loop that turns the current `text` into an encoded sequence of its characters using the function from Task 7.
Forward the text through the network and take the prediction of the last sequence item $\vec z^{\{S\}}$ using the function from Task 8.
Append this to the current text (remember that Python strings are immutable).
Repeat this loop 80 times, and return the resulting `text`.

In [None]:

def sequence_completion(seed, count, use_best):
  """
  Completes a given seed text by generating new characters.

  Args:
    seed (str): The initial text to start with.
    count (int): The number of characters to generate.
    use_best (bool): The prediction method (True for best, False for sampling).

  Returns:
    str: The completed text.
  """
  # We start with the given seed
  text = seed
  # We don't need to track gradients during inference
  with torch.no_grad():
    # Set the network to evaluation mode
    network.eval()
    for i in range(count):
      # Turn current text to a one-hot encoded batch and move to device
      x = encode(text).to(device)

      # Get the logits from the network
      z = network(x)

      # Predict the next character using the logits
      next_char = predict(z, use_best=use_best)

      # Append the predicted character to the text
      text += next_char

  return text

### Task 10: Text Production

Select several seeds (such as `"thi", "ba", "wea", "tru", "sum", "q"`) and let the network predict the following 80 most probable characters, or using probability sampling.
Write the completed sentences to console.

In [None]:
# A list of seed phrases to start the generation
seeds = ["thi", "ba", "wea", "tru", "sum", "q"]

print("--- Generating Shakespearean Text ---\n")
for seed in seeds:
  # Generate text by always picking the best character
  best = sequence_completion(seed, 80, use_best=True)
  # Print the seed and the generated text
  print(f"Best:    \"{seed}\" -> \"{best}\"")

  # Generate text by sampling from the probability distribution
  sampled = sequence_completion(seed, 80, use_best=False)
  # Print the seed and the generated text
  print(f"Sampled: \"{seed}\" -> \"{sampled}\"")

  # Print a blank line for readability
  print()

Output:

Best:    "thi" -> "thine eyes?*bet fretheisting oren,*fertif it fate thynoler tryiei who terr o'er to "

Sampled: "thi" -> "think on thee from falli on s pentt nitcinr ieti fortious your own love?*the rofe p"


Best:    "ba" -> "bastate,*bey see ses,*reditivant i was bus old,*'  sleeds**n scet nerrims frid to "

Sampled: "ba" -> "back as hiss to disthing my unot be sarred,*for where*that is,*moces descove birie"


Best:    "wea" -> "wear against the bay fftensing siailose faetrgie fr th i that snows that i cover he"

Sampled: "wea" -> "wear against the lamoninearagenit*enu nde tbe t'teeaiendsti  t wi mot crt wow  hovl"


Best:    "tru" -> "true sight,*whth no for thou ly comparse*cleat not, of hits,*wh swael my aitide not"

Sampled: "tru" -> "trust,*and descredied thy oftrimenor can me st hats and praitser meri's sdre i to d"


Best:    "sum" -> "summer's day,*agd 's the gart, forne tipe yl of antuitine s awfe my its are*ang to "

Sampled: "sum" -> "sumand out of dothapting thy break i spridg s afiitaset f enaet t a n hi r enr sofr"


Best:    "q" -> "quickly ste,*let int in need mui rain adletnn ei olh  hrrat wfodensttra g ond itn"

Sampled: "q" -> "quest and moring endore i  be thine minen wink is earthy selfuct on train blgld,*"
