# RNN stock prices example

### "One to One" RNN architecture

- In a "One to One" RNN architecture, the model takes one input and produces one output.

- In this case, we're using the stock prices of two consecutive days (yesterday and today) as input to predict the stock price for the next day (tomorrow). 

- This is still considered a "One to One" architecture because for each input sequence (consisting of two days' prices), the RNN produces a single output (the price for the next day).

In [None]:
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
import numpy as np

# Data Preparation
# Days array remains the same
dataset_len = 100

x = np.array(range(1, dataset_len))  # Days

# Fixed stock prices array in a simple descending order
#y = np.array(range(20, 0, -1))  # Stock prices
y = np.sin(x * np.pi / 18)

print("Days array", x)
print("Stock prices array", y)

def ploting_the_data(x, y):
    # Plotting the data
    plt.figure(figsize=(10, 6))
    plt.plot(x, y, marker='o')
    plt.title('Stock Price Trend Over 20 Days')
    plt.xlabel('Day')
    plt.ylabel('Stock Price (sin function)')
    plt.grid(True)
    plt.show()
    
ploting_the_data(x, y)

# Convert the list of sequences to a NumPy array before converting to a tensor
# prepare time frames for 2 days yestarday and today - array([17, 16]) 

# Hyper Parameter - Time frame window size for predictions
time_frame_window_size = 3

X_np = np.array([y[n-time_frame_window_size:n] for n in range(time_frame_window_size, dataset_len)])
X = torch.tensor(X_np, dtype=torch.float).unsqueeze(-1)
Y = torch.tensor(y[time_frame_window_size:], dtype=torch.float)


print("X.shape", X.shape)
print("Y.shape", Y.shape)

print("X[0]", X[0])
print("X[1]", X[1])
print("Y[1]", Y[0])



# Model parameters
input_size = time_frame_window_size  # Because we are inputting N day's price at a time
hidden_size = 2  # Size of the RNN's hidden state
output_size = 1  # We want to output one price

# Define the RNN model
class StockRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(StockRNN, self).__init__()
        # RNN layer: Defines a simple RNN layer with the specified input and hidden size.
        # 'batch_first=True' indicates that the first dimension of the input and output will be the batch size.        
        self.rnn = nn.RNN(input_size, hidden_size, batch_first=True)  # RNN layer
        self.fc = nn.Linear(hidden_size, output_size)  # Fully connected layer for output DNN 

    def forward(self, x):
        #print("x.shape => ", x.shape)
        x = x.squeeze(-1)
        #print("x 2D=> ", x.shape) 
        out, _ = self.rnn(x)  # RNN output
        res = self.fc(out)  # Final output for each sequence
        #print("res ", res.shape) 
        return res

# Instantiate the model
model = StockRNN(input_size, hidden_size, output_size)

# Loss and optimizer
criterion = nn.MSELoss()  # Mean Squared Error Loss
optimizer = torch.optim.Adam(model.parameters(), lr=0.1)

# Training the model
for epoch in range(220):
    optimizer.zero_grad()
    output = model(X)  # Forward pass
    loss = criterion(output, Y)  # Compute loss
    loss.backward()  # Backpropagation
    optimizer.step()  # Update weights

    # Print loss every 20 epochs
    if (epoch+1) % 20 == 0:
        print(f'Epoch [{epoch+1}/200], Loss: {loss.item():.4f}')



# Predict the next day's price

last_working_week = X[-1].T
print("last week shape => ", last_working_week.shape)

# update price by shifting window +1 step and add predicted price for NEXT day.
for i in range(time_frame_window_size):
    print("last_working_week => ", last_working_week)    
    predicted_price = model(last_working_week)
    print(f"Predicted price for tomorrow: {predicted_price.item():.2f}")
    
    x = np.append(x, len(x)+1)
    y = np.append(y, predicted_price.detach().numpy()[0])
    
    print("Updated X => ", x)
    print("Updated Y => ", y)
    
    last_working_week = torch.tensor(torch.from_numpy(x[-time_frame_window_size:]), dtype=torch.float)
    last_working_week = last_working_week.reshape(1,time_frame_window_size)
    
ploting_the_data(x, y)    

### "Many to Many" RNN architecture 

- "One to Many" RNN, the model takes one input and produces a sequence of outputs. 

- For this case, let's say the model takes the stock price of one day and predicts the prices for the next few days.

# LSTM from scratch implementation

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam

import pytorch_lightning as L
from torch.utils.data import TensorDataset, DataLoader

class LSTMFromScrutch(L.LightningModule):
    def __init__(self):
        super(LSTMFromScrutch, self).__init__()
        mean = torch.tensor(0.0)
        std = torch.tensor(1.0)
        
        # Initialize weights and biases for the Forget Gate        
        self.wlr1 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad = True)
        self.wlr2 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad = True)
        self.blr1 = nn.Parameter(torch.tensor(0.), requires_grad = True)
        
        # Initialize weights and biases for the Input Gate
        self.wpr1 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad = True)
        self.wpr2 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad = True)
        self.bpr1 = nn.Parameter(torch.tensor(0.), requires_grad = True)
        
        # Initialize weights and biases for the Tanh function to create new memory
        self.wp1 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad = True)
        self.wp2 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad = True)
        self.bp1 = nn.Parameter(torch.tensor(0.), requires_grad = True)
        
        # Initialize weights and biases for the Output Gate
        self.wo1 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad = True)
        self.wo2 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad = True)
        self.bo1 = nn.Parameter(torch.tensor(0.), requires_grad = True)
        
        #for the visualisation gates: 
        self
    

    
    
     
    def lstm_unit(self, input_value, long_memory, short_memory):
        # Calculate the forget gate activation
        long_remember_percent = torch.sigmoid((short_memory * self.wlr1) +
                                              (input_value * self.wlr2) +
                                              self.blr1)

        print("long_remember_percent => ", long_remember_percent)
        # Calculate the input gate activation
        potential_remember_percent = torch.sigmoid((short_memory * self.wpr1) + 
                                                  (input_value * self.wpr2) +
                                                  self.bpr1)
       
        print("potential_remember_percent => ", potential_remember_percent)  
        # Calculate the new candidate values
        potential_memory = torch.tanh((short_memory * self.wp1) + 
                                      (input_value * self.wp2) +
                                      self.bp1)
        
        print("potential_remember_percent => ", potential_memory)  
        # Update the long-term memory
        updated_long_memory = ((long_memory * long_remember_percent) + 
                              (potential_remember_percent * potential_memory))
       
        print("updated_long_memory => ", updated_long_memory)   
        # Calculate the output gate activation
        output_percent = torch.sigmoid((short_memory * self.wo1) +
                                       (input_value * self.wo2) +
                                       self.bo1)
        
        print("output_percent => ", output_percent)   
        
        # Update the short-term memory
        updated_short_memory = torch.tanh(updated_long_memory) * output_percent
        
        return ([updated_long_memory, updated_short_memory])
        
    def forward(self, input):
        # Initialize long-term and short-term memories
        long_memory = 0
        short_memory = 0
        # Process each day's input through the LSTM unit
        for day in input:
            long_memory, short_memory = self.lstm_unit(day, long_memory, short_memory)
        
        # Return the final short-term memory as the output
        return short_memory
    
    def configure_optimizers(self):
        # Configure the optimizer for training
        return Adam(self.parameters())
    
    def training_step(self, batch, batch_idx):
        # Perform a single training step
        input_i, label_i = batch
        output_i = self.forward(input_i[0])
        loss = (output_i - label_i)**2
        # pytorch_lightning does not use explicit .backward() call:
        # loss.backward()  # Backpropagation
        
        # Log the training loss and output based on the label
        self.log("training_loss", loss)
        if (label_i==0):
            self.log("out_0", output_i)
        else:
            self.log("out_1", output_i)
        
        return loss


model = LSTMFromScrutch()

# Predict without training 
print("Company A: Observed = 0, Predicted =", model(torch.tensor([0., 0.5, 0.25, 1.])).detach())
print("Company A: Observed = 1, Predicted =", model(torch.tensor([1., 0.5, 0.25, 1.])).detach())


# Applying the ReLU function to the value 2
#result = relu(2)


#Vanishing and Exploding Gradient solving
long_memory = 0
short_memory = 0

x0 = -5
x_step = 0.02
for i in range(100):
    step = x0 + i*x_step 
    updated_long_memory, updated_short_memory = model.lstm_unit(step, long_memory, short_memory)
    long_memory = updated_long_memory
    short_memory = short_memory 


# Prepare data for training 
inputs = torch.tensor([
    [0., 0.5, 0.25, 1.],
    [1., 0.5, 0.25, 1.]
])
labels = torch.tensor([0.,1.])

dataset = TensorDataset(inputs, labels)
dataloader = DataLoader(dataset, num_workers=4)

# Train 
trainer = L.Trainer(max_epochs=20)
trainer.fit(model, train_dataloaders = dataloader)

print("Company A: Observed = 0, Predicted =", model(torch.tensor([0., 0.5, 0.25, 1.])).detach())
print("Company A: Observed = 1, Predicted =", model(torch.tensor([1., 0.5, 0.25, 1.])).detach())


# path_to_best_checkpoint = trainer.checkpoint_callback.best_model_path
# trainer = L.Trainer(max_epochs=3000)
# trainer.fit(model, train_dataloaders = dataloader, ckpt_path = path_to_best_checkpoint)

# print("Company A: Observed = 0, Predicted =", model(torch.tensor([0., 0.5, 0.25, 1.])).detach())
# print("Company A: Observed = 1, Predicted =", model(torch.tensor([1., 0.5, 0.25, 1.])).detach())

# path_to_best_checkpoint = trainer.checkpoint_callback.best_model_path
# trainer = L.Trainer(max_epochs=5000)
# trainer.fit(model, train_dataloaders = dataloader, ckpt_path = path_to_best_checkpoint)

# print("Company A: Observed = 0, Predicted =", model(torch.tensor([0., 0.5, 0.25, 1.])).detach())
# print("Company A: Observed = 1, Predicted =", model(torch.tensor([1., 0.5, 0.25, 1.])).detach())


GPU available: True (mps), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
/Users/imax/Documents/github/neural_n00b/env_nn_n00b/lib/python3.11/site-packages/pytorch_lightning/trainer/connectors/logger_connector/logger_connector.py:67: Starting from v1.9.0, `tensorboardX` has been removed as a dependency of the `pytorch_lightning` package, due to potential conflicts with other packages in the ML ecosystem. For this reason, `logger=True` will use `CSVLogger` as the default logger, unless the `tensorboard` or `tensorboardX` packages are found. Please `pip install lightning[extra]` or one of them to enable TensorBoard support by default

  | Name         | Type | Params
--------------------------------------
  | other params | n/a  | 12    
--------------------------------------
12        Trainable params
0         Non-trainable params
12        Total params
0.000     Total estimated model params size (MB)


long_remember_percent =>  tensor(0.5000, grad_fn=<SigmoidBackward0>)
potential_remember_percent =>  tensor(0.5000, grad_fn=<SigmoidBackward0>)
potential_remember_percent =>  tensor(0., grad_fn=<TanhBackward0>)
updated_long_memory =>  tensor(0., grad_fn=<AddBackward0>)
output_percent =>  tensor(0.5000, grad_fn=<SigmoidBackward0>)
long_remember_percent =>  tensor(0.3898, grad_fn=<SigmoidBackward0>)
potential_remember_percent =>  tensor(0.4296, grad_fn=<SigmoidBackward0>)
potential_remember_percent =>  tensor(0.4396, grad_fn=<TanhBackward0>)
updated_long_memory =>  tensor(0.1888, grad_fn=<AddBackward0>)
output_percent =>  tensor(0.4461, grad_fn=<SigmoidBackward0>)
long_remember_percent =>  tensor(0.4389, grad_fn=<SigmoidBackward0>)
potential_remember_percent =>  tensor(0.4736, grad_fn=<SigmoidBackward0>)
potential_remember_percent =>  tensor(0.3225, grad_fn=<TanhBackward0>)
updated_long_memory =>  tensor(0.2356, grad_fn=<AddBackward0>)
output_percent =>  tensor(0.4620, grad_fn=<SigmoidBac

/Users/imax/Documents/github/neural_n00b/env_nn_n00b/lib/python3.11/site-packages/pytorch_lightning/trainer/connectors/data_connector.py:436: Consider setting `persistent_workers=True` in 'train_dataloader' to speed up the dataloader worker initialization.
/Users/imax/Documents/github/neural_n00b/env_nn_n00b/lib/python3.11/site-packages/pytorch_lightning/loops/fit_loop.py:293: The number of training batches (2) is smaller than the logging interval Trainer(log_every_n_steps=50). Set a lower value for log_every_n_steps if you want to see logs for the training epoch.


Training: |          | 0/? [00:00<?, ?it/s]

long_remember_percent =>  tensor(0.5000, device='mps:0', grad_fn=<SigmoidBackward0>)
potential_remember_percent =>  tensor(0.5000, device='mps:0', grad_fn=<SigmoidBackward0>)
potential_remember_percent =>  tensor(0., device='mps:0', grad_fn=<TanhBackward0>)
updated_long_memory =>  tensor(0., device='mps:0', grad_fn=<AddBackward0>)
output_percent =>  tensor(0.5000, device='mps:0', grad_fn=<SigmoidBackward0>)
long_remember_percent =>  tensor(0.3898, device='mps:0', grad_fn=<SigmoidBackward0>)
potential_remember_percent =>  tensor(0.4296, device='mps:0', grad_fn=<SigmoidBackward0>)
potential_remember_percent =>  tensor(0.4396, device='mps:0', grad_fn=<TanhBackward0>)
updated_long_memory =>  tensor(0.1888, device='mps:0', grad_fn=<AddBackward0>)
output_percent =>  tensor(0.4461, device='mps:0', grad_fn=<SigmoidBackward0>)
long_remember_percent =>  tensor(0.4389, device='mps:0', grad_fn=<SigmoidBackward0>)
potential_remember_percent =>  tensor(0.4736, device='mps:0', grad_fn=<SigmoidBackwar

`Trainer.fit` stopped: `max_epochs=20` reached.


long_remember_percent =>  tensor(0.5056, device='mps:0', grad_fn=<SigmoidBackward0>)
potential_remember_percent =>  tensor(0.5052, device='mps:0', grad_fn=<SigmoidBackward0>)
potential_remember_percent =>  tensor(0.0190, device='mps:0', grad_fn=<TanhBackward0>)
updated_long_memory =>  tensor(0.0096, device='mps:0', grad_fn=<AddBackward0>)
output_percent =>  tensor(0.5053, device='mps:0', grad_fn=<SigmoidBackward0>)
long_remember_percent =>  tensor(0.3976, device='mps:0', grad_fn=<SigmoidBackward0>)
potential_remember_percent =>  tensor(0.4378, device='mps:0', grad_fn=<SigmoidBackward0>)
potential_remember_percent =>  tensor(0.4673, device='mps:0', grad_fn=<TanhBackward0>)
updated_long_memory =>  tensor(0.2084, device='mps:0', grad_fn=<AddBackward0>)
output_percent =>  tensor(0.4534, device='mps:0', grad_fn=<SigmoidBackward0>)
long_remember_percent =>  tensor(0.4458, device='mps:0', grad_fn=<SigmoidBackward0>)
potential_remember_percent =>  tensor(0.4817, device='mps:0', grad_fn=<Sigmoi

# LSTM from torch implementation

In [None]:
#LSTM torch implementation

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam
import pytorch_lightning as pl
from torch.utils.data import DataLoader, TensorDataset
import pandas as pd

class LightningLSTM(pl.LightningModule):
    def __init__(self):
        super(LightningLSTM, self).__init__()
        
        # Input Preparation: Your input X is a single value (0 in this case). 
        # Since the input size of your LSTM is set to input_size=1, 
        # this means your network is configured to process one feature at a time.
        self.lstm = nn.LSTM(input_size=1, hidden_size=3, batch_first=True)
        self.linear = nn.Linear(3, 1)
        self.sigmoid = nn.Sigmoid()
        self.loss_function = nn.MSELoss()

    def forward(self, input_seq):
        lstm_out, _ = self.lstm(input_seq)
        last_time_step_out = lstm_out[:, -1, :]
        linear_out = self.linear(last_time_step_out)
        prediction = self.sigmoid(linear_out)
        return prediction

    def configure_optimizers(self):
        return Adam(self.parameters(), lr=0.1)

    def training_step(self, batch, batch_idx):
        inputs, labels = batch
        outputs = self(inputs)
        loss = self.loss_function(outputs, labels.unsqueeze(1))
        # pytorch_lightning does not use explicit .backward() call:
        # loss.backward()  # Backpropagation
        self.log("training_loss", loss)
        return loss

# Example usage (Dummy data for demonstration)
inputs = torch.tensor([
    [0., 0.5, 0.25, 1.],
    [1., 0.5, 0.25, 1.]
]).unsqueeze(-1)

labels = torch.tensor([0., 1.])

dataset = TensorDataset(inputs, labels)
dataloader = DataLoader(dataset, batch_size=1, num_workers=4)

model = LightningLSTM()
trainer = pl.Trainer(max_epochs=2, log_every_n_steps=2)
trainer.fit(model, train_dataloaders=dataloader)

# Test predictions
with torch.no_grad():
    print("Company A: Observed = 0, Predicted =", model(torch.tensor([[0., 0.5, 0.25, 1.]]).unsqueeze(-1)))
    print("Company A: Observed = 1, Predicted =", model(torch.tensor([[1., 0.5, 0.25, 1.]]).unsqueeze(-1)))


# Gated Recurrent Units (GRUs)

Gated Recurrent Units (GRUs) are a type of recurrent neural network (RNN) architecture that was introduced to solve some of the limitations of traditional RNNs, particularly the problem of long-term dependencies.
