In [1]:
import glob
import os
import torchvision.transforms as transforms
import torchvision
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
import numpy as np
import cv2
import torch

from torchvision import datasets, models, transforms
from PIL import Image
from torch.utils.data import Dataset, DataLoader

In [2]:
# torch version gpu 사용 유무 체크
print("torch version >>", torch.__version__)
print("GPU Available >> ", torch.cuda.is_available())

torch version >> 1.9.0+cu102
GPU Available >>  True


In [3]:
# device 설정 GPU 사용 가능 여부에 따라 device 정보 저장
device = "cuda" if torch.cuda.is_available() else "cpu"

In [4]:
# data check function
# output_dir = "./content/drive/MyDrive/classification/cat_dog_data"


def dataset_check(output_dir):
  try:
    # output_dir
    # train data check
    all_train_cat = glob.glob(os.path.join(output_dir, "train", "cat", "*.jpg"))
    all_train_dog = glob.glob(os.path.join(output_dir, "train", "dog", "*.jpg"))

    # val data check
    all_val_cat = glob.glob(os.path.join(output_dir, "val", "cat", "*.jpg"))
    all_val_dog = glob.glob(os.path.join(output_dir, "val", "dog", "*.jpg"))


    # test data check
    all_test_cat = glob.glob(os.path.join(output_dir, "test", "cat", "*.jpg"))
    all_test_dog = glob.glob(os.path.join(output_dir, "test", "dog", "*.jpg"))

    # print
    print("train cat image number >> ", len(all_train_cat))
    print("train cat image number >> ", len(all_train_dog))

    print("train cat image number >> ", len(all_val_cat))
    print("train cat image number >> ", len(all_val_dog))

    print("train cat image number >> ", len(all_test_cat))
    print("train cat image number >> ", len(all_test_dog))

  except Exception as e:
    print("check error 코드 구현을 완료했는지 검토하십시요~")
    print(e)


# data location
data_dir = "/content/drive/MyDrive/classification/cat_dog_data"
# data_dir = "/content/cat_dog_data"

# data check
dataset_check(data_dir)

train cat image number >>  1000
train cat image number >>  1000
train cat image number >>  500
train cat image number >>  500
train cat image number >>  1000
train cat image number >>  1000


In [5]:
# hyper parameter setting

batch_size = 32
num_epochs = 40
learning_rate = 0.01

In [6]:
class catdogDataset(Dataset):
  def __init__(self, data_dir, mode, transform=None):
    # 데이터 정의
    # data_dir/train/cat/images
    self.all_data = sorted(glob.glob(os.path.join(data_dir, mode, "*", "*")))
    self.transform = transform

  def __getitem__(self, item):
    # 데이터 이미지 읽기
    data_path = self.all_data[item]
    img = Image.open(data_path)
    img = img.convert("RGB")

    # label 읽기
    label_name = os.path.basename(data_path)
    label_str = str(label_name)

    if label_str.startswith("cat") == True:
      label = 0
    else:
      label = 1

    # transform is not None
    if self.transform is not None:
      img = self.transform(img)

    return img, label


  def __len__(self):
    length = len(self.all_data)

    return length

In [7]:
from torchvision.transforms.functional import normalize
# data transforms
# image resize -> 224 224
data_transforms = {
    'train': transforms.Compose([
        transforms.RandomRotation(5),
        transforms.RandomHorizontalFlip(p=0.5),
        transforms.RandomVerticalFlip(p=0.5),
        transforms.Resize([224, 224]),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize([224, 224]),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'test': transforms.Compose([
        transforms.Resize([224, 224]),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
}

In [8]:
# data 정의 및 dataloader 코드 구현
# dataset (train, val, test)
train_data_set = catdogDataset(data_dir="/content/drive/MyDrive/classification/cat_dog_data", mode="train", transform=data_transforms["train"])
val_data_set = catdogDataset(data_dir="/content/drive/MyDrive/classification/cat_dog_data", mode="val", transform=data_transforms["val"])
test_data_set = catdogDataset(data_dir="/content/drive/MyDrive/classification/cat_dog_data", mode="test", transform=data_transforms["test"])


# data loader
train_loader = DataLoader(train_data_set, batch_size=batch_size, shuffle=True, drop_last=True)
val_loader = DataLoader(val_data_set, batch_size=batch_size, shuffle=False, drop_last=True)
test_loader = DataLoader(test_data_set, batch_size=batch_size, shuffle=False, drop_last=True)

In [9]:
# 간단한 모델 구성

class SimpleCNN(nn.Module):
  def __init__(self):
    super(SimpleCNN, self).__init__()

    #self conv 구현
    self.conv = nn.Sequential(
        nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3),
        nn.BatchNorm2d(32),
        nn.ReLU(),
        nn.MaxPool2d(2),

        nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3),
        nn.BatchNorm2d(64),
        nn.ReLU(),
        nn.MaxPool2d(2),

        nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3),
        nn.BatchNorm2d(128),
        nn.ReLU(),
        nn.MaxPool2d(2),

        nn.Conv2d(in_channels=128, out_channels=128, kernel_size=3),
        nn.BatchNorm2d(128),
        nn.ReLU(),
        nn.MaxPool2d(2),

        nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3),
        nn.BatchNorm2d(256),
        nn.ReLU(),
        nn.MaxPool2d(2),
    )


    # fc1, fc2

    self.fc1 = nn.Linear(256 * 5 * 5, 512)
    self.fc2 = nn.Linear(512, 2)

  def forward(self, x):
    x = self.conv(x)
    x = x.view(x.shape[0], -1)
    x = F.relu(self.fc1(x))
    x = self.fc2(x)

    return x


In [10]:
net = SimpleCNN().to(device)
# print(net)

# vgg16 사용시
# net = models.vgg16(pretrained=True).to(device)
# net.classifier[6] = nn.Linear(4096, 2)

# resnet18 사용시
# net = models.resnet18(pretrained=True).to(device)
# num_ftrs = net.fc.in_features
# net.fc = nn.Linear(num_ftrs, 2)


model = net.to(device)

In [11]:
# loss function, optimizer scheduler
criterion = nn.CrossEntropyLoss().to(device)
optimizer = optim.SGD(net.parameters(), lr=learning_rate, momentum=0.9)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.1)

In [12]:
def train(num_epochs, model, data_loader, criterion, optimizer, save_dir, val_every, device):
  print("Start Training")
  best_loss = 9999


  for epoch in range(num_epochs):
    for i, (imgs, labels) in enumerate(data_loader):
      images, labels = imgs.to(device), labels.to(device)

      outputs = net(images)

      loss = criterion(outputs, labels)

      optimizer.zero_grad()
      loss.backward()
      optimizer.step()

      _,  argmax = torch.max(outputs, 1)
      accuracy = (labels == argmax).float().mean()

      if (i + 1) % 3 == 0:
        print("epoch [{}/{}], step [{}/{}], loss: {:.4f}, Accuracy: {:.2f}%"
        .format(epoch + 1, num_epochs, i+1, len(train_loader), loss.item(), accuracy.item() * 100))

    if (epoch + 1) % val_every == 0:
      avrg_loss = validation(epoch+1, model, val_loader, criterion, device)

      if avrg_loss < best_loss:
        print("Best Performace at epoch: {}".format(epoch+1))
        print("save model in", save_dir)
        best_loss = avrg_loss
        save_model(model, save_dir)

In [13]:
def validation(epoch, model, data_loader, criterion, device):
  print("Start val")
  model.eval()

  with torch.no_grad():
    total = 0
    correct = 0
    total_loss = 0
    cnt = 0

    for i, (imgs, labels) in enumerate(data_loader):
      images, labels = imgs.to(device), labels.to(device)
      output = net(images)

      loss = criterion(output, labels)

      total += imgs.size(0)
      _, argmax = torch.max(output, 1)
      accuracy = (labels == argmax).float().mean()

      correct += (labels == argmax).sum().item()
      total_loss += loss
      cnt += 1


    avrg_loss = total_loss/cnt
    print("Validation #{} Accuracy {:.2f} % Average Loss: {:.4f}".format(epoch, correct/total * 100, avrg_loss))

  model.train()

  return avrg_loss

In [17]:
# save model

def save_model(model, save_dir, file_name="best_model_simple_cnn.pth"):
  os.makedirs(save_dir, exist_ok=True)
  output_path = os.path.join(save_dir, file_name)
  torch.save(net.state_dict(), output_path)

In [18]:
# test

def test(model, data_loader, device):
  print("Start test")

  correct = 0
  total = 0
  with torch.no_grad():

    for i, (img, labels) in enumerate(data_loader):
      imgs, labels = imgs.to(device), labels.to(device)

      outputs = net(img)
      _, argmax = torch.max(outputs, 1)
      total += imgs.size(0)
      correct += (labels == argmax).sum().item()

    print("Test Accuracy for {} images: {:.2f}%".format(total, correct / total * 100))

In [19]:
# model = net
val_every = 1
save_dir = "/content/drive/MyDrive/classification/weights/"

train(num_epochs, model, train_loader, criterion, optimizer, save_dir, val_every, device)

Start Training


  return torch.max_pool2d(input, kernel_size, stride, padding, dilation, ceil_mode)


epoch [1/40], step [3/62], loss: 0.7386, Accuracy: 40.62%
epoch [1/40], step [6/62], loss: 0.5984, Accuracy: 65.62%
epoch [1/40], step [9/62], loss: 0.8684, Accuracy: 53.12%
epoch [1/40], step [12/62], loss: 0.7242, Accuracy: 59.38%
epoch [1/40], step [15/62], loss: 0.6338, Accuracy: 59.38%
epoch [1/40], step [18/62], loss: 0.6558, Accuracy: 59.38%
epoch [1/40], step [21/62], loss: 0.6825, Accuracy: 59.38%
epoch [1/40], step [24/62], loss: 0.6391, Accuracy: 71.88%
epoch [1/40], step [27/62], loss: 0.6631, Accuracy: 62.50%
epoch [1/40], step [30/62], loss: 0.6419, Accuracy: 59.38%
epoch [1/40], step [33/62], loss: 0.6238, Accuracy: 65.62%
epoch [1/40], step [36/62], loss: 0.6330, Accuracy: 62.50%
epoch [1/40], step [39/62], loss: 0.5625, Accuracy: 78.12%
epoch [1/40], step [42/62], loss: 0.6246, Accuracy: 65.62%
epoch [1/40], step [45/62], loss: 0.7116, Accuracy: 56.25%
epoch [1/40], step [48/62], loss: 0.6131, Accuracy: 75.00%
epoch [1/40], step [51/62], loss: 0.6804, Accuracy: 56.25%
