In [1]:
%load_ext autoreload
%autoreload 2

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torchvision.datasets import MNIST
from torch.utils.data import DataLoader
import KANConv

from tqdm import tqdm
from sklearn.metrics import precision_score, recall_score, f1_score


In [2]:
def data_to_binary(mnist_data, binary=False):
    """
    Just keep the 0 and 1 classes
    """
    if binary:
        mnist_data.data = mnist_data.data[(mnist_data.targets == 0) | (mnist_data.targets == 1)]
        mnist_data.targets = mnist_data.targets[(mnist_data.targets == 0) | (mnist_data.targets == 1)]
    return mnist_data



# Transformaciones
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])

# Cargar MNIST y filtrar por dos clases
all_mnist_train = MNIST(root='./data', train=True, download=True, transform=transform)
# mnist_train = data_to_binary(all_mnist_train, binary=True)
mnist_train = data_to_binary(all_mnist_train, binary=False)

all_mnist_test = MNIST(root='./data', train=False, download=True, transform=transform)
# mnist_test = data_to_binary(all_mnist_test, binary=True)
mnist_test = data_to_binary(all_mnist_test, binary=False)

# DataLoader
train_loader = DataLoader(mnist_train, batch_size=64, shuffle=True)
test_loader = DataLoader(mnist_test, batch_size=64, shuffle=False)

In [3]:
def train(model, device, train_loader, optimizer, epoch, criterion):
    # Set the model to training mode
    model.to(device)
    model.train()
    train_loss = 0
    print("Epoch:", epoch)
    # Process the images in batches
    for batch_idx, (data, target) in enumerate(tqdm(train_loader)):
        # Use the CPU or GPU as appropriate
        # Recall that GPU is optimized for the operations we are dealing with
        data, target = data.to(device), target.to(device)
        
        # Reset the optimizer
        optimizer.zero_grad()
        
        # Push the data forward through the model layers
        output = model(data)
        
        # Get the loss
        loss = criterion(output, target)

        # Keep a running total
        train_loss += loss.item()
        
        # Backpropagate
        loss.backward()
        optimizer.step()
        
        # Print metrics so we see some progress
        # print('\tTraining batch {} Loss: {:.6f}'.format(batch_idx + 1, loss.item()))
            
    # return average loss for the epoch
    avg_loss = train_loss / (batch_idx+1)
    print('Training set: Average loss: {:.6f}'.format(avg_loss))
    return avg_loss

def test(model, device, test_loader, criterion):
    # Switch the model to evaluation mode
    model.eval()
    test_loss = 0
    correct = 0
    all_targets = []
    all_predictions = []
    
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            
            # Get the predicted classes for this batch
            output = model(data)
            
            # Calculate the loss for this batch
            test_loss += criterion(output, target).item()
            
            # Calculate the accuracy for this batch
            _, predicted = torch.max(output.data, 1)
            correct += (target == predicted).sum().item()

            # Collect all targets and predictions for metric calculations
            all_targets.extend(target.view_as(predicted).cpu().numpy())
            all_predictions.extend(predicted.cpu().numpy())

    # Calculate overall metrics
    precision = precision_score(all_targets, all_predictions, average='macro')
    recall = recall_score(all_targets, all_predictions, average='macro')
    f1 = f1_score(all_targets, all_predictions, average='macro')

    # Normalize test loss
    test_loss /= len(test_loader.dataset)
    accuracy = 100. * correct / len(test_loader.dataset)

    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%), Precision: {:.2f}, Recall: {:.2f}, F1 Score: {:.2f}\n'.format(
        test_loss, correct, len(test_loader.dataset), accuracy, precision, recall, f1))

    return test_loss, accuracy, precision, recall, f1

In [4]:
# Define model
model = KANConv.KAN_Convolutional_Network()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
# Define optimizer
optimizer = optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-4)
# Define learning rate scheduler
scheduler = optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.8)


# Define loss
criterion = nn.CrossEntropyLoss()

# Track metrics in these arrays
epoch_nums = []
training_loss = []
validation_loss = []

# Train over 10 epochs (We restrict to 10 for time issues)
epochs = 10
print('Training on', device)
for epoch in range(1, epochs + 1):
        train_loss = train(model, device, train_loader, optimizer, epoch, criterion)
        test_loss, accuracy, precision, recall, f1 = test(model, device, test_loader, criterion)
        epoch_nums.append(epoch)
        training_loss.append(train_loss)
        validation_loss.append(test_loss)
        scheduler.step()
        print('')
        print("lr: ", optimizer.param_groups[0]['lr'])
        print("test loss: ", test_loss)
        print("accuracy: ", accuracy)
        print("precision: ", precision)
        print("recall: ", recall)
        print("f1: ", f1)
        print('')


dim 2
in feat 4
shape Flatten(start_dim=1, end_dim=-1)
dim 2
in feat 512
Training on cuda
Epoch: 1


  0%|          | 0/938 [00:00<?, ?it/s]

imagen torch.Size([64, 1, 28, 28])
matrix torch.Size([64, 28, 28])
sub tensor([-1., -1., -1., -1.], device='cuda:0')
tensor([[-1., -1., -1., -1.]], device='cuda:0') tensor([[-0.2689, -0.2689, -0.2689, -0.2689]], device='cuda:0') Parameter containing:
tensor([[-0.3974, -0.0058, -0.3487, -0.1954]], device='cuda:0',
       requires_grad=True)
base out tensor([[0.2547]], device='cuda:0', grad_fn=<MmBackward0>)
spline out tensor([[0.0016]], device='cuda:0', grad_fn=<MmBackward0>)
matrix out 0.25637078285217285
No gradient for base_weight
No gradient for spline_weight
No gradient for spline_scaler
sub tensor([-1., -1., -1., -1.], device='cuda:0')
tensor([[-1., -1., -1., -1.]], device='cuda:0') tensor([[-0.2689, -0.2689, -0.2689, -0.2689]], device='cuda:0') Parameter containing:
tensor([[-0.3974, -0.0058, -0.3487, -0.1954]], device='cuda:0',
       requires_grad=True)
base out tensor([[0.2547]], device='cuda:0', grad_fn=<MmBackward0>)
spline out tensor([[0.0016]], device='cuda:0', grad_fn=<Mm

  0%|          | 0/938 [00:04<?, ?it/s]


RuntimeError: CUDA error: device-side assert triggered
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1.
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.
