In [None]:
%pip install numpy scipy scikit-learn plotly tqdm
%pip install --no-cache-dir --force-reinstall https://dm.cs.tu-dortmund.de/nats/nats25_08_01_diy_word_embeddings-0.1-py3-none-any.whl
import nats25_08_01_diy_word_embeddings

# DIY Word Embeddings with a simple Model

In [None]:
import numpy as np
from sklearn.datasets import make_moons
import plotly.graph_objects as go
from tqdm.auto import tqdm
from abc import ABC, abstractmethod

## Hello Backpropagation

In this assignment, you will be asked to train your own neural network. Therefore, you will be asked to write your own backpropagation implementation.
In this notebook, we will try to keep things close to `PyTorch`. To help you write a similar API, you are given the following abstract classes for loss functions, network modules (activation functions and utility) and trainable modules (layers).
In derived classes (designated by the class name in brackets), you will then in most cases only need to override the functions annotated with `@abstractmethod`.

Read the code closely and get familiar with these concepts:
- `forward` is the feed-forward functionality of the network. Layers and activation functions should modify the inputs and return them. All inputs are cached in a local variable for use in gradient descent steps.
- `backward` computes gradients derived from a given `delta` (target direction of change in output) and returns the `next_delta` (target direction of change in input). For loss functions, it should return the target direction of change in input for a given prediction and target value. The computed gradients (if necessary) are again stored in a local variable.
- `step_gradient` is used to add the last gradient computed with `backward` multiplied by the learning rate to the parameters of this module. For a single gradient descent step, you will need to executed `forward` -> `backward` -> `step_gradient`.
- Functions preceded with an underscore are the module-specific functionality that remains to be implemented in derived classes.

Cross reference example code of `PyTorch` to see similarities and differences of these approaches.

In [None]:
class Loss(ABC):
    @abstractmethod
    def forward(self, prediction, target): pass
    @abstractmethod
    def backward(self, prediction, target): pass
class NetworkModule(ABC):
    @abstractmethod
    def _forward(self, X):
        return None
    def forward(self, X):
        self.last_input = X.copy()
        return self._forward(X)
    @abstractmethod
    def backward(self, delta): pass
    def step_gradient(self, learning_rate): pass
class TrainableModule(NetworkModule):
    def __init__(self):
        self.last_input = None
        self.grad = None
    def backward(self, delta):
        if self.last_input is None:
            raise AssertionError("Tried to execute backpropagation without forward feeding data.")
        next_delta = self._next_delta(delta)
        self.grad = self._gradient_for_last_input(delta)
        return next_delta
    @abstractmethod
    def _next_delta(self, delta):
        pass
    @abstractmethod
    def _gradient_for_last_input(self, delta):
        pass
    @abstractmethod
    def step_gradient(self, learning_rate):
        pass

When training networks with multiple layers, you will want to aggregate the functionality (e.g. `forward`) into a single call.
The simplest way to do this, is to just have a chain of modules that are processed one after the other.
Implement the missing functions in the `ModuleChain` class below.

**Take care:** `numpy` arrays are stored as references! Inplace math operations like `+=` or `*=` do not create a copy but override the original matrix. You *do not* want to override your inputs but *may want* to override your attributes!

In [None]:
class ModuleChain(NetworkModule):
    def __init__(self, modules):
        super().__init__()
        self.modules = modules
    def _forward(self, X):
        pass # Your solution here
    def backward(self, delta):
        pass # Your solution here
    def step_gradient(self, learning_rate):
        pass # Your solution here

Next, you will at least need fully connected linear layers consisting of a weight matrix $W$ and a bias $b$ that implement the function
$$ f(x) = x^TW + b $$
All weights and biases should be initialized at random. Make sure, that each column sums to 1.
This way, the norm of latent vectors will not explode and training is a bit easier.

Aside from the initialization and forward step, you will need to implement your first parts of backpropagation here.
For the backpropagation, you need to decompose the gradient for the entire network into gradients per module.
To do so, we can start with the derivation
$$
\frac{\partial E}{\partial x_0}
= \frac{\partial x_1}{\partial x_0} \cdot \frac{\partial E}{\partial x_1}
= \frac{\partial x_1}{\partial x_0} \cdot \ldots \cdot \frac{\partial E}{\partial x_n}
$$
where $x_0$ is the input vector and the other $x_i$ denote the latent vectors after forwarding through $i$ modules.
This allows to simplify for the $k$-th layer as
$$
\delta_k
= \frac{\partial E}{\partial x_k}
= \frac{\partial x_{k+1}}{\partial x_k} \cdot \frac{\partial E}{\partial x_{k+1}}
= \frac{\partial x_{k+1}}{\partial x_k} \cdot \delta_{k+1}
$$
That is, the `next_delta` (in input direction) is the product of the derivation of the module function evaluated at the `last_input` multiplied by the `delta` of the layer afterwards, where we consider "next" to move from last to first layer. (The "multiplied by" does a lot of heavy lifting here, when `delta` is vector-valued. This is *not* a scalar or Hadamard product!)

Similarly, we can decompose the gradient for the weights and the bias of this module
$$
\frac{\partial E}{\partial W}
= \frac{\partial x_{k+1}}{\partial W} \cdot \delta_{k+1}
\quad\quad\quad
\frac{\partial E}{\partial b}
= \frac{\partial x_{k+1}}{\partial b} \cdot \delta_{k+1}
$$
These equations are *hand-waving* a lot of the details! Strictly speaking, we can not pull the products apart for entire vectors (at least its not a scalar product anymore). It is an easy way of remembering the right steps, though, and you only need to add a bunch of indices to each of the equations (or change the product types) to make them right. The indices just make it unreadable. Try to figure out where to add which index and add code to the functions below as you see fit. These equations here, however, can be translated to vector and matrix operations, so that you do not need to compute the gradient for each coefficient individually.

*Hint 1:* The $\delta_i$ are vectors of the same shape as the output of each module (i.e. shape of the input of the next module in forward direction). All gradients must have the same shape as the parameters. That allows for only very specific vector and matrix operations.

*Hint 2:* Libraries like `PyTorch` allow to either pass a single input vector or a vector of input vectors through the models. Ensuring that functionality can be a bit tricky, so it is okay to always require single input vectors for simplicity. Just keep that constraint in mind when training your model.

In [None]:
class Linear(TrainableModule):
    def __init__(self, n_in, n_out, use_bias=True):
        '''
        Creates a fully connected linear layer translating from vectors of length `n_in` to vectors of length `n_out`.
        `use_bias` controls whether or not this layer should use a bias (`f(x) = x^T W+b`) or not (`f(x) = x^T W`).
        '''
        super().__init__()
        # self.weights = ...
        # self.bias = ...
        # self.use_bias = ...
        pass # Your solution here
    def _forward(self, X):
        pass # Your solution here
    def _next_delta(self, delta):
        pass # Your solution here
    def _gradient_for_last_input(self, delta):
        pass # Your solution here
    def step_gradient(self, learning_rate):
        pass # Your solution here

In [None]:
nats25_08_01_diy_word_embeddings.hidden_tests_8_0(Linear)

The next step is to define some activation functions and a loss. You will later need the Softmax activation and the Cross Entropy loss, but these are a bit tricky to implement. Instead, start with [ReLU](https://en.wikipedia.org/wiki/Rectified_linear_unit), [Sigmoid](https://en.wikipedia.org/wiki/Sigmoid_function) and L2 loss (half of the squared Euclidean distance between desired output and observed output).

*Hint:* Here again, you need to return the `delta` for the next layer in input direction, which follows the same rule (derivative evaluated at last input times `delta` of next layer in output direction) for activation functions. For loss functions, you receive predicted and ground truth values and must return just the derivative in the input. Beware of the sign of the L2 loss derivative.

In [None]:
class ReLU(NetworkModule):
    def _forward(self, X):
        pass # Your solution here
    def backward(self, delta):
        pass # Your solution here
class Sigmoid(NetworkModule):
    def _forward(self, X):
        pass # Your solution here
    def backward(self, delta):
        pass # Your solution here
class L2Loss(Loss):
    def forward(self, prediction, target):
        pass # Your solution here
    def backward(self, prediction, target):
        pass # Your solution here

In [None]:
nats25_08_01_diy_word_embeddings.hidden_tests_11_0(ReLU, L2Loss, Sigmoid)

Now that you have the fundamental building blocks of a neural network, you can debug your code using a simple example, that some simple architecture should be able to solve and that can be visualized easily. Complete the code below to train the prescribed architecture. Depending on your implementation, each gradient step can be performed on a single input-output pair or on multiples (which is quite a bit trickier). In each iteration choose inputs and outputs for a gradient step, compute the gradient via `forward` and `backward` and call the `step_gradient` function.

In [None]:
# Create dataset.
X,y = make_moons(1000, noise=.2)
# Initializing a module chain for a simple neural network with multiple hidden layers.
# To debug only some of the module classes, create different architectures,
# but beware that you might need to change the learning rate then.
model = ModuleChain([Linear(2, 50), ReLU(), Linear(50, 20), ReLU(), Linear(20, 1), Sigmoid()])
loss = L2Loss()
learning_rate = 1e-1
n_iterations = 50000
with tqdm(range(n_iterations), total=n_iterations) as bar:
    for _ in bar:
        # Insert code for a single training iteration.
        # You can track some information, by overriding `bar.desc`
        # which is the description string displayed on the tqdm
        # progress bar.
        pass # Your solution here
# Compute the predicted label for all outputs and the accuracy of the model.
# y_pred = ...
# accuracy = ...
pass # Your solution here

In [None]:
# Visualization of your label prediction.
# Change the number of digits for rounding to go from a
# continuous visualization to class prediction.
marker_label = np.round(y_pred, 3)
go.Figure(
    go.Scatter(
        x=X[:,0],
        y=X[:,1],
        mode="markers",
        text=marker_label,
        marker_color=marker_label,
    ),
    layout_yaxis_scaleanchor="x"
).show()

In [None]:
nats25_08_01_diy_word_embeddings.hidden_tests_15_0(accuracy, y, y_pred)

## A simple word embedding model

We can now add the missing functions to train a very simple HMM-style word embedding model.
The model is supposed to predict the next word after receiving the $k$ previous words.
The entire model should compute the function

$$
Softmax((ReLU((x^TE)^TW_1+b_1)^TW_2+b_2)^TE^T)
$$

that is, we have the architecture `Linear (1)` -> `Linear (2)` -> `ReLU` -> `Linear(3)` -> `Linear (1) Transposed` -> `Softmax`, where `Linear (1)` does not use a bias.
After training, we expect the weights $E$ of `Linear (1)` to contain our word embedding vectors.
Linking the weights of the first and last layer theoretically makes the function bilinear and the derivatives a lot harder to compute.
We, therefore, act like these are independent matrices during `backward` and add both gradients during `step_gradient`.
This is technically not correct, but gradient descent with a sufficiently small learning rate typically "survives" such stunts.
This is why we designed `backward` and `step_gradient` to be different steps, though, since otherwise the first layer would use a corrupted weight matrix for gradient computation.

Complete the classes below to create new modules that "borrow" the weights (but not bias) of another linear module.
To borrow, you can replace the weight matrix created in the `Linear` constructor with a (transposed) view by referencing `other_linear.weights` or `other_linear.weights.T`.
The transpose of a matrix is a view, that references the original matrix and applies all inplace operations to that part of memory.

*But beware:* To keep these layers synchronized, all changes to the weights must be made in place, i.e. using operators like `+=`. You may need to adapt your `Linear` implementation to satisfy that constraint.

In [None]:
class LinkedLinear(Linear):
    def __init__(self, other_linear, use_bias=True):
        # You probably want to start by invoking the `Linear` constructor.
        # super().__init__(...)
        pass # Your solution here
class LinkedTransposedLinear(Linear):
    def __init__(self, other_linear, use_bias=True):
        # You probably want to start by invoking the `Linear` constructor.
        # super().__init__(...)
        pass # Your solution here

In [None]:
nats25_08_01_diy_word_embeddings.hidden_tests_18_0(LinkedTransposedLinear, Linear, LinkedLinear)

Next we will need the [Softmax](https://en.wikipedia.org/wiki/Softmax_function) activation function and the Cross Entropy loss.
Implementing these can be a bit tricky, especially when computing the derivative.
The Softmax function and its derivative are
$$
Softmax(x) = \left(\frac{e^{x_1}}{\sum_{j} e^{x_j}}, \ldots, \frac{e^{x_d}}{\sum_{j} e^{x_j}}\right)
\\
\nabla Softmax(x) = \left(
    \left\{\begin{array}{rcl}
        Softmax(x)_i - Softmax(x)_i \cdot Softmax(x)_j &\text{if}& i = j\\
        - Softmax(x)_i \cdot Softmax(x)_j &\text{if}& i \neq j\\
    \end{array}\right.
\right)_{i,j}
= diag(Softmax(x)) - Softmax(x) \otimes Softmax(x)
$$
where $diag(v)$ is the diagonal matrix with $v$ on the diagonal and $\otimes$ is the outer product.

The Cross Entropy loss of a prediction $y$ and the ground_truth $y^{\ast}$ and its derivative are
$$
CE(y, y^{\ast}) = -\sum_{i} y^{\ast}_i \log(y_i) = -(y^{\ast})^T\log^\circ(y)
\\
\nabla CE(y, y^{\ast}) = \left(-\frac{y^{\ast}_1}{y_1},\ldots,-\frac{y^{\ast}_d}{y_d}\right)
$$
where $\log^\circ$ is the componentwise logarithm.

We could just implement both of these functions and start training **but** when implementing $\nabla Softmax(x)$ naively, we would compute a $d_{out} \times d_{out}$ matrix! Since the last layer produces a value for each word in our dictionary, we would have a space complexity of the dictionary size squared, which is way too much (probably billions and beyond)!
But $y^{\ast}$ will be a one-hot encoding. The resulting $\delta$ will, therefore, only have one non-zero value.
We also only need the derivative of Softmax multiplied with $\delta$, i.e. $\nabla Softmax(x) \cdot \delta$.
Since $\delta$ can only have one non-zero value, we can replace that matrix-vector-product with a vector-scalar product, using only the relevant row of $\nabla Softmax(x)$.

To implement that simplified behavior, we can make use of sparse matrices like [`scipy.sparse.csr_array`](https://docs.scipy.org/doc/scipy/reference/generated/scipy.sparse.csr_array.html) which implements the [Compressed sparse row](https://en.wikipedia.org/wiki/Sparse_matrix#Compressed_sparse_row_(CSR,_CRS_or_Yale_format)) format.
By iterating over `zip(x.indptr, x.indices, x.data)`, you can receive the row, column and value of all non-zero values stored in a `csr_array` `x`.
You can also perform `numpy`-style operations like `x.dot(...)` using that sparse array.
`csr_array`s are always two dimensional, so you have to store vectors as $d \times 1$ or $1 \times d$ matrices.
You can create a `csr_array` using the constructor `csr_array((values, (row_indices, col_indices)), shape=(n, m))` where `values`, `row_indices` and `col_indices` must be vectors of equal size denoting in which row and column each value lies.

Now use all the information in this text block to implement the Softmax activation and the Cross Entropy loss classes below.

In [None]:
from scipy.sparse import csr_array
class Softmax(NetworkModule):
    def _forward(self, X):
        pass # Your solution here
    def backward(self, delta):
        assert type(delta) == csr_array
        pass # Your solution here
class CELoss(Loss):
    def forward(self, prediction, target):
        pass # Your solution here
    def backward(self, prediction, target):
        assert type(target) == csr_array
        pass # Your solution here

In [None]:
nats25_08_01_diy_word_embeddings.hidden_tests_21_0(Softmax, CELoss)

You will also need a way to translate words to one-hot encoded sparse vectors and back.
To do so, complete the class below.

In [None]:
class OneHotDict():
    def __init__(self, vocabulary):
        # Ensure appropriate types for ordered access
        # self.vocabulary = ...
        pass # Your solution here
    def word_to_one_hot(self, word):
        pass # Your solution here
    def one_hot_to_word(self, one_hot):
        pass # Your solution here

In [None]:
nats25_08_01_diy_word_embeddings.hidden_tests_24_0(OneHotDict)

Again, verify your implementation by training on the simple example below.
You will need to adapt your previous code to translate from the one-hot encodings to class labels.

In [None]:
# Create dataset.
X,y = make_moons(1000, noise=.2)
ohd = OneHotDict(np.unique(y))
# Initializing a module chain for a simple neural network with multiple hidden layers.
# To debug only some of the module classes, create different architectures,
# but beware that you might need to change the learning rate then.
model = ModuleChain([Linear(2, 50), ReLU(), Linear(50, 20), ReLU(), Linear(20, 2), Softmax()])
loss = CELoss()
learning_rate = 1e-1
n_iterations = 50000
with tqdm(range(n_iterations), total=n_iterations) as bar:
    for _ in bar:
        # Insert code for a single training iteration.
        # You can track some information, by overriding `bar.desc`
        # which is the description string displayed on the tqdm
        # progress bar.
        pass # Your solution here
# Compute the predicted label for all outputs and the accuracy of the model.
# y_pred = ...
# accuracy = ...
pass # Your solution here

In [None]:
# Visualization of your label prediction.
# This time, rounding is not necessary, since predictions are categorical.
marker_label = y_pred
go.Figure(
    go.Scatter(
        x=X[:,0],
        y=X[:,1],
        mode="markers",
        text=marker_label,
        marker_color=marker_label,
    ),
    layout_yaxis_scaleanchor="x"
).show()

In [None]:
nats25_08_01_diy_word_embeddings.hidden_tests_28_0(accuracy, y, y_pred)

We finally arrive at the HMM-style part of our model. We will use a very simple approach, where the next word is predicted from the embedding vectors of the previous $k$ words - no recurrency, no transformers.
For your first model we will keep it as simple as the few hundred lines above.
But to make the next word dependent on the previous $k$ words, we will need to combine the latent representations of these $k$ words.
One simple way to do so, is to concatenate the words, for which we need one last module.
Add code to the class below to create a module that distributes a set of inputs to component modules and outputs a concatenation of their outputs.
During the `backward` step, it needs to split the given `delta` into parts and feed them through the respective child modules.
You can assume, that this module is always used at the start of a model, thus does not need to return a `delta` and that the outputs of all nested modules are of equal size (relevant for splitting).

In [None]:
class ModuleConcat(NetworkModule):
    def __init__(self, modules):
        super().__init__()
        self.modules = modules
    def _forward(self, X):
        pass # Your solution here
    def backward(self, delta):
        pass # Your solution here
    def step_gradient(self, learning_rate):
        pass # Your solution here

Before training the model, we now need to load the dataset and prepare our `OneHotDict`.
Complete the code below to load the plain text of Hamlet, tokenize each sentence, pad the tokenized sentences with $k$ times a padding token in front and back and initialize the `OneHotDict`.
The padding will later be used to start sentences and to recognize the end of a generated sentence.
Also remove all "sentences" with two or less tokens, since these are mostly directing instructions and we do not want to generate those.

In [None]:
import nbbootstrap
file_path = await nbbootstrap.ensure_resource("https://dm.cs.tu-dortmund.de/nats/data/hamlet.txt")
with open(file_path, "rt") as file:
    full = file.read()
import nbbootstrap, re
sentence_regex = re.compile(r"[.?!]|\n\n+")
words_regex = re.compile(r"[\w']+", re.U)
special_chars = ".:,;?!-_\"'()„“”‚‘’…"
padding_token = "_" # This character can not occur in any of the words.
k_tokens = 5 # How many tokens to base the next word on.
tokenized_sentences = [] # Store your output in this list.
# First split Hamlet into sentences, then tokenize each sentence.
for sentence in sentence_regex.split(full):
    pass # Your solution here
ohd = OneHotDict(np.unique([token for sentence in tokenized_sentences for token in sentence]))

In [None]:
nats25_08_01_diy_word_embeddings.hidden_tests_33_0(ohd, padding_token, special_chars, tokenized_sentences)

You can now train your model.
The cell below contains an example model that can consume one-hot vectors and produce a probability distribution, which can be compared to another one-hot vector using the `CELoss`.
The train loop comes with a floating loss computation to keep you updated on the training progress.
You will still have to add the random selection of inputs and outputs and translate from words to one-hot vectors.
Keep in mind that the `csr_array`s are always two-dimensional and matrix operations including them will typically return `csr_array`s again.
To have it simple, you can call `toarray()` on a `csr_array` to turn it into a `numpy` array.
For the input vectors, that is a bit slower but may be necessary unless you adapt your `Linear.backward` implementation to handle `csr_array` inputs.

We can help the gradient descent with an additional constraint: Enforcing all embeddings vectors to be normalized.
Simply normalize all row vectors of `enc_layers[0].weights` **in-place** after each `step_gradient` call.
Normalizing the vectors will prevent explosion or vanishing of the vector norms, which may otherwise occur and lead to a bunch of `nan` values.

In the browser, the training process will probably be horrendously slow, due to the limitation to one thread.

In [None]:
# Setting the embedding dimensionality
word_vec_dim = 200
# Creating the embedding encoder layers for each input token
enc_layers = [Linear(len(ohd.vocabulary), word_vec_dim, use_bias=False)]
for _ in range(k_tokens-1): enc_layers.append(LinkedLinear(enc_layers[0], use_bias=False))
# Creating a linked decoder layer
dec_layer = LinkedTransposedLinear(enc_layers[0], use_bias=False)
# Creating the rest of the model
model = ModuleChain([
    ModuleConcat(enc_layers),
    Linear(k_tokens*word_vec_dim, 2000),
    ReLU(),
    Linear(2000, word_vec_dim),
    dec_layer,
    Softmax()
])
loss = CELoss()
floating_loss = None

In [None]:
# Setting training parameters
n_iterations = 5000
learning_rate = 1e-1
floating_factor = 0.999
# Doing the actual training. You can run this cell multiple times to continue training
with tqdm(range(n_iterations), total=n_iterations) as bar:
    for _ in bar:
        # Insert code for a single training iteration.
        # Select a random sentence and a random range of k+1 tokens.
        # The first k of these tokens are the input and the (k+1)-th token is the output.
        # Translate all tokens into one-hot vectors and run forward.
        # Calculate the loss_value for the floating loss visualization.
        # Backpropagate the loss using backward and step_gradient.
        pass # Your solution here
        # loss_value = ...
        floating_loss = loss_value if floating_loss is None else floating_factor * floating_loss + (1-floating_factor) * loss_value
        bar.desc = f"Floating loss: {floating_loss:>7.4f}"

It is now time to generate text using your model.
Start with $k$ times the `padding_token` to signify the sentence start.
Then create the next word by computing the word probabilities using the $k$ last tokens in your sentence.
Sample a word according to the word probabilities.
You can do so by sampling a random float in $[0,1]$ and finding the $i$ most likeliest word for which the cumulative probability of the $i$ most likely words exceeds this float (nucleus sampling).
Using the `OneHotDictionary` you can then add that word to the sentence.
Print all generated words.

Try different sampling techniques. Which works best from your point of view?

In [None]:
# Initialization
generated_words = []
pass # Your solution here
while (len(generated_words) == 0 or generated_words[-1] != padding_token) and len(generated_words) < 50:
	# Process:
	# - feed forward the last k words as one-hot vectors
	# - sample word according to predicted probabilities
	# - append word to sentence print the word
	pass # Your solution here
	generated_words.append(next_word)
	print(next_word, end=" ")
print()

Analyze the resulting word embeddings a bit.
What word vectors are most similar in total or relative to specific words?
Are there relevant clusters?

In [None]:
# Do some stuff
pass # Your solution here