<a href="https://colab.research.google.com/github/ckj18/BigDataSecurity/blob/main/TeamProject_MultiClass.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## 드라이브 설정

In [None]:
from google.colab import drive
drive.mount('/content/drive/') 

In [None]:
cd '/content/drive/Shareddrives/BigDataSecurity'

In [None]:
ls

## 모듈 불러오기

In [None]:
import numpy as np
import pandas as pd
import seaborn as sns
from tqdm import tqdm
import matplotlib.pyplot as plt
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import transforms, utils, datasets
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils, datasets
import torch.utils.data
import os
import time
# 전처리
import cv2
import numpy as np
from skimage.feature import hog

In [None]:
from PIL import ImageFile
from PIL import Image
ImageFile.LOAD_TRUNCATED_IMAGES = True # prevent truncate error

## 데이터셋 불러오기


In [None]:
image_transforms = {
    "train": transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),        
    ]),
    "test": transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
    ])
}

In [None]:
malware_list = {'Adposhel': 1,
 'Allaple': 2,
 'Amonetize': 3,
 'Autorun': 4,
 'Other': 0,
 'BrowseFox': 5,
 'Dinwod': 6,
 'InstallCore': 7,
 'MultiPlug': 8,
 'VBA': 9,
 'Vilsel': 10}

In [None]:
import os
from PIL import Image
import matplotlib.pyplot as plt
import torch
from torch.utils.data import Dataset, ConcatDataset

class CustomDataset(Dataset):
    def __init__(self, root, malware_list, transform=None):
        self.root = root
        self.malware_list = malware_list
        self.transform = transform
        self.data = []
        self.targets = []
        self.folder_to_label = {}  # 폴더명과 클래스 레이블 매핑을 위한 딕셔너리
        self.class_counts = {}  # 클래스별 이미지 개수를 저장하는 딕셔너리

        for idx, folder_name in enumerate(os.listdir(root)):
            if folder_name in malware_list:
                folder_path = os.path.join(root, folder_name)
                self.folder_to_label[folder_name] = self.label_transform(folder_name)  # 폴더명에 대한 레이블 변환 결과 저장

                count = 0  # 클래스별 이미지 개수 초기화
                for image_name in os.listdir(folder_path):
                    if image_name.endswith('.png'):  # 이미지 파일 확장자 지정
                        image_path = os.path.join(folder_path, image_name)
                        self.data.append(image_path)
                        self.targets.append(folder_name)
                        count += 1  # 클래스별 이미지 개수 증가

                self.class_counts[folder_name] = count

    def __getitem__(self, index):
        image_path = self.data[index]
        target = self.targets[index]
        image = Image.open(image_path)

        if self.transform is not None:
            image = self.transform(image)

        target = self.folder_to_label[target]  # 폴더명에 대한 레이블 변환 결과 가져오기
        return image, target, image_path

    def __len__(self):
        return len(self.data)

    def label_transform(self, label):
        # 예시: 폴더명을 기준으로 클래스 레이블 변환
        if label in malware_list:
            return malware_list[label]

def concat_datasets(datasets):
    return ConcatDataset(datasets)


In [None]:
train_path1 = './malware/train'
test_path1 = './malware/val'

In [None]:
train_data = CustomDataset(root = train_path1, malware_list=malware_list,
                                  transform = image_transforms['train'])
test_data = CustomDataset(root = test_path1, malware_list=malware_list,
                                  transform = image_transforms['test'])

In [None]:
# Dataset rebalance 
test_data, add_data = torch.utils.data.random_split(test_data, [1354, len(test_data) - 1354])

train_data = ConcatDataset([train_data, test_data])

In [None]:
print(len(train_data))
print(len(test_data))

In [None]:
def TestLoader(Dataset):
  val_size = int(0.5 * len(Dataset))
  test_size = int(0.5 * len(Dataset))

  valid_data, test_data = torch.utils.data.random_split(Dataset, [val_size, test_size])

  valid_loader = torch.utils.data.DataLoader(valid_data, batch_size=128, shuffle=False) # make test loader
  test_loader = torch.utils.data.DataLoader(test_data, batch_size=128, shuffle=False) # make test loader

  return valid_loader, test_loader

In [None]:
train_loader = torch.utils.data.DataLoader(train_data, batch_size=128, shuffle=True) # make train loader
valid_loader, test_loader = TestLoader(test_data) # make valid & test loader

In [None]:
def show_datasize():
  print(f'Number of training examples: {len(train_data)}')
  print(f'Number of validation examples: {int(len(test_data) / 2)}')
  print(f'Number of testing examples: {int(len(test_data) / 2)}')

show_datasize()

In [None]:
train_data.class_to_idx  = malware_list # class name

In [None]:
classes = train_data.class_to_idx
classes

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# functions to show an image


def imshow(img):
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()


# get some random training images
dataiter = iter(train_loader)
# print(dataiter.next())
images, labels = next(dataiter)

batch_size = 16

# show images
imshow(torchvision.utils.make_grid(images))
# print labels
print()

labels = labels.tolist()
print(' '.join(f'{list(classes.keys())[list(classes.values()).index(j)]}' for j in labels))

## EDA 및 전처리

In [None]:
# 전처리 수행을 위한 데이터 로드
batch_size = 32  # 배치 크기 설정

dataiter = iter(train_loader)
num_batches = len(train_loader)  # 배치의 개수

images = []
CLAHE_images = []
WT_images = []

for _ in range(num_batches):
    batch_images, labels, paths = next(dataiter)

    batch_images_processed = []
    batch_CLAHE_images = []
    batch_WT_images = []

    for path in paths:
        image = cv2.imread(path, cv2.IMREAD_GRAYSCALE)

        # CLAHE 
        clahe = cv2.createCLAHE(clipLimit=0.02, tileGridSize=(4,4))
        CLAHE_image = clahe.apply(image)

        # Wavelet transform
        wavelet = 'db5'  # Daubechies family
        level = 2  # Number of decomposition levels
        coeffs = pywt.wavedec2(CLAHE_image, wavelet, level=level)

        # 재구성
        WT_image = pywt.waverec2(coeffs, wavelet)

        batch_images_processed.append(image)
        batch_CLAHE_images.append(CLAHE_image)
        batch_WT_images.append(WT_image)

    images.append(batch_images_processed)
    CLAHE_images.append(batch_CLAHE_images)
    WT_images.append(batch_WT_images)

images = np.array(images)
CLAHE_images = np.array(CLAHE_images)
WT_images = np.array(WT_images)

### Contrast 조정 ( CLAHE ) 시각화

In [None]:
import cv2
import numpy as np
from matplotlib import pyplot as plt

img = cv2.imread(path,0);

# contrast limit가 0.02이고 title의 size는 4X4
clahe = cv2.createCLAHE(clipLimit=0.02, tileGridSize=(4,4))
img2 = clahe.apply(img)

# 시각화
fig, axes = plt.subplots(1, 2, figsize=(20, 10))
axes[0].imshow(img, cmap='gray')
axes[0].set_title('Before')
axes[0].axis('off')
axes[1].imshow(img2, cmap='gray')
axes[1].set_title('After')
axes[1].axis('off')

plt.tight_layout()
plt.show()

###  영상 압축( Wavelet Transform ) 시각화

In [None]:
import pywt
import numpy as np
import cv2
import matplotlib.pyplot as plt

# 이미지 로드
image = cv2.imread(path, cv2.IMREAD_GRAYSCALE)

# Wavelet transform
wavelet = 'db5'  # Daubechies family
level = 2  # Number of decomposition levels
coeffs = pywt.wavedec2(image, wavelet, level=level)

# 재구성
reconstructed_image = pywt.waverec2(coeffs, wavelet)

# 원본 이미지와 재구성 이미지 시각화
fig, axes = plt.subplots(1, 2, figsize=(20, 10))
axes[0].imshow(image, cmap='gray')
axes[0].set_title('Original Image')
axes[0].axis('off')
axes[1].imshow(reconstructed_image, cmap='gray')
axes[1].set_title('Reconstructed Image')
axes[1].axis('off')

plt.tight_layout()
plt.show()


### HOG

In [None]:
hog = cv2.HOGDescriptor()

hog_features = []

for batch_images in WT_images:
    batch_hog_features = []
    
    for image in batch_images:
        hog_feature = hog.compute(image)  # Compute HOG features
        batch_hog_features.append(hog_feature.flatten())
    
    batch_hog_features = np.array(batch_hog_features)
    hog_features.append(batch_hog_features)

hog_features = np.array(hog_features)

###GIST

In [None]:
pip install python-gist

In [None]:
pip install gists.py

In [None]:
import gist

### SIFT

In [None]:
from matplotlib import pyplot as plt

In [None]:
# SIFT 시각화
img = cv2.imread(path)
gray= cv2.cvtColor(img,cv2.COLOR_BGR2GRAY)

sift = cv2.SIFT_create()
kp = sift.detect(gray,None)

img=cv2.drawKeypoints(gray,kp, img)

plt.figure(figsize=(10, 5))

plt.subplot(1, 3, 1)
plt.imshow(img, cmap='gray')
plt.title('SIFT')

plt.tight_layout()
plt.show()

In [None]:
descriptors = []

for image in enhanced_images:
    # Detect keypoints and compute descriptors
    keypoints, descriptor = sift.detectAndCompute(image, None)
    
    # Store the descriptors
    descriptors.append(descriptor)

###T-SNE

In [None]:
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt

In [None]:
# 클래스 정보 가져오기
class_labels = [list(classes.keys())[list(classes.values()).index(i)] for i in labels]

# 클래스별 색상 매핑
class_colors = ['red', 'blue', 'green', 'orange', 'purple', 'yellow', 'cyan', 'magenta', 'lime', 'pink',
                'lightblue', 'brown', 'gray', 'olive', 'teal', 'navy', 'salmon', 'gold', 'lightgreen', 'lavender',
                'skyblue', 'tan', 'coral', 'orchid', 'darkgreen', 'silver']

In [None]:
# T-SNE 모델 생성 및 학습 (SIFT로 추출한 feature 사용)
# Concatenate the descriptors for each image
all_descriptors = np.concatenate([d for d in descriptors if d is not None], axis=0)

# Apply T-SNE to reduce the dimensionality of the feature matrix
tsne = TSNE(n_components=2, perplexity=10, learning_rate=200, random_state=42)
embedded_features = tsne.fit_transform(all_descriptors)

In [None]:
# T-SNE 결과를 시각화 ( SIFT 사용)
plt.figure(figsize=(10, 10))
for label, color in zip(set(class_labels), class_colors):
    indices = [i for i, x in enumerate(class_labels) if x == label]
    plt.scatter(embedded_features[indices, 0], embedded_features[indices, 1], label=label, color=color)
plt.title('T-SNE Visualization')
plt.xlabel('Dimension 1')
plt.ylabel('Dimension 2')
plt.legend()
plt.show()

In [None]:
# T-SNE 모델 생성 및 학습 ( HOG로 추출한 feature 사용)
tsne = TSNE(n_components=2, perplexity=20, learning_rate=200, random_state=42)
tsne_result = tsne.fit_transform(hog_features)

In [None]:
# T-SNE 결과를 시각화 (HOG 사용)
plt.figure(figsize=(10, 10))
for label, color in zip(set(class_labels), class_colors):
    indices = [i for i, x in enumerate(class_labels) if x == label]
    plt.scatter(tsne_result[indices, 0], tsne_result[indices, 1], label=label, color=color)
plt.title('T-SNE Visualization')
plt.xlabel('Dimension 1')
plt.ylabel('Dimension 2')
plt.legend()
plt.show()

### UMAP

In [None]:
!pip install umap-learn

In [None]:
import umap
import matplotlib.pyplot as plt

In [None]:
# 클래스 정보 가져오기
class_labels = [list(classes.keys())[list(classes.values()).index(i)] for i in labels]

# 클래스별 색상 매핑
class_colors = ['red', 'blue', 'green', 'orange', 'purple', 'yellow', 'cyan', 'magenta', 'lime', 'pink',
                'lightblue', 'brown', 'gray', 'olive', 'teal', 'navy', 'salmon', 'gold', 'lightgreen', 'lavender',
                'skyblue', 'tan', 'coral', 'orchid', 'darkgreen', 'silver']

In [None]:
# UMAP 모델 생성 및 학습 ( SIFT로 추출한 feature 사용)
umap_model = umap.UMAP(n_components=2, learning_rate=200, random_state=42)
umap_result = umap_model.fit_transform(embedded_features)

In [None]:
# UMAP 결과를 시각화 ( SIFT로 추출한 feature 사용)
plt.figure(figsize=(10, 10))
for label, color in zip(set(class_labels), class_colors):
    indices = [i for i, x in enumerate(class_labels) if x == label]
    plt.scatter(umap_result[indices, 0], umap_result[indices, 1], label=label, color=color)
plt.title('UMAP Visualization')
plt.xlabel('Dimension 1')
plt.ylabel('Dimension 2')
plt.colorbar()
plt.show()

In [None]:
# UMAP 모델 생성 및 학습 ( HOG로 추출한 feature 사용)
umap_model = umap.UMAP(n_components=2, learning_rate=200, random_state=42)
umap_result = umap_model.fit_transform(hog_features)

In [None]:
# UMAP 결과를 시각화 ( HOG로 추출한 feature 사용)
plt.figure(figsize=(10, 10))
for label, color in zip(set(class_labels), class_colors):
    indices = [i for i, x in enumerate(class_labels) if x == label]
    plt.scatter(umap_result[indices, 0], umap_result[indices, 1], label=label, color=color)
plt.title('UMAP Visualization')
plt.xlabel('Dimension 1')
plt.ylabel('Dimension 2')
plt.colorbar()
plt.show()

## 모델 구성

## Machine Learning

In [None]:
# 훈련 데이터 준비
train_images = []
train_labels = []

for images, labels, paths in train_loader:
    # 이미지 데이터를 1차원 벡터로 변환
    images = images.view(images.size(0), -1)
    train_images.append(images.numpy())
    train_labels.append(labels.numpy())

train_images = np.concatenate(train_images, axis=0)
train_labels = np.concatenate(train_labels, axis=0)

# 테스트 데이터 준비
test_images = []
test_labels = []

for images, labels, paths in test_loader:
    # 이미지 데이터를 1차원 벡터로 변환
    images = images.view(images.size(0), -1)
    test_images.append(images.numpy())
    test_labels.append(labels.numpy())

test_images = np.concatenate(test_images, axis=0)
test_labels = np.concatenate(test_labels, axis=0)

# Random Forest 모델 생성 및 학습
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(train_images, train_labels)

# 예측
predictions = model.predict(test_images)

# 정확도 평가
accuracy = accuracy_score(test_labels, predictions)
print("Accuracy:", accuracy)

## Deep Learning

### Model List

#### Model-DNN

#### Model-CNN basic

In [None]:
class Net(nn.Module):
    def __init__(self):
        super().__init__()
        
        ############### Conv2d, MaxPool2d, Linear 함수에 들어갈 파라미터를 채우세요 ##############
        self.conv1 = nn.Conv2d(3, 10, 3) # in_channel, out_channel, kernel size
        self.pool = nn.MaxPool2d(3, 2) # kernel_size, stride
        self.conv2 = nn.Conv2d(10, 20, 3)
        self.fc1 = nn.Linear(56180, 160) # in_features, out_features
        self.fc2 = nn.Linear(160, 120)
        self.fc3 = nn.Linear(120, 26)
        self.dropout1 = nn.Dropout(0.4)
        self.dropout2 = nn.Dropout(0.2)
        ###########################################################################################


    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = torch.flatten(x, 1) # flatten all dimensions except batch
        x = F.relu(self.fc1(x))
        x = self.dropout1(x)
        x = F.relu(self.fc2(x))
        x = self.dropout2(x)
        x = self.fc3(x)
        return x

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # Use GPU if it's available # colab 런타임 유형 변경에서 GPU 선택할 것
model = Net().to(device) # define the network

In [None]:
x = torch.randn(3, 3, 224, 224).to(device)
output = model(x)
print(output.size())

summary(model, (3, 224, 224))

#### Model-ResNet50

In [None]:
class CustomModel(nn.Module):
    def __init__(self, config, output_dim):
        super().__init__()
                
        block, n_blocks, channels = config
        self.in_channels = channels[0]
            
        assert len(n_blocks) == len(channels) == 4
        
        self.conv1 = nn.Conv2d(3, self.in_channels, kernel_size = 7, stride = 2, padding = 3, bias = False)
        self.bn1 = nn.BatchNorm2d(self.in_channels)
        self.relu = nn.ReLU(inplace = True)
        self.maxpool = nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1)
        
        self.layer1 = self.get_model_layer(block, n_blocks[0], channels[0])
        self.layer2 = self.get_model_layer(block, n_blocks[1], channels[1], stride = 2)
        self.layer3 = self.get_model_layer(block, n_blocks[2], channels[2], stride = 2)
        self.layer4 = self.get_model_layer(block, n_blocks[3], channels[3], stride = 2)
        
        self.avgpool = nn.AdaptiveAvgPool2d((1,1))
        self.fc = nn.Linear(self.in_channels, output_dim)
        
    def get_model_layer(self, block, n_blocks, channels, stride = 1):
    
        layers = []
        
        if self.in_channels != block.expansion * channels:
            downsample = True
        else:
            downsample = False
        
        layers.append(block(self.in_channels, channels, stride, downsample))
        
        for i in range(1, n_blocks):
            layers.append(block(block.expansion * channels, channels))

        self.in_channels = block.expansion * channels
            
        return nn.Sequential(*layers)
        
    def forward(self, x):
        
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        
        x = self.avgpool(x)
        h = x.view(x.shape[0], -1)
        x = self.fc(h)
        
        return x, h

In [None]:
class BasicBlock(nn.Module):
    
    expansion = 1
    
    def __init__(self, in_channels, out_channels, stride = 1, downsample = False):
        super().__init__()
                
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size = 3, 
                               stride = stride, padding = 1, bias = False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size = 3, 
                               stride = 1, padding = 1, bias = False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        self.relu = nn.ReLU(inplace = True)
        
        if downsample:
            conv = nn.Conv2d(in_channels, out_channels, kernel_size = 1, 
                             stride = stride, bias = False)
            bn = nn.BatchNorm2d(out_channels)
            downsample = nn.Sequential(conv, bn)
        else:
            downsample = None
        
        self.downsample = downsample
        
    def forward(self, x):
        
        i = x
        
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        
        x = self.conv2(x)
        x = self.bn2(x)
        
        if self.downsample is not None:
            i = self.downsample(i)
                        
        x += i
        x = self.relu(x)
        
        return x

In [None]:
class Bottleneck(nn.Module):
    
    expansion = 4
    
    def __init__(self, in_channels, out_channels, stride = 1, downsample = False):
        super().__init__()
    
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size = 1, 
                               stride = 1, bias = False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size = 3, 
                               stride = stride, padding = 1, bias = False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        self.conv3 = nn.Conv2d(out_channels, self.expansion * out_channels, kernel_size = 1,
                               stride = 1, bias = False)
        self.bn3 = nn.BatchNorm2d(self.expansion * out_channels)
        
        self.relu = nn.ReLU(inplace = True)
        
        if downsample:
            conv = nn.Conv2d(in_channels, self.expansion * out_channels, kernel_size = 1, 
                             stride = stride, bias = False)
            bn = nn.BatchNorm2d(self.expansion * out_channels)
            downsample = nn.Sequential(conv, bn)
        else:
            downsample = None
            
        self.downsample = downsample
        
    def forward(self, x):
        
        i = x
        
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        
        x = self.conv3(x)
        x = self.bn3(x)
                
        if self.downsample is not None:
            i = self.downsample(i)
            
        x += i
        x = self.relu(x)
    
        return x

##### model_info

In [None]:
from collections import namedtuple
Model_Info = namedtuple('Model_Info', ['block', 'n_blocks', 'channels'])

Model_config = Model_Info(block = Bottleneck,
                               n_blocks = [3, 4, 6, 3],
                               channels = [64, 128, 256, 512])

In [None]:
import torchvision.models as models
pretrained_model = models.resnet50(pretrained = True)

# Fine Tuning
IN_FEATURES = pretrained_model.fc.in_features 
OUTPUT_DIM = 10

fc = nn.Linear(IN_FEATURES, OUTPUT_DIM)
pretrained_model.fc = fc
model = CustomModel(Model_config, OUTPUT_DIM)

In [None]:
# Model Load
model.load_state_dict(pretrained_model.state_dict())

In [None]:
# Count trainable parameters
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'The model has {count_parameters(model):,} trainable parameters')

### LR_finder

In [None]:
START_LR = 1e-7

optimizer = optim.Adam(model.parameters(), lr=START_LR)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

criterion = nn.CrossEntropyLoss()

model = model.to(device)
criterion = criterion.to(device)

In [None]:
from torchsummary import summary
from torchvision import models

x = torch.randn(3, 3, 224, 224).to(device)
output = model(x)

summary(model, (3, 224, 224))

In [None]:
import torch.optim.lr_scheduler as lr_scheduler
from torch.optim.lr_scheduler import _LRScheduler

class LRFinder:
    def __init__(self, model, optimizer, criterion, device):
        
        self.optimizer = optimizer
        self.model = model
        self.criterion = criterion
        self.device = device
        
        torch.save(model.state_dict(), 'init_params.pt')

    def range_test(self, iterator, end_lr = 10, num_iter = 100, 
                   smooth_f = 0.05, diverge_th = 5):
        
        lrs = []
        losses = []
        best_loss = float('inf')

        lr_scheduler = ExponentialLR(self.optimizer, end_lr, num_iter)
        
        iterator = IteratorWrapper(iterator)
        
        for iteration in range(num_iter):

            loss = self._train_batch(iterator)

            #update lr
            lr_scheduler.step()
            
            lrs.append(lr_scheduler.get_lr()[0])

            if iteration > 0:
                loss = smooth_f * loss + (1 - smooth_f) * losses[-1]
                
            if loss < best_loss:
                best_loss = loss

            losses.append(loss)
            
            if loss > diverge_th * best_loss:
                print("Stopping early, the loss has diverged")
                break
                       
        #reset model to initial parameters
        model.load_state_dict(torch.load('init_params.pt'))
                    
        return lrs, losses

    def _train_batch(self, iterator):
        
        self.model.train()
        
        self.optimizer.zero_grad()
        
        x, y = iterator.get_batch()
        
        x = x.to(self.device)
        y = y.to(self.device)
        
        y_pred, _ = self.model(x)
                
        loss = self.criterion(y_pred, y)
        
        loss.backward()
        
        self.optimizer.step()
        
        return loss.item()

class ExponentialLR(_LRScheduler):
    def __init__(self, optimizer, end_lr, num_iter, last_epoch=-1):
        self.end_lr = end_lr
        self.num_iter = num_iter
        super(ExponentialLR, self).__init__(optimizer, last_epoch)

    def get_lr(self):
        curr_iter = self.last_epoch + 1
        r = curr_iter / self.num_iter
        return [base_lr * (self.end_lr / base_lr) ** r for base_lr in self.base_lrs]

class IteratorWrapper:
    def __init__(self, iterator):
        self.iterator = iterator
        self._iterator = iter(iterator)

    def __next__(self):
        try:
            inputs, labels = next(self._iterator)
        except StopIteration:
            self._iterator = iter(self.iterator)
            inputs, labels, *_ = next(self._iterator)

        return inputs, labels

    def get_batch(self):
        return next(self)

In [None]:
END_LR = 10
NUM_ITER = 30

lr_finder = LRFinder(model, optimizer, criterion, device)
lrs, losses = lr_finder.range_test(train_loader, END_LR, NUM_ITER)

In [None]:
def plot_lr_finder(lrs, losses, skip_start = 10, skip_end = 30):
    
    if skip_end == 0:
        lrs = lrs[skip_start:]
        losses = losses[skip_start:]
    else:
        lrs = lrs[skip_start:-skip_end]
        losses = losses[skip_start:-skip_end]
    
    fig = plt.figure(figsize = (16,8))
    ax = fig.add_subplot(1,1,1)
    ax.plot(lrs, losses)
    ax.set_xscale('log')
    ax.set_xlabel('Learning rate')
    ax.set_ylabel('Loss')
    ax.grid(True, 'both', 'x')
    plt.show()

In [None]:
FOUND_LR = 1e-3

params = [
          {'params': model.conv1.parameters(), 'lr': FOUND_LR / 10},
          {'params': model.bn1.parameters(), 'lr': FOUND_LR / 10},
          {'params': model.layer1.parameters(), 'lr': FOUND_LR / 8},
          {'params': model.layer2.parameters(), 'lr': FOUND_LR / 6},
          {'params': model.layer3.parameters(), 'lr': FOUND_LR / 4},
          {'params': model.layer4.parameters(), 'lr': FOUND_LR / 2},
          {'params': model.fc.parameters()}
         ]


optimizer = optim.Adam(params, lr = FOUND_LR, weight_decay=0.0001)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2)

In [None]:
class EarlyStopping:
    """주어진 patience 이후로 validation loss가 개선되지 않으면 학습을 조기 중지"""
    def __init__(self, patience=7, verbose=False, delta=0, path='./net_pretrained.pth'):
        """
        Args:
            patience (int): validation loss가 개선된 후 기다리는 기간
                            Default: 7
            verbose (bool): True일 경우 각 validation loss의 개선 사항 메세지 출력
                            Default: False
            delta (float): 개선되었다고 인정되는 monitered quantity의 최소 변화
                            Default: 0
            path (str): checkpoint저장 경로
                            Default: './net_pretrained.pth'
        """
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = np.Inf
        self.delta = delta
        self.path = path

    def __call__(self, val_loss, model):

        score = -val_loss

        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
        elif score < self.best_score + self.delta:
            self.counter += 1
            print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0

    def save_checkpoint(self, val_loss, model):
        '''validation loss가 감소하면 모델을 저장한다.'''
        if self.verbose:
            print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}).  Saving model ...')
        torch.save(model.state_dict(), self.path)
        self.val_loss_min = val_loss

## 모델 학습

In [None]:
def calculate_topk_accuracy(y_pred, y, k = 5):
    with torch.no_grad():
        batch_size = y.shape[0]
        _, top_pred = y_pred.topk(k, 1)
        top_pred = top_pred.t()
        correct = top_pred.eq(y.view(1, -1).expand_as(top_pred))
        correct_1 = correct[:1].reshape(-1).float().sum(0, keepdim = True)
        correct_k = correct[:k].reshape(-1).float().sum(0, keepdim = True)
        acc_1 = correct_1 / batch_size
        acc_k = correct_k / batch_size
    return acc_1, acc_k

In [None]:
def train(model, iterator, optimizer, criterion, device):
    
    epoch_loss = 0
    epoch_acc_1 = 0
    epoch_acc_5 = 0
    
    model.train()
    
    for (x, y) in iterator:
        
        x = x.to(device)
        y = y.to(device)
        
        optimizer.zero_grad()
                
        y_pred, _ = model(x)
        
        loss = criterion(y_pred, y)
        
        acc_1, acc_5 = calculate_topk_accuracy(y_pred, y)
        
        loss.backward()
        
        optimizer.step()
        
        epoch_loss += loss.item()
        epoch_acc_1 += acc_1.item()
        epoch_acc_5 += acc_5.item()
        
    epoch_loss /= len(iterator)
    epoch_acc_1 /= len(iterator)
    epoch_acc_5 /= len(iterator)
        
    return epoch_loss, epoch_acc_1, epoch_acc_5

In [None]:
def evaluate(model, iterator, criterion, device):
    
    epoch_loss = 0
    epoch_acc_1 = 0
    epoch_acc_5 = 0
    
    model.eval()
    
    with torch.no_grad():

        for (x, y) in iterator:

           x = x.to(device)
           y = y.to(device)

           y_pred, _ = model(x)

           loss = criterion(y_pred, y)

           acc_1, acc_5 = calculate_topk_accuracy(y_pred, y)

           epoch_loss += loss.item()
           epoch_acc_1 += acc_1.item()
           epoch_acc_5 += acc_5.item()

          
    epoch_loss /= len(iterator)
    epoch_acc_1 /= len(iterator)
    epoch_acc_5 /= len(iterator)
        
    return epoch_loss, epoch_acc_1, epoch_acc_5

In [None]:
def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

In [None]:
best_valid_loss = float('inf')
result_list = []
lr_list = []

patience = 5

early_stopping = EarlyStopping(patience = patience, verbose = True)

for epoch in range(100):
    
    start_time = time.monotonic()
    
    train_loss, train_acc_1, train_acc_5 = train(model, train_loader, optimizer, criterion, device)
    valid_loss, valid_acc_1, valid_acc_5 = evaluate(model, valid_loader, criterion, device)
      
    early_stopping(valid_loss, model)
    lr_list.append(optimizer.param_groups[0]["lr"]) 

    # patience 동안 val_loss가 감소하지 않으면 조기 종료
    if early_stopping.early_stop:
      print("Early stopping")
      break

    # val_loss 감소하면 best model 불러오기 
    model.load_state_dict(torch.load('./net_pretrained.pth'))

    end_time = time.monotonic()

    epoch_mins, epoch_secs = epoch_time(start_time, end_time)
    
    print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc_1: {train_acc_1*100:6.2f}% | Train Acc_5: {train_acc_5*100:6.2f}%')
    print(f'\tValid Loss: {valid_loss:.3f} | Valid Acc_1: {valid_acc_1*100:6.2f}% | Train Acc_5: {train_acc_5*100:6.2f}%')

    result = {
    'EPOCH': epoch,
    'Train Loss': train_loss,
    'Train acc_1': train_acc_1,
    'Train acc_5': train_acc_5,
    'Valid Loss': valid_loss,
    'Valid acc_1': valid_acc_1,
    'Valid acc_5': valid_acc_5}
  
    result_list.append(result)
  
result_df = pd.DataFrame(result_list)

## 모델 평가

In [None]:
# Loss 및 Acc 변화 그래프
fig, axes = plt.subplots(nrows=1, ncols=3, figsize=(15, 5))

axes[0].plot(result_df['EPOCH'], result_df['Train Loss'], label='Train Loss')
axes[0].plot(result_df['EPOCH'], result_df['Valid Loss'], label='Valid Loss')
axes[0].legend()
axes[0].set_title('Loss')

axes[1].plot(result_df['EPOCH'], result_df['Train acc_1'], label='Train acc_1')
axes[1].plot(result_df['EPOCH'], result_df['Valid acc_1'], label='Valid acc_1')
axes[1].legend()
axes[1].set_title('ACC_1')

axes[2].plot(result_df['EPOCH'], result_df['Train acc_5'], label='Train acc_5')
axes[2].plot(result_df['EPOCH'], result_df['Valid acc_5'], label='Valid acc_5')
axes[2].legend()
axes[2].set_title('ACC_5')

plt.show()

In [None]:
plt.plot(lr_list)
plt.xlabel("Epochs")
plt.ylabel("Learning Rate")

In [None]:
model.load_state_dict(torch.load('./net_pretrained.pth'))

In [None]:
test_loss, test_acc_1, test_acc_5 = evaluate(model, test_loader, criterion, device)

print(f'Test Loss: {test_loss:.3f} | Test Acc @1: {test_acc_1*100:6.2f}% | Test Acc @5: {test_acc_5*100:6.2f}%')