In [1]:
import torch
import torch.nn as nn
from torch.autograd import Variable
import numpy as np

# RNN

Recurrent Neural Networks (RNNs) are a type of neural network that are used to model sequential data such as time series, audio, natural language processing, and many others. The fundamental difference between RNNs and other neural networks is that they maintain a state, or "memory", of previous inputs that they've seen. This makes them particularly well-suited to tasks that require processing sequential information.

An RNN consists of a series of nodes, or "cells", that pass information from one to the next. Each cell takes an input and a hidden state from the previous cell as input, and produces an output and a new hidden state as output. The output is typically fed into a classifier or another neural network to produce a final prediction.

![rnn](https://pluralsight2.imgix.net/guides/f6c9b982-4be1-48d5-9f44-bcf7f772eccd_4.JPG)

The simplest RNN architecture is the Elman network, which consists of a single hidden layer. The input at each time step is fed into the input layer, and then processed by the hidden layer, which maintains a state that is passed on to the next time step. The output is produced by a linear or softmax layer that takes the hidden state as input.

A more advanced type of RNN architecture is the Long Short-Term Memory (LSTM) network, which was designed to address the problem of vanishing gradients in Elman networks. LSTMs use a more complex cell structure that consists of several "gates" that control the flow of information through the cell. This allows LSTMs to better maintain long-term dependencies in sequential data.

here is the mathematical formula for a basic RNN with one hidden layer:

$$
\begin{aligned}
h_t &= \sigma(W_{xh} x_t + W_{hh} h_{t-1} + b_h) \\
y_t &= \mathrm{softmax}(W_{hy} h_t + b_y) \\
\end{aligned}
$$

where:
- $h_t$ is the hidden state at time $t$
- $x_t$ is the input at time $t$
- $y_t$ is the output at time $t$
- $W_{xh}$ is the weight matrix for input-to-hidden connections
- $W_{hh}$ is the weight matrix for hidden-to-hidden connections
- $W_{hy}$ is the weight matrix for hidden-to-output connections
- $b_h$ is the bias term for the hidden layer
- $b_y$ is the bias term for the output layer
- $\sigma$ is the activation function, typically sigmoid or tanh
- $\mathrm{softmax}$ is the output activation function, used for classification problems to produce a probability distribution over classes.

Note that this formula assumes a simple RNN with only one hidden layer. More complex architectures, such as stacked or bidirectional RNNs, may have additional layers or connections.

In [27]:
class RNNCell(nn.Module):
    def __init__(self, input_size, hidden_size, bias=True,nonlinearity='tanh'):
        super(RNNCell, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.bias = bias
        self.nonlinearity = nonlinearity

        if self.nonlinearity not in ['relu', 'tanh']:
            raise ValueError("Invalid nonlinearity selected for RNN.")

        self.x2h = nn.Linear(input_size, hidden_size, bias=bias)
        self.h2h = nn.Linear(hidden_size, hidden_size, bias)

        self.reset_parameters()

    def reset_parameters(self):
        std = 1.0 / np.sqrt(self.hidden_size)
        for w in self.parameters():
            w.data.uniform_(-std, std)

    
    def forward(self, input, hx=None):
        # Inputs:
        #       input: of shape (batch_size, input_size)
        #       hx: of shape (batch_size, hidden_size)
        # Output:
        #       hy: of shape (batch_size, hidden_size)

        if hx is None:
            hx = Variable(input.new_zeros(input.size(0), self.hidden_size))

        hy = (self.x2h(input) + self.h2h(hx))

        if self.nonlinearity == 'tanh':
            hy = torch.tanh(hy)
        else:
            hy = torch.relu(hy)

        return hy 

In [28]:
class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, bias, output_size, activation='tanh'):
        super(RNN, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.bias = bias
        self.output_size = output_size

        self.rnn_cell_list = nn.ModuleList()

        if activation == "tanh":
            self.rnn_cell_list.append(
                RNNCell(
                    self.input_size,
                    self.hidden_size, 
                    self.bias,
                    "tanh"
                )
            )

            for l in range(1, self.num_layers):
                self.rnn_cell_list.append(
                    RNNCell(
                        self.hidden_size,
                        self.hidden_size,
                        self.bias,
                        "tanh"
                    )
                )
        elif activation == "relu":
            self.rnn_cell_list.append(
                RNNCell(
                    self.input_size,
                    self.hidden_size, 
                    self.bias,
                    "relu"
                )
            )

            for l in range(1, self.num_layers):
                self.rnn_cell_list.append(
                    RNNCell(
                        self.hidden_size,
                        self.hidden_size,
                        self.bias,
                        "relu"
                    )
                )
            
        else:
            raise ValueError("Invalid Activation")
        
        self.fc = nn.Linear(
            self.hidden_size,
            self.output_size
        )

    def forward(self, input, hx=None):
        # Input of shape (batch_size, seqence length, input_size)
        #
        # Output of shape (batch_size, output_size)

        if hx is None:
            if torch.cuda.is_available():
                h0 = Variable(torch.zeros(self.num_layers, input.size(0), self.hidden_size).cuda())
            else:
                h0 = Variable(torch.zeros(self.num_layers, input.size(0), self.hidden_size))
        else:
            h0 = hx 
        
        outs = []

        hidden = list()
        for layer in range(self.num_layers):
            hidden.append(h0[layer, :, :])

        for t in range(input.size(1)):
            for layer in range(self.num_layers):
                if layer == 0:
                    hidden_l = self.rnn_cell_list[layer](input[:,t,:], hidden[layer])
                else:
                    hidden_l = self.rnn_cell_list[layer](hidden[layer-1], hidden[layer])
                hidden[layer] = hidden_l

            outs.append(hidden_l)
        
        # Take only last time step. modify for seq to seq
        out = outs[-1].squeeze()
        out = self.fc(out)
        
        return out

        

In [29]:
# Define input tensor of shape (batch_size, sequence_length, input_size)
input_tensor = torch.rand((2, 5, 3))

# Define LSTM module with input size=3, hidden size=4, 2 layers, bias=True, output size=2
rnn = RNN(input_size=3, hidden_size=4,
            num_layers=2, bias=True, output_size=2)

# Forward pass through the LSTM module
output_tensor = rnn(input_tensor)

# Print the shape of the output tensor
print(output_tensor.shape)
print(output_tensor)


torch.Size([2, 2])
tensor([[-0.4781, -0.2210],
        [-0.4721, -0.1940]], grad_fn=<AddmmBackward0>)


In [21]:
# input_tensor = torch.rand((2, 5, 3))
# print(input_tensor)
# Variable(input_tensor.new_zeros(input_tensor.size(0), 3))

"""
nn.Linear() is a PyTorch module that applies a linear transformation to the input tensor, i.e., it performs a matrix multiplication followed by a bias addition.

The output tensor has the shape (batch_size, out_features) where out_features is the number of output features specified during module initialization. The input tensor should have the shape (batch_size, in_features) where in_features is the number of input features.
"""
input_size = 100
hidden_size = 5
bias = True
# nn.Linear(input_size, hidden_size, bias=bias)