In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
from torchvision import datasets, transforms, models
import numpy as np 
import os
import torch.nn.utils.prune as prune
import time
from utils import *
import matplotlib.pyplot as plt

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
transform = transforms.Compose([
    transforms.Resize(224),  # Resize to 224x224
    transforms.ToTensor(),   # Convert image to PyTorch Tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize using ImageNet stats
])

train_dataset = datasets.CIFAR10(root="../data", train=True, transform=transform, download=True)
test_dataset = datasets.CIFAR10(root="../data", train=False, transform=transform, download=True)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=128, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=128, shuffle=False)

Files already downloaded and verified
Files already downloaded and verified


In [2]:
from typing import Union, List, Optional, Type, Callable, Any
from torch import Tensor

## Resnet 기본구조 불러오기 (Taken from Online)

In [3]:
def conv3x3(in_planes, out_planes, stride=1, groups=1, dilation=1):
    """3x3 convolution with padding
    
    Args:
        in_planes: number of channels in input image
        out_planes: number of channels produced by convolution
        stride: stride of the convolution. Default: 1
        groups: Number of blocked connections from input channels to output channels. Default: 1
        dilation (int or tuple, optional): Spacing between kernel elements. Default: 1
        
    Returns:
        Convoluted layer of kernel size=3, with specified out_planes
    
    """
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
                     padding=dilation, groups=groups, bias=False, dilation=dilation)
    # 파라미터 갯수 => 3x3x in_planes x out_planes 


def conv1x1(in_planes, out_planes, stride=1):
    """1x1 convolution
    
    Args:
        in_planes: number of channels in input image
        out_planes: number of channels produced by convolution
        stride: stride of the convolution. Default: 1
        
    Returns:
        Convoluted layer of kernel size=1, with specified out_planes
        
    """
    return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)
   # 파라미터 갯수 => 1 x 1 x in_planes x out_planes

class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, inplanes, planes, stride=1, downsample=None, groups=1,
                 base_width=64, dilation=1, norm_layer=None):
        super(BasicBlock, self).__init__()
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        if groups != 1 or base_width != 64:
            raise ValueError('BasicBlock only supports groups=1 and base_width=64')
        if dilation > 1:
            raise NotImplementedError("Dilation > 1 not supported in BasicBlock")
        # Both self.conv1 and self.downsample layers downsample the input when stride != 1
        self.conv1 = conv3x3(inplanes, planes, stride) # 해상도유지 if stride = 1 
        self.bn1 = norm_layer(planes)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = conv3x3(planes, planes) # 해상도유지지
        self.bn2 = norm_layer(planes)
        self.downsample = downsample
        self.stride = stride
        # FloatFunction()
        self.skip_add = nn.quantized.FloatFunctional()
        

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        # Notice the addition operation in both scenarios
    
        out = self.skip_add.add(out, identity)
        out = self.relu(out)

        return out

In [4]:
class Bottleneck(nn.Module):
    # Bottleneck in torchvision places the stride for downsampling at 3x3 convolution(self.conv2)
    # while original implementation places the stride at the first 1x1 convolution(self.conv1)
    # according to "Deep residual learning for image recognition"https://arxiv.org/abs/1512.03385.
    # This variant is also known as ResNet V1.5 and improves accuracy according to
    # https://ngc.nvidia.com/catalog/model-scripts/nvidia:resnet_50_v1_5_for_pytorch.

    expansion: int = 4

    def __init__(
            self,
            inplanes: int,
            planes: int,
            stride: int = 1,
            downsample: Optional[nn.Module] = None,
            groups: int = 1,
            base_width: int = 64,
            dilation: int = 1,
            norm_layer: Optional[Callable[..., nn.Module]] = None,
            quantize=False) -> None:
        super(Bottleneck, self).__init__()

        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        width = int(planes * (base_width / 64.)) * groups
        # Both self.conv2 and self.downsample layers downsample the input when stride != 1
        self.conv1 = conv1x1(inplanes, width)
        self.bn1 = norm_layer(width)

        self.conv2 = conv3x3(width, width, stride, groups, dilation)
        self.bn2 = norm_layer(width)

        self.conv3 = conv1x1(width, planes * self.expansion)
        self.bn3 = norm_layer(planes * self.expansion)
        # ReLU function
        self.relu1 = nn.ReLU(inplace=True)
        self.relu2 = nn.ReLU(inplace=True)
        # downsample
        self.downsample = downsample
        self.stride = stride
        # 
        self.skip_add = nn.quantized.FloatFunctional()
        

    def forward(self, x: Tensor) -> Tensor:
        identity = x
        # Fusing conv1,bn1,relu1
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu1(out)
        # Fusing conv2,bn2,relu2
        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu2(out)

        out = self.conv3(out)
        out = self.bn3(out)

        if self.downsample is not None:
            identity = self.downsample(x)
        ## self.quantize -> unncesscary flag we can get rid of it (chat gpt 가 말함 둘다 실험할껏껏)
    
        out = self.skip_add.add(identity, out)
        # out += identity
        # out = torch.add(identity, out)        
        out = self.relu2(out)

        return out


In [5]:

class ResNet(nn.Module):
    def __init__(
            self,
            block: Type[Union[BasicBlock, Bottleneck]],
            layers: List[int], # [2,2,2,2]
            num_classes: int = 1000,
            zero_init_residual: bool = False,
            groups: int = 1,
            width_per_group: int = 64,
            replace_stride_with_dilation: Optional[List[bool]] = None,
            norm_layer: Optional[Callable[..., nn.Module]] = None) -> None:
        super(ResNet, self).__init__()
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        self._norm_layer = norm_layer

        self.inplanes = 64
        self.dilation = 1
        if replace_stride_with_dilation is None:
            # each element in the tuple indicates if we should replace
            # the 2x2 stride with a dilated convolution instead
            replace_stride_with_dilation = [False, False, False]
        if len(replace_stride_with_dilation) != 3:
            raise ValueError("replace_stride_with_dilation should be None "
                             "or a 3-element tuple, got {}".format(
                                 replace_stride_with_dilation))
        self.groups = groups
        self.base_width = width_per_group
        self.conv1 = nn.Conv2d(3,
                               self.inplanes,
                               kernel_size=7,
                               stride=2,
                               padding=3,
                               bias=False)
        ## self. conv1 을통해 해상도 1/2 로감소
        self.bn1 = norm_layer(self.inplanes)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        # self.maxpool 을 통해 해상도 한번더 감소 1/4
        # self.inplanes =64 
        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block,
                                       128,
                                       layers[1],
                                       stride=2,
                                       dilate=replace_stride_with_dilation[0])
        self.layer3 = self._make_layer(block,
                                       256,
                                       layers[2],
                                       stride=2,
                                       dilate=replace_stride_with_dilation[1])
        self.layer4 = self._make_layer(block,
                                       512,
                                       layers[3],
                                       stride=2,
                                       dilate=replace_stride_with_dilation[2])
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight,
                                        mode='fan_out',
                                        nonlinearity='relu')
            elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

        # Zero-initialize the last BN in each residual branch,
        # so that the residual branch starts with zeros, and each residual block behaves like an identity.
        # This improves the model by 0.2~0.3% according to https://arxiv.org/abs/1706.02677
        if zero_init_residual:
            for m in self.modules():
                if isinstance(m, Bottleneck):
                    nn.init.constant_(m.bn3.weight,
                                      0)  # type: ignore[arg-type]
                elif isinstance(m, BasicBlock):
                    nn.init.constant_(m.bn2.weight,
                                      0)  # type: ignore[arg-type]

    def _make_layer(self,
                    block: Type[Union[BasicBlock, Bottleneck]],
                    planes: int,
                    blocks: int,
                    stride: int = 1,
                    dilate: bool = False) -> nn.Sequential:   # self._make_layer(block, 64, layers[0])
        #  return _resnet('resnet50', Bottleneck, [3, 4, 6, 3], pretrained, progress,
        #            **kwargs)
        norm_layer = self._norm_layer
        downsample = None
        previous_dilation = self.dilation

        if dilate:
            self.dilation *= stride
            stride = 1
        if stride != 1 or self.inplanes != planes * block.expansion: # 64 != 64*4
            downsample = nn.Sequential(
                conv1x1(self.inplanes, planes * block.expansion, stride), #(64,256) # stride =1
                norm_layer(planes * block.expansion),
            )

        layers = []
        layers.append(
            block(self.inplanes, planes, stride, downsample, self.groups, # self.groups = 1
                  self.base_width, previous_dilation, norm_layer)) # self.inplanes here =64, # planes = 64
        # conv1 을 통해 64 => 64
        # conv2 을 통해 64 => 64 
        # conv3 을 통해 64 => 256
        self.inplanes = planes * block.expansion # self.inplanes here => 64*4 =256
        # 첫 블록 64 => 256  나머지 블록 256=>256, 256=>256
        for _ in range(1, blocks):
            layers.append(
                block(self.inplanes,
                      planes,
                      groups=self.groups,
                      base_width=self.base_width,
                      dilation=self.dilation,
                      norm_layer=norm_layer))

        return nn.Sequential(*layers)
        # self.layer2 = self._make_layer(block,
        #                                128,
        #                                layers[1],
        #                                stride=2,
        #                                dilate=replace_stride_with_dilation[0])
        # self.inplanes= 256 planes =128 512
    def _forward_impl(self, x: Tensor) -> Tensor:
        # See note [TorchScript super()]
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x

    def forward(self, x: Tensor) -> Tensor:
        return self._forward_impl(x)

In [6]:
def _resnet(arch: str, block: Type[Union[BasicBlock,
                                         Bottleneck]], layers: List[int],
            pretrained: bool, progress: bool, **kwargs: Any) -> ResNet:
    model = ResNet(block, layers, **kwargs)
    if pretrained:
        state_dict = load_state_dict_from_url(model_urls[arch],
                                              progress=progress)
        model.load_state_dict(state_dict)
    return model


def resnet18(pretrained: bool = False,
             progress: bool = True,
             **kwargs: Any) -> ResNet:
    r"""ResNet-18 model from
    `"Deep Residual Learning for Image Recognition" <https://arxiv.org/pdf/1512.03385.pdf>`_.

    Args:
        pretrained (bool): If True, returns a model pre-trained on ImageNet
        progress (bool): If True, displays a progress bar of the download to stderr
    """
    return _resnet('resnet18', BasicBlock, [2, 2, 2, 2], pretrained, progress,
                   **kwargs)

In [7]:
class QuantizeResnet18(nn.Module):
    def __init__(self,model_fp32):
         super(QuantizeResnet18,self).__init__()
         self.quant=torch.ao.quantization.QuantStub()
         self.model=model_fp32
         self.dequant=torch.ao.quantization.DeQuantStub()

    def forward(self,x):
         x=self.quant(x)

         x=self.model(x)
         
         x=self.dequant(x)
         return x
         

## Pretrained 모델 부르기

In [8]:
model= resnet18(num_classes=10,pretrained=False)

In [9]:

def train_model(model,
                train_loader,
                test_loader,
                device,
                learning_rate=0.01,
                num_epochs=10):

    # The training configurations were not carefully selected.

    criterion = nn.CrossEntropyLoss()

    model.to(device)

    # It seems that SGD optimizer is better than Adam optimizer for ResNet18 training on CIFAR10.
    optimizer = optim.SGD(model.parameters(),
                          lr=learning_rate,
                          momentum=0.9,
                          weight_decay=1e-4)
    # scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=500)
    scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer,
                                                     milestones=[100, 150],
                                                     gamma=0.1,
                                                     last_epoch=-1)
    # optimizer = optim.Adam(model.parameters(), lr=learning_rate, betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False)

    for epoch in range(num_epochs):

        # Training
        model.train()

        running_loss = 0
        running_corrects = 0

        for inputs, labels in tqdm(train_loader):

            inputs = inputs.to(device)
            labels = labels.to(device)

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            # statistics
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)
        
        train_loss = running_loss / len(train_loader.dataset)
        train_accuracy = running_corrects / len(train_loader.dataset)
        print(
            "Epoch: {:03d} Train Loss: {:.3f} Train Acc: {:.3f}"
            .format(epoch + 1, train_loss, train_accuracy))
        test(model,test_loader,device)
        scheduler.step()
        
def test(model,test_loader,device):
    model.to(device)
    model.eval()
    s=time.time()
    with torch.no_grad():
        total=0
        correct=0
        num_samples=0
        for images,labels in tqdm(test_loader):
            images,labels= images.to(device),labels.to(device)
            num_samples+=images.size(0)
            output=model(images)
            _,predicted = torch.max(output,dim=1)
            total+=labels.size(0)
            correct+=(predicted==labels).sum().item()
            
        accuracy=100*correct/total
        e=time.time()
        print(f'Accuracy: {accuracy}%, Forward Time: {e - s :.2f}s')
def check_memory(model):
    
    parameter_size = sum(p.numel()*p.element_size() for p in model.parameters())
    buffer_size = sum(b.numel()*b.element_size() for b in model.buffers())

    total_size= parameter_size+buffer_size

    return total_size/1e6

In [10]:
import os
saved_path = '../saved'

state_dict_path = os.path.join(saved_path,'resnet18_weights.pth')
state_dict=torch.load(state_dict_path)


  state_dict=torch.load(state_dict_path)


In [11]:
model.load_state_dict(state_dict)

<All keys matched successfully>

## fused model

In [12]:
import copy
# cpu 로 모델 불러오기
cpu_device = torch.device("cpu:0")
model.to(cpu_device)
fused_model = copy.deepcopy(model)
# fusion 전 eval()써야함
model.eval()
fused_model.eval()


ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (skip_add): FloatFunctional(
        (activation_post_process): Identity()
      )
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05,

In [13]:
torch.ao.quantization.fuse_modules_qat(fused_model,[['conv1','bn1','relu']],inplace=True)

ResNet(
  (conv1): ConvBnReLU2d(
    (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
  )
  (bn1): Identity()
  (relu): Identity()
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (skip_add): FloatFunctional(
        (activation_post_process): Identity()
      )
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), 

In [14]:
for name,modules in fused_model.named_children():
    if 'layer' in name:
        for block_name,block_module in modules.named_children():
            torch.ao.quantization.fuse_modules_qat(block_module,[['conv1','bn1','relu'],['conv2','bn2']],inplace=True)
            if block_module.downsample is not None:
                torch.ao.quantization.fuse_modules_qat(block_module.downsample,[['0','1']],inplace=True)

## QAT 훈련

In [15]:
model_quantized = QuantizeResnet18(fused_model)

In [16]:
qconfig = torch.ao.quantization.get_default_qat_qconfig('x86')
model_quantized.qconfig = qconfig

In [17]:
torch.ao.quantization.prepare_qat(model_quantized,inplace=True)



QuantizeResnet18(
  (quant): QuantStub(
    (activation_post_process): FusedMovingAvgObsFakeQuantize(
      fake_quant_enabled=tensor([1]), observer_enabled=tensor([1]), scale=tensor([1.]), zero_point=tensor([0], dtype=torch.int32), dtype=torch.quint8, quant_min=0, quant_max=127, qscheme=torch.per_tensor_affine, reduce_range=True
      (activation_post_process): MovingAverageMinMaxObserver(min_val=inf, max_val=-inf)
    )
  )
  (model): ResNet(
    (conv1): ConvBnReLU2d(
      3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False
      (bn): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (weight_fake_quant): FusedMovingAvgObsFakeQuantize(
        fake_quant_enabled=tensor([1]), observer_enabled=tensor([1]), scale=tensor([1.]), zero_point=tensor([0], dtype=torch.int32), dtype=torch.qint8, quant_min=-128, quant_max=127, qscheme=torch.per_channel_symmetric, reduce_range=False
        (activation_post_process): MovingAveragePerChannelMi

In [18]:
train_model(model_quantized,train_loader=train_loader,test_loader=test_loader,device=device,learning_rate=0.001,num_epochs=3)

100%|██████████| 391/391 [02:45<00:00,  2.36it/s]


Epoch: 001 Train Loss: 0.599 Train Acc: 0.805


100%|██████████| 79/79 [00:15<00:00,  5.27it/s]


Accuracy: 88.62%, Forward Time: 15.00s


100%|██████████| 391/391 [02:41<00:00,  2.43it/s]


Epoch: 002 Train Loss: 0.195 Train Acc: 0.933


100%|██████████| 79/79 [00:15<00:00,  5.11it/s]


Accuracy: 90.53%, Forward Time: 15.45s


100%|██████████| 391/391 [02:42<00:00,  2.41it/s]


Epoch: 003 Train Loss: 0.119 Train Acc: 0.960


100%|██████████| 79/79 [00:15<00:00,  5.13it/s]


Accuracy: 91.04%, Forward Time: 15.40s


100%|██████████| 391/391 [02:40<00:00,  2.43it/s]


Epoch: 004 Train Loss: 0.075 Train Acc: 0.976


100%|██████████| 79/79 [00:14<00:00,  5.28it/s]


Accuracy: 91.92%, Forward Time: 14.95s


100%|██████████| 391/391 [02:39<00:00,  2.44it/s]


Epoch: 005 Train Loss: 0.050 Train Acc: 0.985


100%|██████████| 79/79 [00:14<00:00,  5.27it/s]

Accuracy: 91.24%, Forward Time: 14.99s





In [19]:
model_quantized = model_quantized.to('cpu')
quantized_model = torch.ao.quantization.convert(model_quantized.eval(), inplace=False)

In [20]:
test(quantized_model,test_loader,device='cpu')


100%|██████████| 79/79 [00:27<00:00,  2.86it/s]

Accuracy: 91.28%, Forward Time: 27.65s





In [21]:
test(model,test_loader,device='cpu')

100%|██████████| 79/79 [01:44<00:00,  1.32s/it]

Accuracy: 95.76%, Forward Time: 104.48s





### jit 으로 저장하는 이유는 배포를 위함 (Deployment : CPU, Edge, Mobile)
조금 더 빠른듯?


In [34]:
model_filepath='../saved/resnet18_quantized.pt'
torch.jit.save(torch.jit.script(quantized_model), model_filepath)

In [35]:
model = torch.jit.load(model_filepath, map_location='cpu')

In [36]:
test(model,test_loader,device='cpu')

100%|██████████| 79/79 [00:26<00:00,  2.93it/s]

Accuracy: 91.28%, Forward Time: 26.94s



