# SMAI Assignment-5

## 4 RNNs ------------------------------------------------

In [2]:
import random
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, random_split

from typing import Dict, Tuple, List

### 4.1 Counting Bits

#### 4.1.1 Task 1: Dataset

In [3]:
class BitCountData(Dataset):
	def __init__(self, n: int, minlen: int=None, maxlen: int=None, fixlen: int=None):
		self.lens = []
		self.data = []
		self.labels = []
		self.fit(n, minlen, maxlen, fixlen)

	def __len__(self):
		return len(self.data)
	
	def __getitem__(self, i):
		return self.data[i], self.labels[i], self.lens[i]
	
	def fit(self, n, minlen=None, maxlen=None, fixlen=None):
		if fixlen is not None:
			for i in range(n):
				self.lens.append(fixlen)
				
				seq = [random.randint(0,1) for p in range(fixlen)]
				self.data.append(seq)
				
				count = sum(seq)
				self.labels.append(fixlen)
		
		else: 
			smpls_perlen = n // (maxlen-minlen+1)
			smpls_remain = n % (maxlen-minlen+1)
			
			for l in range(minlen, maxlen+1):
				curr_smpl = smpls_perlen + (1 if smpls_remain>0 else 0)
				smpls_remain -= 1
				
				for _ in range(curr_smpl):
					self.lens.append(l)
				
					seq = [random.randint(0,1) for p in range(l)]
					self.data.append(seq)
					
					count = sum(seq)
					self.labels.append(l)

		self.data = [torch.tensor(seq, dtype=torch.float32).unsqueeze(-1) for seq in self.data]
		self.labels = torch.tensor(self.labels, dtype=torch.float32)
		self.lengths = torch.tensor(self.lens, dtype=torch.long)


#### 4.1.2 Task 2: Architecture

In [4]:
class BitCountingRNN(nn.Module):
	def __init__(self, input_size=1, hidden_size=32, num_layers=1, dropout = 0.0, use_norm = False):
		super().__init__()
		self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True, dropout=dropout)
		self.use_norm = use_norm
		if use_norm:
			self.norm = nn.LayerNorm(hidden_size)
		self.fc = nn.Linear(hidden_size, 1)

	def forward(self, x, lengths):
		packed_x = nn.utils.rnn.pack_padded_sequence(x, lengths, batch_first=True, enforce_sorted=False)
		packed_output, hidden = self.rnn(packed_x)
		if self.use_norm:
			hidden = self.norm(hidden[-1])
		else:
			hidden = hidden[-1]
		out = self.fc(hidden)
		return out.squeeze()


#### 4.1.3 Task 3: Training

In [7]:
def train_epoch(model, dataloader, criterion, optimizer, device):
    model.train()
    total_loss = 0
    for sequences, labels, lengths in dataloader:
        sequences, labels = sequences.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(sequences.unsqueeze(-1), lengths)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(dataloader)

def evaluate(model, dataloader, criterion, device):
    model.eval()
    total_loss = 0
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for sequences, labels, lengths in dataloader:
            sequences, labels = sequences.to(device), labels.to(device)
            outputs = model(sequences.unsqueeze(-1), lengths)
            loss = criterion(outputs, labels)
            total_loss += loss.item()
            all_preds.extend(outputs.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    mae = np.mean(np.abs(np.array(all_preds) - np.array(all_labels)))
    return total_loss / len(dataloader), mae

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
dataset = BitCountData(n=50000, minlen=1, maxlen=16)
criterion = nn.MSELoss()
bcr = BitCountingRNN()
optimizer = torch.optim.Adam(bcr.parameters(), lr=0.001)
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

def collate_fn(batch):
    data = [item[0] for item in batch]
    labels = [item[1] for item in batch]
    lengths = [item[2] for item in batch]
    return data, torch.stack(labels), torch.stack(lengths)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=32, collate_fn=collate_fn)
    

num_epochs = 10
for epoch in range(num_epochs):
    train_loss = train_epoch(bcr, train_loader, criterion, optimizer, device)
    val_loss, val_mae = evaluate(bcr, val_loader, criterion, device)
    print(f"Epoch {epoch + 1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val MAE: {val_mae:.4f}")



TypeError: expected Tensor as element 0 in argument 0, but got int

#### Metrics

#### 4.1.4 Task 4: Generalization

In [None]:
def evaluate_sequence_lengths(model, maxlen, device):
    lengths = range(1, maxlen + 1)
    maes = []
    for len in lengths:
        bcd = BitCountData(n=1000, minlen=len, maxlen=len)
        dataloader = DataLoader(bcd, batch_size=64, shuffle=False, collate_fn=collate_fn)
        _, mae = evaluate(model, dataloader, criterion, device)
        maes.append(mae)
    return lengths, maes


lengths, maes = evaluate_sequence_lengths(dataset, maxlen=32, device=device)
plt.plot(lengths, maes, label="MAE")
plt.xlabel("Sequence Length")
plt.ylabel("MAE")
plt.title("Generalization Performance")
plt.legend()
plt.show()


### 4.2 Optical Character Recognition

#### 4.2.1 Task 1: Dataset

#### 4.1.2 Task 2: Architecture

#### 4.1.3 Task 3: Training

#### Metrics