In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
%cd 'Your Folder'

In [None]:
import glob
from pathlib import Path
import random
import numpy as np
import json
from PIL import Image
from tqdm import tqdm
import matplotlib.pyplot as plt
%matplotlib inline

import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import torchvision
from torchvision import models, transforms

In [None]:
BATCH_SIZE = 32
CLASSES = 2
NUM_EPOCHS = 2
PRETRAINED_FLG = True
SEED = 41
VERBOSE = 0 # When it's value == 1, printing for check validate

In [None]:
"""
想定フォルダ
- data
    - ***
        - train
            - dataset1
            - dataset2
            ...
        - val
            - dataset1
            - dataset2
            ...
"""
train_path = Path('data', '***', 'train')
val_path = Path('data', '***', 'val')

train_list = [path.as_posix() for path in Path(train_path).glob("**/*") if not path.is_dir()]
val_list = [path.as_posix() for path in Path(val_path).glob("**/*") if not path.is_dir()]

In [None]:
torch.manual_seed(SEED)
np.random.seed(SEED)
random.seed(SEED)

## 計算結果の再現を考慮する場合
# torch.backends.cudnn.deterministic = True
# torch.backends.cudnn.benchmark = False

In [None]:
class ImageTransform():
  """
  Attributes
  --------------
  resize : int
    リサイズ先の画像の大きさ
  mean : (R, G, B)
    各色チャネルの平均値
  std : (R, G, B)
    各色チャネルの標準偏差
  """
  def __init__(self, resize, mean, std):
    self.data_transform = {
        'train': transforms.Compose([
                                     transforms.RandomResizedCrop(
                                         resize,
                                         scale=(0.5, 1.0),
                                     ),
                                     transforms.RandomHorizontalFlip(),
                                     transforms.ToTensor(),
                                     transforms.Normalize(mean, std),
        ]),
        'val': transforms.Compose([
                                   transforms.Resize(resize),
                                   transforms.CenterCrop(resize),
                                   transforms.ToTensor(),
                                   transforms.Normalize(mean, std)
        ])
    }

  def __call__(self, img, phase='train'):
    """
    Parameters
    ----------------
    phase : 'train' or 'val'
      前処理のモード指定
    """
    return self.data_transform[phase](img)

In [None]:
if VERBOSE:
    img_path = Path('data', '****')
    img = Image.open(img_path.as_posix())

    plt.imshow(img)
    plt.show()

    size=224
    mean=(0.485, 0.456, 0.406)
    std=(0.229, 0.224, 0.225)
    transform = ImageTransform(size, mean, std)
    img_transformed = transform(img, 'train')

    img_transformed = img_transformed.numpy().transpose((1, 2, 0))
    img_transformed = np.clip(img_transformed, 0, 1)
    plt.imshow(img_transformed)
    plt.show()

In [None]:
class BaseDataset(data.Dataset):
  """
  Attributes
  --------------
  file_list : リスト
    画像のパスを格納したリスト
  transform : object
    前処理クラスのインスタンス
  phase : 'train' or 'test'
    学習か訓練かを設定する
  """
  def __init__(self, file_list, transform=None, phase='train'):
    self.file_list = file_list
    self.transform = transform
    self.phase = phase
    # ラベル情報を別途読み込む場合は追記が必要
  
  def __len__(self):
    return len(self.file_list)

  def __getitem__(self, index):
    '''
    前処理をした画像のTensor形式のデータとラベルを取得
    '''
    img_path = self.file_list[index]
    img = Image.open(img_path)
    # RGBを想定しているため、グレースケールの画像に対してはRGB変換を実施
    if len(np.asarray(img, np.uint8).shape) < 3:
      img = img.convert("RGB")
    img_transformed = self.transform(
        img, self.phase
    )

    # label情報を画像パスから取得する想定なので、下記は要カスタマイズ
    label = None # dummy
    return img_transformed, label

In [None]:
train_dataset = BaseDataset(
    file_list=train_list, transform=ImageTransform(size, mean, std), phase='train'
)
val_dataset = BaseDataset(
    file_list=val_list, transform=ImageTransform(size, mean, std), phase='val'
)

if VERBOSE:
    print(train_dataset.__getitem__(0)[0].size())
    print(train_dataset.__getitem__(0)[1])
    print(val_dataset.__getitem__(0)[0].size())
    print(val_dataset.__getitem__(0)[1])

In [None]:
train_dataloader = torch.utils.data.DataLoader(
    train_dataset, batch_size=BATCH_SIZE, shuffle=True
)
val_dataloader = torch.utils.data.DataLoader(
    val_dataset, batch_size=BATCH_SIZE, shuffle=False
)

dataloaders_dict = {"train": train_dataloader, "val": val_dataloader}

if VERBOSE:
    batch_iterator = iter(dataloaders_dict["train"])
    inputs, labels = next(batch_iterator)
    print(inputs.size())
    print(labels)

In [None]:
net = models.vgg16(pretrained=PRETRAINED_FLG)
net.classifier[6] = nn.Linear(in_features=4096, out_features=CLASSES)
net.train()

criterion = nn.CrossEntropyLoss()

params_to_update = [] # 転移学習で学習させるパラメータの格納
update_param_names = ["classifier.6.weight", "classifier.6.bias"] # 対象

# 学習させるパラメータ以外は勾配計算を省略(固定させる)
for name, param in net.named_parameters():
  if name in update_param_names:
    param.requires_grad = True
    params_to_update.append(param)
    print(name)
  else:
    param.requires_grad = False

print("---------")
print(params_to_update)

optimizer = optim.SGD(params=params_to_update, lr=0.001, momentum=0.9)

In [None]:
def train_model(net, dataloaders_dict, criterion, optimizer, num_epochs):
  for epoch in range(num_epochs):
    print("Epoch {}/{}".format(epoch+1, num_epochs))
    print("-------------------")

    for phase in ['train', 'val']:
      if phase == 'train':
        net.train()
      else:
        net.eval()
      epoch_loss = 0.0
      epoch_corrects = 0

      # 未学習時の検証性能を確かめるため、epoch=0の訓練は省略
      if (epoch == 0) and (phase == 'train'):
        continue

      for inputs, labels in tqdm(dataloaders_dict[phase]):
        optimizer.zero_grad() # 初期化

        with torch.set_grad_enabled(phase == 'train'):
          outputs = net(inputs)
          loss = criterion(outputs, labels) # 損失計算
          _, preds = torch.max(outputs, 1) # ラベルの予測
          if phase == 'train':
            loss.backward()
            optimizer.step()

          epoch_loss += loss.item() * inputs.size(0)
          epoch_corrects += torch.sum(preds == labels.data)

      epoch_loss = epoch_loss / len(dataloaders_dict[phase].dataset)
      epoch_acc = epoch_corrects.double() / len(dataloaders_dict[phase].dataset)

      print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))

In [None]:
train_model(net, dataloaders_dict, criterion, optimizer, num_epochs=NUM_EPOCHS)