In [None]:
import PIL
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import numpy as np
import json
import os
from tqdm import tqdm

In [None]:
if torch.cuda.is_available():
  device = torch.device('cuda')
else:
  device = torch.device('cpu')

In [None]:
# 구글 드라이브 불러오기
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# tensor를 .npy 파일로 저장하는 함수
# data_np 를 경로 '/content/drive/MyDrive/DF/전처리 파일/filename.npy'로 저장
def tensor_toFile(data, filename):
  data_np = data.cpu().numpy()
  np.save('/content/drive/MyDrive/DF/전처리 파일/'+filename, data_np)


# .npy 파일 불러와서 tensor로 저장하는 함수
def file_toTensor(filename):
  np_load = np.load('/content/drive/MyDrive/DF/전처리 파일/'+filename+'.npy')
  img_tensor = torch.from_numpy(np_load)
  return img_tensor

## 데이터 불러와서 합치기

In [None]:
PP_data = file_toTensor('PP_data')
PP_data = F.interpolate(PP_data, scale_factor=1/8, mode='bicubic')
PP_data.size()

torch.Size([1700, 3, 32, 32])

In [None]:
PS_data = file_toTensor('ps_data')
PS_data = F.interpolate(PS_data, scale_factor=1/8, mode='bicubic')
PS_data.size()

torch.Size([930, 3, 32, 32])

In [None]:
PE_data = file_toTensor('pe_data')
PE_data = F.interpolate(PE_data, scale_factor=1/8, mode='bicubic')
PE_data.size()

torch.Size([3152, 3, 32, 32])

In [None]:
PET_data = file_toTensor('PET_data')
PET_data = F.interpolate(PET_data, scale_factor=1/8, mode='bicubic')
PET_data.size()

torch.Size([2524, 3, 32, 32])

In [None]:
PET_class = file_toTensor('PET_class')
PP_class = file_toTensor('PP_class')
PS_class = file_toTensor('ps_class')
PE_class = file_toTensor('pe_class')

In [None]:
PET_data.size(), PP_data.size(), PS_data.size(), PE_data.size(), PET_class.size(), PP_class.size(), PS_class.size(), PE_class.size()

(torch.Size([2524, 3, 32, 32]),
 torch.Size([1700, 3, 32, 32]),
 torch.Size([930, 3, 32, 32]),
 torch.Size([3152, 3, 32, 32]),
 torch.Size([2524]),
 torch.Size([1700]),
 torch.Size([930]),
 torch.Size([3152]))

In [None]:
data = torch.vstack((PET_data, PP_data, PS_data, PE_data))

In [None]:
data.size()

torch.Size([8306, 3, 32, 32])

In [None]:
target = torch.cat((PET_class, PP_class, PS_class, PE_class))

In [None]:
target.size()

torch.Size([8306])

In [None]:
# PET=0 (2524), PP=1 (1700), PS=2 (930), PE=3 (3152)
target[0:2524] = 0
target[2524:4224] = 1
target[4224:5154]=2
target[5154:] = 3

target.unique()

tensor([0, 1, 2, 3])

In [None]:
tensor_toFile(data, 'data')
tensor_toFile(target, 'target')

## 모델링

In [None]:
data = file_toTensor('data')
target = file_toTensor('target')
print('데이터 크기:',data.size())
print('타겟 크기:',target.size())

데이터 크기: torch.Size([8306, 3, 32, 32])
타겟 크기: torch.Size([8306])


In [None]:
# PET=0 (2524), PP=1 (1700), PS=2 (930), PE=3 (3152)
print(target.unique())
print(target.bincount())

tensor([0, 1, 2, 3])
tensor([2524, 1700,  930, 3152])


In [None]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(data, target, test_size=0.3, shuffle=True,
                                                    stratify=target, random_state=42)

In [None]:
print('train 데이터 크기:',X_train.size())
print('test 데이터 크기:',X_test.size())

train 데이터 크기: torch.Size([5814, 3, 32, 32])
test 데이터 크기: torch.Size([2492, 3, 32, 32])


In [None]:
class IdentityPadding(nn.Module):
  def __init__(self, ch_in, ch_out, stride):
    super(IdentityPadding, self).__init__()

    self.pooling = nn.MaxPool2d(1, stride=stride)
    self.add_channels = ch_out - ch_in

  def forward(self, x):
    output = F.pad(x, (0,0,0,0,0,self.add_channels))
    output = self.pooling(output)
    return output

In [None]:
class ResidualBlock(nn.Module):
  def __init__(self, ch_in, ch_out, stride=1, down_sample=False):
    super(ResidualBlock, self).__init__()
    self.conv1 = nn.Conv2d(ch_in, ch_out, kernel_size=3, stride=stride, padding=1, bias=False)
    self.bn1 = nn.BatchNorm2d(ch_out)
    self.relu = nn.ReLU(inplace=True)

    self.conv2 = nn.Conv2d(ch_out, ch_out, kernel_size=3, stride=1, padding=1, bias=False)
    self.bn2 = nn.BatchNorm2d(ch_out)
    self.stride = stride

    if down_sample:
      self.down_sample = IdentityPadding(ch_in, ch_out, stride)
    else:
      self.down_sample = None

  def forward(self, x):
    shortcut = x

    output = self.conv1(x)
    output = self.bn1(output)
    output = self.relu(output)

    output = self.conv2(output)
    output = self.bn2(output)

    if self.down_sample is not None:
      shortcut = self.down_sample(x)

    output += shortcut
    output = self.relu(output)

    return output

In [None]:
class ResNet(nn.Module):
  def __init__(self, block, num_layer, num_classes=4):
    super(ResNet, self).__init__()
    self.n_layer = num_layer
    self.conv1 = nn.Conv2d(3, 16, 3, 1, 1, bias=False)
    self.bn1 = nn.BatchNorm2d(16)
    self.relu = nn.ReLU(inplace=True)

    self.layer_2n = self.make_layer(block, 16, 16, stride=1)
    self.layer_4n = self.make_layer(block, 16, 32, stride=2)
    self.layer_6n = self.make_layer(block, 32, 64, stride=2)

    self.avgpool = nn.AvgPool2d(8, stride=1)
    self.fc = nn.Linear(64, num_classes)


  def make_layer(self, block, ch_in, ch_out, stride):
    if stride == 2:
      down_sample = True
    else:
      down_sample = False

    layer = nn.ModuleList([block(ch_in, ch_out, stride, down_sample)])

    for i in range(self.n_layer-1) :
      layer.append(block(ch_out, ch_out))

    return nn.Sequential(*layer)


  def forward(self, x):
    x = self.conv1(x)
    x = self.bn1(x)
    x = self.relu(x)

    x = self.layer_2n(x)
    x = self.layer_4n(x)
    x = self.layer_6n(x)

    x = self.avgpool(x)
    x = x.view(x.size(0), -1)
    x = self.fc(x)
    return x

In [None]:
num_classes = 4
num_layer = 3

model = ResNet(ResidualBlock, num_layer, num_classes).to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

In [None]:
# Resnet 학습 코드
epochs = 50
batch_size = 64
total_batch_num = np.ceil(len(X_train)/batch_size)

model.train()
for epoch in range(epochs):
  model.train()
  avg_cost = 0

  for i in range(0, len(X_train), batch_size):
    b_x = X_train[i:i+batch_size]
    b_x = b_x.to(device)
    b_y = y_train[i:i+batch_size]
    b_y = b_y.to(device)

    logits = model(b_x) # forward propagation
    loss = criterion(logits, b_y) # get cost

    avg_cost += loss / total_batch_num
    optimizer.zero_grad()
    loss.backward() # backward propagation
    optimizer.step() # update parameters

  print('Epoch : {} / {}, cost : {}'.format(epoch+1, epochs, avg_cost))

Epoch : 1 / 50, cost : 0.65633624792099
Epoch : 2 / 50, cost : 0.4083254039287567
Epoch : 3 / 50, cost : 0.32314223051071167
Epoch : 4 / 50, cost : 0.2793947756290436
Epoch : 5 / 50, cost : 0.24513591825962067
Epoch : 6 / 50, cost : 0.2014555037021637
Epoch : 7 / 50, cost : 0.1869429647922516
Epoch : 8 / 50, cost : 0.17372514307498932
Epoch : 9 / 50, cost : 0.15326380729675293
Epoch : 10 / 50, cost : 0.12183242291212082
Epoch : 11 / 50, cost : 0.10489952564239502
Epoch : 12 / 50, cost : 0.09579841047525406
Epoch : 13 / 50, cost : 0.08716394752264023
Epoch : 14 / 50, cost : 0.06082341447472572
Epoch : 15 / 50, cost : 0.07120712101459503
Epoch : 16 / 50, cost : 0.04488501697778702
Epoch : 17 / 50, cost : 0.04359811916947365
Epoch : 18 / 50, cost : 0.043434835970401764
Epoch : 19 / 50, cost : 0.04015945643186569
Epoch : 20 / 50, cost : 0.03561476618051529
Epoch : 21 / 50, cost : 0.03253360465168953
Epoch : 22 / 50, cost : 0.037687379866838455
Epoch : 23 / 50, cost : 0.025764914229512215
E

In [None]:
# Resnet 정확도 측정 코드

total = len(X_test)
model.eval()

x = X_test.to(device)
y = y_test.to(device)

with torch.no_grad():
  logits = model(x)

predicts = torch.argmax(logits, dim=1)
correct = (predicts == y).sum().item()

print(f'Accuracy of the network on test images: {100 * correct / total} %')