In [1]:
import random
import pandas as pd
import numpy as np
from glob import glob
import os
import cv2
import json
import copy
from sklearn.model_selection import train_test_split

import time
import datetime
import torch
from tqdm import tqdm
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
from sklearn.metrics import confusion_matrix, plot_confusion_matrix
from sklearn.metrics import multilabel_confusion_matrix

In [2]:
# origin_path는 원천데이터(이미지데이터)가 저장되어있는 경로로 지정해야합니다.
# label_path는 라벨링데이터가 저장되어있는 경로로 지정해야합니다.
origin_path='./4.Sample/01.원천데이터'
label_path='./4.Sample/02.라벨링데이터'

#시드 고정
random_seed = 42
random.seed(random_seed)

# 데이터 경로 포함 프레임 구축하기

In [3]:
img_list=[] #이미지 파일명 까지 경로
img_id=[] #이미지 파일명
label_list=[] #라벨링 파일명 까지 경로

#이미지 파일명 경로 및 이미지 파일명 Get
for i in os.listdir(origin_path):
    for j in os.listdir(origin_path+'/'+i):
        for k in os.listdir(origin_path+'/'+i+'/'+j):
            img_id.extend(os.listdir(origin_path+'/'+i+'/'+j+'/'+k))
            img_list.extend(glob(origin_path+'/'+i+'/'+j+'/'+k+'/*'))   
img_list=[path.replace('\\','/') for path in img_list]

#라벨링 파일명 및 경로 Get
for i in os.listdir(label_path):
    for j in os.listdir(label_path+'/'+i):
        for k in os.listdir(label_path+'/'+i+'/'+j):
            label_list.extend(glob(label_path+'/'+i+'/'+j+'/'+k+'/*'))
label_list=[path.replace('\\','/') for path in label_list]

In [4]:
#데이터 프레임 구축
data={'이미지명':img_id,'이미지경로':img_list,'라벨링경로':label_list}
data=pd.DataFrame(data)

In [5]:
#Class List
class_list = ['EG_NA', 'LA_NA', 'PU_NA', 'AB_LI', 'AB_CA', 'AB_BI', 'AB_AP','QB_LI', 'QB_CA', 'QB_BI', 'QB_AP','AA_NA']

#라벨링값 채우기
label=[]
for i in data.index:
    img_name=data['이미지명'][i]
    matching=[word for word in class_list if word in img_name]
    label.append(class_list.index(matching[0]))
    
data['라벨값']=label

# 데이터 분할 및 분석 데이터 셋 구축

In [6]:
# train_test_split 의 stratify 이용해서 분할
train, val = train_test_split(data, test_size = 0.1, random_state = 42, stratify = data['라벨값'])
train, test= train_test_split(train,test_size = 0.1, random_state = 42, stratify = train['라벨값'])

In [7]:
trans_resize = transforms.Resize((224, 224))
trans_tensor = transforms.ToTensor()
def dataset(data):
    data1=[]
    for i in tqdm(data.index):
        with open(data['라벨링경로'][i], 'r', encoding='UTF-8') as f:
            jsonfile=json.load(f)
        if jsonfile['INFO']['DATASET_DETAIL']=='Bounding Box':
            for j in jsonfile['ANNOTATION_INFO']:
                x_data = trans_tensor(trans_resize(Image.open(data['이미지경로'][i]).crop((j['XTL'], j['YTL'], j['XBR'], j['YBR']))))
                y_data= data['라벨값'][i]
                data_list=[[x_data,y_data,jsonfile['IMAGE']['IMAGE_FILE_NAME']+'_'+str((jsonfile['ANNOTATION_INFO'].index(j))+1)]]
                data1.extend(data_list)
            
        elif jsonfile['INFO']['DATASET_DETAIL']=='Polygon':
            for m in jsonfile['ANNOTATION_INFO']:
                XTL = min(m['POLYGON'][0::2])
                XBR = max(m['POLYGON'][0::2])
                YTL = min(m['POLYGON'][0::2])
                YBR = max(m['POLYGON'][0::2])
                x_data = trans_tensor(trans_resize(Image.open(data['이미지경로'][i]).crop((XTL, YTL, XBR, YBR))))
                y_data = data['라벨값'][i]
                data_list=[[x_data,y_data,jsonfile['IMAGE']['IMAGE_FILE_NAME']+'_'+str((jsonfile['ANNOTATION_INFO'].index(m))+1)]]
                data1.extend(data_list)
    print(f'{len(data1)}개 업로드 완료')
    return data1

In [8]:
traindataset = dataset(data=train)
valdataset = dataset(data=val)
testdataset = dataset(data=test)

100%|██████████████████████████████████████████████████████████████████████████████| 1377/1377 [00:37<00:00, 36.43it/s]


4280개 업로드 완료


100%|████████████████████████████████████████████████████████████████████████████████| 170/170 [00:03<00:00, 43.05it/s]


471개 업로드 완료


100%|████████████████████████████████████████████████████████████████████████████████| 153/153 [00:04<00:00, 35.79it/s]

466개 업로드 완료





# Train Start

In [None]:
class SEBlock(nn.Module):
    def __init__(self, in_channel, r=1):
        super().__init__()
        
        self.squeeze = nn.AdaptiveAvgPool2d((1,1))
        
        self.excitation = nn.Sequential(
            nn.Linear(in_channel, (in_channel//r)),
            nn.ReLU(),
            nn.Linear((in_channel//r), in_channel),
            nn.Sigmoid()
            )
        
    def forward(self, x):
        block_input = x
        x = self.squeeze(x)
        x = x.view(x.size(0), -1)
        x = self.excitation(x)
        x = x.view(x.size(0), x.size(1), 1, 1)
        
        return x * block_input

In [9]:
class BasicBlock(nn.Module):
    expansion = 1
    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()

        # BatchNorm에 bias가 포함되어 있으므로, conv2d는 bias=False로 설정합니다.
        self.residual_function = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(),
            nn.Conv2d(out_channels, out_channels * BasicBlock.expansion, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(out_channels * BasicBlock.expansion),
            SEBlock(in_channel=out_channels * BasicBlock.expansion, r=16)
        )

        # identity mapping, input과 output의 feature map size, filter 수가 동일한 경우 사용.
        self.shortcut = nn.Sequential()

        self.relu = nn.ReLU()

        # projection mapping using 1x1conv
        if stride != 1 or in_channels != BasicBlock.expansion * out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels * BasicBlock.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels * BasicBlock.expansion)
            )

    def forward(self, x):
        x = self.residual_function(x) + self.shortcut(x)
        x = self.relu(x)
        return x

In [10]:
class BottleNeck(nn.Module):
    expansion = 4
    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()

        self.residual_function = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(),
            nn.Conv2d(out_channels, out_channels * BottleNeck.expansion, kernel_size=1, stride=1, bias=False),
            nn.BatchNorm2d(out_channels * BottleNeck.expansion),
            SEBlock(in_channel=out_channels * BottleNeck.expansion, r=16)
        )

        self.shortcut = nn.Sequential()

        self.relu = nn.ReLU()

        if stride != 1 or in_channels != out_channels * BottleNeck.expansion:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels*BottleNeck.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels*BottleNeck.expansion)
            )
            
    def forward(self, x):
        x = self.residual_function(x) + self.shortcut(x)
        x = self.relu(x)
        return x

In [11]:
class ResNet(nn.Module):
    def __init__(self, block, num_block, num_classes=14, init_weights=True):
        super().__init__()

        self.in_channels=64

        self.conv1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        )

        self.conv2_x = self._make_layer(block, 64, num_block[0], 1)
        self.conv3_x = self._make_layer(block, 128, num_block[1], 2)
        self.conv4_x = self._make_layer(block, 256, num_block[2], 2)
        self.conv5_x = self._make_layer(block, 512, num_block[3], 2)

        self.avg_pool = nn.AdaptiveAvgPool2d((1,1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

        # weights inittialization
        if init_weights:
            self._initialize_weights()

    def _make_layer(self, block, out_channels, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_channels, out_channels, stride))
            self.in_channels = out_channels * block.expansion

        return nn.Sequential(*layers)

    def forward(self,x):
        output = self.conv1(x)
        output = self.conv2_x(output)
        x = self.conv3_x(output)
        x = self.conv4_x(x)
        x = self.conv5_x(x)
        x = self.avg_pool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

    # define weight initialization function
    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)

In [12]:
class ResNetModel(nn.Module):
    '''
    train_dataset (dataset)         : 모형 학습에서 train dataset 로 사용할 데이터 셋
    val_dataset (dataset)           : 모형 학습에서 validation dataset 로 사용할 데이터 셋
    epoch (integer)              : 학습을 반복할 횟수
    learning_rate (float)        : 모형 학습에서 사용할 learning_rate
    batch_size (integer)         : 한 번 학습에 사용할 이미지의 개수
    loader_num_workers (integer) : DataLoader에서 사용할 코어의 개수
        
    GoogleNet 모형을 학습시키는 일련의 과정을 구현
    '''
    def __init__(self, train_dataset, val_dataset, num_classes, epoch, learning_rate, batch_size, loader_num_workers, init_weights, num_block):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.train_dataset = train_dataset
        self.val_dataset = val_dataset   
        self.num_classes = num_classes
        self.epoch = epoch
        self.learning_rate = learning_rate
        self.batch_size = batch_size
        self.loader_num_workers = loader_num_workers
        self.num_block = num_block
        self.patience_i = int(min(np.ceil(epoch*0.1), 10))
        
        
    def ResNetTrain(self):
        # resnet18
        # ResNet(BasicBlock, [2,2,2,2])
        # resnet34
        # ResNet(BasicBlock, [3, 4, 6, 3])
        # resnet50
        # ResNet(BottleNeck, [3,4,6,3])
        # resnet101
        # ResNet(BottleNeck, [3, 4, 23, 3])
        # resnet152
        # ResNet(BottleNeck, [3, 8, 36, 3])
        model = ResNet(BottleNeck, num_block=self.num_block, num_classes=self.num_classes, init_weights=True).to(self.device)
        
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(model.parameters(), lr=self.learning_rate, weight_decay=1e-4)
        lr_scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=self.patience_i, verbose=True)
        
        train_loader = DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=True, num_workers=self.loader_num_workers)
        val_loader = DataLoader(self.val_dataset, batch_size=self.batch_size, shuffle=False, num_workers=self.loader_num_workers)
        dataloaders = {"train": train_loader, "val": val_loader}
                
        since = time.time()
        train_acc_history = []
        train_loss_history = []
        val_acc_history = []
        val_loss_history = []
        best_model_wts = copy.deepcopy(model.state_dict())
        best_acc = 0.0
        use_auxiliary=False

        for i in range(self.epoch):
            epoch_time = time.time()
            print('Epoch {}/{}'.format(i+1, self.epoch))
            print('-' * 10)

            for phase in ['train', 'val']: # Each epoch has a training and validation phase
                if phase == 'train':
                    model.train()  # Set model to training mode
                else:
                    model.eval()   # Set model to evaluate mode

                running_loss = 0.0
                running_corrects = 0

                for inputs, labels in dataloaders[phase]: # Iterate over dataset

                    inputs = inputs.to(self.device)

                    labels = labels.to(self.device)

                    optimizer.zero_grad() # Zero the parameter gradients

                    with torch.set_grad_enabled(phase == 'train'): # Forward. Track history if only in train

                        if phase == 'train': # Backward + optimize only if in training phase
                            if use_auxiliary:
                                outputs, aux1, aux2 = model(inputs)
                                loss = criterion(outputs, labels) + 0.3 * criterion(aux1, labels) + 0.3 * criterion(aux2, labels)
                            else:
                                outputs = model(inputs)
                                loss = criterion(outputs, labels)

                            _, preds = torch.max(outputs, 1)
                            loss.backward()
                            optimizer.step()

                        if phase == 'val':
                            outputs = model(inputs)
                            loss = criterion(outputs, labels)
                            _, preds = torch.max(outputs, 1)

                    # Statistics
                    running_loss += loss.item() * inputs.size(0)
                    running_corrects += torch.sum(preds == labels.data)

                epoch_loss = running_loss / len(dataloaders[phase].dataset)

                if phase == 'val': # Adjust learning rate based on val loss
                    lr_scheduler.step(epoch_loss)

                epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)
                
                print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))
                e_time = time.time() - epoch_time

                # deep copy the model
                if phase == 'val' and epoch_acc > best_acc:
                    best_epoch = i+1
                    best_acc = epoch_acc
                    best_model_wts = copy.deepcopy(model.state_dict())
                if phase == 'val':
                    val_acc_history.append(float(epoch_acc))
                    val_loss_history.append(float(epoch_loss))
                if phase == 'train':
                    train_acc_history.append(float(epoch_acc))
                    train_loss_history.append(float(epoch_loss))

            print('Epoch Time : {:.0f}m {:.0f}s'.format(e_time//60, e_time%60))
            print()

        time_elapsed = time.time() - since
        print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
        print('Best Model is {0}'.format(best_epoch))
        print('Best val Acc: {:4f}'.format(best_acc))

        # load best model weights
        model.load_state_dict(best_model_wts)
        
        return model, self.epoch, train_loss_history, train_acc_history, val_loss_history, val_acc_history

In [13]:
model, epoch, train_loss_history, train_acc_history, val_loss_history, val_acc_history = ResNetModel(train_dataset = [i[0:2] for i in traindataset],
                                                                                                     val_dataset = [i[0:2] for i in valdataset],
                                                                                                     num_classes = 12, epoch = 20, 
                                                                                                     learning_rate = 0.001, batch_size = 64,
                                                                                                     loader_num_workers = 2,
                                                                                                     init_weights = True,
                                                                                                     num_block = [3, 8, 36, 3]).ResNetTrain()
if (os.path.exists('./models')==False):
    os.makedirs('./models')
torch.save(model, f'./models/ResNet_SE_Lifecycle_AA_Final.pt')
torch.save(model.state_dict(), f'./models/ResNet_SE_stat_Lifecycle_AA_Final.pt')

Epoch 1/20
----------


OutOfMemoryError: CUDA out of memory. Tried to allocate 98.00 MiB (GPU 0; 6.00 GiB total capacity; 3.83 GiB already allocated; 0 bytes free; 3.89 GiB reserved in total by PyTorch) If reserved memory is >> allocated memory try setting max_split_size_mb to avoid fragmentation.  See documentation for Memory Management and PYTORCH_CUDA_ALLOC_CONF

In [16]:
try:
    if torch.cuda.is_available():
        device = torch.device('cuda')
    else:
        device = torch.device('cpu')
except:
    device = torch.device('cpu')
    
Image.MAX_IMAGE_PIXELS = None
try:
    torch.backends.cudnn.deterministic = True    
    torch.backends.cudnn.benchmark = False 
except:
    pass
np.random.seed(random_seed)
try:
    torch.cuda.manual_seed(random_seed)
    torch.cuda.manual_seed_all(random_seed)
except:
    pass

model = torch.load('./models/GoogleNet_Lifecycle_AA_Final.pt', map_location=device)
model.load_state_dict(torch.load('./models/GoogleNet_stat_Lifecycle_AA_Final.pt', map_location=device))
model = model.to(device)

test_loader = DataLoader([x[:3] for x in testdataset], batch_size=4, shuffle=False, num_workers=0)

label_list = list()
prediction_list = list()

print('GoogleNet Test 결과 출력', '(', datetime.datetime.now(), ')')

classes = ['EG_NA', 'LA_NA', 'PU_NA', 'AB_LI', 'AB_CA', 'AB_BI', 'AB_AP','QB_LI', 'QB_CA', 'QB_BI', 'QB_AP','AA_NA']
correct_pred = {classname: 0 for classname in classes}
total_pred = {classname: 0 for classname in classes}

print(f'{device}로 결과를 출력중입니다. cpu로 계산할 경우 시간이 오래 걸리니 기다려주세요.', '(', datetime.datetime.now(), ')\n')

model.eval()
with torch.no_grad():
    for data in test_loader:
        images, labels, _ = data
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)

        _, predictions = torch.max(outputs[0], 1)    

        for label, prediction in zip(labels, predictions):
            if label == prediction:
                correct_pred[classes[label]] += 1
            total_pred[classes[label]] += 1
            label_list.append(label.tolist())
            prediction_list.append(prediction.tolist())
mean = 0
for classname, correct_count in correct_pred.items():
    accuracy = 100 * float(correct_count) / total_pred[classname]
    print(f'Accuracy for class : {classname:7s} is {accuracy:.2f} %', '(', datetime.datetime.now(), ')')
    mean += accuracy
print(f'\nTotal classes : {mean/len(classes):.2f} %', '(', datetime.datetime.now(), ')')

GoogleNet Test 결과 출력 ( 2023-12-05 17:02:04.514566 )
cuda로 결과를 출력중입니다. cpu로 계산할 경우 시간이 오래 걸리니 기다려주세요. ( 2023-12-05 17:02:04.514566 )



AttributeError: 'InceptionBlock' object has no attribute 'se_block'

# 유효성 증빙

In [None]:
# 1. 개별 결과값 Test data set

test_ID=[]
for i in range(len(testdataset)):
    ID=testdataset[i][2]
    test_ID.append(ID)

    
test_ID=pd.DataFrame(test_ID)
label_data=pd.DataFrame(label_list)
pred_data=pd.DataFrame(prediction_list)

df_list=pd.concat([test_ID,label_data],axis=1)
df_list=pd.concat([df_list,pred_data],axis=1)
df_list.columns=['file_name','label','prediction']
df_list

# 문제당 개별 결과값 저장
df_list.to_csv('./filelist'+'문제 당 개별 결과값.csv',encoding='cp949',index=False)

In [None]:
# 2. confusion matrix

cf=confusion_matrix(label_list, prediction_list)

cf=pd.DataFrame(cf).rename(index={0:'True_백묵병음성',1:'True_백묵병양성'}
                             ,columns={0:'Pred_백묵병음성',1:'pred_백묵병양성'})
#계산할 때 사용된 값 저장 (Confusion matrix 기반 TP, FP, TN, FN)
cf.to_csv('./filelist'+'Confusion Matrix 기반 TP, FP, TN, FN(백묵병 분류 모델).csv',encoding='cp949')