In [1]:
from google.colab import drive
drive.mount('/content/drive')

MessageError: Error: credential propagation was unsuccessful

In [None]:
!pip install timm==0.4.12
!git clone https://github.com/Chungchih/ViT_Training.git
%cd ViT_Training

In [None]:
from transformers import AutoFeatureExtractor, AutoModelForImageClassification, BitsAndBytesConfig
import torch
import torch.nn as nn
import bitsandbytes as bnb
import time
import utils


In [None]:
# Configure 8-bit quantization
quantization_config = BitsAndBytesConfig(
    load_in_8bit=True,
    #bnb_4bit_compute_dtype=torch.float16
)

# Load the model with 8-bit quantization
model = AutoModelForImageClassification.from_pretrained(
    "facebook/deit-tiny-patch16-224",  # example model name
    quantization_config=quantization_config
)

# Load the feature extractor
feature_extractor = AutoFeatureExtractor.from_pretrained(
    "facebook/deit-tiny-patch16-224"
)


In [None]:
class NoiseQuantizedLinear4Bit(nn.Module):
    def __init__(self, original_linear, noise_level=0.01, quantization_bits=4):
        super().__init__()
        self.original_linear = original_linear
        self.noise_level = noise_level
        self.quantization_bits = quantization_bits

    def quantize_tensor(self, x, bits=8):
        """
        Simulate hardware-like integer quantization

        Args:
        - x: Input tensor
        - bits: Number of quantization bits (e.g., 8 for uint8)

        Returns:
        - Quantized tensor
        """
        # Determine quantization range based on bits
        max_val = 2**bits - 1
        min_val = 0

        # Normalize tensor to [0, 1]
        x_norm = (x - x.min()) / (x.max() - x.min())

        # Scale to integer range and round
        x_quantized = x_norm * max_val
        x_quantized = torch.round(x_quantized)

        # Clip to ensure within range
        x_quantized = torch.clamp(x_quantized, min_val, max_val)

        return x_quantized

    def quantizeACT(self, x, bits=16):
        # Quantize the activation
        scale = x.abs().max() / (2 ** (self.bits - 1) - 1)

        # Simulate quantization process
        quantized = torch.round(x / scale)
        quantized = torch.clamp(quantized, -2**(self.bits-1), 2**(self.bits-1) - 1)

        # Dequantize
        dequantized = quantized * scale

        return dequantized

    def add_quantization_noise(self, x, noise_level=0.01):
        """
        Add noise to quantized tensor

        Args:
        - x: Quantized input tensor
        - noise_level: Magnitude of noise

        Returns:
        - Noisy quantized tensor
        """
        # Generate noise
        noise = torch.randn_like(x) * noise_level * torch.std(x)

        # Add noise and clip
        noisy_x = x + noise
        #noisy_x = torch.clamp(noisy_x, 0, 2**self.quantization_bits-1)  # Assume 8-bit quantization

        return noisy_x

    def forward(self, x):
        # Dequantize weights
        try:
            dequantized_weights = bnb.functional.dequantize_4bit(
                self.original_linear.weight.data,
                self.original_linear.weight.quant_state
            )
        except Exception as e:
            print(f"Dequantization error: {e}")
            dequantized_weights = self.original_linear.weight.data

        # Quantize input
        #x_quantized = self.quantize_tensor(x, bits=self.quantization_bits)
        x_quantized = x

        # Quantize weights
        # weights_quantized = self.quantize_tensor(
        #     dequantized_weights,
        #     bits=self.quantization_bits
        # )
        weights_quantized = dequantized_weights

        # Add noise to quantized tensors
        x_noisy = self.add_quantization_noise(x_quantized, self.noise_level)
        weights_noisy = self.add_quantization_noise(weights_quantized, self.noise_level)
        print(x.dtype)
        print(x_noisy.dtype)
        # Perform linear transformation
        output = torch.nn.functional.linear(
            x_noisy.float(),
            weights_noisy.float(),
            self.original_linear.bias
        )
        #output = self.quantize_tensor(output, 16)
        return output


def add_noise_to_4bit_model(model, noise_level=0.01, quantization_bits=4):
    for name, module in model.named_children():
        # Look for Linear4Bit layers
        if hasattr(module, 'weight') and hasattr(module.weight, 'quant_state'):
            # Replace the original linear layer with our noisy quantized version
            setattr(model, name, NoiseQuantizedLinear4Bit(module, noise_level=noise_level, quantization_bits=4))

        else:
            # Recursively search through nested modules
            add_noise_to_4bit_model(module, noise_level, quantization_bits)
    return model


#model = add_noise_to_4bit_model(model, 0.1, 4)


In [None]:
import os

from torchvision import datasets, transforms

#from timm.data.constants import IMAGENET_DEFAULT_MEAN, IMAGENET_DEFAULT_STD
from timm.data import create_transform


def build_transform(is_train=False):
    resize_im = 224 > 32
    if is_train:
        # this should always dispatch to transforms_imagenet_train
        transform = create_transform(
            input_size=224,
            is_training=True,
            color_jitter=0.3,
            auto_augment='rand-m9-mstd0.5-inc1',
            interpolation='bicubic',
            re_prob=0.25,
            re_mode='pixel',
            re_count=1,
        )
        if not resize_im:
            # replace RandomResizedCropAndInterpolation with
            # RandomCrop
            transform.transforms[0] = transforms.RandomCrop(
                224, padding=4)
        return transform

    t = []
    if True:
        size = int(224 / 0.875)
        t.append(
            transforms.Resize(size, interpolation=3),  # to maintain same ratio w.r.t. 224 images
        )
        t.append(transforms.CenterCrop(224))

    t.append(transforms.ToTensor())
    #t.append(transforms.Normalize(IMAGENET_DEFAULT_MEAN, IMAGENET_DEFAULT_STD))
    t.append(transforms.Normalize(feature_extractor.image_mean, feature_extractor.image_std))
    return transforms.Compose(t)

def build_dataset(is_train, path):
    transform = build_transform(is_train,)

    root = os.path.join(path, 'train' if is_train else 'val')
    dataset = datasets.ImageFolder(root, transform=transform)
    nb_classes = 1000

    return dataset, nb_classes

#path = 'C:/Computing/mini_imagenet'
#path = r'C:/Computing/zhuhanqing-Lightening-Transformer-25e9859/imagenet-object-localization-challenge/ILSVRC/Data/CLS-LOC/'
path = '/content/drive/MyDrive/mini_imagenet'

dataset_train, nb_classes = build_dataset(is_train=1, path=path)
dataset_val, _ = build_dataset(is_train=0, path=path)

sampler_val = torch.utils.data.SequentialSampler(dataset_val)

In [None]:
def evaluate(data_loader_val, model, device):
    criterion = torch.nn.CrossEntropyLoss()

    metric_logger = utils.MetricLogger(delimiter="  ")
    header = 'Test:'

    # switch to evaluation mode
    model.eval()

    num_images = 0
    for images, target in metric_logger.log_every(data_loader_val, 10, header):
        images = images.to(device, non_blocking=True)
        target = target.to(device, non_blocking=True)
        num_images += images.shape[0]

        # compute output
        with torch.amp.autocast('cuda'):
            with torch.no_grad():
                output = model(images).logits
                loss = criterion(output, target)

        acc1, acc5 = accuracy(output, target, topk=(1, 5))

        batch_size = images.shape[0]
        metric_logger.update(loss=loss.item())
        metric_logger.meters['acc1'].update(acc1.item(), n=batch_size)
        metric_logger.meters['acc5'].update(acc5.item(), n=batch_size)

    # gather the stats from all processes
    metric_logger.synchronize_between_processes()
    print('* Acc@1 {top1.global_avg:.3f} Acc@5 {top5.global_avg:.3f} loss {losses.global_avg:.3f}'
          .format(top1=metric_logger.acc1, top5=metric_logger.acc5, losses=metric_logger.loss))

    return {k: meter.global_avg for k, meter in metric_logger.meters.items()}

from samplers import RASampler
num_tasks = utils.get_world_size()
global_rank = utils.get_rank()

sampler_train = RASampler(
    dataset_train, num_replicas=num_tasks, rank=global_rank, shuffle=True
)

data_loader_val = torch.utils.data.DataLoader(
    dataset_val, sampler=sampler_val,
    batch_size=int(1.5 * 32),
    num_workers=8,
    pin_memory=True,
    drop_last=False

)
device = 'cuda'

model = add_noise_to_8bit_model(model, 0.0, 8)

evaluate(data_loader_val, model, device='cuda')

In [None]:
torch.cuda.empty_cache()
!nvidia-smi