In [39]:
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, ConcatDataset, Subset
from sklearn.model_selection import train_test_split
import numpy as np
import torch
from dense import dense
from torch import nn


def get_mnist_loaders(batch_size=64, train_ratio=0.8):
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ])
    if train_ratio == 1.0:
        train_dataset = datasets.MNIST(root="./data", train=True, download=True, transform=transform)
        test_dataset = datasets.MNIST(root="./data", train=False, download=True, transform=transform)

        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    else:
        # Load both train and test parts of MNIST
        train_part = datasets.MNIST(root="./data", train=True, download=True, transform=transform)
        test_part = datasets.MNIST(root="./data", train=False, download=True, transform=transform)

        # Merge into one dataset
        full_dataset = ConcatDataset([train_part, test_part])

        # Extract all labels (ConcatDataset doesn't store `.targets` directly)
        targets = np.concatenate([np.array(train_part.targets), np.array(test_part.targets)])

        # Stratified split
        indices = np.arange(len(targets))
        train_indices, test_indices = train_test_split(
            indices,
            test_size=1-train_ratio,
            stratify=targets,
            random_state=42
        )

        # Create subsets
        train_dataset = Subset(full_dataset, train_indices)
        test_dataset = Subset(full_dataset, test_indices)

        # DataLoaders
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    print(f"[Ratio:{train_ratio}] Train size: {len(train_dataset)}, Test size: {len(test_dataset)}")
    return train_loader, test_loader

def train_one_epoch(model, loader, optimizer, criterion, device):
    model.train()
    total_loss = 0
    correct = 0
    for inputs, targets in loader:
        inputs, targets = inputs.to(device), targets.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets) # Average loss in a batch
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * inputs.size(0) # Total loss in a batch
        correct += (outputs.argmax(dim=1) == targets).sum().item()

    avg_loss = total_loss / len(loader.dataset)
    accuracy = correct / len(loader.dataset) # Average loss
    return avg_loss, accuracy

def evaluate(model, loader, criterion, device):
    model.eval()
    total_loss = 0
    correct = 0
    with torch.no_grad():
        for inputs, targets in loader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            total_loss += loss.item() * inputs.size(0)
            correct += (outputs.argmax(dim=1) == targets).sum().item()
    avg_loss = total_loss / len(loader.dataset)
    accuracy = correct / len(loader.dataset)
    return avg_loss, accuracy

def run_mnist_experiment(scale_J, angle_K, kernel_size, model, train_ratio, image_shape=(1, 28), nb_class=10):
    
    train_loader, test_loader = get_mnist_loaders(batch_size=64, train_ratio=train_ratio)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

    for epoch in range(10):  # Change number of epochs as needed
        train_loss, train_acc = train_one_epoch(model, train_loader, optimizer, criterion, device)
        test_loss, test_acc = evaluate(model, test_loader, criterion, device)
        print(f"Epoch {epoch+1}: Train Acc={train_acc:.4f}, Test Acc={test_acc:.4f}")
    return test_acc



In [40]:
import sys
import os

# Get the absolute path to ../src and append it to sys.path
src_path = os.path.abspath(os.path.join(os.getcwd(), '..', 'src'))
sys.path.append(src_path)


models = []
device = "cuda" if torch.cuda.is_available() else "cpu"
# Try different combinations
J_params = [2,3]
K_params = [4,5]
kernel_size_params = [3, 5, 7]
train_ratios = [0.6,0.3, 0.1]
##############################################
# Train linear layer only
results1 = []
for J in J_params:
    for K in K_params:
        for ks in kernel_size_params:
            for train_ratio in train_ratios:
                print(f"\n[Train linear layer] Running: J={J}, K={K}, kernel={ks}, train_ratio={train_ratio}")
                model = dense(J, K, (1, 28), ks, 10).to(device)
                for conv in model.sequential_conv:
                    for param in conv.parameters():
                        param.requires_grad = False
                for param in model.linear.parameters():
                    param.requires_grad = True
                test_acc = run_mnist_experiment(J, K, ks, model, train_ratio)
                config = f"J={J}, K={K}, kernel={ks}, train_ratio={train_ratio}"
                results1.append((config, test_acc))
                models.append(model)
##############################################
# Train conv layers only
results2 = []
count = 0
result_models = []
for J in J_params:
        for K in K_params:
            for ks in kernel_size_params:
                for train_ratio in train_ratios:
                    print(f"\n[Train conv layer] Running: J={J}, K={K}, kernel={ks}, train_ratio={train_ratio}")
                    model = models[count]
                    for conv in model.sequential_conv:
                        for param in conv.parameters():
                            param.requires_grad = True
                    for param in model.linear.parameters():
                        param.requires_grad = False
                    config = f"J={J}, K={K}, kernel={ks}, train_ratio={train_ratio}"
                    test_acc = run_mnist_experiment(J, K, ks, model, train_ratio)
                    results2.append((config, test_acc))
                    result_models.append(model)
                    count = count+1
# Print results summary
print(model)
for result1, result2 in zip(results1, results2):
    config, acc1 = result1
    _, acc2 = result2
    print(f"{config} → Test Accuracy: {acc1:.2f}% {acc2:.2f}%")


[Train linear layer] Running: J=2, K=4, kernel=3, train_ratio=0.6
Creating filter bank with Sampling support width=8, Size=3, Angles=4 ...
Creating filter bank with Sampling support width=8, Size=5, Angles=4 ...
[Ratio:0.6] Train size: 42000, Test size: 28000
Epoch 1: Train Acc=0.7464, Test Acc=0.8155
Epoch 2: Train Acc=0.8360, Test Acc=0.8506
Epoch 3: Train Acc=0.8565, Test Acc=0.8625
Epoch 4: Train Acc=0.8660, Test Acc=0.8698
Epoch 5: Train Acc=0.8719, Test Acc=0.8749
Epoch 6: Train Acc=0.8759, Test Acc=0.8783
Epoch 7: Train Acc=0.8785, Test Acc=0.8800
Epoch 8: Train Acc=0.8811, Test Acc=0.8828
Epoch 9: Train Acc=0.8835, Test Acc=0.8848
Epoch 10: Train Acc=0.8849, Test Acc=0.8850

[Train linear layer] Running: J=2, K=4, kernel=3, train_ratio=0.3
Creating filter bank with Sampling support width=8, Size=3, Angles=4 ...
Creating filter bank with Sampling support width=8, Size=5, Angles=4 ...
[Ratio:0.3] Train size: 21000, Test size: 49000
Epoch 1: Train Acc=0.6738, Test Acc=0.7761
Epoc