In [None]:
from torch.utils.data import random_split
import torchvision.datasets as ds
import os
import re
import shutil
import torchvision.transforms as tf

In [None]:
import sys
import pandas as pd

In [None]:
import numpy as np
from torch.utils.data import DataLoader, Dataset
import torch.nn as nn
import torch
from tqdm import tqdm
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from sklearn.model_selection import train_test_split


In [None]:
def get_dataset(root_dir):
    dataset = ds.ImageFolder(
        root=root_dir,
        transform=tf.Compose([
            tf.Resize(224),
            tf.CenterCrop(224),
            tf.ToTensor(),
        ])
    )
    dlen = len(dataset)
    ratioList = [dlen - (dlen//5), dlen//5]
    tmp, test = random_split(dataset=dataset, lengths=ratioList)

    tlen = len(tmp)
    ratioList = [tlen - (tlen//5), tlen//5]
    train, val = random_split(dataset=tmp, lengths=ratioList)
    return train, val, test


In [None]:
def conv3x3(in_planes, out_planes, stride=1):
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, padding=1, bias=False)


def conv1x1(in_planes, out_planes, stride=1):
    return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)


class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(ResidualBlock, self).__init__()

        self.conv1 = conv3x3(in_channels, out_channels, stride)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)

        self.conv2 = conv3x3(out_channels, out_channels)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.downsample = None
        if stride != 1:
            self.downsample = nn.Sequential(
                conv1x1(in_planes=in_channels, out_planes=out_channels, stride=stride),
                nn.BatchNorm2d(out_channels)
            )
        
    def forward(self, x):
        residual = x
        # print("1 ", o.shape)
        out = self.conv1(x)
        # print("2 ", out.shape)
        out = self.bn1(out)
        # print("3 ", out.shape)
        out = self.relu(out)
        # print("4 ", out.shape)
        out = self.conv2(out)
        # print("5 ", out.shape)
        out = self.bn2(out)
        # print("6 ", out.shape)
        if self.downsample is not None:
            residual = self.downsample(x)
            # print("DOWNSAMPLE ", out.shape)
        out += residual
        out = self.relu(out)
        return out

"""
image의 높이, 넓이가 둘 다 32의 배수여야 함. 그 외의 경우는 해봐야 알 것 같음
제일 바람직한 상황은 이미지가 224 x 224인 경우. (ImageNet)
정사각형 아니고, 둘다 32의 배수기만 해도 돌아감
"""
class ResNet18(nn.Module):
    def __init__(self, in_channels, heigth=224, width=224, labelNum=1000):
        super(ResNet18, self).__init__()

        self.conv = nn.Conv2d(in_channels=in_channels, out_channels=64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxp = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.block1 = nn.Sequential(
            ResidualBlock(in_channels=64, out_channels=64),
            ResidualBlock(in_channels=64, out_channels=64)
        )
        self.block2 = nn.Sequential(
            ResidualBlock(in_channels=64, out_channels=128, stride=2),
            ResidualBlock(in_channels=128, out_channels=128)
        )
        self.block3 = nn.Sequential(
            ResidualBlock(in_channels=128, out_channels=256, stride=2),
            ResidualBlock(in_channels=256, out_channels=256)
        )
        self.block4 = nn.Sequential(
            ResidualBlock(in_channels=256, out_channels=512, stride=2),
            ResidualBlock(in_channels=512, out_channels=512)
        )
        self.avgp = nn.AvgPool2d(kernel_size=(width//32, heigth//32))
        self.fc = nn.Linear(in_features=512, out_features=labelNum)

    
    def forward(self, x):
        # print('0 ', x.shape)
        o = self.conv(x)
        # print('1 ', o.shape)
        o = self.maxp(o)
        # print('2 ', o.shape)
        o = self.block1(o)
        # print('3 ', o.shape)
        o = self.block2(o)
        # print('4 ', o.shape)
        o = self.block3(o)
        # print('5 ', o.shape)
        o = self.block4(o)
        # print('6 ', o.shape)
        o = self.avgp(o)
        # print('7 ', o.shape)
        o = o.view(o.size(0), -1)
        # print('8 ', o.shape)
        o = self.fc(o)
        return o

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
IMAGE_ROOT = '/content/drive/MyDrive/kcars'
BATCH_SIZE = 16
LR = 0.001
EPOCH = 30


gpu_flag = torch.cuda.is_available()
device = torch.device("cuda" if gpu_flag else "cpu")
# device = torch.device("cpu")
print(device)

# 한번만 실행하면 됨
#reorg_root(IMAGE_ROOT)

trainSet, valSet, testSet = get_dataset(IMAGE_ROOT)





In [None]:
#instead of using test, just use validate group
def train():
  gpu_flag = torch.cuda.is_available()
  device = torch.device("cuda" if gpu_flag else "cpu")
  print(device)


  model = ResNet18(in_channels=3, labelNum=100)
  optim = torch.optim.Adam(model.parameters(), lr=LR)
  loss_func = nn.CrossEntropyLoss()

  train_loss_arr = []
  val_loss_arr = []
  val_acc_arr = []

  model.to(device)
  for epoch in range(1, EPOCH+1):
      train_loss = 0
      val_loss = 0
      val_acc = 0
      for i, (x,y) in enumerate(tqdm(train_dataloader)):
          model.train()
          x, y = x.to(device), y.to(device)
          optim.zero_grad()
          pred = model(x)

          loss = loss_func(pred, y)
          train_loss += loss.item()

          loss.backward()
          optim.step()
          # break
      
      for i, (x,y) in enumerate(val_dataloader):
          model.eval()
          x, y = x.to(device), y.to(device)
          pred = model(x)

          loss = loss_func(pred, y)
          val_loss += loss.item()

          check = torch.argmax(pred, dim=1) == y
          val_acc += check.float().mean()
          # break

      train_loss = train_loss / len(train_dataloader)
      val_loss = val_loss / len(val_dataloader)
      val_acc = val_acc/len(val_dataloader)
      train_loss_arr.append(train_loss)
      val_loss_arr.append(val_loss)
      val_acc_arr.append(val_acc)

      print(f'epoch: {epoch}, train_loss: {train_loss}, val_loss: {val_loss}, acc: {val_acc}')
      
    
  state = { 'model': model.state_dict(), 'optimizer':optim.state_dict(), 'epoch': epoch }   
  torch.save(state, '/content/drive/MyDrive/result')
      


In [None]:
BATCH_SIZE = 16
LR = 0.001
EPOCH = 30

train_dataloader = DataLoader(trainSet, batch_size=BATCH_SIZE, num_workers=1)
val_dataloader = DataLoader(valSet, batch_size=BATCH_SIZE, num_workers=1)
test_dataloader = DataLoader(testSet, batch_size=BATCH_SIZE, num_workers=1)
train()

In [None]:
BATCH_SIZE = 16
LR = 0.003
EPOCH = 50

train_dataloader = DataLoader(trainSet, batch_size=BATCH_SIZE, num_workers=1)
val_dataloader = DataLoader(valSet, batch_size=BATCH_SIZE, num_workers=1)
test_dataloader = DataLoader(testSet, batch_size=BATCH_SIZE, num_workers=1)

train()

In [None]:
model = ResNet18(in_channels=3, labelNum=100)
optim = torch.optim.Adam(model.parameters(), lr=LR)
loss_func = nn.CrossEntropyLoss()

checkpoint = torch.load('/content/drive/MyDrive/result')
model.load_state_dict(checkpoint['model'])
optim.load_state_dict(checkpoint['optimizer'])
start_epoch = checkpoint['epoch']+1


In [None]:
test_loss = 0
test_acc = 0
gpu_flag = torch.cuda.is_available()
device = torch.device("cuda" if gpu_flag else "cpu")

print(device)
for i, (x,y) in enumerate(test_dataloader):
    if torch.cuda.is_available():
      model.cuda()
      x, y = x.to(device), y.to(device)
    model.eval()
    pred = model(x)

    loss = loss_func(pred, y)
    test_loss += loss.item()

    check = torch.argmax(pred, dim=1) == y
    test_acc += check.float().mean()
    # break

test_loss = test_loss / len(test_dataloader)
test_acc = test_acc /len(test_dataloader)
print(f'Accuracy: {test_acc * 100 :.2f}%')

In [None]:
filter_map = []
for param in list(model.parameters()):
  if param.dim() == 4:
    print(param.shape)
    filter_map.append(param)

In [None]:
#Make the function for feature map visualization
#tensor = (input filter), ch = (channel of filter you want to see)
import matplotlib.pyplot as plt
import torchvision.datasets as dataset
import torchvision.transforms as transforms
from torchvision import utils

def visTensor(tensor, ch=3, allkernels=False, nrow=8, padding=1): 
  tensor = tensor.cpu()
  n,c,w,h = tensor.shape

  if allkernels: 
    tensor = tensor.view(n*c, -1, w, h)
  elif c != 3: 
    tensor = tensor[:,ch,:,:].unsqueeze(dim=1)
    
  rows = np.min((tensor.shape[0] // nrow + 1, 64))    
  grid = utils.make_grid(tensor, nrow=nrow, normalize=True, padding=padding)
  plt.figure( figsize=(nrow,rows) )
  plt.imshow(grid.numpy().transpose((1, 2, 0)))

In [None]:
for n, weight in enumerate(filter_map):
  ch = 0
  visTensor(weight, ch=0, allkernels=False)
  plt.title(f'{n+1}\'th convolution layer {ch+1}\'th channel filter')

In [None]:
# 4. Return the best set of hyper-parameters for the model.
print(f'Accuracy: {gs.best_score_ * 100 :.2f}%')
print(gs.best_params_)