#### 1. 라이브러리 불러오기

In [1]:
import numpy as np

import torch
from torch.utils.data import DataLoader

import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR100
 
import torch.nn as nn
import torch.optim as optim


from torchvision.models import mobilenet_v2
from torch.quantization import FakeQuantize, QuantStub, DeQuantStub

from tqdm.auto import tqdm

In [2]:
print(torch.cuda.is_available())

True


#### 2. CIFAR100 데이터셋 불러오기

In [2]:
# 데이터 준비
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

train_dataset = CIFAR100(root='./data', train=True, transform=transform, download=True)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)

test_dataset = CIFAR100(root='./data', train=False, transform=transform, download=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4)

Files already downloaded and verified
Files already downloaded and verified


In [5]:
from torch.utils.data import Subset

# 데이터 준비
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

train_dataset = CIFAR100(root='./data', train=True, transform=transform, download=True)
test_dataset = CIFAR100(root='./data', train=False, transform=transform, download=True)

# 처음 100개의 샘플만 선택
train_subset = Subset(train_dataset, indices=range(100))
test_subset = Subset(test_dataset, indices=range(100))

train_loader = DataLoader(train_subset, batch_size=32, shuffle=True, num_workers=4)
test_loader = DataLoader(test_subset, batch_size=32, shuffle=False, num_workers=4)

Files already downloaded and verified
Files already downloaded and verified


#### 3. 모델 정의 및 함수 정의하기

In [6]:
# EMA(Exponential Moving Averages) 클래스 정의
class EMA:
    def __init__(self, alpha=0.99):
        self.alpha = alpha
        self.ema_value = None

    def update(self, value):
        if self.ema_value is None:
            self.ema_value = value
        else:
            self.ema_value = self.alpha * self.ema_value + (1 - self.alpha) * value
        return self.ema_value

In [8]:
class QuantizedMobileNet(nn.Module):
    def __init__(self, num_classes=100, pretrained=True):
        super().__init__()
        self.model = mobilenet_v2(pretrained=pretrained)
        
        # Classifier 출력 조정 (CIFAR100: num_classes=100)
        self.model.classifier[1] = nn.Linear(self.model.last_channel, num_classes)
        
        # FBN 적용: BatchNorm을 Fold하여 가중치 및 편향을 업데이트
        self._fold_bn(self.model)
        
        # FakeQuantize 설정
        self.weight_fake_quant = FakeQuantize(observer=torch.quantization.MinMaxObserver, quant_min=-127, quant_max=127, dtype=torch.qint8)
        self.activation_fake_quant = FakeQuantize(observer=torch.quantization.MovingAverageMinMaxObserver, quant_min=0, quant_max=255, dtype=torch.quint8)
        
        # EMA 활성화 범위 추정을 위한 객체
        self.activation_ema = EMA(alpha=0.99)

        # 활성화 및 비활성화 도우미
        self.quant = QuantStub()
        self.dequant = DeQuantStub()

    def _fold_bn(self, model):
        """Batch Normalization Folding"""
        # 모든 BatchNorm2d 모듈 미리 수집
        bn_modules = [(name, module) for name, module in model.named_modules() 
                      if isinstance(module, nn.BatchNorm2d)]
        
        for name, bn_module in bn_modules:
            gamma = bn_module.weight
            beta = bn_module.bias
            mean = bn_module.running_mean
            var = bn_module.running_var
            epsilon = bn_module.eps

            # 부모 모듈에서 컨볼루션 레이어 찾기
            parent_name = name.rsplit('.', 1)[0]
            conv_layer = None

            for n, m in model.named_modules():
                if n == parent_name:
                    for sub_name, sub_module in m.named_modules():
                        if isinstance(sub_module, nn.Conv2d):
                            conv_layer = sub_module
                            break
                    break
            
            if conv_layer is not None:
                gamma_factor = (gamma / torch.sqrt(var + epsilon))
                expansion_factor = 1

                if conv_layer.weight.size(0) % gamma_factor.size(0) == 0:
                    expansion_factor = conv_layer.weight.size(0) // gamma_factor.size(0)

                    gamma_factor = gamma_factor.view(-1, 1, 1, 1).repeat_interleave(expansion_factor, dim=0)
                    # gamma_factor를 conv_layer의 weight 차원과 맞게 확장
                    gamma_factor = gamma_factor.expand_as(conv_layer.weight)
                
                else:
                    gamma_factor = nn.functional.interpolate(
                        gamma_factor.view(1, 1, -1, 1),
                        size=(conv_layer.weight.size(0), conv_layer.weight.size(1)),
                        mode='nearest',
                    )
                    gamma_factor = gamma_factor.permute(2, 3, 0, 1).expand_as(conv_layer.weight)
                 
                w_folded = conv_layer.weight * gamma_factor

                # 편향이 없다면 새로 생성
                if conv_layer.bias is None:
                    conv_layer.bias = nn.Parameter(torch.zeros(conv_layer.weight.size(0), device=conv_layer.weight.device))
                    
                # 기존의 편향이 있다면 weight 첫 번째 차원의 크기에 맞게 변경
                else:
                    if conv_layer.weight.size(0) % conv_layer.bias.size(0) == 0:
                        conv_layer.bias = nn.Parameter(
                            conv_layer.bias.repeat_interleave(conv_layer.weight.size(0) // conv_layer.bias.size(0), dim=0)
                        )
                    else:
                        conv_layer.bias = nn.Parameter(
                            nn.functional.interpolate(conv_layer.bias, size=conv_layer.weight.size(0), mode='nearest')
                        )

                bias_factor = gamma * mean / torch.sqrt(var + epsilon)
                
                if conv_layer.bias.size(0) % bias_factor.size(0) == 0:
                    bias_factor = bias_factor.repeat_interleave(conv_layer.bias.size(0) // bias_factor.size(0), dim=0)

                    b_folded = conv_layer.bias - bias_factor
                    
                else:
                    bias_factor = bias_factor.repeat_interleave(conv_layer.bias.size(0) // bias_factor.size(0) + 1, dim=0)
                    
                    new_bias = torch.zeros(conv_layer.bias.size(0))
                    window_size = bias_factor.size(0) - conv_layer.bias.size(0) + 1

                    for i in range(conv_layer.bias.size(0)):
                        new_bias[i] = bias_factor[i:i + window_size].mean()
                    
                    b_folded = conv_layer.bias - new_bias
                
                conv_layer.weight = nn.Parameter(w_folded)
                conv_layer.bias = nn.Parameter(b_folded)

                # Remove BatchNorm layer
                parent_module = model.get_submodule(parent_name)
                for sub_name, sub_module in parent_module.named_children():
                    if isinstance(sub_module, nn.BatchNorm2d):
                        setattr(parent_module, sub_name, nn.Identity())

    def forward(self, x):
        # Fake Quantization
        x = self.quant(x)
        x = self.model.features(x)
        x = nn.functional.adaptive_avg_pool2d(x, 1)
        x = torch.flatten(x, 1)
        x = self.activation_fake_quant(x)
        x = self.model.classifier(x)
        x = self.weight_fake_quant(x)
        x = self.dequant(x)

        # Update EMA activation range
        current_max = x.max().item()
        self.activation_ema.update(current_max)
        
        return x

In [9]:
# 모델 학습 함수
def train_model(model, train_loader, criterion, optimizer, num_epochs=5):
    model.train()
    for epoch in range(num_epochs):
        running_loss = 0.0
        tepoch = tqdm(train_loader, desc=f"Epoch {epoch + 1}/{num_epochs}", unit="batch")
        
        for inputs, labels in tepoch:
            inputs, labels = inputs.cuda(), labels.cuda()
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            tepoch.set_postfix(loss=running_loss / (tepoch.n or 1))
        print(f"Epoch {epoch + 1}, Loss: {running_loss / len(train_loader)}")

# 모델 평가 함수
def evaluate_model(model, test_loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        ttest = tqdm(test_loader, desc="Evaluating", unit="batch")

        for inputs, labels in ttest:
            inputs, labels = inputs.cuda(), labels.cuda()
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    accuracy = correct / total
    print(f"Accuracy: {accuracy * 100:.2f}%")
    return accuracy

#### 4. 성능 평가하기

In [10]:
print("Training and evaluating the quantized model...")
quantized_model = QuantizedMobileNet(num_classes=100, pretrained=True)
quantized_model = quantized_model.cuda()

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(quantized_model.parameters(), lr=0.01, momentum=0.9)
train_model(quantized_model, train_loader, criterion, optimizer, num_epochs=5)
quantized_accuracy = evaluate_model(quantized_model, test_loader)

Training and evaluating the quantized model...




Epoch 1/5:   0%|          | 0/4 [00:21<?, ?batch/s]

Epoch 1, Loss: 4.849008679389954


Epoch 2/5:   0%|          | 0/4 [00:22<?, ?batch/s]

Epoch 2, Loss: 4.632986307144165


Epoch 3/5:   0%|          | 0/4 [00:22<?, ?batch/s]

Epoch 3, Loss: 4.584195137023926


Epoch 4/5:   0%|          | 0/4 [00:22<?, ?batch/s]

Epoch 4, Loss: 4.454532146453857


Epoch 5/5:   0%|          | 0/4 [00:22<?, ?batch/s]

Epoch 5, Loss: 4.636066198348999


Evaluating:   0%|          | 0/4 [00:19<?, ?batch/s]

Accuracy: 0.00%


In [11]:
print("Training and evaluating the original model...")
original_model = mobilenet_v2(pretrained=True)
original_model.classifier[1] = nn.Linear(original_model.last_channel, 100)  # CIFAR100에 맞게 수정
original_model = original_model.cuda()

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(original_model.parameters(), lr=0.01, momentum=0.9)
train_model(original_model, train_loader, criterion, optimizer, num_epochs=5)
original_accuracy = evaluate_model(original_model, test_loader)

Training and evaluating the original model...


Epoch 1/5:   0%|          | 0/4 [00:21<?, ?batch/s]

Epoch 1, Loss: 4.764323711395264


Epoch 2/5:   0%|          | 0/4 [00:22<?, ?batch/s]

Epoch 2, Loss: 4.356222748756409


Epoch 3/5:   0%|          | 0/4 [00:22<?, ?batch/s]

Epoch 3, Loss: 3.773183524608612


Epoch 4/5:   0%|          | 0/4 [00:22<?, ?batch/s]

Epoch 4, Loss: 3.105807900428772


Epoch 5/5:   0%|          | 0/4 [00:20<?, ?batch/s]

Epoch 5, Loss: 2.3831313252449036


Evaluating:   0%|          | 0/4 [00:37<?, ?batch/s]

Accuracy: 2.00%


In [12]:
# 결과 출력
print(f"Original Model Accuracy: {original_accuracy * 100:.2f}%")
print(f"Quantized Model Accuracy: {quantized_accuracy * 100:.2f}%")

Original Model Accuracy: 2.00%
Quantized Model Accuracy: 0.00%
