In [12]:
import numpy as np
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

In [4]:
class S4Model:
    def __init__(self, input_size, hidden_size, num_classes):
        # Initialize learnable parameters
        self.hidden_size = hidden_size
        self.A = np.random.randn(hidden_size, hidden_size) * 0.1
        self.B = np.random.randn(hidden_size, input_size) * 0.1
        self.C = np.random.randn(num_classes, hidden_size) * 0.1
        self.D = np.random.randn(num_classes, input_size) * 0.1

    def forward(self, x):
        """
        Forward pass through the S4 model.
        x: Input sequence of shape (seq_len, input_size)
        Returns: Output logits of shape (num_classes,)
        """
        seq_len, input_size = x.shape
        h = np.zeros((self.hidden_size,))  # Initialize hidden state
        for t in range(seq_len):
            u_t = x[t]  # Input at time t
            h = np.tanh(np.dot(self.A, h) + np.dot(self.B, u_t))  # Update hidden state
        y = np.dot(self.C, h) + np.dot(self.D, u_t)  # Compute output logits
        return y

    def predict(self, x):
        logits = self.forward(x)
        return np.argmax(logits)  # Return the predicted class


In [2]:
def cross_entropy_loss(logits, label):
    exp_logits = np.exp(logits - np.max(logits))  # Numerical stability
    probs = exp_logits / np.sum(exp_logits)
    return -np.log(probs[label]), probs

In [1]:
def update_params(model, grads, lr):
    for param, grad in grads.items():
        model.__dict__[param] -= lr * grad

In [5]:
def compute_gradients(model, x, label):
    """
    Backpropagation for S4.
    Returns gradients for A, B, C, D.
    """
    seq_len, input_size = x.shape
    h = np.zeros((model.hidden_size,))
    hs = []  # Store hidden states for backpropagation

    # Forward pass
    for t in range(seq_len):
        u_t = x[t]
        h = np.tanh(np.dot(model.A, h) + np.dot(model.B, u_t))
        hs.append(h)
    logits = np.dot(model.C, h) + np.dot(model.D, u_t)

    # Compute loss and probabilities
    loss, probs = cross_entropy_loss(logits, label)

    # Gradients initialization
    d_logits = probs
    d_logits[label] -= 1  # Gradient of cross-entropy wrt logits
    d_C = np.outer(d_logits, hs[-1])
    d_D = np.outer(d_logits, x[-1])

    d_h = np.dot(model.C.T, d_logits)

    d_A = np.zeros_like(model.A)
    d_B = np.zeros_like(model.B)
    for t in reversed(range(seq_len)):
        u_t = x[t]
        h = hs[t]
        d_tanh = (1 - h ** 2) * d_h  # Gradient through tanh
        d_A += np.outer(d_tanh, hs[t - 1] if t > 0 else np.zeros_like(h))
        d_B += np.outer(d_tanh, u_t)
        d_h = np.dot(model.A.T, d_tanh)

    grads = {
        'A': d_A,
        'B': d_B,
        'C': d_C,
        'D': d_D,
    }
    return loss, grads


In [13]:
# MNIST Dataset
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))  # Normalize to [-1, 1]
])
train_dataset = datasets.MNIST(root='./data', train=True, transform=transform, download=True)
test_dataset = datasets.MNIST(root='./data', train=False, transform=transform, download=True)

# DataLoader
train_loader = DataLoader(train_dataset, batch_size=256, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=256, shuffle=False)

# Function to convert DataLoader batches to NumPy
def dataloader_to_numpy(data_loader):
    data = []
    labels = []
    for batch_images, batch_labels in data_loader:
        # Move to CPU and convert to NumPy
        data.append(batch_images.numpy())
        labels.append(batch_labels.numpy())
    # Concatenate batches
    data = np.concatenate(data, axis=0)
    labels = np.concatenate(labels, axis=0)
    return data, labels

# Convert train and test data
train_images, train_labels = dataloader_to_numpy(train_loader)
test_images, test_labels = dataloader_to_numpy(test_loader)

# Reshape the images into sequences of length 28 (seq_len=28, input_size=28)
train_images = train_images.reshape(-1, 28, 28)
test_images = test_images.reshape(-1, 28, 28)


In [15]:
# Initialize model and hyperparameters
model = S4Model(input_size=28, hidden_size=128, num_classes=10)
learning_rate = 0.0005
epochs = 5

# Training
for epoch in range(epochs):
    total_loss = 0
    for i in range(len(train_images)):
        x = train_images[i]  # Shape: (28, 28)
        label = train_labels[i]

        # Compute loss and gradients
        loss, grads = compute_gradients(model, x, label)
        total_loss += loss

        # Update parameters
        update_params(model, grads, learning_rate)

    print(f"Epoch {epoch + 1}/{epochs}, Loss: {total_loss / len(train_images)}")



Epoch 1/5, Loss: 0.5155643616332608
Epoch 2/5, Loss: 0.21744363777599113
Epoch 3/5, Loss: 0.172894513521342
Epoch 4/5, Loss: 0.1398888802354398
Epoch 5/5, Loss: 0.12673826988745282


In [16]:

# Test Accuracy
correct = 0
for i in range(len(test_images)):
    x = test_images[i]
    label = test_labels[i]
    pred = model.predict(x)
    correct += (pred == label)

print(f"Test Accuracy: {correct / len(test_images) * 100:.2f}%")

Test Accuracy: 96.23%
