In [1]:
import numpy as np
from tqdm import tqdm
from PIL import Image
import json
import os
import copy
import re

import torch
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader

### 데이터셋 생성(이미지)

In [2]:
class Scalp_Health_Dataset(Dataset) :
    def __init__(self, image_path_list, label_list) : # 용량을 고려해 이미지는 경로만 받는걸로
        self.image_path_list = image_path_list
        self.label_list = label_list
    def __len__(self) : 
        return len(self.image_path_list)

    def __getitem__(self, index) : # 한 개의 데이터 가져오는 함수
        # 224 X 224로 전처리
        
        to_tensor = transforms.ToTensor()
        img = to_tensor(Image.open(self.image_path_list[index]).convert('RGB'))
        img.resize_(3, 224, 224)
        img = torch.divide(img, 255.0) # 텐서로 변경 후 이미지 리사이징하고 각 채널을 0~1 사이의 값으로 만들어버림
        
        label = torch.tensor(self.label_list[index])
        
        return img, label


### 데이터셋 생성(두피 상태, 중증도 분별)

In [3]:
# 이미지 데이터셋의 라벨에 해당하는 val1~val6이 여기서 입력값임
class Scalp_classifier_Dataset(Dataset) :
    def __init__(self, state_list, label_list) : # state_list : val1~val6 label_list : "모낭사이홍반_0.양호" 등의 문자열
        self.state_list = state_list
        self.label_list = label_list
        self.state_str_list = ["미세각질", "피지과다", "모낭사이홍반", "모낭홍반농포", "비듬", "탈모", "양호"]
    def __len__(self) : 
        return len(self.state_list)

    def __getitem__(self, index) : # 한 개의 데이터 가져오는 함수
        
        state = torch.Tensor(self.state_list[index])
        label_str = self.label_list[index]
        
        # "모낭사이홍반_0.양호" 문자열이 들어있는 label_list를 분리
        if self.label_list[index].find("중등도") != -1 :  
            class_str = label_str[:-6] # 모낭사이홍반 등 증상
        else :
            class_str = label_str[:-5]
        
        
        severity_num = float(re.sub(r'[^0-9]', '', label_str))
        severity = torch.Tensor([severity_num]).type(torch.float32)/3.0
        
        # one-hot encoding
        if torch.eq(severity, 0.0) == True :
            class_str_index = self.state_str_list.index("양호")
            label_one_hot = torch.zeros(len(self.state_str_list)).type(torch.LongTensor)
            label_one_hot[class_str_index] = 1
        else : 
            class_str_index = self.state_str_list.index(class_str)
            label_one_hot = torch.zeros(len(self.state_str_list)).type(torch.LongTensor)
            label_one_hot[class_str_index] = 1
        
        label = [label_one_hot, severity]
        
        
        return state, label

### 최종 데이터셋 생성

In [4]:
def make_dataset(dataset_path, category) : # root_path + '/Train'이나 root_path + '/Label'을 받음
    
    
    image_group_folder_path = dataset_path + '/Image'
    label_group_folder_path = dataset_path + '/Label'
    
    ori_label_folder_list = os.listdir(label_group_folder_path) # '[라벨]피지과다_3.중증' 등 폴더명 알기
    
    label_folder_list = []
    
    for i in range(len(ori_label_folder_list)) :
        if ori_label_folder_list[i] != '.DS_Store' : # '.DS_Store'가 생성되었을 수 있으니 폴더 목록에서 제외
            label_folder_list.append(ori_label_folder_list[i])
    
    image_path_list = []
    label_list = []
    label_class_list = []
    
    desc_str = category + "_make_dataset"
    
    for i in tqdm(range(len(label_folder_list)), desc = desc_str) :
                  
        label_folder_path = label_group_folder_path + "/" + label_folder_list[i]
        
        # label_folder_list에서 '라벨'을 '원천'으로 만 바꿔도 image파일들이 들어있는 폴더명으로 만들 수 있다
        image_folder_path = image_group_folder_path + "/" + label_folder_list[i].replace('라벨', '원천')
        
        json_list = os.listdir(label_folder_path) # json파일 목록 담기

        for j in range(len(json_list)) : 
            json_file_path = label_folder_path + '/' + json_list[j]

            with open(json_file_path, "r", encoding="utf8") as f: 
                contents = f.read() # string 타입 
                json_content = json.loads(contents) # 딕셔너리로 저장

            image_file_name = json_content['image_file_name'] # 라벨 데이터에 이미지 파일의 이름이 들어있다
            
            image_file_path = image_folder_path + "/" + image_file_name

            y_true = []
            y_true.append(int(json_content['value_1']))
            y_true.append(int(json_content['value_2']))
            y_true.append(int(json_content['value_3']))
            y_true.append(int(json_content['value_4']))
            y_true.append(int(json_content['value_5']))
            y_true.append(int(json_content['value_6']))

            y_true = np.asarray(y_true)

            image_path_list.append(image_file_path)
            label_list.append(y_true/3.0) # val1~val6의 범위가 0,1,2,3이라 3으로 나눠줌
            label_class_list.append(label_folder_list[i][4:]) # 이미지마다 할당된 클래스를 담음

    return image_path_list, label_list, label_class_list
    
    

In [5]:
def get_dataset(root_path) : 
    
    Train_image_path_list, Train_label_list, Train_label_class_list = make_dataset(root_path + '/Train', "Train")
    Test_image_path_list, Test_label_list, Test_label_class_list = make_dataset(root_path + '/Test', "Test")
    
    Train_Scalp_Health_Dataset = Scalp_Health_Dataset(Train_image_path_list, Train_label_list)
    Test_Scalp_Health_Dataset = Scalp_Health_Dataset(Test_image_path_list, Test_label_list)
    
    
    Train_Scalp_classifier_Dataset = Scalp_classifier_Dataset(Train_label_list, Train_label_class_list)
    Test_Scalp_classifier_Dataset = Scalp_classifier_Dataset(Test_label_list, Test_label_class_list)
    
    return Train_Scalp_Health_Dataset, Test_Scalp_Health_Dataset, Train_Scalp_classifier_Dataset, Test_Scalp_classifier_Dataset
    
    

### 학습시키는 메소드

In [6]:
def train_model(model, criterion, optimizer, epochs, data_loader, device, desc_str) :
    # 학습 -> 성능 측정
    
    pred_loss = -1.0
    checkpoint_model = 0
    
    pbar = tqdm(range(epochs), desc = desc_str, mininterval=0.01)
    

    for epoch in pbar:  # loop over the dataset multiple times
            
        if epoch == int(epochs * 0.5) or int(epochs * 0.75) : # 31, 61번 째 에포크에서 학습률을 10배 줄임
            optimizer.param_groups[0]['lr'] = optimizer.param_groups[0]['lr'] / 10.0
            
        
        for inputs, labels in data_loader:
            
            # get the inputs
            inputs = inputs.to(device)
            
            # label data로 텐서가 왔는지 리스트가 왔는지 확인
            if type(labels) == type([]) :
                labels[0] = labels[0].to(device)
                labels[1] = labels[1].to(device)
            else : 
                labels = labels.to(device)
                

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            loss.backward()
            optimizer.step()

            # print statistics
            pbar_str = "training, [loss = %.4f]" % loss.item()
            pbar.set_description(pbar_str)
            
            # 체크포인트
            # 배치 단위로 학습시킬 때마다 체크포인트 저장 여부 확인
            if pred_loss == -1.0 :
                pred_loss = loss
                checkpoint_model = copy.deepcopy(model)
            elif torch.lt(loss, pred_loss) == True : # loss를 더 줄였으면
                pred_loss = loss
                checkpoint_model = copy.deepcopy(model)
    
    # 체크포인트에 저장했던걸 최종 모델로
    model = checkpoint_model
    
    return model

### 구현

In [7]:
root_path = '/home/ubuntu/CUAI_2021/Advanced_Minkyu_Kim/Scalp_Health_Dataset'
# root_path = '/Users/minkyukim/Downloads/Scalp_Health_Dataset'
Train_Scalp_Health_Dataset, Test_Scalp_Health_Dataset, Train_Scalp_classifier_Dataset, Test_Scalp_classifier_Dataset = get_dataset(root_path)

Train_make_dataset: 100%|██████████| 24/24 [00:01<00:00, 16.12it/s]
Test_make_dataset: 100%|██████████| 24/24 [00:00<00:00, 113.00it/s]


In [8]:
# 학습에 사용
BATCH_SIZE = 64

data_loader_Scalp_Health_Dataset = torch.utils.data.DataLoader(dataset=Train_Scalp_Health_Dataset, # 사용할 데이터셋
                                          batch_size=BATCH_SIZE, # 미니배치 크기
                                          shuffle=True, # 에포크마다 데이터셋 셔플할건가? 
                                          drop_last=True) # 마지막 배치가 BATCH_SIZE보다 작을 수 있다. 나머지가 항상 0일 수는 없지 않는가. 이 때 마지막 배치는 사용하지 않으려면 drop_last = True를, 사용할거면 drop_last = False를 입력한다

data_loader_Scalp_classifier_Dataset = torch.utils.data.DataLoader(dataset=Train_Scalp_classifier_Dataset, # 사용할 데이터셋
                                          batch_size=BATCH_SIZE, # 미니배치 크기
                                          shuffle=True, # 에포크마다 데이터셋 셔플할건가? 
                                          drop_last=True) 

#### 네트워크 생성 : DenseNet_161에 classifier만 수정해서 학습시킨다

In [9]:
# 디바이스 설정
device = 'cuda' if torch.cuda.is_available() else 'cpu'

model_CNN = models.densenet161(pretrained = True, memory_efficient = True).to(device)

# ImageNet으로 학습시킨 CNN은 수정하지 못하게끔
for param in model_CNN.features.parameters():
    param.requires_grad = False

In [10]:
# 모발 분류를 위한 linear model 생성 후 교체
new_classifier = nn.Sequential(
            nn.Linear(in_features=2208, out_features=6, bias=True)
            , nn.Sigmoid() # 0~1사이 값으로 만들어버림
        ).to(device)

for m in new_classifier.modules():
    if isinstance(m, nn.Linear) :
        nn.init.kaiming_uniform_(m.weight)
        
model_CNN.classifier = new_classifier # 교체

In [11]:
# model_CNN(torch.unsqueeze(Train_Scalp_Health_Dataset[0][0], 0).to(device))

### 이미지에서 얻은 val1~val6을 입력값으로 받아 (증상 종류), (중증도)를 출력하는 네트워크 생성

In [12]:
class scalp_state_diagnoser(torch.nn.Module):
    def __init__(self):
        super(scalp_state_diagnoser, self).__init__()
        
        self.linear = nn.Sequential(
            nn.Linear(in_features=6, out_features=16, bias=True),
            nn.LeakyReLU(),
            nn.Linear(in_features=16, out_features=8, bias=True)
        )
        # 모발 상태 판단
        self.classifier = nn.Sequential(
            nn.Linear(in_features=8, out_features=7, bias=True), # [미세각질, 피지과다, 모낭사이홍반, 모낭홍반농포, 비듬, 탈모, 양호]
            nn.Softmax(dim = 0)
        )
        # 중증도 판단
        self.severity = nn.Sequential(
            nn.Linear(in_features=8, out_features=1, bias=True), # 0, 0.33, 0.66, 1.0
            nn.Sigmoid()
        )
        
        for m in self.classifier.modules():
            if isinstance(m, nn.Linear) :
                nn.init.kaiming_uniform_(m.weight)
                
        for m in self.severity.modules():
            if isinstance(m, nn.Linear) :
                nn.init.kaiming_uniform_(m.weight)
                
    # 정전파 
    def forward(self, x):
        out = self.linear(x)
        
        classifier_out = self.classifier(out)
        severity_out = self.severity(out)
        
        return [classifier_out, severity_out]
    
def loss_Diagnoser(y_pred, y_true) :
    # y는 [상태, 중증도]의 리스트로 구성 
    class_pred = y_pred[0]
    severity_pred = y_pred[1]
    
    class_true = y_true[0]
    severity_true = y_true[1]
    
    loss = nn.CrossEntropyLoss()
    
    class_loss = loss(class_pred, class_true)
    severity_loss = loss(severity_pred, severity_true)
    
    total_loss = torch.add(class_loss, severity_loss)
    
    return total_loss
    
    

In [13]:
# CNN 학습에 필요한 것들

optimizer_CNN = torch.optim.SGD(model_CNN.parameters(), lr=0.1, momentum = 0.9, weight_decay = 1e-4)
EPOCHS = 300
loss_CNN = nn.MSELoss() # 5종류의 출력값을 예측하는 선형회귀 모델이라 MSE사용

In [14]:
# Diagnoser 모델과 모델 학습에 필요한 것들

model_Diagnoser = scalp_state_diagnoser().to(device)
optimizer_Diagnoser = torch.optim.SGD(model_Diagnoser.parameters(), lr=0.001, momentum = 0.9, weight_decay = 1e-4)

#### 학습

In [11]:
model_CNN = train_model(model_CNN, loss_CNN, optimizer_CNN, EPOCHS, data_loader_Scalp_Health_Dataset, device, "training CNN")

training, [loss = 0.0782]: 100%|██████████| 300/300 [19:39:14<00:00, 235.85s/it]  


In [17]:
model_Diagnoser = train_model(model_Diagnoser, loss_Diagnoser, optimizer_Diagnoser, EPOCHS, data_loader_Scalp_classifier_Dataset, device, "training Diagnoser")

training Diagnoser:   0%|          | 0/300 [00:00<?, ?it/s]

#### 학습한 모델 저장하기

In [17]:
def save_models(PATH) : # PATH = '/home/ubuntu/CUAI_2021/Advanced_Minkyu_Kim/Scalp_model_parameters/'
    torch.save(model_CNN.state_dict(), PATH + 'model_CNN_parameter.pt')
    torch.save(model_Diagnoser.state_dict(), PATH + 'model_Diagnoser_parameter.pt')

## 테스트

### 모델 불러오기

In [None]:
def get_model_trained(PATH) :
    model_CNN = models.densenet161(pretrained = True, memory_efficient = True).to(device)

    new_classifier = nn.Sequential(
                nn.Linear(in_features=2208, out_features=6, bias=True)
                ).to(device)

    for m in new_classifier.modules():
        if isinstance(m, nn.Linear) :
            nn.init.kaiming_uniform_(m.weight)

    model_CNN.classifier = new_classifier.to(device) # 교체
    
    model_Diagnoser = scalp_state_diagnoser().to(device)
    
    # 학습된 가중치들 불러오기
    PATH_model_CNN = PATH + 'model_CNN_parameter.pt'
    PATH_model_Diagnoser = PATH + 'model_Diagnoser_parameter.pt'
    
    model_CNN.load_state_dict(torch.load(PATH_model_CNN))
    model_Diagnoser.load_state_dict(torch.load(PATH_model_Diagnoser))
    
    return model_CNN, model_Diagnoser

### 값 출력

In [None]:
def get_value(input_data, model_CNN, model_Diagnoser, device) :
    
    state_str_list = ["미세각질", "피지과다", "모낭사이홍반", "모낭홍반농포", "비듬", "탈모", "양호"]
    
    input_data = torch.unsqueeze(input_data, 0).to(device)
    
    output = model_CNN(input_data)
    output = model_Diagnoser(output)
    
    scalp_state_idx = torch.argmax(output[0])
    scalp_state = state_str_list[state_idx] # 두피 상태. state_str_list에 있는 문자열 중 하나가 나옴
    severity = torch.round(output[1] * 3).type(torch.int32) # 중증도. 0,1,2,3중 하나가 나옴
    
    return scalp_state, severity
    