## LSQ 
- Симметричная квантизация
- один параметр квантизации для активаций другой для весов
- симулируем эффект квантизации при точности флотов

In [1]:
import os
import random

import numpy as np
import torch

# from torchvision.io import decode_image
# from torchvision.models import ResNet18_Weights, resnet18
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

# Set the global seed for NumPy's random number generator
seed_value = 42
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


def set_seed(seed: int = 42) -> None:
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    # When running on the CuDNN backend, two further options must be set
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    # Set a fixed value for the hash seed
    os.environ["PYTHONHASHSEED"] = str(seed)
    print(f"Random seed set as {seed}")


set_seed(42)

Random seed set as 42


In [None]:
from lsq_quantization import QConvImg2Col
from classify_single import evaluate_model, classify_single_image

In [4]:
class LeNetIMCol(nn.Module):
    def __init__(self, num_classes=10):
        super(LeNetIMCol, self).__init__()
        self.conv1 = QConvImg2Col(8, 3, 6, 7)
        self.conv2 = QConvImg2Col(8, 6, 16, 7)

        self.pool = nn.MaxPool2d(2, 2)

        # final feature map after conv/pool:
        # input 32×32 → conv5 → 28×28 → pool → 14×14
        # → conv5 → 10×10 → pool → 5×5 → 16 channels
        self.fc1 = nn.Linear(16 * 3 * 3, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, num_classes)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.fc3(x)


def weights_init(m):
    if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
        nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
        if m.bias is not None:
            nn.init.constant_(m.bias, 0)


model = LeNetIMCol(10)
model.apply(weights_init)
model.to(device)


LeNetIMCol(
  (conv1): QConvImg2Col(
    (q_act): Quantizer()
    (q_w): Quantizer()
  )
  (conv2): QConvImg2Col(
    (q_act): Quantizer()
    (q_w): Quantizer()
  )
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (fc1): Linear(in_features=144, out_features=120, bias=True)
  (fc2): Linear(in_features=120, out_features=84, bias=True)
  (fc3): Linear(in_features=84, out_features=10, bias=True)
)

In [5]:
import ssl

ssl._create_default_https_context = ssl._create_unverified_context

transform = transforms.Compose(
    [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
)


train_dataset = datasets.CIFAR10(
    root="./data",
    train=True,
    download=True,
    transform=transform,
)
test_dataset = datasets.CIFAR10(
    root="./data",
    train=False,
    download=True,
    transform=transform,
)

class_names = [
    "airplane",
    "automobile",
    "bird",
    "cat",
    "deer",
    "dog",
    "frog",
    "horse",
    "ship",
    "truck",
]

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

100%|██████████| 170M/170M [00:03<00:00, 47.7MB/s]


In [7]:
criterion = nn.CrossEntropyLoss()
# optimizer = optim.Adam(model.parameters(), lr=0.01)
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
num_epochs = 10

In [None]:
import time

model.train()

for epoch in range(num_epochs):
    running_loss = 0.0
    correct = 0
    total = 0

    total_forward_time = 0.0
    total_backward_time = 0.0
    total_forward_memory = 0.0
    total_backward_memory = 0.0

    for batch_idx, (inputs, labels) in enumerate(train_loader):
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()

        torch.cuda.reset_peak_memory_stats()

        # ------- FORWARD -------
        torch.cuda.synchronize()
        mem_before_forward = torch.cuda.memory_allocated()

        forward_start = time.perf_counter()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        torch.cuda.synchronize()
        forward_end = time.perf_counter()

        forward_time = forward_end - forward_start
        total_forward_time += forward_time

        mem_after_forward = torch.cuda.memory_allocated()
        forward_memory = (mem_after_forward - mem_before_forward) / 1024**2
        total_forward_memory += forward_memory

        # ------- BACKWARD -------
        torch.cuda.synchronize()

        # reset peak memory stats for isolated backward measurement
        torch.cuda.reset_peak_memory_stats()

        backward_start = time.perf_counter()
        loss.backward()
        torch.cuda.synchronize()
        backward_end = time.perf_counter()

        backward_time = backward_end - backward_start
        total_backward_time += backward_time

        # peak memory used inside backward
        backward_peak = torch.cuda.max_memory_allocated() / 1024**2
        total_backward_memory += backward_peak

        optimizer.step()

        # stats
        running_loss += loss.item()
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()

    # epoch metrics
    avg_forward = total_forward_time / len(train_loader)
    avg_backward = total_backward_time / len(train_loader)
    avg_forward_mem = total_forward_memory / len(train_loader)
    avg_backward_mem = total_backward_memory / len(train_loader)
    peak_memory = torch.cuda.max_memory_allocated() / 1024**2

    print(
        f"Epoch [{epoch + 1}/{num_epochs}] "
        f"Loss: {running_loss / len(train_loader):.4f}, "
        f"Acc: {100 * correct / total:.2f}%, "
        f"Fwd: {avg_forward * 1000:.2f} ms ({avg_forward_mem:.2f} MB), "
        f"Bwd: {avg_backward * 1000:.2f} ms ({avg_backward_mem:.2f} MB), "
        f"Peak: {peak_memory:.2f} MB"
    )


Epoch [1/10] Loss: 2.0774, Acc: 22.69%, Fwd: 4.94 ms (31.90 MB), Bwd: 3.39 ms (69.82 MB), Peak: 30.89 MB
Epoch [2/10] Loss: 1.8085, Acc: 32.07%, Fwd: 4.44 ms (31.89 MB), Bwd: 3.01 ms (69.82 MB), Peak: 30.89 MB
Epoch [3/10] Loss: 1.7183, Acc: 35.60%, Fwd: 4.45 ms (31.89 MB), Bwd: 3.03 ms (69.82 MB), Peak: 30.89 MB
Epoch [4/10] Loss: 1.6711, Acc: 37.58%, Fwd: 4.44 ms (31.89 MB), Bwd: 3.01 ms (69.82 MB), Peak: 30.89 MB
Epoch [5/10] Loss: 1.6276, Acc: 39.49%, Fwd: 4.46 ms (31.89 MB), Bwd: 3.04 ms (69.82 MB), Peak: 30.89 MB
Epoch [6/10] Loss: 1.5985, Acc: 40.89%, Fwd: 4.44 ms (31.89 MB), Bwd: 3.03 ms (69.82 MB), Peak: 30.89 MB
Epoch [7/10] Loss: 1.5705, Acc: 41.93%, Fwd: 4.45 ms (31.89 MB), Bwd: 3.04 ms (69.82 MB), Peak: 30.89 MB
Epoch [8/10] Loss: 1.5541, Acc: 42.81%, Fwd: 4.43 ms (31.89 MB), Bwd: 3.01 ms (69.82 MB), Peak: 30.89 MB
Epoch [9/10] Loss: 1.5311, Acc: 43.77%, Fwd: 4.45 ms (31.89 MB), Bwd: 3.06 ms (69.82 MB), Peak: 30.89 MB
Epoch [10/10] Loss: 1.5578, Acc: 43.30%, Fwd: 4.43 ms (

In [None]:
import json

acc = evaluate_model(model, test_loader, device)
report = {
    "name": "lenet_quantized",
    "batch_size": 64,
    "avg_forward": avg_forward,
    "avg_backward": avg_backward,
    "avg_forward_mem": avg_forward_mem,
    "avg_backward_mem": avg_backward_mem,
    "kernel_size": 5,
    "acc": acc,
}

file_path = "lenet_quantized_kernel5_bs64.json"

with open(file_path, "w") as f:
    json.dump(report, f, indent=4)  # indent=4 makes the JSON file human-readable

Test Accuracy: 43.03%


In [None]:
from PIL import Image

transform = transforms.Compose(
    [
        transforms.Resize((32, 32)),
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
    ]
)

image = Image.open("/content/cat_watermelon.jpg").convert("RGB")
sample_image = transform(image).unsqueeze(0).to(device)


In [11]:
from torch.profiler import ProfilerActivity, profile

with profile(activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA]) as prof:
    # batch = preprocess(img).unsqueeze(0)

    # # Step 4: Use the model and print the predicted category
    # prediction = model(batch).squeeze(0).softmax(0)
    # class_id = prediction.argmax().item()
    # score = prediction[class_id].item()
    # category_name = weights.meta["categories"][class_id]
    classify_single_image(model, sample_image, device, class_names)

In [12]:
print(prof.key_averages().table(sort_by="cuda_time_total", row_limit=10))


-------------------------------------------------------  ------------  ------------  ------------  ------------  ------------  ------------  ------------  ------------  ------------  ------------  
                                                   Name    Self CPU %      Self CPU   CPU total %     CPU total  CPU time avg     Self CUDA   Self CUDA %    CUDA total  CUDA time avg    # of Calls  
-------------------------------------------------------  ------------  ------------  ------------  ------------  ------------  ------------  ------------  ------------  ------------  ------------  
                                           aten::im2col         5.14%       1.471ms         5.61%       1.604ms     802.249us      54.558us        17.84%      59.902us      29.951us             2  
void at::native::im2col_kernel<float>(long, float co...         0.00%       0.000us         0.00%       0.000us       0.000us      54.558us        17.84%      54.558us      27.279us             2  
void at::