In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

In [None]:
from IPython.display import Image, display, Markdown

rnn_m2m_image_path = "../../images/rnn_m2m.png"
display(Image(filename=rnn_m2m_image_path))

rnn_image_path = "../../images/simplernn.png"
display(Markdown(f"# <ins>SimpleRNN / RNN Cell<ins>"))
display(Image(filename=rnn_image_path))

lstm_image_path = "../../images/lstm.png"
display(Markdown(f"# <ins>LSTM Cell<ins>"))
display(Image(filename=lstm_image_path))

rnn_lstm_gru_image_path = "../../images/rnn_lstm_gru.png"
display(Markdown(f"# <ins>RNN / LSTM / GRU Cell<ins>"))
display(Image(filename=rnn_lstm_gru_image_path))

## <ins>SimpleRNN<ins>:
$$ h_t = \sigma(W_{ih} \cdot x_t + b_{ih} + W_{hh} \cdot h_{t-1} + b_{hh}) $$

- $h_t$: Hidden state at time $t$.
- $x_t$: Input at time $t$.
- $W_{ih}$: Weight matrix for the input.
- $b_{ih}$: Bias vector for the input.
- $W_{hh}$: Weight matrix for the hidden state.
- $b_{hh}$: Bias vector for the hidden state.
- $\sigma$: Activation function (usually sigmoid).

## <ins>LSTM<ins>:
$$
\begin{align*}
i_t &= \sigma(W_{ii} \cdot x_t + b_{ii} + W_{hi} \cdot h_{t-1} + b_{hi}) \\
f_t &= \sigma(W_{if} \cdot x_t + b_{if} + W_{hf} \cdot h_{t-1} + b_{hf}) \\
g_t &= \tanh(W_{ig} \cdot x_t + b_{ig} + W_{hg} \cdot h_{t-1} + b_{hg}) \\
o_t &= \sigma(W_{io} \cdot x_t + b_{io} + W_{ho} \cdot h_{t-1} + b_{ho}) \\
c_t &= f_t \cdot c_{t-1} + i_t \cdot g_t \\
h_t &= o_t \cdot \tanh(c_t)
\end{align*}
 $$

 - $i_t$: Input gate.
- $f_t$: Forget gate.
- $g_t$: Cell gate.
- $o_t$: Output gate.
- $c_t$: Cell state.
- $h_t$: Hidden state.
- $x_t$: Input at time $t$.
- $W_{ii}, W_{if}, W_{ig}, W_{io}$: Weight matrices for input gates.
- $b_{ii}, b_{if}, b_{ig}, b_{io}$: Bias vectors for input gates.
- $W_{hi}, W_{hf}, W_{hg}, W_{ho}$: Weight matrices for hidden state gates.
- $b_{hi}, b_{hf}, b_{hg}, b_{ho}$: Bias vectors for hidden state gates.
- $\sigma$: Sigmoid activation function.
- $\tanh$: Hyperbolic tangent activation function.

## <ins>GRU<ins>:
$$
\begin{align*}
z_t &= \sigma(W_{iz} \cdot x_t + b_{iz} + W_{hz} \cdot h_{t-1} + b_{hz}) \\
r_t &= \sigma(W_{ir} \cdot x_t + b_{ir} + W_{hr} \cdot h_{t-1} + b_{hr}) \\
\tilde{h}_t &= \tanh(W_{ih} \cdot x_t + b_{ih} + r_t \cdot (W_{hh} \cdot h_{t-1} + b_{hh})) \\
h_t &= (1 - z_t) \cdot \tilde{h}_t + z_t \cdot h_{t-1}
\end{align*}
$$

- $z_t$: Update gate.
- $r_t$: Reset gate.
- $\tilde{h}_t$: Candidate hidden state.
- $h_t$: Hidden state.
- $x_t$: Input at time $t$.
- $W_{iz}, W_{ir}, W_{ih}$: Weight matrices for input, reset, and candidate hidden state.
- $b_{iz}, b_{ir}, b_{ih}$: Bias vectors for input, reset, and candidate hidden state.
- $W_{hz}, W_{hr}$: Weight matrices for hidden state in update and reset gates.
- $b_{hz}, b_{hr}$: Bias vectors for hidden state in update and reset gates.
- $\sigma$: Sigmoid activation function.
- $\tanh$: Hyperbolic tangent activation function.

In [None]:
class FlexibleRNN(nn.Module):
    def __init__(
        self, input_size, hidden_size, output_size, num_layers, rnn_type="rnn"
    ):
        super(FlexibleRNN, self).__init__()

        # Choose the RNN cell type
        if rnn_type.lower() == "rnn":
            # input_size: The number of expected features in the input `x`
            # hidden_size: The number of features in the hidden state `h`
            # num_layers: Number of recurrent layers. E.g., setting ``num_layers=2``
            #             would mean stacking two RNNs together to form a `stacked RNN`,
            #             with the second RNN taking in outputs of the first RNN and
            #             computing the final results. Default: 1
            #             There is no such argument in TensorFlow, which means multiple RNN
            #             layers can only be stacked using tf.keras.Sequential() in TensorFlow
            # batch_first: If ``True``, then the input and output tensors are provided
            #              as `(batch, seq, feature)` instead of `(seq, batch, feature)`.
            #              Note that this does not apply to hidden or cell states. Default: ``False``
            self.rnn = nn.RNN(
                input_size=input_size,
                hidden_size=hidden_size,
                num_layers=num_layers,
                batch_first=True,
            )
        elif rnn_type.lower() == "lstm":
            self.rnn = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        elif rnn_type.lower() == "gru":
            self.rnn = nn.GRU(input_size, hidden_size, num_layers, batch_first=True)
        else:
            raise ValueError("Invalid rnn_type. Choose 'rnn', 'lstm', or 'gru'.")

        self.layernorm = nn.LayerNorm(hidden_size)

        # Fully connected layer to map RNN output to the desired output size
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        # RNN forward pass
        rnn_out, _ = self.rnn(x)

        # Extract the output from the last time step
        x = self.layernorm(rnn_out[:, -1, :])

        # Fully connected layer
        x = self.fc(x)

        return x

    def train_model(
        self, train_data, train_labels, num_epochs=300, learning_rate=0.001
    ):
        # Define loss function and optimizer
        criterion = nn.MSELoss()
        optimizer = optim.Adam(self.parameters(), lr=learning_rate)

        for epoch in range(num_epochs):
            # Forward pass
            outputs = self(train_data)
            loss = criterion(outputs, train_labels)

            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            if (epoch + 1) % 10 == 0:
                print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")

    def predict(self, input_data):
        # Make predictions for input_data
        with torch.no_grad():
            predictions = self(input_data)
        return predictions

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim


class TransformerModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_heads):
        super(TransformerModel, self).__init__()

        # MultiheadAttention layer
        self.attention = nn.MultiheadAttention(
            embed_dim=input_size, num_heads=num_heads
        )

        # Feedforward layer
        self.feedforward = nn.Sequential(
            nn.Linear(input_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, output_size),
        )

    def forward(self, x):
        # Self-attention layer
        attention_out, _ = self.attention(x, x, x)

        # Feedforward layer
        output = self.feedforward(
            attention_out.mean(dim=1)
        )  # Aggregate over the sequence

        return output

    def train_model(
        self, train_data, train_labels, num_epochs=300, learning_rate=0.001
    ):
        # Define loss function and optimizer
        criterion = nn.MSELoss()
        optimizer = optim.Adam(self.parameters(), lr=learning_rate)

        for epoch in range(num_epochs):
            # Forward pass
            outputs = self(train_data)
            loss = criterion(outputs, train_labels)

            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            if (epoch + 1) % 10 == 0:
                print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")

    def predict(self, input_data):
        # Make predictions for input_data
        with torch.no_grad():
            predictions = self(input_data)
        return predictions

In [None]:
# Example usage (RNNs for sequence processing):
# Define input size, hidden size, and output size
input_size = 10  # Number of expected features in the input
hidden_size = 20  # Number of features in the hidden state
output_size = 5  # Size of each output sample
num_layers = 2  # Number of recurrent layers

# Create an instance of the FlexibleRNN model
model = FlexibleRNN(input_size, hidden_size, output_size, num_layers)

# Create an instance of the FlexibleRNN model with LSTM
model_lstm = FlexibleRNN(
    input_size, hidden_size, output_size, num_layers, rnn_type="lstm"
)

# Create an instance of the FlexibleRNN model with GRU
model_gru = FlexibleRNN(
    input_size, hidden_size, output_size, num_layers, rnn_type="gru"
)

# Generate synthetic training data
train_data = torch.randn(100, 8, input_size)
train_labels = torch.randn(100, output_size)

# Train the model
model.train_model(train_data, train_labels)

# Train the LSTM model
model_lstm.train_model(train_data, train_labels)

# Train the GRU model
model_gru.train_model(train_data, train_labels)

# Generate a test sequence
test_data = torch.randn(1, 8, input_size)

# Make predictions
predictions = model.predict(test_data)
print("Predictions:", predictions)

# Make predictions with LSTM model
predictions_lstm = model_lstm.predict(test_data)
print("Predictions (LSTM):", predictions_lstm)

# Make predictions with GRU model
predictions_gru = model_gru.predict(test_data)
print("Predictions (GRU):", predictions_gru)

In [None]:
# Example usage (transformer for sequence processing):
# Define input size, hidden size, and output size
input_size = 10  # Total dimension of the model (embed_dim)
hidden_size = 20  # Number of features in the hidden layers of feed-forward network
output_size = 5
num_heads = 2  # Number of parallel attention heads

# Create an instance of the TransformerModel
model_transformer = TransformerModel(input_size, hidden_size, output_size, num_heads)

# Generate synthetic training data
train_data = torch.randn(100, 8, input_size)
train_labels = torch.randn(100, output_size)

# Train the Transformer model
model_transformer.train_model(train_data, train_labels)

# Generate a test sequence
test_data = torch.randn(1, 8, input_size)

# Make predictions with Transformer model
predictions_transformer = model_transformer.predict(test_data)
print("Predictions (Transformer):", predictions_transformer)