In [None]:
# date: 04.29.24

In [None]:
import torch
import torchvision.models as models
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
from torchvision.transforms import transforms

# Define Jacobian-vector product function
def set_attr(obj, names, value):
    """
    Set attribute value by nested names.
    """
    for name in names[:-1]:
        obj = getattr(obj, name)
    setattr(obj, names[-1], value)

def del_attr(obj, names):
    """
    Delete attribute by nested names.
    """
    for name in names[:-1]:
        obj = getattr(obj, name)
    delattr(obj, names[-1])

def jacobian_vector_product(model, inputs, vector):
    params = {name: p for name, p in model.named_parameters()}
    tangents = {name: vec_pt for name, vec_pt in zip(params.keys(), vector)}

    jvp = None
    with torch.autograd.forward_ad.dual_level():
        for name, p in params.items():
            tangent = tangents[name]
            del_attr(model, name.split("."))
            set_attr(model, name.split("."), torch.autograd.forward_ad.make_dual(p, tangent))

        out = model(inputs)
        jvp = out.clone().detach()

    return jvp


# Load pre-trained ResNet18 model
model = models.resnet18(pretrained=True)
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 10)

# Initialize vector
vector = [torch.randn_like(p, requires_grad=True) for p in model.parameters()]

# Define optimizer to only optimize the parameters in vector
optimizer = optim.SGD(vector, lr=0.001)

criterion = nn.CrossEntropyLoss()

# CIFAR10 data loading and preprocessing
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize to match ResNet input size
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))  # Normalize
])

train_dataset = CIFAR10(root='./data', train=True, download=True, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)

# Training loop
epochs = 3
for epoch in range(epochs):
    for batch_idx, (inputs, targets) in enumerate(train_loader):
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)
        jvp = jacobian_vector_product(model, inputs, vector)
        combined_output = outputs + jvp

        # Compute the loss
        loss = criterion(combined_output, targets)

        # Backward pass
        loss.backward()

        # Optimizer step
        optimizer.step()

        # Print loss
        if batch_idx % 100 == 0:
            print(f'Epoch [{epoch+1}/{epochs}], Batch [{batch_idx+1}/{len(train_loader)}], Loss: {loss.item()}')


In [None]:
import torch
import torchvision.models as models
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader, Subset
from torchvision.transforms import transforms
import numpy as np

# Define Jacobian-vector product function
def set_attr(obj, names, value):
    """
    Set attribute value by nested names.
    """
    for name in names[:-1]:
        obj = getattr(obj, name)
    setattr(obj, names[-1], value)

def del_attr(obj, names):
    """
    Delete attribute by nested names.
    """
    for name in names[:-1]:
        obj = getattr(obj, name)
    delattr(obj, names[-1])

def jacobian_vector_product(model, inputs, vector):
    params = {name: p for name, p in model.named_parameters()}
    tangents = {name: vec_pt for name, vec_pt in zip(params.keys(), vector)}

    jvp = None
    with torch.autograd.forward_ad.dual_level():
        for name, p in params.items():
            tangent = tangents[name]
            del_attr(model, name.split("."))
            set_attr(model, name.split("."), torch.autograd.forward_ad.make_dual(p, tangent))

        out = model(inputs)
        jvp = out.clone().detach()

    return jvp


# Load pre-trained ResNet18 model
model = models.resnet18(pretrained=True)
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 10)

# Initialize vector
vector = [torch.randn_like(p, requires_grad=True) for p in model.parameters()]

# Define optimizer to only optimize the parameters in vector
optimizer = optim.SGD(vector, lr=0.01)

criterion = nn.MSELoss()  # MSE Loss

# CIFAR10 data loading and preprocessing
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize to match ResNet input size
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))  # Normalize
])

# Load CIFAR-10 dataset
train_dataset = CIFAR10(root='./data', train=True, download=True, transform=transform)

# Use only 20% of the data
subset_indices = np.random.choice(len(train_dataset), size=int(0.2 * len(train_dataset)), replace=False)
train_subset = Subset(train_dataset, subset_indices)

train_loader = DataLoader(train_subset, batch_size=16, shuffle=True)

# Training loop
epochs = 3
for epoch in range(epochs):
    for batch_idx, (inputs, targets) in enumerate(train_loader):
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)
        jvp = jacobian_vector_product(model, inputs, vector)
        combined_output = outputs + jvp

        # Resize targets to match the batch size
        targets_resized = targets.unsqueeze(1).expand_as(combined_output)

        # Compute the loss
        loss = criterion(combined_output, targets_resized.float())

        # Backward pass
        loss.backward()

        # Optimizer step
        optimizer.step()

        # Print loss
        if batch_idx % 100 == 0:
            print(f'Epoch [{epoch+1}/{epochs}], Batch [{batch_idx+1}/{len(train_loader)}], Loss: {loss.item()}')
