In [None]:
import torch
import torch.nn as nn
import numpy as np
from torch.autograd import Function
import math
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import os
import sys

def adder2d_function(X, W, stride=1, padding=0):
    n_filters, d_filter, h_filter, w_filter = W.size()
    n_x, d_x, h_x, w_x = X.size()

    h_out = (h_x - h_filter + 2 * padding) / stride + 1
    w_out = (w_x - w_filter + 2 * padding) / stride + 1

    h_out, w_out = int(h_out), int(w_out)
    X_col = torch.nn.functional.unfold(X.view(1, -1, h_x, w_x), h_filter, dilation=1, padding=padding, stride=stride).view(n_x, -1, h_out*w_out)
    X_col = X_col.permute(1,2,0).contiguous().view(X_col.size(1),-1)
    W_col = W.view(n_filters, -1)
    
    out = adder.apply(W_col,X_col)
    
    out = out.view(n_filters, h_out, w_out, n_x)
    out = out.permute(3, 0, 1, 2).contiguous()
    
    return out

class adder(Function):
    @staticmethod
    def forward(ctx, W_col, X_col):
        ctx.save_for_backward(W_col,X_col)
        output = -(W_col.unsqueeze(2)-X_col.unsqueeze(0)).abs().sum(1)
        return output

    @staticmethod
    def backward(ctx,grad_output):
        W_col,X_col = ctx.saved_tensors
        grad_W_col = ((X_col.unsqueeze(0)-W_col.unsqueeze(2))*grad_output.unsqueeze(1)).sum(2)
        grad_W_col = grad_W_col/grad_W_col.norm(p=2).clamp(min=1e-12)*math.sqrt(W_col.size(1)*W_col.size(0))/5
        grad_X_col = (-(X_col.unsqueeze(0)-W_col.unsqueeze(2)).clamp(-1,1)*grad_output.unsqueeze(1)).sum(0)
        
        return grad_W_col, grad_X_col
    
class adder2d(nn.Module):

    def __init__(self,input_channel,output_channel,kernel_size, stride=1, padding=0, bias = False):
        super(adder2d, self).__init__()
        self.stride = stride
        self.padding = padding
        self.input_channel = input_channel
        self.output_channel = output_channel
        self.kernel_size = kernel_size
        self.adder = torch.nn.Parameter(nn.init.normal_(torch.randn(output_channel,input_channel,kernel_size,kernel_size)))
        self.bias = bias
        if bias:
            self.b = torch.nn.Parameter(nn.init.uniform_(torch.zeros(output_channel)))

    def forward(self, x):
        output = adder2d_function(x,self.adder, self.stride, self.padding)
        if self.bias:
            output += self.b.unsqueeze(0).unsqueeze(2).unsqueeze(3)
        
        return output

In [9]:
import torch.nn as nn

def conv3x3(in_planes, out_planes, stride=1):
    " 3x3 convolution with padding "
    return adder2d(in_planes, out_planes, kernel_size=3, stride=stride, padding=1, bias=False)


class BasicBlock(nn.Module):
    expansion=1

    def __init__(self, inplanes, planes, stride=1, downsample=None):
        super(BasicBlock, self).__init__()
        self.conv1 = conv3x3(inplanes, planes, stride = stride)
        self.bn1 = nn.BatchNorm2d(planes)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = conv3x3(planes, planes)
        self.bn2 = nn.BatchNorm2d(planes)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        residual = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            residual = self.downsample(x)

        out += residual
        out = self.relu(out)

        return out

In [10]:
import torch.nn as nn

class ResNet(nn.Module):

    def __init__(self, block, layers, num_classes=10):
        super(ResNet, self).__init__()
        self.inplanes = 16
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(16)
        self.relu = nn.ReLU(inplace=True)
        self.layer1 = self._make_layer(block, 16, layers[0])
        self.layer2 = self._make_layer(block, 32, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 64, layers[2], stride=2)
        self.avgpool = nn.AvgPool2d(8, stride=1)
        self.fc = nn.Conv2d(64 * block.expansion, num_classes, 1, bias=False)
        self.bn2 = nn.BatchNorm2d(num_classes)
        
        for m in self.modules():
            if isinstance(m, nn.BatchNorm2d):
                m.weight.data.fill_(1)
                m.bias.data.zero_()
         
    def _make_layer(self, block, planes, blocks, stride=1):
        downsample = None
        if stride != 1 or self.inplanes != planes * block.expansion:
            downsample = nn.Sequential(
                adder2d(self.inplanes, planes * block.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(planes * block.expansion)
            )

        layers = []
        layers.append(block(inplanes = self.inplanes, planes = planes, stride = stride, downsample = downsample))
        self.inplanes = planes * block.expansion
        for _ in range(1, blocks):
            layers.append(block(inplanes = self.inplanes, planes = planes))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)

        x = self.avgpool(x)
        x = self.fc(x)
        x = self.bn2(x)

        return x.view(x.size(0), -1)


def resnet20(**kwargs):
    return ResNet(BasicBlock, [3, 3, 3], **kwargs)

In [11]:
import torch.nn as nn
import torch
from torch.nn import functional as F

class AdderNet(nn.Module):
    """
    AdderNet with Quantization Support

    This is a wrapper around ResNet20 that adds:
    - Manual weight loading for quantization
    - Activation saving for debugging
    - Classification method with softmax

    Quantization Preprocessing:
    - Adder weights → quantized integers: W_clip ∈ [-2^(q-1), 2^(q-1)-1]
    - BatchNorm running_mean → adjusted: μ' = round(μ/δ) + Σ|W_q - W_clip|
    - BatchNorm bias → quantized: β' = β/δ
    - Final FC weights → scaled: W_fc' = W_fc * δ

    During inference:
    - Adder: outputs -Σ|X - W_clip| (integer operations)
    - BatchNorm: Y = γ * (X - μ') / √(σ² + ε) + β'
    - The weight bias is implicitly handled via the adjusted running_mean
    """
    def __init__(self, num_classes=10, load_weights=None):
        super(AdderNet, self).__init__()
        
        # Use ResNet20 as the base model
        self.model = resnet20(num_classes=num_classes)
        
        if load_weights is not None:
            self.load_manual_weights(load_weights)

        self.activations = {}

    def load_manual_weights(self, weights_dict):
        """
        Load quantized weights into the model.
        Expected preprocessing:
        1. Adder weights: quantized integers W_clip
        2. BN running_mean: adjusted with weight bias (μ' = round(μ/δ) + bias_sum)
        3. BN bias: quantized by delta (β' = β/δ)
        4. BN weight: quantized by delta for bn1
        5. FC weights: scaled by delta (W_fc' = W_fc * δ)
        """
        with torch.no_grad():
            for name, param in self.model.named_parameters():
                if name in weights_dict:
                    weight_value = weights_dict[name]
                    if weight_value.shape == param.shape:
                        param.copy_(weight_value.to(param.device))

            for name, buffer in self.model.named_buffers():
                if name in weights_dict:
                    buffer_value = weights_dict[name]
                    if buffer_value.shape == buffer.shape:
                        buffer.copy_(buffer_value.to(buffer.device))

    def forward(self, x, save_activations=False):
        if save_activations:
            self.activations['input_activation'] = x.clone()

        # Forward through the model
        out = self.model(x)

        return out

    def classification(self, x):
        out = self.forward(x)
        return F.softmax(out, dim=1)

In [12]:
import pandas as pd
import numpy as np

class EvolutionAlgorithmBase:
    """
    Base class for Evolution Algorithms.
    """
    def __init__(self, func, n_dim, size_pop, max_iter, prob_mut):
        self.func = func
        self.n_dim = n_dim
        self.size_pop = size_pop
        self.max_iter = max_iter
        self.prob_mut = prob_mut
        
        # History containers
        self.generation_best_X = []
        self.generation_best_Y = []
        self.all_history_Y = []
        self.all_history_FitV = []

    def run(self):
        pass


class DE(EvolutionAlgorithmBase):
    """
    Differential Evolution (DE) Algorithm.
    
    This class implements the Differential Evolution algorithm for activation cutoff optimization.
    It uses a loop-based approach to ensure distinct candidate selection for mutation.
    This implementation maximizes the accuracy.
    
    Parameters:
    -----------
    func: callable
        The objective function to minimize. 
    n_dim: int
        The dimension of the search space (Layers*blocks*2 + 2) = 20.
    F: float
        The mutation factor (differential weight).
    size_pop: int
        The size of the population.
    max_iter: int
        The maximum number of iterations/generations.
    lb: array
        Lower bounds for the activation values.
    ub: array
        Upper bounds for the activation values.
    prob_mut: float (optional, default 0.7)
        The crossover rate (CR).
    """
    def __init__(self, func, F, lb, ub,
                 size_pop, n_dim, max_iter, prob_mut):
        # Note: 'prob_mut' corresponds to 'cr' (crossover rate) in sample code
        super().__init__(func, n_dim, size_pop, max_iter, prob_mut)

        self.F = F
        self.lb = np.array(lb) * np.ones(self.n_dim)
        self.ub = np.array(ub) * np.ones(self.n_dim)
        
        # Initialize population
        self.crtbp()
        # Evaluate initial population
        self.Y = np.array([self.func(x) for x in self.X])

    def crtbp(self):
        """Create the initial population"""
        self.X = np.random.uniform(self.lb, self.ub, (self.size_pop, self.n_dim))
        self.X = np.round(self.X, 2)
        
        return self.X

    def mutation_op(self, x, F):
        """
        Mutation operation: x[0] + F * (x[1] - x[2])
        x is a list/array of 3 vectors [a, b, c]
        """
        return x[0] + F * (x[1] - x[2])

    def check_bounds(self, mutated):
        """Boundary check operation using clip"""
        return np.clip(mutated, self.lb, self.ub)

    def crossover_op(self, mutated, target, cr):
        """
        Crossover operation
        """
        # generate a uniform random value for every dimension
        p = np.random.rand(self.n_dim)

        # ensure at least one parameter is from mutated vector
        j_rand = np.random.randint(0, self.n_dim)
        
        # Apply the crossover logic:
        # Use mutant value if rand <= CR OR if it's the forced index
        trial_vector = np.where((p <= cr) | (np.arange(self.n_dim) == j_rand), 
                                mutated, 
                                target)
        # Round to hundredths place (2 decimal places)
        trial_vector = np.round(trial_vector, 2)
        return trial_vector

    def run(self, max_iter=None):
        self.max_iter = max_iter or self.max_iter
        
        # Initial Best
        best_idx = np.argmax(self.Y)
        self.best_x = self.X[best_idx].copy()
        self.best_y = self.Y[best_idx]

        for i in range(self.max_iter):
            # Iterate over all candidate solutions
            for j in range(self.size_pop):
                # Choose three candidates a, b, c that are not the current one
                # to ensure distinct indices for mutation
                candidates = [idx for idx in range(self.size_pop) if idx != j]
                a_idx, b_idx, c_idx = np.random.choice(candidates, 3, replace=False)
                
                a = self.X[a_idx]
                b = self.X[b_idx]
                c = self.X[c_idx]
                
                # Perform mutation
                mutated = self.mutation_op([a, b, c], self.F)
                
                # Check bounds
                mutated = self.check_bounds(mutated)
                
                # Perform crossover
                trial = self.crossover_op(mutated, self.X[j], self.prob_mut)
                
                # Compute objective function value for trial vector
                # (Assuming func takes a single vector)
                if hasattr(self.func, 'batch_mode') and self.func.batch_mode:
                     # Handle batch if necessary, but sample assumes single
                     obj_trial = self.func(trial.reshape(1, -1))[0]
                else:
                     obj_trial = self.func(trial)
                
                print(f"Trial {j} accuracy: {obj_trial:.2f}%")
                
                obj_target = self.Y[j]
                
                # Perform selection
                if obj_trial > obj_target:
                    # Replace the target vector with the trial vector
                    self.X[j] = trial
                    self.Y[j] = obj_trial
            
            # Record the best individual of this generation
            generation_best_index = np.argmax(self.Y)
            current_best_y = self.Y[generation_best_index]
            
            self.generation_best_X.append(self.X[generation_best_index, :].copy())
            self.generation_best_Y.append(current_best_y)
            self.all_history_Y.append(self.Y.copy())
            
            print(f"Generation {i+1}: Best Accuracy = {current_best_y:.2f}%")
            
            # Update global best
            if current_best_y > self.best_y:
                 self.best_y = current_best_y
                 self.best_x = self.X[generation_best_index].copy()

        return self.best_x, self.best_y

In [16]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import os
import sys
import numpy as np

def load_data(dataset='cifar10', data_dir='./data', batch_size=100, workers=4):
    """
    Load test data for CIFAR10 or ImageNet
    
    Args:
        dataset: 'cifar10' or 'ImageNet'
        data_dir: path to dataset directory
        batch_size: batch size for data loader
        workers: number of data loading workers
    
    Returns:
        testloader: PyTorch DataLoader for test set
    """
    print(f'Preparing {dataset} data from {data_dir}..')
    
    if dataset.lower() == 'cifar10':
        val_loader = torch.utils.data.DataLoader(
            datasets.CIFAR10(data_dir, train=False, download=True, transform=transforms.Compose([
                transforms.ToTensor(),
                transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
            ])),
            batch_size=batch_size, shuffle=False,
            num_workers=workers, pin_memory=True)
    elif dataset.lower() == 'imagenet':
        val_loader = torch.utils.data.DataLoader(
            datasets.ImageFolder(data_dir, transforms.Compose([
                transforms.Resize(256),
                transforms.CenterCrop(224),
                transforms.ToTensor(),
                transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                     std=[0.229, 0.224, 0.225])
            ])),
            batch_size=batch_size, shuffle=False,
            num_workers=workers, pin_memory=True)
    else:
        raise ValueError(f"Dataset {dataset} not supported. Choose 'cifar10' or 'ImageNet'")
    
    return val_loader

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import os
import sys
import numpy as np

def get_adder_layer_keys(state_dict):
    keys = []
    for key in state_dict.keys():
        if key.startswith('layer') and key.endswith('adder'):
            keys.append(key)
    return keys

def quantization_objective(x, fixed_state_dict, adder_keys, bits, device, testloader):
    """
    Objective function for DE.
    x: array of max_activation_val scalars, one for each adder layer.
    """
    quantized_state_dict = {k: v.clone() for k, v in fixed_state_dict.items()}

    Max_A = 2**(bits) - 1
    Max_B = 0

    # Helper to calculate delta
    def get_delta(max_val):
        return max_val / Max_A

    delta_first = get_delta(x[0])
    delta_last = get_delta(x[-1])

    # Quantize bn1
    quantized_state_dict['bn1.weight'] = quantized_state_dict['bn1.weight'] / delta_first
    quantized_state_dict['bn1.bias'] = quantized_state_dict['bn1.bias'] / delta_first

    # Pre-calculate deltas for all adder layers
    layer_deltas = {key: get_delta(val) for key, val in zip(adder_keys, x)}

    bias_sums = {}

    # Process layers sequentially
    current_delta = delta_first
    current_bias_sum = 0

    for name in quantized_state_dict.keys():
        if name in adder_keys:
            # apply AOQ to weights
            w_tensor = quantized_state_dict[name]
            current_delta = layer_deltas[name]
            wq = torch.round(w_tensor / current_delta)
            wq_clamp = torch.clamp(wq, max=Max_A, min=Max_B)
            quantized_state_dict[name] = wq_clamp

            # Calculate bias sum for FBR
            bias_tensor = (wq - wq_clamp).abs()
            current_bias_sum = torch.sum(bias_tensor, dim=(1,2,3))

        elif name.startswith('layer'):
            # Handle BN parameters for layers (layer1, layer2, etc.)
            # Assumes these come AFTER their corresponding adder layer
            if name.endswith('running_mean'):
                m_tensor = quantized_state_dict[name]
                mq = torch.round(m_tensor / current_delta)
                quantized_state_dict[name] = mq + current_bias_sum

            elif name.endswith('bias') and 'bn' in name:
                x_tensor = quantized_state_dict[name]
                xq_tensor = x_tensor / current_delta
                quantized_state_dict[name] = xq_tensor

    # Quantize FC
    quantized_state_dict['fc.weight'] = quantized_state_dict['fc.weight'] * delta_last

    # Evaluate
    quant_model = AdderNet(num_classes=10).to(device)
    quant_model.load_manual_weights(quantized_state_dict)
    quant_model.eval()

    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, targets in testloader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = quant_model(inputs)
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()

    accuracy = 100. * correct / total
    return accuracy

def run_optimization():
    print("-+" * 25)
    print("Starting DE Optimization for AdderNet2.0 Quantization")
    print("-+" * 25)

    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    print(f"Using device: {device}")

    # Load Data
    testloader = load_data(dataset='cifar10', data_dir='./data', batch_size=200, workers=4)

    # Load Pretrained Model Weights
    print('Loading pretrained weights...')
    #model_dir = '/kaggle/input/state-dictionary'
    # files = os.listdir(model_dir)
    #model_path = os.path.join(model_dir, 'AdderNet_model.pth')
    model_path = 'AdderNet_model.pth'

    # Load state dictionary
    checkpoint = torch.load(model_path, map_location=device)
    if 'net' in checkpoint:
        state_dict_raw = checkpoint['net']
    else:
        state_dict_raw = checkpoint

    def remap_key(key):
        """Map original checkpoint keys to the correct ResNet20 naming."""
        new_key = key.replace('module.', '')

        # conv1, bn1, fc, bn2 are at the root level - keep them as is
        if new_key.startswith('conv1.') or new_key.startswith('bn1.') or new_key.startswith('fc.') or new_key.startswith('bn2.'):
            return new_key

        # Process residual layers
        # Original structure: layer1.0.conv1.adder -> New structure: layer1.0.conv1.adder
        # The ResNet20 uses Sequential for layers, so keys are already in correct format
        # We just need to remove 'module.' prefix
        
        return new_key

    # Apply remapping
    fixed_state_dict = {}
    for k, v in state_dict_raw.items():
        fixed_key = remap_key(k)
        fixed_state_dict[fixed_key] = v

    # Prepare keys
    print("Debug: First 10 keys in fixed_state_dict:")
    for key in list(fixed_state_dict.keys())[:10]:
        print(key)

    adder_keys = get_adder_layer_keys(fixed_state_dict)
    print(f"Found {len(adder_keys)} adder layers to optimize.")
    if len(adder_keys) == 0:
        print("Error: No adder layers found. Check model keys or filtering logic.")
        return

    # DE Parameters
    n_dim = 20
    size_pop = 50
    max_iter = 50
    prob_mut = 0.85 # Also called CR
    F = 0.4

    lb = [2.0] * 20
    ub = [4.0] * 20

    # Activation value range: using best results from previous generations
    # lb = [2.4, 2.73, 2.63, 2.4, 2.61, 2.45, 2.53, 2.48, 3.09, 2.5, 2.13, 2.36, 2.0, 2.52, 2.18, 2.0, 2.29, 2.21, 2.0, 2.38]
    # ub = [2.4, 2.74, 2.67, 2.4, 2.61, 2.45, 2.68, 2.48, 3.10, 2.5, 2.13, 2.36, 2.0, 2.52, 2.18, 2.0, 2.29, 2.21, 2.0, 2.38]

    bit_array = [4] # Bits to be tested (unsigned integer 4)

    for bits in bit_array:
        print(f"\nOptimizing for {bits}-bit quantization...")

        # Define objective wrapper
        def objective(x):
            acc = quantization_objective(x, fixed_state_dict, adder_keys, bits, device, testloader)
            return acc # maximizing accuracy

        de = DE(objective, F, lb, ub, size_pop, n_dim, max_iter, prob_mut)

        best_x, best_acc = de.run()

        print(f"Best Max Vals for {bits}-bit: {best_x}")
        print(f"Best Accuracy: {best_acc:.2f}%")
        print("-+" * 25)

if __name__ == "__main__":
    run_optimization()


-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
Starting DE Optimization for AdderNet2.0 Quantization
-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
Using device: cpu
Preparing cifar10 data from ./data..


5.2%


KeyboardInterrupt: 

In [17]:
# Test specific activation clipping values
def test_specific_values():
    print("-+" * 25)
    print("Testing Specific Activation Clipping Values")
    print("-+" * 25)

    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    print(f"Using device: {device}")

    # Load Data
    testloader = load_data(dataset='cifar10', data_dir='./data', batch_size=200, workers=4)

    # Load Pretrained Model Weights
    print('Loading pretrained weights...')
    model_path = 'AdderNet_model.pth'

    # Load state dictionary
    checkpoint = torch.load(model_path, map_location=device)
    if 'net' in checkpoint:
        state_dict_raw = checkpoint['net']
    else:
        state_dict_raw = checkpoint

    def remap_key(key):
        """Map original checkpoint keys to the correct ResNet20 naming."""
        new_key = key.replace('module.', '')
        if new_key.startswith('conv1.') or new_key.startswith('bn1.') or new_key.startswith('fc.') or new_key.startswith('bn2.'):
            return new_key
        return new_key

    # Apply remapping
    fixed_state_dict = {}
    for k, v in state_dict_raw.items():
        fixed_key = remap_key(k)
        fixed_state_dict[fixed_key] = v

    adder_keys = get_adder_layer_keys(fixed_state_dict)
    print(f"Found {len(adder_keys)} adder layers")

    # Specific activation clipping values to test (provided 18, adding 2 more based on pattern)
    test_values = [2.50, 1.97, 1.75, 0.75, 2.471, 1.821, 3.389, 1.586, 2.801, 1.26, 
                   3.49, 2.161, 2.009, 2.826, 2.757, 1.305, 2.255, 1.021, 2.0, 2.50]
    
    print(f"\nTesting activation values: {test_values}")
    print(f"Number of values: {len(test_values)}")

    bits = 4
    accuracy = quantization_objective(test_values, fixed_state_dict, adder_keys, bits, device, testloader)
    
    print(f"\n{'='*60}")
    print(f"Results for 4-bit quantization with specified activation values:")
    print(f"Accuracy: {accuracy:.4f}%")
    print(f"{'='*60}\n")


# Run the test
if __name__ == "__main__":
    test_specific_values()


-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
Testing Specific Activation Clipping Values
-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
Using device: cpu
Preparing cifar10 data from ./data..


2.4%


KeyboardInterrupt: 