# Spiral classification

Given the x-y coordinates of 2D points, classify them into one of three possible spiral branches that they belong to.

Objectives:
 - Build and train a PyTorch MLP model from scratch to classify the points to a high accuracy
 - Visualize how the points are transformed akin to: https://youtu.be/EyKiYVwrdjE?si=JiAKShLumRxbFWXA&t=1002

In [None]:
import torch
from torch import nn, optim

from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torch.utils.data import random_split

import torch.functional as F

In [None]:
import fastbook
from fastbook import *

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
device

## Create data

In [None]:
seed = 12345
torch.manual_seed(seed)
N = 1000  # num_samples_per_class
K = 3     # num_classes

In [None]:
# Generate spirals

t = torch.linspace(0, 1, N)
a = 0.8 * t + 0.2  # amplitude 0.2 → 1.0
X = list()
y = list()
for k in range(K):
    θ = (2 * t + k) * 2 * torch.pi / K + 0.2 * torch.randn(N)
    X.append(torch.stack((a * θ.sin(), a * θ.cos()), dim=1))
    y.append(torch.zeros(N, dtype=torch.long).fill_(k))
X = torch.cat(X)
y = torch.cat(y)

In [None]:
X.shape, y.shape

In [None]:
X[:5]

In [None]:
y.unique()

In [None]:
plt.scatter(X[:,0], X[:,1], c=y)
plt.show()

## Dataset and DataLoader Splits and Batches Data

In [None]:
class ToyDataset(Dataset):
    def __init__(self, X, y, split=0.2):
        self.features = X
        self.labels = y
    
    def __getitem__(self, index):
        x_item = self.features[index]
        y_item = self.labels[index]
        return x_item, y_item
    
    def __len__(self):
        return self.labels.shape[0]

In [None]:
whole_ds = ToyDataset(X=X, y=y)
len(whole_ds)

In [None]:
def split_dataset(ds, train_percent=0.8):
    """
    ds - ToyDataset
    """

    train_size = int(len(ds) * train_percent)
    test_size = len(ds) - train_size

    train_subset, test_subset = random_split(ds, [train_size, test_size])

    X_train = ds.features[train_subset.indices]
    y_train = ds.labels[train_subset.indices]

    X_test = ds.features[test_subset.indices]
    y_test = ds.labels[test_subset.indices]

    train_ds = ToyDataset(X_train,y_train)
    test_ds = ToyDataset(X_test,y_test)
    
    return train_ds, test_ds

In [None]:
train_ds, test_ds = split_dataset(whole_ds)

In [None]:
train_ds.features.shape

In [None]:
type(train_ds)

In [None]:
test_ds.features.shape

In [None]:
torch.manual_seed(1337)

train_loader = DataLoader(
    dataset=train_ds,
    batch_size=32,
    shuffle=True,
    drop_last=True,
    num_workers=0
)

test_loader = DataLoader(
    dataset=test_ds,
    batch_size=32,
    shuffle=False,
    drop_last=True,
    num_workers=0
)

In [None]:
for idx, (x, y) in enumerate(train_loader):
    print(f"Batch {idx}: ", x.shape, y.shape)
    if idx > 0:
        break

## Model

In [None]:
class MLP(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, n_layers=3):
        super().__init__() # IMPORTANT
        # TODO: figure out later how to create a variable nn.Sequential
        # self.n_layers = n_layers
        self.h = hidden_size

        self.net = nn.Sequential(
            nn.Linear(input_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size,output_size)
        )
    
    def forward(self, xb):
        xb = self.net(xb)
        output = xb.sigmoid()
        return output


In [None]:
# initialize weights
model = MLP(input_size = 2, hidden_size = 16, output_size = 3)

In [None]:
X_batch, y_batch = next(iter(train_loader))
probs = model(X_batch[0])
probs

In [None]:
pred = torch.argmax(probs)
pred.item()

In [None]:
target = y_batch[0]
target.item()

In [None]:
target == pred

In [None]:
probs = model(X_batch)
probs.shape

In [None]:
preds = torch.argmax(probs, dim=-1) # now we need to specify dim
preds.shape

In [None]:
targets = y_batch
targets.shape

In [None]:
((preds-targets)==0).float()

In [None]:
((preds-targets)==0).float().mean().item()

## Training Loop

In [None]:
optim = torch.optim.AdamW(model.parameters(), lr=0.01)

In [None]:
def train_epoch(model, train_loader, optim):
    # set model to train
    model.train()

    for idx, (xb, yb) in enumerate(train_loader):
        # forward pass
        probs = model(xb)
        
        # cross-entropy loss 
        loss = F.cross_entropy(probs, yb)

        # accuracy
        preds = torch.argmax(probs, dim=-1)
        acc = ((preds-targets)==0).float().mean().item()

        # clear old gradient from previous backprop
        optim.zero_grad()

        # compute new gradient and backprop
        loss.backward()

        # update parameter step
        optim.step()

    # set model back to eval in case 
    # other code wants to do that by default
    model.eval()

    return loss.item(), acc
    

In [None]:
train_epoch(model=model, train_loader=train_loader, optim=optim)

## Testing Loop

In [None]:
def test_epoch(model, test_loader):
    
    # no gradient calculations
    with torch.no_grad():
        for idx, (xb, yb) in enumerate(test_loader):
            # model output
            probs = model(xb)

            # loss
            loss = F.cross_entropy(probs, yb)

            # accuracy
            preds = torch.argmax(probs, dim=-1)
            acc = ((preds-targets)==0).float().mean().item()
    
    return loss.item(), acc

In [None]:
test_epoch(model=model, test_loader=test_loader)

## Training and Testing Loop

In [None]:
def train_test(model, train_loader, test_loader, optim, epochs=100, verbose=False):
    for e in range(epochs):
        train_loss, train_acc = train_epoch(model, train_loader, optim)
        test_loss, test_acc = test_epoch(model, test_loader)

        if verbose:
            if e % 10 == 0:
                print(f"[Epoch {e}] Train/Test Loss: {train_loss:.2f} / {test_loss:.2f} | Train/Test Acc: {train_acc*100:.1f}% / {test_acc*100:.1f}%")


In [None]:
train_test(model, train_loader, test_loader, optim, verbose=True)