## **RNN** - Recurrent Neural Network

Recurrent Neural Network, is a type of deep learning model designed for processing sequential data, such as text or time-series, by using feedback loops that give the network a form of memory to retain and utilize information from previous inputs in a sequence. Key applications of RNNs include natural language processing tasks like translation and text generation, and they are often being replaced by more advanced transformer-based models for their greater efficiency.

---
\* Based on **Stanford University CS231n: Deep Learning for Computer Vision** course material

In [None]:
# This line of code helps you check current Python version
# outputs: <version> (date & time) [C compiler]
import sys
print(sys.version)

### **Fully connected layer** (Linear Layer)

A fully connected (FC) layer, also called a dense layer, is a core component of neural networks where every neuron in the layer connects to every neuron in the previous layer. These layers combine and transform the features learned by preceding layers, such as convolutional layers, to make complex global relationships and produce final predictions for tasks like classification or regression. Mathematically, an FC layer performs a linear transformation using weights and biases, followed by a nonlinear activation function, to generate its output. 

In [None]:
import numpy
import typing

def affine_forward(x_input:numpy.ndarray, weight:numpy.ndarray, bias:numpy.ndarray) \
    -> typing.Tuple[numpy.ndarray, typing.Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray]]:

    """
    Computes the forward pass for an affine (fully connected) layer.

    The input x has shape (N, d_1, ..., d_k) and contains a minibatch of N
    examples, where each example x[i] has shape (d_1, ..., d_k). We will
    reshape each input into a vector of dimension D = d_1 * ... * d_k, and
    then transform it to an output vector of dimension M.

    Inputs:
    - x: A numpy array containing input data, of shape (N, d_1, ..., d_k)
    - w: A numpy array of weights, of shape (D, M)
    - b: A numpy array of biases, of shape (M,)

    Returns a tuple of:
    - out: output, of shape (N, M)
    - cache: (x, w, b)
    """

    out = x_input.reshape(x_input.shape[0], -1).dot(weight) + bias # bias will be broadcasted to (1, M) -> compatable with (N, M)
    cache = (x_input, weight, bias)

    return (out, cache)

def affine_backward(dout:numpy.ndarray, cache:typing.Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray]) \
    -> typing.Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray]:
    """
    Computes the backward pass for an affine (fully connected) layer.

    Inputs:
    - dout: Upstream derivative, of shape (N, M)
    - cache: Tuple of:
      - x: Input data, of shape (N, d_1, ... d_k)
      - w: Weights, of shape (D, M)
      - b: Biases, of shape (M,)

    Returns a tuple of:
    - dx: Gradient with respect to x, of shape (N, d1, ..., d_k)
    - dw: Gradient with respect to w, of shape (D, M)
    - db: Gradient with respect to b, of shape (M,)
    """

    x, w, b = cache
    dx:numpy.ndarray = dout.dot(w.T).reshape(x.shape)
    dw:numpy.ndarray = x.reshape(x.shape[0], -1).T.dot(dout)
    db:numpy.ndarray = numpy.sum(dout, axis = 0)
    """
    x * W + '1' * b = out
    db = id^(T) * dout = dout
    """
    return (dx, dw, db)

### **Vanilla RNN** - the basic one

Vanilla RNN is easy to construct but with issues like exploding gradient or vanishing gradient

#### A single time-step

In [None]:
import numpy
import typing

def rnn_step_forward(x_input:numpy.ndarray, prev_h:numpy.ndarray, Wx:numpy.ndarray, Wh:numpy.ndarray, b:numpy.ndarray) \
    -> typing.Tuple[numpy.ndarray, typing.Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray]]:

    """
    Run the forward pass for a single timestep of a vanilla RNN using a tanh activation function.
    The input data has dimension D, the hidden state has dimension H,
    and the minibatch is of size N.
    Inputs:
    - x: Input data for this timestep, of shape (N, D)
    - prev_h: Hidden state from previous timestep, of shape (N, H)
    - Wx: Weight matrix for input-to-hidden connections, of shape (D, H)
    - Wh: Weight matrix for hidden-to-hidden connections, of shape (H, H)
    - b: Biases of shape (H,)
    Returns a tuple of:
    - next_h: Next hidden state, of shape (N, H)
    - cache: Tuple of values needed for the backward pass.
    """

    next_h:numpy.ndarray = numpy.tanh(x_input.dot(Wx) + prev_h.dot(Wh) + b)
    cache = (x_input, prev_h, Wx, Wh, next_h)

    return (next_h, cache)

def rnn_step_backward(dnext_h:numpy.ndarray, cache:typing.Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray]) \
    -> typing.Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray]:

    """
    Backward pass for a single timestep of a vanilla RNN.

    Inputs:
    - dnext_h: Gradient of loss with respect to next hidden state, of shape (N, H)
    - cache: Cache object from the forward pass

    Returns a tuple of:
    - dx: Gradients of input data, of shape (N, D)
    - dprev_h: Gradients of previous hidden state, of shape (N, H)
    - dWx: Gradients of input-to-hidden weights, of shape (D, H)
    - dWh: Gradients of hidden-to-hidden weights, of shape (H, H)
    - db: Gradients of bias vector, of shape (H,)
    """

    # unpack cache (type are automatically inferred)
    x_input, prev_h, Wx, Wh, next_h = cache
    # x_input (N, D)
    # prev_h (N, H)
    # Wx (D, H)
    # Wh (H, H)
    # bias (H,)

    # calculate derivatives
    # dnext_h (N, H)
    dnext_h = dnext_h * (1 - (next_h ** 2))  # get inner derivative of tanh(X)
    dx = numpy.dot(a = dnext_h, b = Wx.T)
    dprev_h = numpy.dot(a = dnext_h, b = Wh.T)
    dWx = numpy.dot(a = x_input.T, b = dnext_h)
    dWh = numpy.dot(a = prev_h.T, b = dnext_h)
    db = numpy.sum(a = dnext_h, axis = 0)

    # return as tuple
    return (dx, dprev_h, dWx, dWh, db)

#### More time-steps

In [None]:
import numpy
import typing

def rnn_forward(x_input:numpy.ndarray, h0:numpy.ndarray, Wx:numpy.ndarray, Wh:numpy.ndarray, b:numpy.ndarray) \
    -> typing.Tuple[numpy.ndarray, typing.MutableSequence[typing.Tuple[numpy.ndarray, ...]]]:

    """
    Run a vanilla RNN forward on an entire sequence of data.

    We assume an input sequence composed of T vectors, each of dimension D. The RNN uses a hidden
    size of H, and we work over a minibatch containing N sequences. After running the RNN forward,
    we return the hidden states for all timesteps. (Timestamp T)

    Inputs:
    - x: Input data for the entire timeseries, of shape (N, T, D)
    - h0: Initial hidden state, of shape (N, H)
    - Wx: Weight matrix for input-to-hidden connections, of shape (D, H)
    - Wh: Weight matrix for hidden-to-hidden connections, of shape (H, H)
    - b: Biases of shape (H,)

    Returns a tuple of:
    - h: Hidden states for the entire timeseries, of shape (N, T, H)
    - cache: Values needed in the backward pass
    """

    # retrieve shape info first
    N, T, D = x_input.shape
    _, H    = h0.shape

    prev_h = h0 # just make a copy for readability
    h = numpy.zeros(shape = (N, T, H)) # array to pack h (memories)
    cache = []

    # for every timestep
    for i in range(T):
        prev_h, cache_i = rnn_step_forward(x_input[:, i, :], prev_h, Wx, Wh, b)
        h[:,i,:] = prev_h # pack and save H (will not save h0)
        cache.append(cache_i)

    # return
    return (h, cache)

def rnn_backward(dh:numpy.ndarray, cache:typing.MutableSequence[typing.Tuple[numpy.ndarray, ...]]) \
    -> typing.Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray]:

    """
    Compute the backward pass for a vanilla RNN over an entire sequence of data.
    Inputs:
    - dh: Upstream gradients of all hidden states, of shape (N, T, H)

    NOTE: 'dh' contains the upstream gradients produced by the
    individual loss functions at each timestep, *not* the gradients
    being passed between timesteps (which you'll have to compute yourself
    by calling rnn_step_backward in a loop).

    Returns a tuple of:
    - dx: Gradient of inputs, of shape (N, T, D)
    - dh0: Gradient of initial hidden state, of shape (N, H)
    - dWx: Gradient of input-to-hidden weights, of shape (D, H)
    - dWh: Gradient of hidden-to-hidden weights, of shape (H, H)
    - db: Gradient of biases, of shape (H,)
    """

    # get shape info
    N, T, H = dh.shape
    x_input = cache[0][0]
    _, D = x_input.shape

    dx = numpy.zeros(shape = (N, T, D))
    dh0 = numpy.zeros(shape = (N, H))
    dWx = numpy.zeros(shape = (D, H))
    dWh = numpy.zeros(shape = (H, H))
    db = numpy.zeros(shape = (H,))

    # loop
    for i in reversed(range(T)):
        ddx, dh0, ddWx, ddWh, ddb = rnn_step_backward(dh[:, i, :] + dh0, cache[i])
        dx[:, i, :] += ddx
        dWx += ddWx
        dWh += ddWh
        db += ddb

    # return
    return (dx, dh0, dWx, dWh, db)

### **Word Embedding**

_A word embedding layer is a type of neural network layer that maps discrete word indices to dense, low-dimensional vectors of real numbers. These vectors, or "embeddings," capture the semantic meaning and relationships between words, enabling models to process and understand text by learning from large text corpora. Key applications include NLP tasks like text classification, machine translation, and sentiment analysis, where similar words are positioned close to each other in the embedding space._

**-> Basically, you convert your words into vectors. Or in other words, you put your words into a muti-dimensional space.**

In [None]:
import numpy
import typing

def word_embedding_forward(x:numpy.ndarray, W:numpy.ndarray) \
    -> typing.Tuple[numpy.ndarray, typing.Tuple[numpy.ndarray, numpy.ndarray]]:

    """
    Forward pass for word embeddings.

    We operate on minibatches of size N where
    each sequence has length T. We assume a vocabulary of V words, assigning each
    word to a vector of dimension D.

    Inputs:
    - x: Integer array of shape (N, T) giving indices of words. Each element idx
    of x muxt be in the range 0 <= idx < V.
    - W: Weight matrix of shape (V, D) giving word vectors for all words.

    Returns a tuple of:
    - out: Array of shape (N, T, D) giving word vectors for all input words.
    - cache: Values needed for the backward pass
    """

    out = W[x]
    cache = (x, W)
    return (out, cache)

def word_embedding_backward(dout:numpy.ndarray, cache:typing.Tuple[numpy.ndarray, numpy.ndarray]) \
    -> numpy.ndarray:

    """
    Backward pass for word embeddings.

    We cannot back-propagate into the words
    since they are integers, so we only return gradient for the word embedding
    matrix.
    HINT: Look up the function numpy.add.at

    Inputs:
    - dout: Upstream gradients of shape (N, T, D)
    - cache: Values from the forward pass

    Returns:
    - dW: Gradient of word embedding matrix, of shape (V, D)
    """

    # unpack cache
    x, W = cache # shape of x: (N, T)
    # create zeroed return value
    dW = numpy.zeros_like(a = W) # shape (V, D)
    # put dout to correct places of word embedding matrix
    """
    numpy.add.at is a NumPy function that performs an "unbuffered" in-place addition
    operation on elements of an array a at specified indices.
    It is part of the numpy.ufunc.at family of functions,
    which provide a way to apply a universal function (ufunc) at specific locations within an array.
    """
    numpy.add.at(dW, x, dout) # dW[x] is in the shape of (N, T, D)
    # return
    return dW

### **Sigmoid** - normalization function

-> You map $\mathbb{R}$ to $(0, 1)$

-> Sigmoid function: $\sigma(z) = \frac{1}{1 + e^{-z}}$

In [None]:
import numpy

def sigmoid(x:numpy.ndarray) -> numpy.ndarray:
    """A numerically stable version of the logistic sigmoid function."""
    # avoid MAX out
    # x >= 0 : 1 / (1 + e ^ (-x))
    # x < 0 : e ^ (x) / (1 + e ^ (x))
    pos_mask = x >= 0
    neg_mask = x < 0
    z = numpy.zeros_like(a = x)
    z[pos_mask] = numpy.exp(-x[pos_mask])
    z[neg_mask] = numpy.exp(x[neg_mask])
    top = numpy.ones_like(x)
    top[neg_mask] = z[neg_mask]
    return top / (1 + z)

### **LSTM RNN** - Long Short-Term Memory RNN

LSTM, or Long Short-Term Memory, is a type of recurrent neural network (RNN) designed to process sequential data by remembering important information over long periods and forgetting irrelevant details, thus overcoming the vanishing gradient problem common in standard RNNs. It uses a sophisticated internal structure with memory cells and gates (forget, input, and output) to selectively store and retrieve information, making it effective for tasks like speech recognition, language translation, and time-series prediction.  

#### A single time-step

In [None]:
import numpy
import typing

def lstm_step_forward(x_input:numpy.ndarray, prev_h:numpy.ndarray, prev_c:numpy.ndarray, Wx:numpy.ndarray, Wh:numpy.ndarray, b:numpy.ndarray) \
    -> typing.Tuple[numpy.ndarray, numpy.ndarray, typing.Tuple[numpy.ndarray, ...]]:

    """
    Forward pass for a single timestep of an LSTM.
    The input data has dimension D, the hidden state has dimension H, and we use
    a minibatch size of N.
    Note that a sigmoid() function has already been provided for you in this file.

    Inputs:
    - x: Input data, of shape (N, D)
    - prev_h: Previous hidden state, of shape (N, H)
    - prev_c: previous cell state, of shape (N, H)
    - Wx: Input-to-hidden weights, of shape (D, 4H)
    - Wh: Hidden-to-hidden weights, of shape (H, 4H)
    - b: Biases, of shape (4H,)

    Returns a tuple of:
    - next_h: Next hidden state, of shape (N, H)
    - next_c: Next cell state, of shape (N, H)
    - cache: Tuple of values needed for backward pass.
    """

    # retrieve shape of H
    H = prev_h.shape[1]

    # forward pass
    a:numpy.ndarray = x_input.dot(Wx) + prev_h.dot(Wh) + b

    # calculate gates and attributes
    i = sigmoid(a[ : , : H])                    # input intake ratio
    f = sigmoid(a[ : , H : 2 * H])              # forget ratio
    o = sigmoid(a[ : , 2 * H : 3 * H])          # output ratio of next_c
    g = numpy.tanh(a[ : , 3 * H : 4 * H])       # intermediate result (comprehending input)

    # calculate next long short term state
    next_c = f * prev_c + i * g
    next_h = o * numpy.tanh(next_c)

    # prepare cache
    cache = (x_input, prev_h, prev_c, next_c, Wx, Wh, i, f, g, o)

    # return
    return (next_h, next_c, cache)

def lstm_step_backward(dnext_h:numpy.ndarray, dnext_c:numpy.ndarray, cache:typing.Tuple[numpy.ndarray, ...]) \
    -> typing.Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray]:

    """
    Backward pass for a single timestep of an LSTM.

    Inputs:
    - dnext_h: Gradients of next hidden state, of shape (N, H)
    - dnext_c: Gradients of next cell state, of shape (N, H)
    - cache: Values from the forward pass

     Returns a tuple of:
    - dx: Gradient of input data, of shape (N, D)
    - dprev_h: Gradient of previous hidden state, of shape (N, H)
    - dprev_c: Gradient of previous cell state, of shape (N, H)
    - dWx: Gradient of input-to-hidden weights, of shape (D, 4H)
    - dWh: Gradient of hidden-to-hidden weights, of shape (H, 4H)
    - db: Gradient of biases, of shape (4H,)
    """

    # unpack cache
    x_input, prev_h, prev_c, next_c, Wx, Wh, i, f, g, o = cache

    # get shape
    N = prev_h.shape[0]
    H = prev_h.shape[1]

    # prepare da
    da = numpy.zeros((N, 4 * H))

    # previously:
    # C = Ct-1 * f + i * g (N, H)
    # H = o * tanh(C) (N, H)
    # (1 - tanh ** 2) == (sech ** 2) which is the derivative of tanh
    dnext_c += dnext_h * o * (1 - numpy.tanh(next_c) ** 2)  # get real gradient (sum) -> as c's gradient is accumulated (initially 0)
                                                            # since C doesn't involve in the calculation flow of x
    dprev_c = dnext_c * f                                   # dnext_c / dprev_c = f while dLoss / dc = dnext_c
    df = dnext_c * prev_c
    di = dnext_c * g
    dg = dnext_c * i
    do = dnext_h * numpy.tanh(next_c)

    # put separate parts into da
    # derivative of sigmoid = sigmoided * (1 - sigmoided)
    da[ : , : H] = di * i * ( 1 - i )
    da[ : , H : 2 * H] = df * f * ( 1 - f )
    da[ : , 2 * H : 3 * H] = do * o * ( 1 - o )
    da[ : , 3 * H : 4 * H] = dg * ( 1 - g ** 2)             # this one comes from tanh -> derivative = sech ^ 2

    # calculate the rest
    db = numpy.sum(da, axis = 0)                            # since everywhere same bias
    dx = numpy.dot(da, Wx.T)
    dWx = numpy.dot(x_input.T, da)
    dprev_h = numpy.dot(da, Wh.T)
    dWh = numpy.dot(prev_h.T, da)

    # return
    return (dx, dprev_h, dprev_c, dWx, dWh, db)

#### More time-steps

In [None]:
import numpy
import typing

def lstm_forward(x_input:numpy.ndarray, h0:numpy.ndarray, Wx:numpy.ndarray, Wh:numpy.ndarray, b:numpy.ndarray) \
    -> typing.Tuple[numpy.ndarray, typing.List[typing.Tuple[numpy.ndarray, ... ]]]:

    """
    Forward pass for an LSTM over an entire sequence of data.

    We assume an input sequence composed of T vectors, each of dimension D. The LSTM uses a hidden
    size of H, and we work over a minibatch containing N sequences. After running the LSTM forward,
    we return the hidden states for all timesteps.
    Note that the initial cell state is passed as input, but the initial cell state is set to zero.
    Also note that the cell state is not returned; it is an internal variable to the LSTM and is not
    accessed from outside.

    Inputs:
    - x: Input data of shape (N, T, D)
    - h0: Initial hidden state of shape (N, H)
    - Wx: Weights for input-to-hidden connections, of shape (D, 4H)
    - Wh: Weights for hidden-to-hidden connections, of shape (H, 4H)
    - b: Biases of shape (4H,)

    Returns a tuple of:
    - h: Hidden states for all timesteps of all sequences, of shape (N, T, H)
    - cache: Values needed for the backward pass.
    """

    # get shape
    N, T, D = x_input.shape
    N, H = h0.shape

    h = numpy.zeros((N, T, H))
    cache = []

    # initially, h = h0, c = 0
    h_i = h0
    c_i = numpy.zeros((N, H))

    # for every time step
    for i in range(T):
        h_i, c_i, cache_i = lstm_step_forward(x_input[:, i, :], h_i, c_i, Wx, Wh, b)
        h[:, i, :] = h_i
        cache.append(cache_i)

    return h, cache

def lstm_backward(dh:numpy.ndarray, cache:typing.List[typing.Tuple[numpy.ndarray, ... ]]) \
    -> typing.Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray]:

    """
    Backward pass for an LSTM over an entire sequence of data.

    Inputs:
    - dh: Upstream gradients of hidden states, of shape (N, T, H)
    - cache: Values from the forward pass

    Returns a tuple of:
    - dx: Gradient of input data of shape (N, T, D)
    - dh0: Gradient of initial hidden state of shape (N, H)
    - dWx: Gradient of input-to-hidden weight matrix of shape (D, 4H)
    - dWh: Gradient of hidden-to-hidden weight matrix of shape (H, 4H)
    - db: Gradient of biases, of shape (4H,)
    """

    # get shape
    N, T, H = dh.shape
    D = cache[0][0].shape[1]

    dx = numpy.zeros(shape = (N, T, D))
    dWx = numpy.zeros(shape = (D, 4 * H))
    dWh = numpy.zeros(shape = (H, 4 * H))
    db = numpy.zeros(shape = 4 * H)
    dh_i = numpy.zeros(shape = (N, H))
    dc_i = numpy.zeros(shape = (N, H))

    # in reversed order
    for i in range(T - 1, -1, -1):
        dh_i += dh[:, i, :]
        dx_i, dh_i, dc_i, dWx_i, dWh_i, db_i = lstm_step_backward(dh_i, dc_i, cache[i])
        dx[:, i, :] = dx_i
        dWx += dWx_i
        dWh += dWh_i
        db += db_i

    dh0 = dh_i

    return dx, dh0, dWx, dWh, db

### **Fully connected layer** - with time-steps in mind

In [None]:
import numpy
import typing

def temporal_affine_forward(x_input:numpy.ndarray, w:numpy.ndarray, b:numpy.ndarray) \
    -> typing.Tuple[numpy.ndarray, typing.Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray]]:

    """
    Forward pass for a temporal affine layer.

    The input is a set of D-dimensional vectors arranged into a minibatch of N
    timeseries, each of length T. We use an affine function to transform each
    of those vectors into a new vector of dimension M.

    Inputs:
    - x: Input data of shape (N, T, D)
    - w: Weights of shape (D, M)
    - b: Biases of shape (M,)

    Returns a tuple of:
    - out: Output data of shape (N, T, M)
    - cache: Values needed for the backward pass
    """

    # retrieve shape info
    N, T, D = x_input.shape
    M = b.shape[0]

    # performs transformation
    out:numpy.ndarray = x_input.reshape((N * T, D)).dot(b = w).reshape((N, T, M)) + b
    cache:typing.Tuple = (x_input, w, b, out)

    # return
    return (out, cache)

def temporal_affine_backward(dout:numpy.ndarray, cache:typing.Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray]) \
    -> typing.Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray]:

    """
    Backward pass for temporal affine layer.

    Input:
    - dout: Upstream gradients of shape (N, T, M)
    - cache: Values from forward pass

    Returns a tuple of:
    - dx: Gradient of input, of shape (N, T, D)
    - dw: Gradient of weights, of shape (D, M)
    - db: Gradient of biases, of shape (M,)
    """

    # unpack data
    x_input, w, b, out = cache

    # obtain shape info
    N, T, D = x_input.shape
    M = b.shape[0]

    # calculate derivatives
    dx:numpy.ndarray = dout.reshape((N * T, M)).dot(b = w.T).reshape((N, T, D))
    dw:numpy.ndarray = (x_input.reshape((N * T, D)).T).dot(dout.reshape((N * T, M)))
    db:numpy.ndarray = dout.sum(axis = (0, 1))

    # return
    return (dx, dw, db)

### **Softmax Loss function** - with time-steps in mind

The Softmax Loss function, more formally known as Categorical Cross-Entropy Loss, measures the difference between a model's predicted probability distribution and the true probability distribution for a multi-class classification problem. It is the standard loss function paired with the Softmax activation function, as the softmax function outputs probabilities that the loss function then compares to the actual labels. The goal is to minimize this loss, guiding the model to assign high probabilities to the correct classes and low probabilities to incorrect classes.

In [None]:
import numpy
import typing

def temporal_softmax_loss(x:numpy.ndarray, y:numpy.ndarray, mask:numpy.ndarray, verbose:bool = False) \
    -> typing.Tuple[numpy.ndarray, numpy.ndarray]:

    """
    A temporal version of softmax loss for use in RNNs.

    We assume that we are making predictions over a vocabulary of size V for each timestep of a
    timeseries of length T, over a minibatch of size N. The input x gives scores for all vocabulary
    elements at all timesteps, and y gives the indices of the ground-truth element at each timestep.
    We use a cross-entropy loss at each timestep, summing the loss over all timesteps and averaging
    across the minibatch.
    As an additional complication, we may want to ignore the model output at some timesteps, since
    sequences of different length may have been combined into a minibatch and padded with NULL
    tokens. The optional mask argument tells us which elements should contribute to the loss.

    Inputs:
    - x: Input scores, of shape (N, T, V)
    - y: Ground-truth indices, of shape (N, T) where each element is in the range
    0 <= y[i, t] < V
    - mask: Boolean array of shape (N, T) where mask[i, t] tells whether or not
    the scores at x[i, t] should contribute to the loss.

    Returns a tuple of:
    - loss: Scalar giving loss
    - dx: Gradient of loss with respect to scores x.
    """

    # get shape info and flat N * T
    N, T, V = x.shape
    x_flat = x.reshape((N * T, V))
    y_flat = y.reshape((N * T))
    mask_flat = mask.reshape((N * T))

    # calculate loss
    probs = numpy.exp(x_flat - numpy.max(x_flat, axis = 1, keepdims = True))    # for every [V], utilizes broadcasting
    probs /= numpy.sum(probs, axis = 1, keepdims = True)
    loss:numpy.ndarray = -numpy.sum(mask_flat * numpy.log(probs[numpy.arange(N * T), y_flat])) / N  # only sums unmasked region

    # calculate derivative
    dx_flat = probs.copy()
    dx_flat[numpy.arange(N * T), y_flat] -= 1
    dx_flat /= N                            # dx_flat shape (N * T, V)
    dx_flat *= mask_flat[ : , None ]        # use None to skip a dimension (N * T, 1) -> making broadcasting compatable

    if verbose:
        print("dx_flat: ", dx_flat.shape)

    # reshape back
    dx:numpy.ndarray = dx_flat.reshape((N, T, V))

    # return
    return (loss, dx)


### **Classifier Class** - A class that has it all

-> Example from course 231n, a img2txt RNN

In [None]:
import numpy
import typing

class CaptioningRNN:
    """
    A CaptioningRNN produces captions from image features using a recurrent
    neural network.

    The RNN receives input vectors of size D, has a vocab size of V, works on
    sequences of length T, has an RNN hidden dimension of H, uses word vectors
    of dimension W, and operates on minibatches of size N.

    Note that we don't use any regularization for the CaptioningRNN.
    """	

    def __init__(
            self,
            word_to_idx:typing.Dict[str, int],
            input_dim:int = 512,
            wordvec_dim:int = 128,
            hidden_dim:int = 128,
            cell_type:str = "rnn",
            dtype:numpy.dtype = numpy.float32
        ):
        """
        Construct a new CaptioningRNN instance.	
        Inputs:
         - word_to_idx: A dictionary giving the vocabulary. It contains V entries,
         - and maps each string to a unique integer in the range [0, V).
         - input_dim: Dimension D of input image feature vectors.
         - wordvec_dim: Dimension W of word vectors.
         - hidden_dim: Dimension H for the hidden state of the RNN.
         - cell_type: What type of RNN to use; either 'rnn' or 'lstm'.
         - dtype: numpy datatype to use; use float32 for training and float64 for
         - numeric gradient checking.
        """	
        # check RNN type
        if cell_type not in {"rnn", "lstm"}:
            raise ValueError('Invalid cell_type "%s"' % cell_type)

        # assign member variables
        self.cell_type:str                          = cell_type
        self.dtype:numpy.dtype                      = dtype
        self.word_to_idx:typing.Dict[str, int]      = word_to_idx
        self.idx_to_word:typing.Dict[int, str]      = {i: w for w, i in word_to_idx.items()}
        self.params:typing.Dict[str, numpy.ndarray] = {}	
        # protected member variables
        self._null:int                              = word_to_idx["<NULL>"]
        self._start:int                             = word_to_idx.get("<START>", None)
        self._end:int                               = word_to_idx.get("<END>", None)	
        # attribute
        vocab_size:int                              = len(word_to_idx)	
        # Initialize word vectors
        # numpy.random.randn(): The distribution has a mean of 0 and a variance of 1.
        self.params["W_embed"] = numpy.random.randn(vocab_size, wordvec_dim)
        self.params["W_embed"] /= 100	
        # Initialize CNN -> hidden state projection parameters
        # using fully connected layers
        self.params["W_proj"] = numpy.random.randn(input_dim, hidden_dim)
        self.params["W_proj"] /= numpy.sqrt(input_dim)
        self.params["b_proj"] = numpy.zeros(hidden_dim)	
        # Initialize parameters for the RNN
        dim_mul:int = {"lstm": 4, "rnn": 1}[cell_type]
        self.params["Wx"] = numpy.random.randn(wordvec_dim, dim_mul * hidden_dim)
        self.params["Wx"] /= numpy.sqrt(wordvec_dim)
        self.params["Wh"] = numpy.random.randn(hidden_dim, dim_mul * hidden_dim)
        self.params["Wh"] /= numpy.sqrt(hidden_dim)
        self.params["b"] = numpy.zeros(dim_mul * hidden_dim)	
        # Initialize output to vocab weights
        self.params["W_vocab"] = numpy.random.randn(hidden_dim, vocab_size)
        self.params["W_vocab"] /= numpy.sqrt(hidden_dim)
        self.params["b_vocab"] = numpy.zeros(vocab_size)

        # Cast parameters to correct dtype
        for k, v in self.params.items():
            self.params[k] = v.astype(self.dtype)

    def loss(self, features:numpy.ndarray, captions:numpy.ndarray):
        """
        Compute training-time loss for the RNN. We input image features and
        ground-truth captions for those images, and use an RNN (or LSTM) to compute
        loss and gradients on all parameters.

        Inputs:
        - features: Input image features, of shape (N, D)
        - captions: Ground-truth captions; an integer array of shape (N, T + 1) where
                    each element is in the range 0 <= y[i, t] < V

        Returns a tuple of:
        - loss: Scalar loss
        - grads: Dictionary of gradients parallel to self.params
        """	
        # Cut captions into two pieces: captions_in has everything but the last word
        # and will be input to the RNN; captions_out has everything but the first
        # word and this is what we will expect the RNN to generate. These are offset
        # by one relative to each other because the RNN should produce word (t+1)
        # after receiving word t. The first element of captions_in will be the START
        # token, and the first element of captions_out will be the first word.
        captions_in = captions[ : , : -1 ]
        captions_out = captions[ : , 1 : ]
        # You'll need this
        mask = (captions_out != self._null)
        # Weight and bias for the affine transform from image features to initial
        # hidden state
        W_proj, b_proj = self.params["W_proj"], self.params["b_proj"]
        # Word embedding matrix
        W_embed = self.params["W_embed"]
        # Input-to-hidden, hidden-to-hidden, and biases for the RNN
        Wx, Wh, b = self.params["Wx"], self.params["Wh"], self.params["b"]
        # Weight and bias for the hidden-to-vocab transformation.
        W_vocab, b_vocab = self.params["W_vocab"], self.params["b_vocab"]
        loss, grads = 0.0, {}
        ############################################################################
        # TODO: Implement the forward and backward passes for the CaptioningRNN.   #
        # In the forward pass you will need to do the following:                   #
        # (1) Use an affine transformation to compute the initial hidden state     #
        #     from the image features. This should produce an array of shape (N, H)#
        # (2) Use a word embedding layer to transform the words in captions_in     #
        #     from indices to vectors, giving an array of shape (N, T, W).         #
        # (3) Use either a vanilla RNN or LSTM (depending on self.cell_type) to    #
        #     process the sequence of input word vectors and produce hidden state  #
        #     vectors for all timesteps, producing an array of shape (N, T, H).    #
        # (4) Use a (temporal) affine transformation to compute scores over the    #
        #     vocabulary at every timestep using the hidden states, giving an      #
        #     array of shape (N, T, V).                                            #
        # (5) Use (temporal) softmax to compute loss using captions_out, ignoring  #
        #     the points where the output word is <NULL> using the mask above.     #
        #                                                                          #
        #                                                                          #
        # Do not worry about regularizing the weights or their gradients!          #
        #                                                                          #
        # In the backward pass you will need to compute the gradient of the loss   #
        # with respect to all model parameters. Use the loss and grads variables   #
        # defined above to store loss and gradients; grads[k] should give the      #
        # gradients for self.params[k].                                            #
        #                                                                          #
        # Note also that you are allowed to make use of functions from layers.py   #
        # in your implementation, if needed.                                       #
        ############################################################################
        # *****START OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****
        # forward pass
        initial_state, cache_affine = affine_forward(features, W_proj, b_proj)
        word_embed, cache_embed = word_embedding_forward(captions_in, W_embed)
        if self.cell_type == "rnn":
            h, cache_forward = rnn_forward(word_embed, initial_state, Wx, Wh, b)
        elif self.cell_type == "lstm":
            h, cache_forward = lstm_forward(word_embed, initial_state, Wx, Wh, b)
        out_vocab, cache_vocab = temporal_affine_forward(h, W_vocab, b_vocab)
        loss, dout = temporal_softmax_loss(out_vocab, captions_out, mask)
        # backward pass
        dh, dW_vocab, db_vocab = temporal_affine_backward(dout, cache_vocab)
        if self.cell_type=="rnn":
            dword_embed, dinitial_state, dWx, dWh, db = rnn_backward(dh, cache_forward)
        elif self.cell_type=="lstm":
            dword_embed, dinitial_state, dWx, dWh, db = lstm_backward(dh, cache_forward)
        dW_embed = word_embedding_backward(dword_embed, cache_embed)
        dfeatures, dW_proj, db_proj = affine_backward(dinitial_state, cache_affine)
        grads['W_proj'] = dW_proj
        grads['b_proj'] = db_proj
        grads['W_embed'] = dW_embed
        grads['Wx'] = dWx
        grads['Wh'] = dWh
        grads['b'] = db
        grads['W_vocab'] = dW_vocab
        grads['b_vocab'] = db_vocab
        # *****END OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****
        ############################################################################
        #                             END OF YOUR CODE                             #
        ############################################################################
        return loss, grads	
        
    def sample(self, features:numpy.ndarray, max_length:int = 30):
        """
        Run a test-time forward pass for the model, sampling captions for input
        feature vectors.
        At each timestep, we embed the current word, pass it and the previous hidd
        state to the RNN to get the next hidden state, use the hidden state to get
        scores for all vocab words, and choose the word with the highest score as
        the next word. The initial hidden state is computed by applying an affine
        transform to the input image features, and the initial word is the <START>
        token.
        For LSTMs you will also have to keep track of the cell state; in that case
        the initial cell state should be zero.

        Inputs:
        - features: Array of input image features of shape (N, D).
        - max_length: Maximum length T of generated captions.

        Returns:
        - captions: Array of shape (N, max_length) giving sampled captions,
                    where each element is an integer in the range [0, V). The first element
                    of captions should be the first sampled word, not the <START> token.
        """

        N:int = features.shape[0]
        captions = self._null * numpy.ones((N, max_length), dtype = numpy.int32)
        # Unpack parameters
        W_proj, b_proj = self.params["W_proj"], self.params["b_proj"]
        W_embed = self.params["W_embed"]
        Wx, Wh, b = self.params["Wx"], self.params["Wh"], self.params["b"]
        W_vocab, b_vocab = self.params["W_vocab"], self.params["b_vocab"]
        ###########################################################################
        # TODO: Implement test-time sampling for the model. You will need to      #
        # initialize the hidden state of the RNN by applying the learned affine   #
        # transform to the input image features. The first word that you feed to  #
        # the RNN should be the <START> token; its value is stored in the         #
        # variable self._start. At each timestep you will need to do to:          #
        # (1) Embed the previous word using the learned word embeddings           #
        # (2) Make an RNN step using the previous hidden state and the embedded   #
        #     current word to get the next hidden state.                          #
        # (3) Apply the learned affine transformation to the next hidden state to #
        #     get scores for all words in the vocabulary                          #
        # (4) Select the word with the highest score as the next word, writing it #
        #     (the word index) to the appropriate slot in the captions variable   #
        #                                                                         #
        # For simplicity, you do not need to stop generating after an <END> token #
        # is sampled, but you can if you want to.                                 #
        #                                                                         #
        # HINT: You will not be able to use the rnn_forward or lstm_forward       #
        # functions; you'll need to call rnn_step_forward or lstm_step_forward in #
        # a loop.                                                                 #
        #                                                                         #
        # NOTE: we are still working over minibatches in this function. Also if   #
        # you are using an LSTM, initialize the first cell state to zeros.        #
        ###########################################################################
        # *****START OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****
        state, _ = affine_forward(features, W_proj, b_proj)
        word = self._start * numpy.ones(N, dtype = numpy.int8)
        if self.cell_type == "lstm":
            c = numpy.zeros_like(state)
        for i in range(max_length):
            word, _ = word_embedding_forward(word, W_embed)
            if self.cell_type=="rnn":
                state, _ = rnn_step_forward(word, state, Wx, Wh, b)
            elif self.cell_type == "lstm":
                state, c, _ = lstm_step_forward(word, state, c, Wx, Wh, b)
            word_scores, _ = affine_forward(state, W_vocab, b_vocab)
            word = word_scores.argmax(axis = 1)
            captions[ : , i ] = word
        # *****END OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****
        ############################################################################
        #                             END OF YOUR CODE                             #
        ############################################################################
        return captions