In [None]:
import os
import sys

current_dir = os.getcwd()
parent_dir = os.path.abspath(os.path.join(current_dir, ".."))

sys.path.append(parent_dir)

In [None]:
import torch
import numpy as np
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset, TensorDataset, random_split
from torch.nn.utils import rnn as rnn_utils
from generators.data_generation import generate_sequences
from model_managers.DeepLearningManager import DeepLearningManager
torch.cuda.empty_cache()

In [None]:
import matplotlib.pyplot as plt

def plot_sequence(points, direction):
    # Create a figure and axis
    fig, ax = plt.subplots(figsize=(3, 2))
    
    # Plot the sequence of points
    point_array = np.array(points)
    ax.plot(point_array[:, 0], point_array[:, 1], marker='o', linestyle='-')
    
    # Plot direction arrow
    if direction == 1:  # Clockwise
        start_point = point_array[0]
        end_point = point_array[-1]
        dx = end_point[0] - start_point[0]
        dy = end_point[1] - start_point[1]
        ax.arrow(start_point[0], start_point[1], dx, dy, head_width=0.1, head_length=0.1, fc='k', ec='k')
    elif direction == 0:  # Counterclockwise
        start_point = point_array[-1]
        end_point = point_array[0]
        dx = end_point[0] - start_point[0]
        dy = end_point[1] - start_point[1]
        ax.arrow(start_point[0], start_point[1], dx, dy, head_width=0.1, head_length=0.1, fc='k', ec='k')
    
    # Set labels and title
    ax.set_xlabel('X')
    ax.set_ylabel('Y')
    ax.set_title('Sequence of Points with Direction')
    
    # Show plot
    plt.grid()
    plt.show()


In [None]:
points, directions = generate_sequences(n=128, seed=13)

In [None]:

for i in range(3):
    plot_sequence(points[i], directions[i])

# Build a Recurrent Neural Network

In [None]:
n_features = 2
n_hidden_dim = 2

torch.manual_seed(101)
rnn_cell = nn.RNNCell(input_size=n_features, hidden_size=n_hidden_dim)
rnn_state = rnn_cell.state_dict()
rnn_state

##### To understand the RNN architecture, we utilize states generated by nn.RNNCell. This allows us to build the architecture from scratch, beginning with linear layers.

In [None]:
# Define the linear layers and get the generated parameters from the RNNCell
linear_input = nn.Linear(n_features, n_hidden_dim)
linear_hidden = nn.Linear(n_hidden_dim, n_hidden_dim)

with torch.no_grad():
    linear_input.weight = nn.Parameter(rnn_state['weight_ih'])
    linear_input.bias = nn.Parameter(rnn_state['bias_ih'])
    linear_hidden.weight = nn.Parameter(rnn_state['weight_hh'])
    linear_hidden.bias = nn.Parameter(rnn_state['bias_hh'])

In [None]:
# Initial hidden state set to 0 with dims 1 x 2
initial_hidden = torch.zeros(1, n_hidden_dim)
initial_hidden

In [None]:
# We can now generate the first hidden state, this is a simple linear transformation without any activ func
th = linear_hidden(initial_hidden)
th

In [None]:
# Now take the first sequence with 4 points, 2 x 4
X = torch.as_tensor(points[0]).float()
X, X.shape

In [None]:
tx = linear_input(X[0:1])
tx

In [None]:
# Add the linear transformations to replicate the RNN
adds = th + tx
# Then use the tanh activation function
torch.tanh(adds)

# What we get is the updated hidden state

In [None]:
rnn_cell(X[0:1])

In [None]:
X[0:1]

## RNN Layer

In [None]:
# Single layer RNN
n_features = 2
n_hidden_dim = 2

torch.manual_seed(101)
rnn_cell = nn.RNN(input_size=n_features, hidden_size=n_hidden_dim)
rnn_state = rnn_cell.state_dict()

# As you can see we have l0 added to the weights and biases that indicates the layer 0
rnn_state

### RNN Input Dimension
In PyTorch, if you set the batch_first argument to True when using the nn.RNN class, it adjusts the expected input tensor layout to have the batch dimension first. Therefore, if batch_first is set to True, the input tensor should have dimensions (batch_size, sequence_length, input_size). This is useful for compatibility with certain data formats or personal preference in organizing data.

However, by default, PyTorch's nn.RNN class assumes the sequence dimension comes first. So, if batch_first is not specified or set to False, the input tensor should have dimensions (sequence_length, batch_size, input_size).

In [None]:
batch = torch.as_tensor(points[:3]).float()
batch.shape

In [None]:
# Convert from B S F -> S B F
permuted_batch = batch.permute(1,0,2)

# RNN friendly dimensions: Sequence - batch - Features
permuted_batch.shape

In [None]:
# Batch second
torch.manual_seed(101)
rnn = nn.RNN(input_size=n_features, hidden_size=n_hidden_dim)
out, final_hidden = rnn(permuted_batch)
out.shape, final_hidden.shape

In [None]:
# Or use batch_first argument
torch.manual_seed(101)
rnn = nn.RNN(input_size=n_features, batch_first=True ,hidden_size=n_hidden_dim)
out, final_hidden = rnn(batch)
out.shape, final_hidden.shape

#### Remember that Datasets and Dataloaders have batch_number as first dimension!

In [None]:
# RNN Layers stacked
torch.manual_seed(101)
rnn_stacked = nn.RNN(input_size=2, hidden_size=2, batch_first=True, num_layers=2)
rnn_stacked_state = rnn_stacked.state_dict()
rnn_stacked_state

In [None]:
# RNN Bidirectional
torch.manual_seed(101)
rnn_bidirect = nn.RNN(input_size=2, hidden_size=2, batch_first=True, bidirectional=True)
state = rnn_bidirect.state_dict()
state

In [None]:
# Create forward RNN and backward RNN and pass the parameters to the models
torch.manual_seed(19)
forward_rnn = nn.RNN(input_size=2, hidden_size=2, batch_first=True)
backward_rnn = nn.RNN(input_size=2, hidden_size=2, batch_first=True)
state

In [None]:
[(k[:-8], v) for k, v in list(state.items())[4:]]

In [None]:
forward_rnn.load_state_dict(dict(list(state.items())[:4]))
backward_rnn.load_state_dict(dict([(k[:-8], v) for k, v in list(state.items())[4:]]))

In [None]:
# Convert the state dictionary into a list of key-value pairs and start from the fifth element
state_items = list(state.items())[4:]

# Initialize an empty dictionary to store the modified key-value pairs
modified_state_dict = {}

# Iterate over the key-value pairs obtained from the state dictionary
for key, value in state_items:
    # Modify the key to remove the '_reverse' suffix, assuming it's present
    modified_key = key[:-8]  # Remove the last 8 characters from the key
    # Add the modified key-value pair to the modified state dictionary
    modified_state_dict[modified_key] = value

# Convert the list of modified key-value pairs back into a dictionary
modified_state_dict = dict(modified_state_dict)

# Load the modified state dictionary into the backward RNN model
backward_rnn.load_state_dict(modified_state_dict)


In [None]:
X = X.reshape(1,4,2)

In [None]:
# Reverse the sequence input to the backward_rnn
x_rev = torch.flip(X, dims=[1])
x_rev, X

In [None]:
out, h = forward_rnn(X)
out, h

In [None]:
out_rev, h_rev = backward_rnn(x_rev)
out_rev, h_rev

In [None]:
torch.cat([out, out_rev], dim=2), torch.cat([h, h_rev])

In [None]:
rnn_bidirect(X)

# Sequence Training

In [None]:
test_points, test_directions = generate_sequences(seed=101)

In [None]:
# Prepare data
train_data = TensorDataset(torch.as_tensor(points).float(),
                           torch.as_tensor(directions).view(-1,1).float())
test_data = TensorDataset(torch.as_tensor(test_points).float(),
                           torch.as_tensor(test_directions).view(-1,1).float())

In [None]:
# Build Dataloaders
train_loader = DataLoader(train_data, batch_size=16, shuffle=True)
test_loader = DataLoader(test_data, batch_size=16)

In [None]:
# Check requires_grad for tensors in train_loader
for x_batch, y_batch in train_loader:
    print("x_batch requires_grad:", x_batch.requires_grad)
    print("y_batch requires_grad:", y_batch.requires_grad)
    break  # Print only the first batch

# Check requires_grad for tensors in test_loader
for x_batch, y_batch in test_loader:
    print("x_batch requires_grad:", x_batch.requires_grad)
    print("y_batch requires_grad:", y_batch.requires_grad)
    break  # Print only the first batch


In [None]:
train_loader.dataset[0]

In [None]:
from models.SimpleRNN import SquareModel
from model_managers.GodoyStepByStep import StepByStep

In [None]:
model = SquareModel(n_features=2, hidden_dim=2, n_outputs=1)
loss = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

In [None]:
model_manager = DeepLearningManager(model, loss, optimizer)

In [None]:
model_manager.set_data_loaders(train_loader=train_loader, val_loader=test_loader)
model_manager.train(n_epochs=1000)

In [None]:
fig = model_manager.plot_losses()

In [None]:
model_manager.loader_apply(test_loader, model_manager.correct)