[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/NNDesignDeepLearning/NNDesignDeepLearning/blob/master/05.PythonChapter/Code/LabSolutions/PythonLab1_Solution.ipynb)

# Nonlinear Sequence Processing Lab 1 Objective

This objective of this lab is to help you experiment with training recurrent neural networks. You will begin with only Python and Numpy, in order to see the details of the operation, and then will move on to PyTorch. The experiments will involve:

- Implementing a recurrent neural network using Python and Numpy.
- Implementing a backpropagation training algorithm in Python and Numpy.
- Testing your algorithms on a dynamic test problem.
- Repeating the steps with PyTorch implementations.

Some of the cells in this notebook are prefilled with working code. In addition, there will be cells with missing code (labeled `# TODO`), which you will need to complete. If you need additional cells, you can use the `Insert` menu at the top of the page.

## Loading Modules

We begin by loading some useful modules.

In [None]:
%matplotlib inline 
import matplotlib.pyplot as plt
import numpy as np

# RNN in Python/Numpy


Next we implement a recurrent neural network in Python/Numpy. We will first implement some components that we will need later. The RNN has the following structure.

![](RNN2.jpg)


## Test Problem - Counting Ones in a Window

We first define the problem that we are going to train the network for. In this case we are going to train a network to count the number of ones that appear in a window. The input to the network will be a binary sequence of ones and zeros. If the number of ones that appears in the window is even, then the network will output a 1. If the number of ones that appears in the window is odd, then the network will output a 0. The following cell implements a method to generate a data set for training.

In [None]:
def generate_counting_ones_data(num_samples, seq_length, window_size):
    """
    Generate input and target data for the "Counting Ones in a Window" task.

    Args:
        num_samples (int): Number of training examples.
        seq_length (int): Length of each binary sequence.
        window_size (int): Number of past steps to consider for counting.

    Returns:
        X (np.ndarray): Input sequences of shape (num_samples, seq_length, 1).
        y (np.ndarray): Target sequences of shape (num_samples, seq_length, 1).
    """
    # Generate random binary sequences (0s and 1s) as integers, then convert to float32
    X = np.random.randint(0, 2, size=(num_samples, seq_length)).astype(np.float32)

    # Initialize targets
    y = np.zeros((num_samples, seq_length), dtype=np.float32)

    for i in range(num_samples):
        for t in range(seq_length):
            # Look back 'window_size' steps (or fewer if at the start)
            start = max(0, t - window_size + 1)
            window = X[i, start:t + 1]

            # Count number of 1s in the window
            count = np.sum(window)

            # Target is 1 if count is even, else 0
            y[i, t] = 1 if count % 2 == 0 else 0

    return X, y

## Sigmoid Activation Function
Now we build up the parts of the network. First, we create the log sigmoid activation function, which we use in the final layer of the network. The output represents the probability that there were an even number of ones in the previous window. For numerical stability, we handle positive values and negative values differently. Also, we clip the net input, if it gets larger than 500, so as not to overflow the exponential function.

In [None]:
def sigmoid(n):
    """
    Numerically stable sigmoid function.

    For numerical stability, we handle large positive and negative values differently:
    - For n >= 0: sigmoid(n) = 1 / (1 + exp(-n))
    - For n < 0: sigmoid(n) = exp(n) / (1 + exp(n))

    This prevents overflow in the exponential function.
    """
    # TODO: Implement a numerically stable sigmoid function
    # 1. Clip n to prevent overflow
    # 2. Handle positive and negative values differently for numerical stability
    # 3. Return the result
    
    return None

## Binary Cross-Entropy Loss

Next, we create the loss function. We will be using Binary Cross-Entropy as our loss function. This is implemented in the following cell. The equation for BCE loss is:
$$\mathcal{L} = -\frac{1}{Q} \sum_{i=1}^Q \left[ t(i) \log(a^{2}(i)) + (1 - t(i)) \log(1 - a^{2}(i)) \right]$$

where $t(i)$ is the true label and $a^{2}(i)$ is the predicted probability. (With the sigmoid activation function, $a^{2}(i)$ will be a value between 0 and 1.)

In [None]:
def binary_crossentropy_loss(a, t):
    """
    Compute binary cross-entropy loss.

    Formula: L = -[t * log(a2) + (1 - t) * log(1 - a2)]

    Args:
        a: Predicted probabilities (output of sigmoid), shape (batch_size, 1) or (batch_size,)
        t: True binary labels (0 or 1), shape (batch_size, 1) or (batch_size,)

    Returns:
        loss: Scalar loss value
    """
    # TODO: Implement binary cross-entropy loss
    # 1. Convert inputs to numpy arrays
    # 2. Flatten inputs if needed
    # 3. Add epsilon for numerical stability to prevent log(0)
    # 4. Compute the binary cross-entropy loss
    # 5. Return the mean loss
    
    return None

## Derivative of the Loss
We will need to take the derivative of the loss function to begin the backpropagation process. It turns out that it is very efficient to take the direct derivative of the loss with respect to the net input $n^{2}$, rather than to take the derivative of the loss with respect to the network output $a^{2}$ and then multiply that by the derivative of the sigmoid activation function.

In [None]:
def binary_crossentropy_logit_gradient(n, t):
    """
    Compute gradient of binary cross-entropy loss with respect to the net input.

    For binary cross-entropy with sigmoid:
    L = -[t * log(sigmoid(n)) + (1 - t) * log(1 - sigmoid(n))]

    The gradient w.r.t. n is simply: dL/dn = sigmoid(n) - t = a - t

    Args:
        n: net input before the sigmoid
        t: True binary labels (0 or 1)

    Returns:
        gradient: Gradient of loss w.r.t. net input n
    """
    # TODO: Implement the gradient of binary cross-entropy loss w.r.t. the net input
    # 1. Compute sigmoid of net input
    # 2. Compute the gradient: sigmoid(n) - t
    # 3. Return the gradient
    
    return None

## Recurrent Neural Network

Now we implement the SimpleRNN network class. The class has five methods:

- `__init__` initializes the network weights and baises, and creates tensors to store the gradients.
- `forward` takes a sequence of inputs, simulates the network, and then returns the outputs. It also saves intermediate variables in a cache for use during the backpropagation process.
- `backward` performs the backpropagation through time algorithm to compute the gradient.
- `clip_gradients` clips the gradients, if the norm becomes too large, to prevent exploding gradient.
- `update_parameters` updates the weights and biases using the steepest descent algorithm.


In [None]:
class SimpleRNN:
    """
    Uses tanh activation for hidden state and sigmoid for output.
    """

    def __init__(self, R: int, S1: int, S2: int, learning_rate: float = 0.01):
        """
        Initialize the RNN with random weights.

        Args:
            R: Size of input vectors
            S1: Number of hidden units
            S2: Size of output vectors
            learning_rate: Learning rate for training
        """
        self.R = R
        self.S1 = S1
        self.S2 = S2
        self.learning_rate = learning_rate

        # Initialize weights with small random values
        # LW_11: hidden to hidden weights
        # IW_11: input to hidden weights
        # LW_21: hidden to output weights
        # b_1: hidden bias
        # b_2: output bias
        self.LW_11 = np.random.randn(S1, S1) * 0.1
        self.IW_11 = np.random.randn(S1, R) * 0.1
        self.LW_21 = np.random.randn(S2, S1) * 0.1
        self.b_1 = np.zeros((S1, 1))
        self.b_2 = np.zeros((S2, 1))

        # Store gradients
        self.dLW_11 = np.zeros_like(self.LW_11)
        self.dIW_11 = np.zeros_like(self.IW_11)
        self.dLW_21 = np.zeros_like(self.LW_21)
        self.db_1 = np.zeros_like(self.b_1)
        self.db_2 = np.zeros_like(self.b_2)

    def forward(self, inputs: list[np.ndarray], a1_prev: np.ndarray | None = None) -> tuple[
        list[np.ndarray], list[np.ndarray], list[np.ndarray]]:
        """
        Forward pass through the RNN.

        Args:
            inputs: List of input vectors (each is column vector)
            a1_prev: Previous hidden state (if None, initialize to zeros)

        Returns:
            outputs: List of output vectors
            a1_states: List of hidden states (including initial state)
            cache: Dictionary containing intermediate values for backprop
        """
        # TODO: Implement the forward pass through the RNN
        # 1. Initialize a1_prev if None
        # 2. Initialize empty lists for outputs, a1_states, and n2_tot
        # 3. Loop through each input:
        #    a. Ensure input is a column vector
        #    b. Update hidden state: a1 = tanh(LW_11 @ a1 + IW_11 @ p + b_1)
        #    c. Compute output: output = sigmoid(LW_21 @ a1 + b_2)
        #    d. Store values in the appropriate lists
        # 4. Create a cache dictionary with inputs, a1_states, and n2_tot
        # 5. Return outputs, a1_states, and cache
        
        return None, None, None

    def backward(self, outputs: list[np.ndarray], targets: list[np.ndarray], cache: dict) -> float:
        """
        Backward pass (backpropagation through time).

        Args:
            outputs: List of output vectors from forward pass
            targets: List of targets
            cache: Cache from forward pass

        Returns:
            loss: Average loss over the sequence
        """
        # TODO: Implement backpropagation through time (BPTT)
        # 1. Extract values from cache
        # 2. Get sequence length
        # 3. Initialize gradients to zero
        # 4. Initialize gradient for next hidden state
        # 5. Initialize total_loss
        # 6. Loop through the sequence in reverse order:
        #    a. Get current values (p_t, a1_t, a1_prev, a2_t, n2_t, target_t)
        #    b. Compute loss for current time step
        #    c. Compute gradient of loss w.r.t. n2
        #    d. Compute gradients for output layer
        #    e. Compute gradient w.r.t. hidden state
        #    f. Compute gradient through tanh activation
        #    g. Compute gradients for hidden layer
        #    h. Update gradient for next iteration
        # 7. Clip gradients to prevent exploding gradients
        # 8. Return average loss
        
        return 0.0

    def clip_gradients(self, max_norm: float = 5.0):
        """Clip gradients to prevent exploding gradients."""
        # TODO: Implement gradient clipping
        # 1. Create a list of all gradients
        # 2. For each gradient:
        #    a. Compute the norm
        #    b. If norm > max_norm, scale the gradient
        
        pass

    def update_parameters(self):
        """Update parameters using computed gradients."""
        # TODO: Implement parameter updates using gradient descent
        # 1. Update LW_11, IW_11, LW_21, b_1, and b_2 using their gradients and the learning rate
        
        pass

## Training the Network

Now we create the training function. Its arguments are an initialized network object, a batch of input sequences, a batch of target sequences and the number of epochs of training. For each epoch, the method does a forward pass using the `forward` method, runs the `backward` method to compute the loss and the gradients and then updates the weights and biases using the `update_parameters` method. It prints the loss every 5 epochs and returns a list of losses during training.

In [None]:
def train_rnn(rnn: SimpleRNN, sequences: list[list[np.ndarray]], targets: list[list[np.ndarray]], epochs: int = 100):
    """Train the RNN on the given dataset."""
    # TODO: Implement the training loop for the RNN
    # 1. Initialize an empty list for losses
    # 2. Loop through epochs:
    #    a. Initialize total_loss
    #    b. Loop through each sequence and target:
    #       i. Perform forward pass
    #       ii. Perform backward pass and get loss
    #       iii. Update parameters
    #       iv. Add loss to total_loss
    #    c. Compute average loss
    #    d. Append average loss to losses list
    #    e. Print loss every 5 epochs
    # 3. Return losses
    
    return []

## Test Run

Now let's test the RNN by creating a training dataset with `generate_counting_ones_data` and training the network. First create the dataset and the network. We will start by using a window size of 3, so the network will be looking back three time steps to see if there are an even number of ones. Create 1000 sequences of length 10.

In [None]:
num_samples = 1000  # Number of training examples
seq_length = 10     # Length of each sequence
window_size = 3     # Look back 3 steps

X, y = generate_counting_ones_data(num_samples, seq_length, window_size)

X = np.expand_dims(X, axis=2)
y = np.expand_dims(y, axis=2)

# Create RNN
rnn = SimpleRNN(R=1, S1=20, S2=1, learning_rate=0.1)

Now train the network for 10 epochs.

In [None]:
# Train the RNN
# TODO: Uncomment the following line after implementing the train_rnn function
# losses = train_rnn(rnn, X, y, epochs=10)

Now let's plot the loss function during training to see how the network has converged.

In [None]:
# Plot training curve
# TODO: Uncomment the following code after implementing and running the training
# plt.figure(figsize=(10, 5))
# plt.plot(losses)
# plt.title('RNN Training Loss')
# plt.xlabel('Epoch')
# plt.ylabel('Loss')
# plt.yscale('log')
# plt.grid(True)

## Test the Trained Network

The loss trajectory should show that the loss is still decreasing after 10 epochs. We'll check the network by giving it a binary input. Let's see if the network output is truly indicating at each time step whether there have been an even or an odd number of ones in the window. An even number should produce a 1, and an odd number should produce a 0. Compare the input and output sequences to verify that this is true.

In [None]:
# Make a numpy array test input of zeros and ones.
test_input = np.array([1,0,1,1,0,0,0,0,1,0,1,0,0,0,0,1,1,1,1,0,0,0])

# The batch size is 1, so we add a dimension
test_input = np.expand_dims(test_input, axis=1)

# Pass the input through the network
# TODO: Uncomment the following line after implementing the forward method
# outputs, _, _ = rnn.forward(test_input)

# Print the inputs and outputs
print('Inputs:')
print(test_input.transpose(1,0))
# TODO: Uncomment the following lines after implementing the forward method
# print('Outputs:')
# print([f"{ii[0][0]:.4e}" for ii in outputs])
# print('Outputs rounded to nearest integer:')
# print([np.int64(np.round(ii[0][0])) for ii in outputs])

# PyTorch Implementation

Now implement the same network using PyTorch. We need to first import the PyTorch libraries

In [None]:
# Import the needed PyTorch libraries
import torch
import torch.nn as nn
import torch.optim as optim

## Implementing the RNN in PyTorch

We now create the network class. We don't need to define the sigmoid activation function, the binary cross-entropy loss or its derivative, because they are built in to PyTorch. Also, we don't need to store layer outputs at each time step, or have a backward method, because PyTorch does this in the background. PyTorch will unfold the network and perform the backpropagation automatically. The network class has these methods:

- `__init__` defines the network layers, loss function and optimizer.
- `forward` takes a sequence of inputs, simulates the network, and then returns the outputs.
- `compute_loss` computes the loss for a batch of data.
- `train_step` performs the standard PyTorch training sequence - zeros the gradient, passes forward through the network, computes the loss, passes backward to compute the gradient, clips the gradient and updates the weights.

In [None]:
class SimplePyTorchRNN(nn.Module):
    """
    PyTorch implementation.
    """
    def __init__(self, R: int, S1: int, S2: int, learning_rate: float = 0.01):
        super(SimplePyTorchRNN, self).__init__()

        self.R = R
        self.S1 = S1
        self.S2 = S2
        self.learning_rate = learning_rate

        # Define layers -
        self.IW_11 = nn.Linear(R, S1, bias=True)
        self.LW_11 = nn.Linear(S1, S1, bias=False)
        self.LW_21 = nn.Linear(S1, S2, bias=True)

        # Activation functions
        self.tanh = nn.Tanh()
        self.sigmoid = nn.Sigmoid()  # Note: we'll use BCE loss with sigmoid

        # Loss function - Binary Cross Entropy
        self.criterion = nn.BCELoss()

        # Optimizer - compare with manual parameter updates
        self.optimizer = optim.SGD(self.parameters(), lr=learning_rate)

    def forward(self, inputs: torch.Tensor, a1_prev: torch.Tensor = None) -> tuple[torch.Tensor, torch.Tensor]:
        """
        Forward pass through the RNN.

        Compare with numpy version:
        - No manual cache management needed
        - PyTorch tracks computation graph automatically
        - Batch processing is built-in

        Args:
            inputs: Tensor of shape (batch_size, seq_len, R)
            a1_prev: Previous hidden state (batch_size, S1)

        Returns:
            outputs: Output probabilities (batch_size, seq_len, S2)
            a1_tot: All hidden states (batch_size, seq_len+1, S1)
        """
        # TODO: Implement the forward pass for the PyTorch RNN
        # 1. Get batch_size and seq_len from inputs
        # 2. Initialize a1_prev if None
        # 3. Initialize empty lists for outputs and a1_tot
        # 4. Loop through each time step:
        #    a. Update hidden state using tanh activation
        #    b. Compute output using sigmoid activation
        #    c. Store values in the appropriate lists
        # 5. Stack outputs and hidden states
        # 6. Return outputs and a1_tot
        
        return None, None

    def compute_loss(self, outputs: torch.Tensor, targets: torch.Tensor) -> torch.Tensor:
        """
        Compute loss using PyTorch's built-in functions.

        Compare with numpy version:
        - No manual cross-entropy implementation
        - No numerical stability concerns
        - Automatic broadcasting
        """
        # TODO: Implement the loss computation for the PyTorch RNN
        # 1. Reshape outputs and targets for the loss function
        # 2. Return the computed loss
        
        return None

    def train_step(self, inputs: torch.Tensor, targets: torch.Tensor) -> float:
        """
        Single training step.

        Compare with numpy version:
        - No manual gradient computation (backward())
        - No manual parameter updates (optimizer.step())
        - No gradient clipping needed (can be added easily)
        """
        # TODO: Implement a single training step for the PyTorch RNN
        # 1. Zero gradients
        # 2. Perform forward pass
        # 3. Compute loss
        # 4. Perform backward pass
        # 5. Clip gradients (optional)
        # 6. Update parameters
        # 7. Return loss value
        
        return 0.0

## Collating the Sequence

Because PyTorch can use a data loader and can train using minibatches, we create a method that will collate sequences and can have sequences of different length within the same minibatch.

In [None]:
def collate_sequences(sequences: list[list[np.ndarray]], targets: list[list[int]]) -> tuple[torch.Tensor, torch.Tensor]:
    """
    Convert variable-length sequences to padded tensors for batch processing.

    This shows how PyTorch handles variable-length sequences compared to
    the loop-based approach in the numpy version.
    """
    # TODO: Implement sequence collation for PyTorch
    # 1. Find maximum sequence length
    # 2. Initialize empty lists for padded sequences and targets
    # 3. Loop through each sequence and target:
    #    a. Convert sequence to tensor
    #    b. Pad sequence if needed
    #    c. Convert target to tensor
    #    d. Pad target if needed
    #    e. Append to the appropriate lists
    # 4. Stack padded sequences and targets
    # 5. Return stacked tensors
    
    return None, None

## Training Function

Now we create the training function. It takes as arguments an initialized network, the input sequences (a list of lists of numpy arrays), the targets (a list of lists of integers), and the number of epochs. It trains the network and then returns a list of losses at each epoch.

In [None]:
def train_pytorch_rnn(rnn: SimplePyTorchRNN, sequences: list[list[np.ndarray]], targets: list[list[int]], epochs: int = 100,
                      batch_size: int = 32):
    """
    Train the PyTorch RNN with batch processing.

    Compare with numpy version:
    - Batch processing built-in
    - Automatic gradient computation
    - No manual parameter updates
    - Much shorter code
    """
    # TODO: Implement the training loop for the PyTorch RNN
    # 1. Convert sequences and targets to tensors using collate_sequences
    # 2. Create a dataset and dataloader
    # 3. Initialize an empty list for losses
    # 4. Loop through epochs:
    #    a. Initialize epoch_loss and num_batches
    #    b. Loop through each batch:
    #       i. Perform a training step and get loss
    #       ii. Add loss to epoch_loss
    #       iii. Increment num_batches
    #    c. Compute average loss
    #    d. Append average loss to losses list
    #    e. Print loss every 10 epochs
    # 5. Return losses
    
    return []

## Test Run

Now let's test the RNN by creating a training dataset with `generate_counting_ones_data` and training the network. First create the dataset and the network. As in the Python/Numpy version, we will start by using a window size of 3, so the network will be looking back three time steps to see if there are an even number of ones. Generate 1000 sequences of length 10.

In [None]:
num_samples = 1000  # Number of training examples
seq_length = 10     # Length of each sequence
window_size = 3     # Look back 3 steps

X, y = generate_counting_ones_data(num_samples, seq_length, window_size)

X = np.expand_dims(X, axis=2)
y = np.expand_dims(y, axis=2)

rnn = SimplePyTorchRNN(R=1, S1=20, S2=1, learning_rate=0.1)

Now train the network for 10 epochs.

In [None]:
# TODO: Uncomment the following line after implementing the train_pytorch_rnn function
# losses = train_pytorch_rnn(rnn, X, y, epochs=10, batch_size=16)

Now plot the loss function during training to see how the network has converged.

In [None]:
# Plot training curve
# TODO: Uncomment the following code after implementing and running the training
# plt.figure(figsize=(10, 5))
# plt.plot(losses)
# plt.title('PyTorch RNN Training Loss')
# plt.xlabel('Epoch')
# plt.ylabel('Loss')
# plt.yscale('log')
# plt.grid(True)

# Test the Trained Network

We'll perform the same test that we used for the Python/Numpy version. The results should be similar, since both programs are performing gradient descent on the same RNN.

In [None]:
test_input = torch.tensor(np.array([1,0,1,1,0,0,0,0,1,0,1,0,0,0,0,1,1,1,1,0,0,0]))
test_input = test_input.to(torch.float32)
test_input = torch.unsqueeze(test_input, 0)
test_input = torch.unsqueeze(test_input, 2)

# TODO: Uncomment the following line after implementing the forward method
# outputs, _ = rnn.forward(test_input)

# Print the inputs and outputs
print('Inputs:')
print(test_input[0,:,0].long())
# TODO: Uncomment the following lines after implementing the forward method
# print('Outputs:')
# print(outputs[0,:,0])
# print('Outputs rounded to nearest integer:')
# print(torch.round(outputs[0,:,0]).long())

## Explore Further

Experiment with different window sizes in the training data. The longer the window, the more difficult it will be to train the network, because the network will have to increase the length of its memory. Some other things you could experiment with are:
- other optimizers, like Adam.
- learning rates.
- numbers of neurons in the hidden layer, $S_{1}$.
- different test sequences.
- numbers of sequences in the training set.
- length of training set sequences.
- modify the code to use an LSTM, instead of an RNN. Compare the abilities of each to handle longer window sizes.
- Could this problem be solved with a FTDNN? What is an advantage of using an RNN instead?