<a href="https://colab.research.google.com/github/Reaper-ai/ML_AI/blob/main/03_RNN_from_scratch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F


import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
print("setup complete")

setup complete


In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cpu')

In [3]:
# Parameters
vocab_size = 1000    # Size of vocabulary
embed_dim = 32       # Embedding dimension (should match your model)
input_size = embed_dim # Should match your model's input_size
hidden_size = 64     # Hidden size (should match your model)
seq_length = 20      # Length of input sequences
batch_size = 32      # Batch size for training
num_classes = 2      # Binary classification
num_samples = 1000   # Total number of samples

In [4]:
class RandomSequenceDataset(Dataset):
  def __init__(self, num_samples, seq_length, vocab_size):
      self.num_samples = num_samples
      self.sequences = torch.randint(0, vocab_size, (num_samples, seq_length))
      self.labels = torch.zeros(num_samples)

      # Label is 1 if sequence contains a specific pattern (e.g., high then low)
      for i in range(num_samples):
          for t in range(seq_length-1):
              if self.sequences[i,t] > vocab_size*0.337 and self.sequences[i,t+1] < vocab_size*0.551:
                  self.labels[i] = 1
                  break

  def __len__(self):
      return self.num_samples

  def __getitem__(self, idx):
      return self.sequences[idx], self.labels[idx]

In [5]:
# Create dataset
dataset = RandomSequenceDataset(num_samples, seq_length, vocab_size)

# Split into train and test
train_size = int(0.8 * num_samples)
test_size = num_samples - train_size
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [6]:
# Model
class Simple_RNN(nn.Module):
    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.i2o = nn.Linear(input_size + hidden_size, 1)
        self.i2h = nn.Linear(input_size + hidden_size, hidden_size)

    def forward(self, input_, hidden=None):
        batch_size, seq_len = input_.size()
        embedded = self.embedding(input_)

        if hidden is None:
            hidden = self.hidden_init(batch_size)

        for t in range(seq_len):
            input_t = embedded[:, t, :]
            combined = torch.cat((input_t, hidden), dim=1)
            hidden = torch.tanh(self.i2h(combined))

        output = self.i2o(torch.cat((input_t, hidden), dim=1))
        return torch.sigmoid(output)

    def hidden_init(self, batch_size):
        return torch.zeros(batch_size, self.hidden_size, device=device)

# Instantiate model
my_model = Simple_RNN(input_size, hidden_size).to(device)

In [7]:
# loss function and optimizer
loss_fn = nn.BCELoss()
optimizer = torch.optim.Adam(my_model.parameters(), lr=1e-3)

In [8]:
def train_loop(model, train):
    for x, y in train_loader:
        x, y = x.to(device), y.to(device)


        out = model(x).squeeze(1)
        loss = loss_fn(out, y)


        optimizer.zero_grad()
        loss.backward()
        optimizer.step()


In [9]:
def test_loop(model, test_loader):
  model.eval()
  correct, t_loss, total = 0, 0, 0
  with torch.inference_mode():
      for x, y in test_loader:
          x, y = x.to(device), y.to(device)
          out = model(x).squeeze(1)


          predicted = (out > 0.5).float()
          correct += (predicted.squeeze() == y).sum().item()
          total += y.size(0)

          t_loss += loss_fn(out, y).item()

  return correct/total*100, t_loss/len(test_loader)


In [10]:
# Train and Evaluate

epochs = 10
for t in range(epochs):
  train_loop(my_model, train_loader)
  acc , loss = test_loop(my_model, test_loader)

  print(f'Epoch: {t+1} --------------------------------------------')
  print(f' Accuracy: {acc:.2f}%, Avg loss: {loss:.5f}')

Epoch: 1 --------------------------------------------
 Accuracy: 100.00%, Avg loss: 0.21381
Epoch: 2 --------------------------------------------
 Accuracy: 100.00%, Avg loss: 0.01732
Epoch: 3 --------------------------------------------
 Accuracy: 100.00%, Avg loss: 0.00815
Epoch: 4 --------------------------------------------
 Accuracy: 100.00%, Avg loss: 0.00546
Epoch: 5 --------------------------------------------
 Accuracy: 100.00%, Avg loss: 0.00407
Epoch: 6 --------------------------------------------
 Accuracy: 100.00%, Avg loss: 0.00321
Epoch: 7 --------------------------------------------
 Accuracy: 100.00%, Avg loss: 0.00261
Epoch: 8 --------------------------------------------
 Accuracy: 100.00%, Avg loss: 0.00218
Epoch: 9 --------------------------------------------
 Accuracy: 100.00%, Avg loss: 0.00185
Epoch: 10 --------------------------------------------
 Accuracy: 100.00%, Avg loss: 0.00160
