In [1]:
!pip install aihwkit
!wget https://aihwkit-gpu-demo.s3.us-east.cloud-object-storage.appdomain.cloud/aihwkit-0.9.1+cuda117-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
!pip install aihwkit-0.9.1+cuda117-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl

Collecting aihwkit
  Downloading aihwkit-0.9.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (11 kB)
Collecting torch==2.4.1 (from aihwkit)
  Downloading torch-2.4.1-cp310-cp310-manylinux1_x86_64.whl.metadata (26 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch==2.4.1->aihwkit)
  Downloading nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.1.105 (from torch==2.4.1->aihwkit)
  Downloading nvidia_cuda_runtime_cu12-12.1.105-py3-none-manylinux1_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.1.105 (from torch==2.4.1->aihwkit)
  Downloading nvidia_cuda_cupti_cu12-12.1.105-py3-none-manylinux1_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch==2.4.1->aihwkit)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.1.3.1 (from torch==2.4.1->aihwkit)
  Downloading nvidia_cu

MTL Model Adapted from: https://medium.com/@aminul.huq11/multi-task-learning-a-beginners-guide-a1fc17808688

In [None]:
!git clone https://github.com/aminul-huq/medium.git

In [None]:
%cd medium/mammogram
import random
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from torch.utils.data import DataLoader,random_split,Dataset
import torch.optim as optim
from tqdm import tqdm
from training_utils import *

In [None]:
seed = 43
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)

torch.backends.cudnn.deterministic = True

In [None]:
trainset = datasets.CIFAR10(root='./data/', train=True, download=True, transform=transforms.ToTensor())
testset = datasets.CIFAR10(root='./data/', train=False, download=True, transform=transforms.ToTensor())

labels_list = ['airplane','automobile','bird','cat','deer','dog','frog','horse','ship','truck']
non_animal = [0,1,8,9]
device = 'cuda'

In [None]:
class NewDataset(Dataset):

    def __init__(self,data,transform=None):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self,idx):
        image = self.data[idx][0]
        label1 = self.data[idx][1]                                  #original label
        label2 = 0 if self.data[idx][1] in non_animal else 1        #animal or non-animal
        return image, label1, label2

In [None]:
new_trainset = NewDataset(trainset,non_animal)
new_testset = NewDataset(testset,non_animal)

train_set, valid_set = random_split(new_trainset,[int(len(new_trainset)*0.9), int(len(new_trainset)*0.1)],
                                  generator=torch.Generator().manual_seed(0))

train_loader = DataLoader(train_set, batch_size=100, shuffle=True)
valid_loader = DataLoader(valid_set, batch_size=100, shuffle=True)
test_loader = DataLoader(new_testset, batch_size=100, shuffle=True)

In [None]:
from aihwkit.simulator.configs import InferenceRPUConfig
from aihwkit.nn import AnalogLinear
from aihwkit.optim import AnalogSGD
from aihwkit.simulator.configs import (
    InferenceRPUConfig,
    WeightNoiseType,
    WeightClipType,
    WeightModifierType,
    WeightRemapType,
)
from aihwkit.nn import AnalogConv2d, AnalogLinear, AnalogSequential
from aihwkit.inference import PCMLikeNoiseModel, GlobalDriftCompensation
from aihwkit.simulator.parameters import IOParameters

In [4]:
def create_rpu_config(g_max=25, tile_size=512, modifier_std=0.07):
  rpu_config = InferenceRPUConfig()
  rpu_config.mapping.digital_bias = True
  rpu_config.mapping.weight_scaling_omega = 1.0
  rpu_config.mapping.weight_scaling_columnwise = True
  rpu_config.mapping.learn_out_scaling = True
  rpu_config.mapping.out_scaling_columnwise = True
  rpu_config.mapping.max_input_size = tile_size
  rpu_config.mapping.max_output_size = tile_size
  rpu_config.noise_model = PCMLikeNoiseModel(g_max=g_max)
  rpu_config.remap.type = WeightRemapType.CHANNELWISE_SYMMETRIC
  rpu_config.clip.type = WeightClipType.FIXED_VALUE
  rpu_config.clip.fixed_value = 1.0
  rpu_config.modifier.type = WeightModifierType.MULT_NORMAL
  rpu_config.modifier.rel_to_actual_wmax = True
  rpu_config.modifier.std_dev = modifier_std
  rpu_config.forward = IOParameters()
  rpu_config.forward.out_noise = 0.05
  rpu_config.forward.inp_res = 1 / (2 ** 8 - 2)
  rpu_config.forward.out_res = 1 / (2 ** 8 - 2)
  rpu_config.drift_compensation = GlobalDriftCompensation()
  return rpu_config

In [5]:
def compute_hardware_metrics(model, outputs):
  rpu_config = model.rpu_config
  temperature = np.random.uniform(0.8, 1.2) * rpu_config.forward.inp_res * rpu_config.forward.out_res
  noise = np.random.normal(0, rpu_config.forward.out_noise)
  drift = rpu_config.drift_compensation.readout(outputs)
  return temperature, noise, drift

In [6]:
def compute_hardware_loss(model, outputs):
  temperature, noise, drift = compute_hardware_metrics(model, outputs)
  hardware_loss = 0.2 * temperature + 0.3 * noise + 0.2 * drift
  return hardware_loss

In [7]:
def update_rpu_config(rpu_config, perturbation_scale=0.01):
  rpu_config.forward.inp_res += rpu_config.forward.inp_res * perturbation_scale
  rpu_config.forward.out_res += rpu_config.forward.inp_res * perturbation_scale
  rpu_config.forward.out_noise += rpu_config.forward.inp_res * perturbation_scale
  g_max = rpu_config.noise_model.g_max + rpu_config.noise_model.g_max * perturbation_scale
  rpu_config.noise_model = PCMLikeNoiseModel(g_max)
  rpu_config.modifier.std_dev += modifier_std * perturbation_scale

In [None]:
class DynamicLayer(nn.Module):
    def __init__(self, layer_type, *args, **kwargs):

        super(DynamicLayer, self).__init__()

        self.layer_type = layer_type
        self.args = args
        self.kwargs = kwargs

        rpu_config = create_rpu_config()

        if layer_type == 'conv':
            self.analog_layer = AnalogConv2d(*args, **kwargs, rpu_config=rpu_config)
            self.digital_layer = nn.Conv2d(*args, **kwargs)
        elif layer_type == 'linear':
            self.analog_layer = AnalogLinear(*args, **kwargs, rpu_config=rpu_config)
            self.digital_layer = nn.Linear(*args, **kwargs)
        else:
            raise ValueError("Invalid layer_type. Use 'conv' or 'linear'.")

        self.is_analog = False

    def forward(self, x):
        if self.is_analog:
            return self.analog_layer(x)
        else:
            return self.digital_layer(x)

    def toggle(self, is_analog):

        self.is_analog = is_analog

        # If switching to analog, synchronize weights
        if self.is_analog:
            if self.layer_type == 'conv':
                self.analog_layer.set_weights(self.digital_layer.weight.detach().cpu().numpy().reshape(-1))
            elif self.layer_type == 'linear':
                self.analog_layer.set_weights(self.digital_layer.weight.detach().cpu().numpy(),
                                               self.digital_layer.bias.detach().cpu().numpy())

        # If switching to digital, synchronize weights
        else:
            if self.layer_type == 'conv':
                weights = torch.tensor(self.analog_layer.get_weights()[0]).reshape(self.digital_layer.weight.shape)
                self.digital_layer.weight.data.copy_(weights)
            elif self.layer_type == 'linear':
                weights, bias = self.analog_layer.get_weights()
                self.digital_layer.weight.data.copy_(torch.tensor(weights))
                self.digital_layer.bias.data.copy_(torch.tensor(bias))

In [None]:
class MTL_Net_DynamicToggle(nn.Module):
    def __init__(self, input_channel, num_class):
        super(MTL_Net_DynamicToggle, self).__init__()
        self.classes = num_class

        # Replace layers with dynamic layers
        self.conv1 = DynamicLayer('conv', in_channels=input_channel, out_channels=8, kernel_size=3, stride=1)
        self.conv2 = DynamicLayer('conv', in_channels=8, out_channels=16, kernel_size=3, stride=1)
        self.fc1 = DynamicLayer('linear', 64, 256)
        self.dropout1 = nn.Dropout(0.3)
        self.fc2 = DynamicLayer('linear', 256, 128)
        self.dropout2 = nn.Dropout(0.3)

        # Task-specific layers remain digital
        self.fc3 = nn.Linear(128, self.classes[0])
        self.fc4 = nn.Linear(128, self.classes[1])

    def forward(self, x):
        x = F.max_pool2d(F.relu(self.conv1(x)), kernel_size=3)
        x = F.max_pool2d(F.relu(self.conv2(x)), kernel_size=3)
        x = F.relu(self.fc1(x.reshape(-1, x.shape[1] * x.shape[2] * x.shape[3])))
        x = self.dropout1(x)
        x = F.relu(self.fc2(x))
        x = self.dropout2(x)
        x1 = self.fc3(x)  # Task 1 output
        x2 = self.fc4(x)  # Task 2 output

        return x1, x2

    def toggle_layer(self, layer_name, is_analog):
        """
        Toggle a specific layer between analog and digital.
        """
        getattr(self, layer_name).toggle(is_analog)

In [None]:
def train_model_dynamic(model, trainloader, optim, criterion, epoch, device, rpu_config, toggle_config=None):
    """
    Train function with dynamic analog/digital toggling.

    Parameters:
    - toggle_config: dict, specifies which layers to toggle and when during training.
                     Example: {'conv1': True, 'fc1': False}.
    """
    model.train()  # Ensure the model is in training mode
    train_loss, total, total_correct1, total_correct2 = 0, 0, 0, 0

    # Apply toggling at the start of training (if toggle_config is provided)
    if toggle_config:
        for layer_name, is_analog in toggle_config.items():
            model.toggle_layer(layer_name, is_analog)

    for i, (inputs, tg1, tg2) in enumerate(tqdm(trainloader)):
        inputs, tg1, tg2 = inputs.to(device), tg1.to(device), tg2.to(device)
        optim.zero_grad()

        # Forward pass through the model
        op1, op2 = model(inputs)

        # Compute losses for both tasks
        loss1 = criterion(op1, tg1)
        loss2 = criterion(op2, tg2)
        total_loss = loss1 + loss2

        hardware_loss = compute_hardware_loss(model, op1) + compute_hardware_loss(model, op2)
        total_loss += hardware_loss

        # Backpropagation
        total_loss.backward()

        # Update weights
        optim.step()

        # Accumulate metrics
        train_loss += total_loss.item()
        _, pd1 = torch.max(op1.data, 1)
        _, pd2 = torch.max(op2.data, 1)

        total_correct1 += (pd1 == tg1).sum().item()
        total_correct2 += (pd2 == tg2).sum().item()
        total += tg1.size(0)

        if total_loss < prev_loss:
            update_rpu_config(rpu_config, total_loss*0.000001)
        prev_loss = total_loss

    print("Epoch: [{}]  loss: [{:.2f}] Original_task_acc [{:.2f}] animal_vs_non_animal_acc [{:.2f}]".format(
        epoch + 1, train_loss / (i + 1),
        (total_correct1 * 100 / total),
        (total_correct2 * 100 / total)
    ))

    return train_loss / (i + 1), (total_correct1 * 100 / total), (total_correct2 * 100 / total)

In [None]:
def test_model_dynamic(model, testloader, criterion, epoch, device, toggle_config=None):
    """
    Test function with dynamic analog/digital toggling.

    Parameters:
    - toggle_config: dict, specifies which layers to toggle during testing.
                     Example: {'conv1': True, 'fc1': False}.
    """
    model.eval()  # Ensure the model is in evaluation mode
    test_loss, total, total_correct1, total_correct2 = 0, 0, 0, 0

    # Apply toggling at the start of testing (if toggle_config is provided)
    if toggle_config:
        for layer_name, is_analog in toggle_config.items():
            model.toggle_layer(layer_name, is_analog)

    with torch.no_grad():
        for i, (inputs, tg1, tg2) in enumerate(tqdm(testloader)):
            inputs, tg1, tg2 = inputs.to(device), tg1.to(device), tg2.to(device)

            # Forward pass through the model
            op1, op2 = model(inputs)

            # Compute losses for both tasks
            loss1 = criterion(op1, tg1)
            loss2 = criterion(op2, tg2)

            # Accumulate metrics
            test_loss += loss1.item() + loss2.item()
            _, pd1 = torch.max(op1.data, 1)
            _, pd2 = torch.max(op2.data, 1)

            total_correct1 += (pd1 == tg1).sum().item()
            total_correct2 += (pd2 == tg2).sum().item()
            total += tg1.size(0)

    # Compute accuracies
    acc1 = 100. * total_correct1 / total
    acc2 = 100. * total_correct2 / total

    # Log metrics
    print("Test Epoch: [{}]  loss: [{:.2f}] Original_task_Acc [{:.2f}] animal_vs_non_animal_acc [{:.2f}]".format(
        epoch + 1, test_loss / (i + 1), acc1, acc2
    ))

    return test_loss / (i + 1), acc1, acc2

In [None]:
def create_sgd_optimizer(model, learning_rate):
    optimizer = AnalogSGD(model.parameters(), lr=learning_rate)
    optimizer.regroup_param_groups(model)
    return optimizer

In [None]:
model = MTL_Net_DynamicToggle(input_channel=3, num_class=[10, 2]).to('cuda')

# Start with all analog
model.toggle_layer('conv1', is_analog=True)
model.toggle_layer('conv2', is_analog=True)
model.toggle_layer('fc1', is_analog=True)
model.toggle_layer('fc2', is_analog=True)
model.toggle_layer('fc3', is_analog=True)
model.toggle_layer('fc4', is_analog=True)

optimizer = create_sgd_optimizer()
criterion = nn.CrossEntropyLoss()
rpu_config = create_rpu_config()

order = ['conv1', 'conv1',('fc1, fc2'),('fc3','fc4')]

for epoch in range(50):

    # Dynamically toggle during training
    train_loss, l1, l2 = train_model_dynamic(model, train_loader, optimizer, criterion, epoch, device='cuda', rpu_config, toggle_config)
    if train_loss > 0.8 and order:
      val = order.pop()
      toggle_config = dict()
      if type(val) is tuple:
          if val[0] > val[1]:
            toggle_config[val[1]] = False
          else:
            toggle_config[val[0]] = False
        else:
          toggle_config[val] = False
    else:
      toggle_config = None

    # Dynamically toggle during testing
    test_loss, acc1, acc2 = test_model_dynamic(model, test_loader, criterion, epoch, device='cuda', toggle_config)