# CvT

In [None]:
#fashionMNST 다운로드
from itertools import chain
from collections import defaultdict
from torch.utils.data import Subset
from torchvision import datasets


def subset_sampler(dataset, classes, max_len):
    target_idx = defaultdict(list)
    for idx, label in enumerate(dataset.train_labels):
        target_idx[int(label)].append(idx)

    indices = list(
        chain.from_iterable(
            [target_idx[idx][:max_len] for idx in range(len(classes))]
        )
    )
    return Subset(dataset, indices)


train_dataset = datasets.FashionMNIST(root="/content/drive/MyDrive/Colab Notebooks", download=True, train=True)
test_dataset = datasets.FashionMNIST(root="/content/drive/MyDrive/Colab Notebooks", download=True, train=False)

classes = train_dataset.classes #데이터세트에 포함된 클래스
class_to_idx = train_dataset.class_to_idx #클래스 ID와 클래스가 매핑된 값 확인

print(classes)
print(class_to_idx)
subset_train_dataset = subset_sampler(
    dataset=train_dataset, classes=train_dataset.classes, max_len=1000 #서브 샘플링 수행 -> max_len은 최대 샘플링 개수를 의미
)
subset_test_dataset = subset_sampler(
    dataset=test_dataset, classes=test_dataset.classes, max_len=100
)

['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat', 'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']
{'T-shirt/top': 0, 'Trouser': 1, 'Pullover': 2, 'Dress': 3, 'Coat': 4, 'Sandal': 5, 'Shirt': 6, 'Sneaker': 7, 'Bag': 8, 'Ankle boot': 9}




In [None]:
#cvt 모델 이미지 데이터 전처리
import torch
from torchvision import transforms
from transformers import AutoImageProcessor


image_processor = AutoImageProcessor.from_pretrained(
    pretrained_model_name_or_path="microsoft/cvt-21"
)

transform = transforms.Compose(
    [
        transforms.ToTensor(),
        transforms.Resize(
            size=(
                image_processor.size["shortest_edge"], #이미지의 너비나 높이 중 더 작은 값을 의미
                image_processor.size["shortest_edge"]
            )
        ),
        transforms.Lambda(lambda x: torch.cat([x, x, x], 0)),
        transforms.Normalize(
            mean=image_processor.image_mean,
            std=image_processor.image_std
        )
    ]
)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


preprocessor_config.json:   0%|          | 0.00/266 [00:00<?, ?B/s]

In [None]:
from torch.utils.data import DataLoader


def collator(data, transform):
    images, labels = zip(*data)
    pixel_values = torch.stack([transform(image) for image in images])
    labels = torch.tensor([label for label in labels])
    return {"pixel_values": pixel_values, "labels": labels}


train_dataloader = DataLoader(
    subset_train_dataset,
    batch_size=32,
    shuffle=True,
    collate_fn=lambda x: collator(x, transform),
    drop_last=True
)
valid_dataloader = DataLoader(
    subset_test_dataset,
    batch_size=4,
    shuffle=True,
    collate_fn=lambda x: collator(x, transform),
    drop_last=True
)

In [None]:
#사전 학습된 cvt 모델
from transformers import CvtForImageClassification

model = CvtForImageClassification.from_pretrained(
    pretrained_model_name_or_path="microsoft/cvt-21",
    num_labels=len(train_dataset.classes), #분류기 이전에 풀링 계층이 존재하지 않는다는 것
    id2label={idx: label for label, idx in train_dataset.class_to_idx.items()},
    label2id=train_dataset.class_to_idx,
    ignore_mismatched_sizes=True
)

for main_name, main_module in model.named_children(): #세 개의 스테이지로 구성
    print(main_name)
    for sub_name, sub_module in main_module.named_children():
        print("└", sub_name)
        for ssub_name, ssub_module in sub_module.named_children():
            print("   └", ssub_name)
            for sssub_name, sssub_module in ssub_module.named_children():
                print("     └", sssub_name)

config.json:   0%|          | 0.00/70.3k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/127M [00:00<?, ?B/s]

Some weights of CvtForImageClassification were not initialized from the model checkpoint at microsoft/cvt-21 and are newly initialized because the shapes did not match:
- classifier.weight: found shape torch.Size([1000, 384]) in the checkpoint and torch.Size([10, 384]) in the model instantiated
- classifier.bias: found shape torch.Size([1000]) in the checkpoint and torch.Size([10]) in the model instantiated
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


cvt
└ encoder
   └ stages
     └ 0
     └ 1
     └ 2
layernorm
classifier


In [None]:
#cvt 모델의 스테이지 구조
stages = model.cvt.encoder.stages
print(stages[0])

CvtStage(
  (embedding): CvtEmbeddings(
    (convolution_embeddings): CvtConvEmbeddings(
      (projection): Conv2d(3, 64, kernel_size=(7, 7), stride=(4, 4), padding=(2, 2))
      (normalization): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
    )
    (dropout): Dropout(p=0.0, inplace=False)
  )
  (layers): Sequential(
    (0): CvtLayer(
      (attention): CvtAttention(
        (attention): CvtSelfAttention(
          (convolution_projection_query): CvtSelfAttentionProjection(
            (convolution_projection): CvtSelfAttentionConvProjection(
              (convolution): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=64, bias=False)
              (normalization): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
            )
            (linear_projection): CvtSelfAttentionLinearProjection()
          )
          (convolution_projection_key): CvtSelfAttentionProjection(
            (convolution_projection): CvtSelfAtte

In [None]:
#셀프 어텐션 적용
batch = next(iter(train_dataloader))
print("이미지 차원 :", batch["pixel_values"].shape)

patch_emb_output = stages[0].embedding(batch["pixel_values"])
print("패치 임베딩 차원 :", patch_emb_output.shape)

batch_size, num_channels, height, width = patch_emb_output.shape
hidden_state = patch_emb_output.view(batch_size, num_channels, height * width).permute(0, 2, 1)
print("셀프 어텐션 입력 차원 :", hidden_state.shape)

attention_output = stages[0].layers[0].attention.attention(hidden_state, height, width) #입력 차원과 출력 차원의 형태가 동일함.
print("셀프 어텐션 출력 차원 :", attention_output.shape)

이미지 차원 : torch.Size([32, 3, 224, 224])
패치 임베딩 차원 : torch.Size([32, 64, 56, 56])
셀프 어텐션 입력 차원 : torch.Size([32, 3136, 64])
셀프 어텐션 출력 차원 : torch.Size([32, 3136, 64])


In [None]:
pip install evaluate

Collecting evaluate
  Downloading evaluate-0.4.3-py3-none-any.whl.metadata (9.2 kB)
Collecting datasets>=2.0.0 (from evaluate)
  Downloading datasets-3.1.0-py3-none-any.whl.metadata (20 kB)
Collecting dill (from evaluate)
  Downloading dill-0.3.9-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from evaluate)
  Downloading xxhash-3.5.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess (from evaluate)
  Downloading multiprocess-0.70.17-py310-none-any.whl.metadata (7.2 kB)
Collecting dill (from evaluate)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting multiprocess (from evaluate)
  Downloading multiprocess-0.70.16-py310-none-any.whl.metadata (7.2 kB)
Collecting fsspec>=2021.05.0 (from fsspec[http]>=2021.05.0->evaluate)
  Downloading fsspec-2024.9.0-py3-none-any.whl.metadata (11 kB)
Downloading evaluate-0.4.3-py3-none-any.whl (84 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m84.0/84.0 kB[0m [

In [None]:
#cvt 모델 학습
import torch
import evaluate
import numpy as np
from itertools import chain
from collections import defaultdict
from torch.utils.data import Subset
from torchvision import datasets
from torchvision import transforms
from transformers import AutoImageProcessor
from transformers import CvtForImageClassification
from transformers import TrainingArguments, Trainer


def subset_sampler(dataset, classes, max_len):
    target_idx = defaultdict(list)
    for idx, label in enumerate(dataset.train_labels):
        target_idx[int(label)].append(idx)

    indices = list(
        chain.from_iterable(
            [target_idx[idx][:max_len] for idx in range(len(classes))]
        )
    )
    return Subset(dataset, indices)


def model_init(classes, class_to_idx):
    model = CvtForImageClassification.from_pretrained(
        pretrained_model_name_or_path="microsoft/cvt-21",
        num_labels=len(classes),
        id2label={idx: label for label, idx in class_to_idx.items()},
        label2id=class_to_idx,
        ignore_mismatched_sizes=True
    )
    return model


def collator(data, transform):
    images, labels = zip(*data)
    pixel_values = torch.stack([transform(image) for image in images])
    labels = torch.tensor([label for label in labels])
    return {"pixel_values": pixel_values, "labels": labels}


def compute_metrics(eval_pred):
    metric = evaluate.load("f1")
    predictions, labels = eval_pred
    predictions = np.argmax(predictions, axis=1)
    macro_f1 = metric.compute(
        predictions=predictions, references=labels, average="macro"
    )
    return macro_f1


train_dataset = datasets.FashionMNIST(root="/content/drive/MyDrive/Co
test_dataset = datasets.FashionMNIST(root="/content/drive/MyDrive/Colab Nolab Notebooks", download=True, train=True)tebooks", download=True, train=False)

classes = train_dataset.classes
class_to_idx = train_dataset.class_to_idx

subset_train_dataset = subset_sampler(
    dataset=train_dataset, classes=train_dataset.classes, max_len=1000
)
subset_test_dataset = subset_sampler(
    dataset=test_dataset, classes=test_dataset.classes, max_len=100
)

image_processor = AutoImageProcessor.from_pretrained(
    pretrained_model_name_or_path="microsoft/cvt-21"
)

transform = transforms.Compose(
    [
        transforms.ToTensor(),
        transforms.Resize(
            size=(
                image_processor.size["shortest_edge"],
                image_processor.size["shortest_edge"]
            )
        ),
        transforms.Lambda(
            lambda x: torch.cat([x, x, x], 0)
        ),
        transforms.Normalize(
            mean=image_processor.image_mean,
            std=image_processor.image_std
        )
    ]
)

args = TrainingArguments(
    output_dir="/content/drive/MyDrive/Colab Notebooks/models/CvT-FashionMNIST",
    save_strategy="epoch",
    evaluation_strategy="epoch",
    learning_rate=1e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=3,
    weight_decay=0.001,
    load_best_model_at_end=True,
    metric_for_best_model="f1",
    logging_dir="logs",
    logging_steps=125,
    remove_unused_columns=False,
    seed=7
)

trainer = Trainer(
    model_init=lambda x: model_init(classes, class_to_idx),
    args=args,
    train_dataset=subset_train_dataset,
    eval_dataset=subset_test_dataset,
    data_collator=lambda x: collator(x, transform),
    compute_metrics=compute_metrics,
    tokenizer=image_processor,
)
trainer.train()

  trainer = Trainer(
Some weights of CvtForImageClassification were not initialized from the model checkpoint at microsoft/cvt-21 and are newly initialized because the shapes did not match:
- classifier.weight: found shape torch.Size([1000, 384]) in the checkpoint and torch.Size([10, 384]) in the model instantiated
- classifier.bias: found shape torch.Size([1000]) in the checkpoint and torch.Size([10]) in the model instantiated
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Some weights of CvtForImageClassification were not initialized from the model checkpoint at microsoft/cvt-21 and are newly initialized because the shapes did not match:
- classifier.weight: found shape torch.Size([1000, 384]) in the checkpoint and torch.Size([10, 384]) in the model instantiated
- classifier.bias: found shape torch.Size([1000]) in the checkpoint and torch.Size([10]) in the model instantiated
You should probably TRAIN this model on a down

<IPython.core.display.Javascript object>

[34m[1mwandb[0m: Logging into wandb.ai. (Learn how to deploy a W&B server locally: https://wandb.me/wandb-server)
[34m[1mwandb[0m: You can find your API key in your browser here: https://wandb.ai/authorize
wandb: Paste an API key from your profile and hit enter, or press ctrl+c to quit:[34m[1mwandb[0m: [32m[41mERROR[0m API key must be 40 characters long, yours was 4


<IPython.core.display.Javascript object>

[34m[1mwandb[0m: Logging into wandb.ai. (Learn how to deploy a W&B server locally: https://wandb.me/wandb-server)
[34m[1mwandb[0m: You can find your API key in your browser here: https://wandb.ai/authorize
wandb: Paste an API key from your profile and hit enter, or press ctrl+c to quit:[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: /root/.netrc


Epoch,Training Loss,Validation Loss


Epoch,Training Loss,Validation Loss


KeyboardInterrupt: 

In [None]:
#cvt 모델 성능 평가
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay


outputs = trainer.predict(subset_test_dataset)
print(outputs)

y_true = outputs.label_ids
y_pred = outputs.predictions.argmax(1)

labels = list(classes)
matrix = confusion_matrix(y_true, y_pred)
display = ConfusionMatrixDisplay(confusion_matrix=matrix, display_labels=labels)
_, ax = plt.subplots(figsize=(10, 10))
display.plot(xticks_rotation=45, ax=ax)
plt.show()

# 모델 배포

가지치기 실습

In [None]:
#BERT 모델 가지치기
import torch
from torch.nn.utils import prune
from transformers import BertTokenizer, BertForSequenceClassification


tokenizer = BertTokenizer.from_pretrained(
    pretrained_model_name_or_path="bert-base-multilingual-cased",
    do_lower_case=False,
)
model = BertForSequenceClassification.from_pretrained(
    pretrained_model_name_or_path="bert-base-multilingual-cased",
    num_labels=2
)
model.load_state_dict(torch.load("/content/drive/MyDrive/Colab Notebooks/models/BertForSequenceClassification.pt"))

print("가지치기 적용 전:")
print(model.bert.encoder.layer[0].attention.self.key.weight)

parameters = [
    (model.bert.embeddings.word_embeddings, "weight"),
    (model.bert.encoder.layer[0].attention.self.key, "weight"),
    (model.bert.encoder.layer[1].attention.self.key, "weight"),
    (model.bert.encoder.layer[2].attention.self.key, "weight"),
]
prune.global_unstructured( #전역 비구조화 함수
    parameters=parameters,
    pruning_method=prune.L1Unstructured, #l1 가지치기 방법 사용
    amount=0.2 #제거량 20%로 설정해 가중치의 20%를 제거함.
)

print("가지치기 적용 후:")
print(model.bert.encoder.layer[0].attention.self.key.weight)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/49.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/996k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.96M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/625 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/714M [00:00<?, ?B/s]

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-multilingual-cased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
  model.load_state_dict(torch.load("/content/drive/MyDrive/Colab Notebooks/models/BertForSequenceClassification.pt"))


FileNotFoundError: [Errno 2] No such file or directory: '/content/drive/MyDrive/Colab Notebooks/models/BertForSequenceClassification.pt'

양자화

In [None]:
#VGG-16 학습 후 정적 양자화
import torch
from torch import nn
from torch.ao import quantization
from torchvision import models
from torchvision import transforms
from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder

# 양자화된 VGG-16 모델 클래스를 정의
class QuantizedVGG16(nn.Module):
    def __init__(self, model_fp32):
        super(QuantizedVGG16, self).__init__()
        self.quant = quantization.QuantStub()
        self.dequant = quantization.DeQuantStub()
        self.model_fp32 = model_fp32

    def forward(self, x):
        x = self.quant(x)
        x = self.model_fp32(x)
        x = self.dequant(x)
        return x

#양자화 매개변수를 미리 계산하여 고정된 값으로 사용
hyperparams = {
    "batch_size": 4,
    "learning_rate": 0.0001,
    "epochs": 5,
    "transform": transforms.Compose(
        [
            transforms.Resize(256),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            transforms.Normalize(
                mean=[0.48235, 0.45882, 0.40784],
                std=[1.0 / 255.0, 1.0 / 255.0, 1.0 / 255.0],
            ),
        ]
    ),
}
device = "cuda" if torch.cuda.is_available() else "cpu"
quantized_model = QuantizedVGG16(model).to(device)
model = models.vgg16(num_classes=2)
model.load_state_dict(torch.load("/content/drive/MyDrive/Colab Notebooks/models/VGG16.pt",map_location=device))

device = "cuda" if torch.cuda.is_available() else "cpu"
quantized_model = QuantizedVGG16(model).to(device)

quantization_backend = "fbgemm"
quantized_model.qconfig = quantization.get_default_qconfig(quantization_backend)

model_static_quantized = quantization.prepare(quantized_model)

calibartion_dataset = ImageFolder( #양자화 과정을 교정하는 과정
    "/content/drive/MyDrive/pet/test",
    transform=hyperparams["transform"]
)
calibartion_dataloader = DataLoader(
    calibartion_dataset,
    batch_size=hyperparams["batch_size"]
)

for i, (images, target) in enumerate(calibartion_dataloader):
    if i >= 10:
        break
    model_static_quantized(images.to(device))

model_static_quantized.to("cpu") #CPU로 변경했다면 양자화 변환 함수로 모델 내의 양자화 관련 연산자들을 실제 양자화 연산자로 대체한다.
model_static_quantized = quantization.convert(model_static_quantized)

torch.jit.save(torch.jit.script(model_static_quantized), "/content/drive/MyDrive/Colab Notebooks/models/PTSQ_VGG16.pt")

  model.load_state_dict(torch.load("/content/drive/MyDrive/Colab Notebooks/models/VGG16.pt",map_location=device))


In [None]:
#양자화 결과 비교
import os
import time
import torch
from PIL import Image
from torchvision import models
from torchvision import transforms


transform = transforms.Compose(
    [
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize(
            mean=[0.48235, 0.45882, 0.40784],
            std=[1.0 / 255.0, 1.0 / 255.0, 1.0 / 255.0],
        ),
    ]
)

image = Image.open("/content/drive/MyDrive/cat.jpg")
inputs = transform(image).unsqueeze(0)

device = "cuda" if torch.cuda.is_available() else "cpu"
model = model.to(device)

model = models.vgg16(num_classes=2)
model.load_state_dict(torch.load("/content/drive/MyDrive/Colab Notebooks/models/VGG16.pt",map_location=device))

device = "cuda" if torch.cuda.is_available() else "cpu"
model = model.to(device)
model.eval()

model_static_quantized = torch.jit.load("/content/drive/MyDrive/Colab Notebooks/models/PTSQ_VGG16.pt")

with torch.no_grad():
    start_time = time.time()
    outputs = model(inputs.to(device))
    file_size = os.path.getsize("/content/drive/MyDrive/Colab Notebooks/models/VGG16.pt") / 1e6
    print("양자화 적용 전:")
    print(f"출력 결과: {outputs}")
    print(f"추론 시간: {time.time() - start_time:.4f}s")
    print(f"파일 크기: {file_size:.2f} MB")
    print("\n")

start_time = time.time()
outputs = model_static_quantized(inputs)
file_size = os.path.getsize("/content/drive/MyDrive/Colab Notebooks/models/PTSQ_VGG16.pt") / 1e6
end_time = time.time() - start_time
print("양자화 적용 후:")
print(f"출력 결과: {outputs}")
print(f"추론 시간: {time.time() - start_time:.4f}s")
print(f"파일 크기: {file_size:.2f} MB")

  model.load_state_dict(torch.load("/content/drive/MyDrive/Colab Notebooks/models/VGG16.pt",map_location=device))


양자화 적용 전:
출력 결과: tensor([[17.1443, -4.8791]])
추론 시간: 0.5627s
파일 크기: 537.08 MB


양자화 적용 후:
출력 결과: tensor([[17.7452, -5.2756]])
추론 시간: 0.3633s
파일 크기: 134.55 MB


In [None]:
#학습 후 동적 양자화
import os
import torch
from torch import nn
from torch.ao import quantization
from torchvision import models

device = "cuda" if torch.cuda.is_available() else "cpu"
model = model.to(device)

model = models.vgg16(num_classes=2)
model.load_state_dict(torch.load("/content/drive/MyDrive/Colab Notebooks/models/VGG16.pt",map_location=device))
device = "cuda" if torch.cuda.is_available() else "cpu"
model = model.to(device)
model.eval()

model_dynamic_quantized = quantization.quantize_dynamic(
    model=model,
    qconfig_spec={nn.Linear}, #양자화하려는 계층
    dtype=torch.qint8 #양자화 데이터 형식
)
model_dynamic_quantized.eval()
torch.save(model_dynamic_quantized.state_dict(), "/content/drive/MyDrive/Colab Notebooks/models/PTDQ_VGG16.pt")

file_size = os.path.getsize("/content/drive/MyDrive/Colab Notebooks/models/VGG16.pt") / 1e6
print("양자화 적용 전:")
print(f"파일 크기: {file_size:.2f} MB")
print(model.classifier)
print("\n")

file_size = os.path.getsize("/content/drive/MyDrive/Colab Notebooks/models/PTDQ_VGG16.pt") / 1e6
print("양자화 적용 후:")
print(f"파일 크기: {file_size:.2f} MB")
print(model_dynamic_quantized.classifier)

  model.load_state_dict(torch.load("/content/drive/MyDrive/Colab Notebooks/models/VGG16.pt",map_location=device))


양자화 적용 전:
파일 크기: 537.08 MB
Sequential(
  (0): Linear(in_features=25088, out_features=4096, bias=True)
  (1): ReLU(inplace=True)
  (2): Dropout(p=0.5, inplace=False)
  (3): Linear(in_features=4096, out_features=4096, bias=True)
  (4): ReLU(inplace=True)
  (5): Dropout(p=0.5, inplace=False)
  (6): Linear(in_features=4096, out_features=2, bias=True)
)


양자화 적용 후:
파일 크기: 178.45 MB
Sequential(
  (0): DynamicQuantizedLinear(in_features=25088, out_features=4096, dtype=torch.qint8, qscheme=torch.per_tensor_affine)
  (1): ReLU(inplace=True)
  (2): Dropout(p=0.5, inplace=False)
  (3): DynamicQuantizedLinear(in_features=4096, out_features=4096, dtype=torch.qint8, qscheme=torch.per_tensor_affine)
  (4): ReLU(inplace=True)
  (5): Dropout(p=0.5, inplace=False)
  (6): DynamicQuantizedLinear(in_features=4096, out_features=2, dtype=torch.qint8, qscheme=torch.per_tensor_affine)
)


In [None]:
#VGG-16 양자화 인식 학습 - VGG모델에 거의 동일함.
import torch
from torch import nn
from torch import optim
from torch.ao import quantization
from torch.nn import functional as F
from torch.utils.data import DataLoader
from torchvision import models
from torchvision import transforms
from torchvision.datasets import ImageFolder


class QuantizedVGG16(nn.Module):
    def __init__(self, model_fp32):
        super(QuantizedVGG16, self).__init__()
        self.quant = quantization.QuantStub()
        self.dequant = quantization.DeQuantStub()
        self.model_fp32 = model_fp32

    def forward(self, x):
        x = self.quant(x)
        x = self.model_fp32(x)
        x = self.dequant(x)
        return x


hyperparams = {
    "batch_size": 4,
    "learning_rate": 0.0001,
    "epochs": 5,
    "transform": transforms.Compose(
        [
            transforms.Resize(256),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            transforms.Normalize(
                mean=[0.48235, 0.45882, 0.40784],
                std=[1.0 / 255.0, 1.0 / 255.0, 1.0 / 255.0],
            ),
        ]
    ),
}

train_dataset = ImageFolder("/content/drive/MyDrive/pet/test", transform=hyperparams["transform"])
test_dataset = ImageFolder("/content/drive/MyDrive/pet/test", transform=hyperparams["transform"])

train_dataloader = DataLoader(train_dataset, batch_size=hyperparams["batch_size"], shuffle=True, drop_last=True)
test_dataloader = DataLoader(test_dataset, batch_size=hyperparams["batch_size"], shuffle=True, drop_last=True)

model = models.vgg16(weights="VGG16_Weights.IMAGENET1K_V1")
model.classifier[6] = nn.Linear(4096, len(train_dataset.classes))

quantization_backend = "fbgemm"
device = "cuda" if torch.cuda.is_available() else "cpu"
quantized_model = QuantizedVGG16(model).to(device)
quantized_model.qconfig = quantization.get_default_qat_qconfig(quantization_backend)
quantization.prepare_qat(quantized_model)

criterion = nn.CrossEntropyLoss().to(device)
optimizer = optim.SGD(quantized_model.parameters(), lr=hyperparams["learning_rate"])

for epoch in range(hyperparams["epochs"]):
    cost = 0.0

    for images, classes in train_dataloader:
        images = images.to(device)
        classes = classes.to(device)

        output = quantized_model(images)
        loss = criterion(output, classes)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        cost += loss

    cost = cost / len(train_dataloader)
    print(f"Epoch : {epoch+1:4d}, Cost : {cost:.3f}")

with torch.no_grad():
    quantized_model.eval()

    accuracy = 0.0
    for images, classes in test_dataloader:
        images = images.to(device)
        classes = classes.to(device)

        outputs = quantized_model(images)
        probs = F.softmax(outputs, dim=-1)
        outputs_classes = torch.argmax(probs, dim=-1)

        accuracy += int(torch.eq(classes, outputs_classes).sum())

    print(f"acc@1 : {accuracy / (len(test_dataloader) * hyperparams['batch_size']) * 100:.2f}%")

quantized_model = quantized_model.to("cpu")
quantization.convert(quantized_model)
torch.jit.save(torch.jit.script(quantized_model), "QAT_VGG16.pt")

Downloading: "https://download.pytorch.org/models/vgg16-397923af.pth" to /root/.cache/torch/hub/checkpoints/vgg16-397923af.pth
100%|██████████| 528M/528M [00:11<00:00, 47.2MB/s]


Epoch :    1, Cost : 8.434
Epoch :    2, Cost : 0.997
Epoch :    3, Cost : 0.095
Epoch :    4, Cost : 0.101
Epoch :    5, Cost : 0.079
acc@1 : 100.00%


지식 증류

In [None]:
#응답 기반 지식 증류를 활용한 모델 학습
import torch
from torch import nn
from torch import optim
from torch.nn import functional as F


class TeacherModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(TeacherModel, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x


class StudentModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(StudentModel, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x

#지식 증류를 위한 손실 함수 정의
def distillation_loss(y, labels, teacher_output, T, alpha): #지식 증류하기 위한 손실 함수
    student_softmax = F.log_softmax(y / T, dim=1)
    teacher_softmax = F.softmax(teacher_output / T, dim=1)
    temperature_loss = T * T * 2.0 + alpha

    kld_loss = nn.KLDivLoss(reduction="batchmean")(student_softmax, teacher_softmax) #쿨백 - 라이블러 발산 손실을 계산
    kld_loss = kld_loss * temperature_loss
    ce_loss = F.cross_entropy(y, labels) * (1.0 - alpha)

    total_loss = kld_loss + ce_loss
    return total_loss


input_dim = 100
output_dim = 10
teacher = TeacherModel(input_dim, 256, output_dim)
student = StudentModel(input_dim, 128, output_dim)
optimizer = optim.Adam(student.parameters(), lr=0.001)

input_data = torch.randn(1, input_dim)
with torch.no_grad():
    teacher_output = teacher(input_data) #예측 결과만을 사용

optimizer.zero_grad()
student_output = student(input_data)
loss = distillation_loss(
    y=student_output,
    labels=torch.tensor([0]),
    teacher_output=teacher_output,
    T=0.1,
    alpha=0.5,
)
loss.backward()
optimizer.step()

print("Teacher Model Output:", teacher_output)
print("Student Model Output:", student_output)

Teacher Model Output: tensor([[ 0.2150, -0.3639, -0.1532, -0.1268, -0.0452,  0.2390, -0.0977,  0.2676,
          0.1341,  0.3950]])
Student Model Output: tensor([[-0.1857,  0.3461,  0.0005,  0.0573,  0.2371, -0.0007, -0.1602, -0.1989,
          0.0274, -0.0362]], grad_fn=<AddmmBackward0>)


텐서 분해

In [None]:
#특잇값 분해
import torch


M = torch.rand((4, 3))
U, s, V = torch.svd(M)
composed_M = torch.mm(torch.mm(U, torch.diag(s)), V.t())
print(M)
print(composed_M)

tensor([[0.7590, 0.0321, 0.5789],
        [0.8428, 0.5741, 0.5023],
        [0.9203, 0.4748, 0.5808],
        [0.9418, 0.7808, 0.8853]])
tensor([[0.7590, 0.0321, 0.5789],
        [0.8428, 0.5741, 0.5023],
        [0.9203, 0.4748, 0.5808],
        [0.9418, 0.7808, 0.8853]])


In [None]:
#특잇값 분해를 이용한 저계수 분해
import torch


M = torch.rand((4, 3)) # 임의의 4x3 크기 행렬을 생성
k = 2
Uk, sk, Vk = torch.svd_lowrank(M, q=k) #특잇값 저계수 분해 함수(torch.svd_owrank)는 특잇값 분해를 기반으로 한 저계수 분해를 수행하는 기능
# Uk: 왼쪽 특잇벡터 행렬, sk: 특잇값 벡터, Vk: 오른쪽 특잇벡터 행렬
# 저계수 분해를 통해 원래 행렬 M을 근사

approximated_M = torch.mm(torch.mm(Uk, torch.diag(sk)), Vk.t())
print(M)
print(approximated_M)

tensor([[0.6262, 0.1778, 0.7195],
        [0.9273, 0.7309, 0.9698],
        [0.8285, 0.9520, 0.2679],
        [0.5073, 0.2171, 0.5984]])
tensor([[0.5708, 0.2142, 0.7512],
        [0.9886, 0.6906, 0.9347],
        [0.8000, 0.9708, 0.2842],
        [0.4947, 0.2253, 0.6056]])


CP 분해

In [None]:
pip install tensorly

Collecting tensorly
  Downloading tensorly-0.9.0-py3-none-any.whl.metadata (8.6 kB)
Downloading tensorly-0.9.0-py3-none-any.whl (7.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.4/7.4 MB[0m [31m24.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: tensorly
Successfully installed tensorly-0.9.0


In [None]:
import torch
import tensorly as tl
from torch import nn
from torchvision import models
from tensorly import decomposition
# TensorLy의 백엔드를 PyTorch로 설정
tl.set_backend("pytorch")

# CP 분해 함수 정의
def cp_decomposition(layer, rank):
    weights, factors = decomposition.parafac( #parafac 함수는 교대 최소 제곱법을 이용해 cp 분해를 구현
        tensor=layer.weight.data, # 대상 레이어의 가중치 데이터를 입력으로 사용
        rank=rank, #텐서를 몇 개의 랭크로 분해할지 설정
        init="random", #매우 많은 연산량과 메모리 공간을 요구
        normalize_factors=False #정규화되지 않기 때문에 분해된 텐서의 가중치는 모두 1로 반환된다.
    )
    # 분해된 요인들
    last, first, vertical, horizontal = factors
   # last: 최종 채널 출력, first: 초기 채널 입력
   # vertical: 세로 방향 필터 정보, horizontal: 가로 방향 필터 정보

    pointwise_s_to_r_layer = nn.Conv2d(  #첫 번째 Pointwise 레이어
        first.shape[0],  # 입력 채널 수
        first.shape[1], #출력 채널 수
        kernel_size=1,
        stride=1,
        padding=0,
        dilation=layer.dilation,
        bias=False,
    )
    depthwise_vertical_layer = nn.Conv2d(
        vertical.shape[1], #입력 및 출력 채널 동일
        vertical.shape[1],
        kernel_size=(vertical.shape[0], 1),
        stride=1,
        padding=(layer.padding[0], 0),
        dilation=layer.dilation,
        groups=vertical.shape[1],
        bias=False,
    )
    depthwise_horizontal_layer = nn.Conv2d(
        horizontal.shape[1],
        horizontal.shape[1],
        kernel_size=(1, horizontal.shape[0]),
        stride=layer.stride,
        padding=(0, layer.padding[0]),
        dilation=layer.dilation,
        groups=horizontal.shape[1],
        bias=False,
    )

    pointwise_r_to_t_layer = nn.Conv2d( #첫 번째 Pointwise 레이어
        last.shape[1], # 중간 채널 입력
        last.shape[0],
        kernel_size=1,
        stride=1,
        padding=0,
        dilation=layer.dilation,
        bias=True,
    )
    pointwise_r_to_t_layer.bias.data = layer.bias.data

    depthwise_horizontal_layer.weight.data = (
        torch.transpose(horizontal, 1, 0).unsqueeze(1).unsqueeze(1) # unsqueeze: 가중치 텐서의 차원을 맞추기 위해 추가적인 차원 추가.
    )
    depthwise_vertical_layer.weight.data = (
        torch.transpose(vertical, 1, 0).unsqueeze(1).unsqueeze(-1)
    )
    pointwise_s_to_r_layer.weight.data = (
        torch.transpose(first, 1, 0).unsqueeze(-1).unsqueeze(-1)
    )
    pointwise_r_to_t_layer.weight.data = last.unsqueeze(-1).unsqueeze(-1)

    new_layers = [
        pointwise_s_to_r_layer,
        depthwise_vertical_layer,
        depthwise_horizontal_layer,
        pointwise_r_to_t_layer,
    ]
    return nn.Sequential(*new_layers)

device = "cuda" if torch.cuda.is_available() else "cpu"
model = model.to(device)


model = models.vgg16(num_classes=2)
model.load_state_dict(torch.load("/content/drive/MyDrive/Colab Notebooks/models/VGG16.pt",map_location=device))
model.eval()

layer = model.features[0]
layer_cp_decomposed = cp_decomposition(layer, rank=16)

print("CP 분해 전 가중치 수:", sum(param.numel() for param in layer.parameters()))
print("CP 분해 후 가중치 수:", sum(param.numel() for param in layer_cp_decomposed.parameters()))

  model.load_state_dict(torch.load("/content/drive/MyDrive/Colab Notebooks/models/VGG16.pt",map_location=device))


CP 분해 전 가중치 수: 1792
CP 분해 후 가중치 수: 1232


In [None]:
#VGG-16 모델 경량화

import copy


decomposed_model = copy.deepcopy(model)
for idx, module in enumerate(decomposed_model.features):
    if isinstance(module, nn.Conv2d):
        rank = max(module.weight.data.numpy().shape) // #합성곱 계층의 최대 차원을 3으로 나눈 몫을 계수로 선택하여 분해를 수행

        decomposed_model.features[idx] = cp_decomposition(module, rank)

print("CP 분해 전 가중치 수 :", sum(param.numel() for param in model.parameters()))
print("CP 분해 후 가중치 수 :", sum(param.numel() for param in decomposed_model.parameters()))

CP 분해 전 가중치 수 : 134268738
CP 분해 후 가중치 수 : 120710231
