In [1]:
!nvcc --version

nvcc: NVIDIA (R) Cuda compiler driver
Copyright (c) 2005-2020 NVIDIA Corporation
Built on Mon_Oct_12_20:54:10_Pacific_Daylight_Time_2020
Cuda compilation tools, release 11.1, V11.1.105
Build cuda_11.1.relgpu_drvr455TC455_06.29190527_0


In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.datasets as datasets

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [3]:
device

device(type='cuda')

In [4]:
# normalization을 위해 mean을 빼고, standard deviation으로 나누려고 함

transforms_cifar = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [5]:
train_dataset = datasets.CIFAR10(root='./data', train=True, download=False, transform=transforms_cifar)
test_dataset = datasets.CIFAR10(root='./data', train=False, download=False, transform=transforms_cifar)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=128, shuffle=True, num_workers=2)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=128, shuffle=False, num_workers=2)

> ### Defineing model classes and utility functions

In [6]:
class DeepNN(nn.Module):
    def __init__(self, num_classes=10):
        super(DeepNN, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(128, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(64, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        self.classifier = nn.Sequential(
            nn.Linear(2048, 512),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(512, num_classes)
        )
    
    def forward(self, x):
        x = self.features(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

In [7]:
class LightNN(nn.Module):
    def __init__(self, num_classes=10):
        super(LightNN, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(16, 16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        self.classifier = nn.Sequential(
            nn.Linear(1024, 256),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(256, num_classes)    
        )
    
    def forward(self, x):
        x = self.features(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

> ### Classification task를 위한 2가지 함수 정의. 
model : 모델 인스턴스 생성

train_loader : 모델에 데이터 주입시키기 위한 함수

epochs : 데이터셋을 얼마나 반복할 것인지?

learning_rate : 수렴되기 위해 스텝을 얼마나 크게할지 결정

device : CPU 혹은 GPU로 돌리기 위함

In [8]:
def train(model, train_loader, epochs, learning_rate, device):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    
    model.train()
    
    for epoch in range(epochs):
        running_loss = 0.0
        for inputs, labels in train_loader:
            # input : 배치사이즈 이미지의 집합
            # label : 각 이미지 클래스를 나타내는 정수값을 배치사이즈 차원만큼의 벡터로 정의
            inputs, labels = inputs.to(device), labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(inputs)
            
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
        
        print(f"Epoch {epoch+1}/{epochs}, loss: {running_loss / len(train_loader)}")

def test(model, test_loader, device):
    model.to(device)
    model.eval() # 평가모드로 전환
    
    correct = 0
    total = 0
    
    with torch.no_grad():
        # no_grad로 기울기 업데이트 안함
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            
            outputs = model(inputs)
            # torch.max로 가장 높은 값을 가진 인덱스 반환
            _, predicted = torch.max(outputs.data, 1)
            
            # 배치 내 샘플 개수를 반환. 즉 batch_size=32라면 32를 반환함
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    accuracy = 100 * correct / total
    print(f"Test Accuracy: {accuracy:.2f}%")
    return accuracy

In [9]:
torch.manual_seed(42) # 실험 재현성을 위해 시드값 설정
nn_deep = DeepNN(num_classes=10).to(device)
train(nn_deep, train_loader, epochs=10, learning_rate=0.001, device=device)
test_accuracy_deep = test(nn_deep, test_loader, device)

torch.manual_seed(42)
nn_light = LightNN(num_classes=10).to(device)

Epoch 1/10, loss: 1.3376230263649045
Epoch 2/10, loss: 0.8801519657339891
Epoch 3/10, loss: 0.6984434832087563
Epoch 4/10, loss: 0.5518185598466098
Epoch 5/10, loss: 0.4360514838067467
Epoch 6/10, loss: 0.32272086584049725
Epoch 7/10, loss: 0.2277922832866764
Epoch 8/10, loss: 0.1762107626994705
Epoch 9/10, loss: 0.1397753664485329
Epoch 10/10, loss: 0.12448156816537118
Test Accuracy: 75.25%


> ### 똑같은 모델을 하나 더 만들어서 같은 초기값인지 확인

In [10]:
torch.manual_seed(42)
new_nn_light = LightNN(num_classes=10).to(device)

In [11]:
print("Norm of 1st layer of nn_light:", torch.norm(nn_light.features[0].weight).item())
print("Norm of 1st layer of new_nn_light:", torch.norm(new_nn_light.features[0].weight).item())

Norm of 1st layer of nn_light: 2.327361822128296
Norm of 1st layer of new_nn_light: 2.327361822128296


In [12]:
total_params_deep = "{:,}".format(sum(p.numel() for p in nn_deep.parameters()))
print(f"DeepNN parameters: {total_params_deep}")
total_params_light = "{:,}".format(sum(p.numel() for p in nn_light.parameters()))
print(f"LightNN parameters: {total_params_light}")

DeepNN parameters: 1,186,986
LightNN parameters: 267,738


In [13]:
# light_nn을 cross entropy loss을 통해 훈련 및 테스트
train(nn_light, train_loader, epochs=10, learning_rate=0.001, device=device)
test_accuracy_light_nn = test(nn_light, test_loader, device)

Epoch 1/10, loss: 1.4702755151807194
Epoch 2/10, loss: 1.1610109941733768
Epoch 3/10, loss: 1.025268422825562
Epoch 4/10, loss: 0.9218627793709641
Epoch 5/10, loss: 0.8466211883613216
Epoch 6/10, loss: 0.7808126970325284
Epoch 7/10, loss: 0.7174562125864541
Epoch 8/10, loss: 0.6567044667423229
Epoch 9/10, loss: 0.6046204406129735
Epoch 10/10, loss: 0.5547780798524237
Test Accuracy: 70.76%


위와 같은 결과를 토대로 더 깊은 모델은 teacher가 될 수 있고, 가벼운 모델은 student 모델이 될 수 있음. 



In [14]:
print(f"Teacher accuracy: {test_accuracy_deep:.2f}%")
print(f"Student accuracy: {test_accuracy_light_nn:.2f}%")

Teacher accuracy: 75.25%
Student accuracy: 70.76%


> ## Knowledge Distillation RUN
> ### Teacher를 통해 Student의 accuracy를 개선
지식 증류는 추가적인 loss를 기존 cross entropy(teacher 모델의 softmax 출력 기반)과 함께 작동됨.

지식은 정답인 label 외에도 확률이 존재하는 레이블에 대해 전체적으로 학습할 수 있도록 soft label을 활용함. 

하지만, cross entropy는 이러한 정보를 효율적으로 이용하지 않음. 

In [15]:
def train_knowledge_distillation(teacher, student, train_loader, epochs, learning_rate, 
                                 T, soft_target_loss_weight, ce_loss_weight, device):
    ce_loss = nn.CrossEntropyLoss() # task와의 loss function
    optimizer = optim.Adam(student.parameters(), lr=learning_rate) # optimizer Adam 활용
    
    teacher.eval() # teacher는 더이상 학습하지 않기 때문에 eval 모드
    student.train() # student는 task 및 teacher와의 학습을 위해 train 모드
    
    for epoch in range(epochs):
        running_loss = 0.0
        for inputs, labels in train_loader: 
            inputs, labels = inputs.to(device), labels.to(device) # GPU 메모리 내에서 실행되도록 to(device) 수행
            
            optimizer.zero_grad()
            
            with torch.no_grad():
                teacher_logits = teacher(inputs)
            
            student_logits = student(inputs)
            
            # teacher logit을 soft하게 바꿈
            soft_targets = nn.functional.softmax(teacher_logits/T, dim=-1)
            # student logit은 log softmax를 이용하여 손실 함수 계산의 원활함 제공
            soft_prob = nn.functional.log_softmax(student_logits/T, dim=-1)
            
            # Distilling the knowledge in a neural network 논문에 따라 T^2로 soft target loss를 스케일링
            soft_targets_loss = -torch.sum(soft_targets * soft_prob) / soft_prob.size()[0] * (T**2)
            
            # KL Divergence 방법
            soft_targets_loss_kl = nn.functional.kl_div(soft_prob, soft_targets, reduction='batchmean') * (T**2)
            
            # ground truth 손실 계산
            label_loss = ce_loss(student_logits, labels)
            
            # 두가지 loss의 Weight Sum(weighted 계수 : soft_target_loss_weight, ce_loss_weight)
            # loss = soft_target_loss_weight * soft_targets_loss + ce_loss_weight * label_loss
            loss = soft_target_loss_weight * soft_targets_loss_kl + ce_loss_weight * label_loss
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
            
        print(f"Epoch {epoch+1}/{epochs}, Loss: {running_loss / len(train_loader)}")

> ### KD 학습을 위한 하이퍼파라미터
> ### T : 2, Distillation Loss(Soft_target_loss_weight), Ground Truth와 Student 간의 Cross Entropy를 위한 0.75로 설정

In [16]:
train_knowledge_distillation(teacher=nn_deep, student=new_nn_light, train_loader=train_loader,
                             epochs=10, learning_rate=0.001, T=2, soft_target_loss_weight=0.25, ce_loss_weight=0.75, device=device) # soft target weight 및 ce weight는 하이퍼파라미터 탐색으로 찾기
test_accuracy_light_ce_kd = test(new_nn_light, test_loader, device)

Epoch 1/10, Loss: 2.3522917474322305
Epoch 2/10, Loss: 1.8351799155135289
Epoch 3/10, Loss: 1.6122780648033943
Epoch 4/10, Loss: 1.452798021723852
Epoch 5/10, Loss: 1.3281061784995487
Epoch 6/10, Loss: 1.2145687071868525
Epoch 7/10, Loss: 1.1228389831455163
Epoch 8/10, Loss: 1.0379482465022056
Epoch 9/10, Loss: 0.9645864333947907
Epoch 10/10, Loss: 0.8997774232379006
Test Accuracy: 70.77%


In [17]:
print(f"Teacher accuracy: {test_accuracy_deep:.2f}%")
print(f"Student accuracy without teacher : {test_accuracy_light_nn:.2f}%")
print(f"Student accuracy with CE + KD: {test_accuracy_light_ce_kd:.2f}%")

Teacher accuracy: 75.25%
Student accuracy without teacher : 70.76%
Student accuracy with CE + KD: 70.77%


> ### Teacher의 내부 표현을 흉내내기 위한 지식 증류는 문제점이 있다면, 학습 수용량이 student와 teacher가 다름.
> ### 내적 표현 흉내내기를 위해 CosineEmbedding Loss를 적용해서 영향력을 알아보려고함.

> ### output layer를 증류시키기 위해서는 같은 수의 뉴런이 있어야함.
> ### 하지만, Teacher 모델의 뉴런수는 student에 비해 많이 때문에(flatten 했을 때) 매칭을 시켜주는 작업이 필요함. -> loss function이 같은 차원의 벡터가 입력으로 들어가야하는 조건
> ### Student의 차원에 맞춰주기 위해, Average Pooling을 적용하여 Teacher의 차원을 줄일 것임

In [18]:
class ModifiedDeepNNCosine(nn.Module):
    def __init__(self, num_classes=10):
        super(ModifiedDeepNNCosine, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(128, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(64, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.classifier = nn.Sequential(
            nn.Linear(2048, 512),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(512, num_classes)    
        )
        
    def forward(self, x):
        x = self.features(x)
        flattened_conv_output = torch.flatten(x, 1).unsqueeze(1)
        x = self.classifier(flattened_conv_output)
        flattened_conv_output_after_pooling = torch.nn.functional.avg_pool1d(flattened_conv_output, 2)
        
        return x, flattened_conv_output_after_pooling
    

class ModifiedLightNNCosine(nn.Module):
    def __init__(self, num_classes=10):
        super(ModifiedLightNNCosine, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(16, 16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        
        self.classifier = nn.Sequential(
            nn.Linear(1024, 256),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(256, num_classes)
        )
    
    def forward(self, x):
        x = self.features(x)
        flatten = torch.flatten(x, 1)
        x = self.classifier(flatten)
        
        return x, flatten

In [19]:
modified_deepNN = ModifiedDeepNNCosine(num_classes=10).to(device)
modified_deepNN.load_state_dict(nn_deep.state_dict())


# 첫번째 layer의 가중치가 같은지 확인

print("Norm of 1st layer deep_nn", torch.norm(nn_deep.features[0].weight).item())
print("Norm of 1st layer modified_deep_nn", torch.norm(modified_deepNN.features[0].weight).item())



torch.manual_seed(42)
modified_lightNN = ModifiedLightNNCosine(num_classes=10).to(device)


Norm of 1st layer deep_nn 7.515564441680908
Norm of 1st layer modified_deep_nn 7.515564441680908


> ### 기존과 달리 hidden 표현을 return 해줘야하기 때문에 train loop이 바뀌어야함. 

In [20]:
sample_input = torch.randn(128, 3, 32, 32).to(device)

logits, hidden_representation = modified_lightNN(sample_input)

print("lightNN")
print(logits.shape)
print(hidden_representation.shape)

logits, hidden_representation = modified_deepNN(sample_input)
logits = logits.reshape(128, 10)
hidden_representation = hidden_representation.reshape(128, 1024)


print("deepNN")
print(logits.shape)
print(hidden_representation.shape)

lightNN
torch.Size([128, 10])
torch.Size([128, 1024])
deepNN
torch.Size([128, 10])
torch.Size([128, 1024])


In [21]:
# Cosine loss minimization에서 Teacher와 Student 간의 cosine similarity를 극대화 해야함

def train_cosine_loss(teacher, student, train_loader, epochs, learning_rate, hidden_rep_loss_weight, ce_loss_weight, device):
    ce_loss = nn.CrossEntropyLoss()
    cosine_loss = nn.CosineEmbeddingLoss()
    optimizer = optim.Adam(student.parameters(), lr=learning_rate)
    
    # GPU 메모리로 model 전달 및 teacher은 검증용, student는 학습용으로 진행
    teacher.to(device)
    student.to(device)
    teacher.eval()
    student.train()
    
    for epoch in range(epochs):
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            
            
            optimizer.zero_grad()
            
            with torch.no_grad():
                _, teacher_hidden_representation = teacher(inputs)
                
            student_logits, student_hidden_representation = student(inputs)
            teacher_hidden_representation = teacher_hidden_representation.reshape(-1, 1024)
            
            # 중간층에서의 loss를 구하는 식은 feature based knowledge이 맞지않나? 
            hidden_rep_loss = cosine_loss(student_hidden_representation, teacher_hidden_representation, target=torch.ones(inputs.size(0)).to(device))
            
            label_loss = ce_loss(student_logits, labels)
            
            loss = hidden_rep_loss_weight * hidden_rep_loss + ce_loss_weight * label_loss
            
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
            
        print(f"Epoch {epoch+1}/{epochs}, Loss:{running_loss / len(train_loader)}")

In [22]:
def test_multiple_outputs(model, test_loader, device):
    model.to(device)
    model.eval()
    
    correct = 0
    total = 0
    
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            
            outputs, _ = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    accuracy = 100 * correct / total
    print(f"Test Accuracy: {accuracy:.2f}%")
    return accuracy

In [23]:
train_cosine_loss(teacher=modified_deepNN, student=modified_lightNN, train_loader=train_loader, epochs=10, learning_rate=0.001, hidden_rep_loss_weight=0.25, ce_loss_weight=0.75, device=device)
test_accuracy_lightNN = test_multiple_outputs(modified_lightNN, test_loader, device)

Epoch 1/10, Loss:1.306582652394424
Epoch 2/10, Loss:1.0682969283874688
Epoch 3/10, Loss:0.9670999458683726
Epoch 4/10, Loss:0.8919807530729972
Epoch 5/10, Loss:0.8381563493662783
Epoch 6/10, Loss:0.7938108398481403
Epoch 7/10, Loss:0.7531564029891168
Epoch 8/10, Loss:0.7165703799413599
Epoch 9/10, Loss:0.6806020036987637
Epoch 10/10, Loss:0.6526263821155519
Test Accuracy: 70.40%


> ### Intermediate regressor run
> ### 우리가 다룰려는 벡터의 크기는 1024로 유사성 정보 추출이 힘듦. 또한, 1 대 1 매칭을 목표로 하는 것은 좋은 이유가 될 수 없음
> ### Regressor로 문제점을 개선해보려고 함.
> ### 1차적으로 Teacher 모델의 Conv 층의 Feature map을 추출하고, Student도 똑같이 feature map 추출 수행
> ### 2차적으로 Teacher 및 Student의 feature map을 맞추는 과정이 필요함. 이를 위해 Regressor를 활용(학습 가능)
> ### Regressor는 Teacher와 Student와의 차원성을 맵핑해주는 역할을 함.
> ### 이를 활용한 Loss Function은 Teaching "Path"를 제공해주며 Student의 가중치 업데이트에 기여함

In [24]:
# convolution feature 추출기로부터 얻어지는 결과물 차이

conv_feature_student = nn_light.features(sample_input)
conv_feature_teacher = nn_deep.features(sample_input)

print("student feature :", conv_feature_student.shape)
print("teacher feature :", conv_feature_teacher.shape)


student feature : torch.Size([128, 16, 8, 8])
teacher feature : torch.Size([128, 32, 8, 8])


In [25]:
class ModifiedDeepNNRegressor(nn.Module):
    def __init__(self, num_classes=10):
        super(ModifiedDeepNNRegressor, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(128, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(64, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.classifier = nn.Sequential(
            nn.Linear(2048, 512),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(512, num_classes)
        )
        
    def forward(self, x):
        x = self.features(x)
        conv_feature_map = x
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x, conv_feature_map
    

class ModifiedLightNNRegressor(nn.Module):
    def __init__(self, num_classes=10):
        super(ModifiedLightNNRegressor, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(16, 16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.regressor = nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=3, padding=1)
        )
        self.classifier = nn.Sequential(
            nn.Linear(1024, 256),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(256, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        regressor_output = self.regressor(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        
        return x, regressor_output

> ### Regressor를 포함함에 따라 train loop의 변동이 필요함.
> ### Student의 regressor output을 추출 및 teacher의 feature map을 추출하기 위해 MSE를 활용(같은 크기여야함)

In [None]:
def train_mse