<a href="https://colab.research.google.com/github/Re14m/training/blob/master/2022_04_26_recipe464.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# 画像データセットのダウンロード（https://github.com/albumentations-team/albumentations）
!wget "https://drive.google.com/uc?export=download&id=1rVD5yS_BI4wd_IBlHXKhRNTpLU6oCaN2" -O sample.jpg

In [None]:
# DLした画像の表示
import cv2
from google.colab.patches import cv2_imshow

image = cv2.imread('sample.jpg')
cv2_imshow(image)

In [None]:
# ライブラリのインストール
!pip install -U albumentations
!pip install "opencv-python-headless<4.3"

In [None]:
# Albumentationsのデータ拡張をリスト化
import albumentations as A

ops = [
    A.HorizontalFlip(p=1.0), # 水平反転
    A.RandomRotate90(p=1.0), # ランダム90度回転
    A.RandomBrightnessContrast(brightness_limit=0.5, p=1.0),  # ランダム輝度・コントラスト変更
]

In [None]:
# リストを引数に渡す
transforms = A.Compose(ops)

In [None]:
# データ拡張の実施
augment_image = transforms(image=image)

In [None]:
# 結果の表示
cv2_imshow(augment_image["image"])

In [None]:
# OneOfを使うことでリストのうちの1枚を取り出す
transforms = A.OneOf(ops, p=1.0)
augment_image = transforms(image=image)

# 結果の表示
cv2_imshow(augment_image["image"])

In [None]:
# datasetのダウンロード
!wget https://download.microsoft.com/download/3/E/1/3E1C3F21-ECDB-4869-8368-6DEBA77B919F/kagglecatsanddogs_3367a.zip -O kagglecatsanddogs_3367a.zip

In [None]:
# datasetの解凍
!unzip kagglecatsanddogs_3367a.zip

In [None]:
# 下準備1
from glob import glob

# 全画像ファイルパスのリストを取得
file_path_list = glob('PetImages/**/*.jpg', recursive=True)

In [None]:
# 下準備2
import cv2

# imread()で問題なく読み込み出来た画像のみを使用する。
correct_image_path_list = []
for file_path in file_path_list:
    image = cv2.imread(file_path)
    if image is not None:
        correct_image_path_list.append(file_path)
    else:
        print('Corrupted image:', file_path)

In [None]:
# datasetの分割
import random

# ファイルパスのリストをシャッフル
random.seed(123)
random.shuffle(correct_image_path_list)

# 学習・検証・テストデータに分割
train_file_path_list = correct_image_path_list[:500]
val_file_path_list = correct_image_path_list[500:600]
test_file_path_list = correct_image_path_list[600:610]

print(len(train_file_path_list), len(val_file_path_list), len(test_file_path_list))

In [None]:
# モデルのトレーニング（RandAugmentなし）
import torch.backends.cudnn as cudnn

cudnn.benchmark = True

In [None]:
# モデル構築（ResNet50の学習済モデル）
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.models as models

# デバイス
device = 'cuda'
# 学習率
learning_rate = 0.001

# ResNet50学習済モデルを使用
model = getattr(models, 'resnet50')(pretrained=False, num_classes=1)
model = model.to(device)

# 損失関数、最適化関数を定義
criterion = nn.BCEWithLogitsLoss().to(device)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
# datasetの準備（pytorch向け）
import os
from torch.utils.data import Dataset, DataLoader

class CatsAndDogsDataset(Dataset):
    def __init__(self, image_file_path, transform=None):
        # 画像データ格納パスのリスト
        self.image_file_path = image_file_path
        # データ変換設定
        self.transform = transform

    def __len__(self):
        return len(self.image_file_path)

    def __getitem__(self, idx):
        # 画像読み込み
        image_filepath = self.image_file_path[idx]
        image = cv2.imread(image_filepath)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        # ディレクトリ名からラベルを指定(Dogs：0、Cats：1)
        if os.path.normpath(image_filepath).split(os.sep)[-2] == "Cat":
            label = 1.0
        else:
            label = 0.0
        
        # transformが指定されている場合は、変換を実施
        if self.transform is not None:
            image = self.transform(image=image)["image"]
            
        return image, label

In [None]:
# dataset変換関数の準備（リサイズ,正規化,Tensor形式への変換）
import numpy as np
import albumentations as A
from albumentations.pytorch import ToTensorV2

train_transform = A.Compose(
    [
        A.Resize(height=128, width=128),
        A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
        ToTensorV2(),
    ]
)

val_transform = A.Compose(
    [
        A.Resize(height=128, width=128),
        A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
        ToTensorV2(),
    ]
)

In [None]:
# 学習データと検証データの読込
batch_size = 128
num_workers = 4

# 学習データ
train_dataset = CatsAndDogsDataset(image_file_path=train_file_path_list, transform=train_transform)
train_loader = DataLoader(
    train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers, pin_memory=True,
)

# 検証データ
val_dataset = CatsAndDogsDataset(image_file_path=val_file_path_list, transform=val_transform)
val_loader = DataLoader(
    val_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers, pin_memory=True,
)

In [None]:
# 訓練
%%time

import copy
from tqdm import tqdm

# エポック数
epochs = 100

# ベストモデル保存用変数
best_acc = 0
best_model_weight = copy.deepcopy(model.state_dict())

# Accuracy計算用の関数
def calculate_accuracy(output, target):
    output = (torch.sigmoid(output) >= 0.5)
    target = (target == 1.0)
    accuracy = torch.true_divide((target == output).sum(dim=0), output.size(0)).item()
    return accuracy

# トレーニング
for epoch in range(1, epochs + 1):
    train_acc = []
    train_loss = []
    val_acc = []
    val_loss = []

    # 学習
    model.train()
    stream = tqdm(train_loader)
    for i, (images, target) in enumerate(stream, start=1):
        images = images.to(device, non_blocking=True)
        target = target.to(device, non_blocking=True).float().view(-1, 1)

        output = model(images)

        loss = criterion(output, target)
        accuracy = calculate_accuracy(output, target)

        train_acc.append(accuracy)
        train_loss.append(loss.item())

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        log_text = 'Epoch: {epoch}. Train.      '.format(epoch=epoch)
        log_text = log_text + "Loss: {avg:.3f} ".format(avg=(sum(train_loss)/len(train_loss)))
        log_text = log_text + "Accuracy: {avg:.3f} ".format(avg=(sum(train_acc)/len(train_acc)))
        stream.set_description(log_text)

    # 検証
    model.eval()
    stream = tqdm(val_loader)
    with torch.no_grad():
        for i, (images, target) in enumerate(stream, start=1):
            images = images.to(device, non_blocking=True)
            target = target.to(device, non_blocking=True).float().view(-1, 1)

            output = model(images)

            loss = criterion(output, target)
            accuracy = calculate_accuracy(output, target)

            val_acc.append(accuracy)
            val_loss.append(loss.item())

            log_text = 'Epoch: {epoch}. Validation. '.format(epoch=epoch)
            log_text = log_text + "Loss: {avg:.3f} ".format(avg=(sum(val_loss)/len(val_loss)))
            log_text = log_text + "Accuracy: {avg:.3f} ".format(avg=(sum(val_acc)/len(val_acc)))
            stream.set_description(log_text)

    # ベストモデル更新            
    val_acc_avg = sum(val_acc)/len(val_acc)
    if val_acc_avg > best_acc:
        best_acc = val_acc_avg
        best_model_weight = copy.deepcopy(model.state_dict())

# ベストモデルを復元    
model.load_state_dict(best_model_weight)

In [None]:
# 精度の表示
print(best_acc)

In [None]:
# モデル構築（RandAugument）
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.models as models

# デバイス
device = 'cuda'
# 学習率
learning_rate = 0.001

# ResNet50学習済モデルを使用
model2 = getattr(models, 'resnet50')(pretrained=False, num_classes=1)
model2 = model2.to(device)

# 損失関数、最適化関数を定義
criterion2 = nn.BCEWithLogitsLoss().to(device)
optimizer2 = optim.Adam(model2.parameters(), lr=learning_rate)

In [None]:
# RandAugumentation処理
import cv2
import numpy as np
import albumentations as A

# オートコントラスト
def auto_contrast(image, **kwargs):
    image = np.float32(image)
    hist = cv2.calcHist([image], [0], None, [256], (0, 256)).ravel()

    for lo in range(256):
        if hist[lo]:
            break
    for hi in range(255, -1, -1):
        if hist[hi]:
            break

    if hi > lo:
        lut = np.zeros(256, dtype=np.uint8)
        scale_coef = 255.0 / (hi - lo)
        offset = -lo * scale_coef
        for ix in range(256):
            lut[ix] = int(np.clip(ix * scale_coef + offset, 0, 255))

        image = np.uint8(image)
        image = cv2.LUT(image, lut)

    return image
    
# カラーバランス調整
def color_balance(factor):
    def color_balance_function(image, **kwargs):
        new_image = np.zeros(image.shape)
        new_image[:, :, 0] = ((1 + 2 * factor) * image[:, :, 0] +
                            (1 - factor) * image[:, :, 1] +
                            (1 - factor) * image[:, :, 2]) / 3
        new_image[:, :, 1] = ((1 + 2 * factor) * image[:, :, 1] +
                            (1 - factor) * image[:, :, 0] +
                            (1 - factor) * image[:, :, 2]) / 3
        new_image[:, :, 2] = ((1 + 2 * factor) * image[:, :, 2] +
                            (1 - factor) * image[:, :, 0] +
                            (1 - factor) * image[:, :, 1]) / 3
        new_image = np.uint8(new_image)
        return new_image
    return color_balance_function

def create_rand_augment_ops(N, M, p):
    # マグニチュード(M)探索空間  
    rot = np.linspace(-30, 30, 10)
    sola = np.linspace(0, 256, 10)
    color_factor = np.linspace(0.1, 1.9, 10)
    post = [4, 4, 5, 5, 6, 6, 7, 7, 8, 8]
    cont = [np.linspace(-0.8, -0.1, 10), np.linspace(0.1, 2, 10)]
    bright = np.linspace(0.1, 0.7, 10)
    shar = np.linspace(0.1, 0.9, 10)
    shear = np.linspace(0, 10, 10)
    shift_x = np.linspace(0, 150, 10)
    shift_y = np.linspace(0, 150, 10)

    # 変換(N)探索空間
    transform = [
        A.NoOp(),
        A.Lambda(image=auto_contrast),
        A.Equalize(p=p),
        A.Affine(rotate=rot[M], p=p),
        A.Solarize(threshold=sola[M], p=p),
        A.Lambda(image=color_balance(color_factor[M])),
        A.Posterize(num_bits=post[M], p=p),
        A.RandomBrightnessContrast(contrast_limit=[cont[0][M], cont[1][M]], p=p),
        A.RandomBrightnessContrast(brightness_limit=bright[M], p=p),
        A.Sharpen(alpha=shar[M], lightness=shar[M], p=p),
        A.Affine(shear={'x':shear[M]}, p=p),
        A.Affine(shear={'y':shear[M]}, p=p),
        A.ShiftScaleRotate(shift_limit_x=shift_x[M], rotate_limit=0, shift_limit_y=0, shift_limit=shift_x[M], p=p),
        A.ShiftScaleRotate(shift_limit_y=shift_y[M], rotate_limit=0, shift_limit_x=0, shift_limit=shift_y[M], p=p),
    ]

    # ランダムに選択
    ops = np.random.choice(transform, N)
    
    return ops

In [None]:
# データ拡張用のデータローダ
N, M = 3, 1
p = 1.0

rand_augment_ops = create_rand_augment_ops(N, M, p=p)
train_transform = A.Compose(
    [
        A.Resize(height=128, width=128),
        *rand_augment_ops,
        A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
        ToTensorV2(),
    ]
)
val_transform = A.Compose(
    [
        A.Resize(height=128, width=128),
        A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
        ToTensorV2(),
    ]
)

In [None]:
# 学習データと検証データの読込
batch_size = 128
num_workers = 4

# 学習データ
train_dataset = CatsAndDogsDataset(image_file_path=train_file_path_list, transform=train_transform)
train_loader = DataLoader(
    train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers, pin_memory=True,
)

# 検証データ
val_dataset = CatsAndDogsDataset(image_file_path=val_file_path_list, transform=val_transform)
val_loader = DataLoader(
    val_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers, pin_memory=True,
)

In [None]:
# 訓練
%%time

import copy
from tqdm import tqdm

# エポック数
epochs = 100

# ベストモデル保存用変数
best_acc = 0
best_model_weight = copy.deepcopy(model.state_dict())

# トレーニング
for epoch in range(1, epochs + 1):
    train_acc = []
    train_loss = []
    val_acc = []
    val_loss = []

    # 学習
    model2.train()
    stream = tqdm(train_loader)
    for i, (images, target) in enumerate(stream, start=1):
        images = images.to(device, non_blocking=True)
        target = target.to(device, non_blocking=True).float().view(-1, 1)

        output = model2(images)

        loss = criterion2(output, target)
        accuracy = calculate_accuracy(output, target)

        train_acc.append(accuracy)
        train_loss.append(loss.item())

        optimizer2.zero_grad()
        loss.backward()
        optimizer2.step()

        log_text = 'Epoch: {epoch}. Train.      '.format(epoch=epoch)
        log_text = log_text + "Loss: {avg:.3f} ".format(avg=(sum(train_loss)/len(train_loss)))
        log_text = log_text + "Accuracy: {avg:.3f} ".format(avg=(sum(train_acc)/len(train_acc)))
        stream.set_description(log_text)

    # 検証
    model2.eval()
    stream = tqdm(val_loader)
    with torch.no_grad():
        for i, (images, target) in enumerate(stream, start=1):
            images = images.to(device, non_blocking=True)
            target = target.to(device, non_blocking=True).float().view(-1, 1)

            output = model2(images)
            
            loss = criterion2(output, target)
            accuracy = calculate_accuracy(output, target)

            val_acc.append(accuracy)
            val_loss.append(loss.item())

            log_text = 'Epoch: {epoch}. Validation. '.format(epoch=epoch)
            log_text = log_text + "Loss: {avg:.3f} ".format(avg=(sum(val_loss)/len(val_loss)))
            log_text = log_text + "Accuracy: {avg:.3f} ".format(avg=(sum(val_acc)/len(val_acc)))
            stream.set_description(log_text)
            
    # ベストモデル更新   
    val_acc_avg = sum(val_acc)/len(val_acc)
    if val_acc_avg > best_acc:
        best_acc = val_acc_avg
        best_model_weight = copy.deepcopy(model2.state_dict())

    # RandAugmentのデータ拡張リストを再生成
    rand_augment_ops = create_rand_augment_ops(N, M, p=p)
    train_transform = A.Compose(
        [
            A.Resize(height=128, width=128),
            *rand_augment_ops,
            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
            ToTensorV2(),
        ]
    )
    train_dataset = CatsAndDogsDataset(image_file_path=train_file_path_list, transform=train_transform)
    train_loader = DataLoader(
        train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers, pin_memory=True,
    )
        
# ベストモデルを復元
model2.load_state_dict(best_model_weight)

In [None]:
# 結果の確認（テストデータの読込）
test_transform = A.Compose(
    [
        A.Resize(height=128, width=128),
        A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
        ToTensorV2(),
    ]
)
test_dataset = CatsAndDogsDataset(image_file_path=test_file_path_list, transform=test_transform)
test_loader = DataLoader(
    test_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers, pin_memory=True,
)

In [None]:
# 描画用関数
import matplotlib.pyplot as plt

def display_image_grid(images_file_path, predicted_labels=(), cols=5):
    rows = len(images_file_path) // cols
    figure, ax = plt.subplots(nrows=rows, ncols=cols, figsize=(12, 6))
    for i, image_filepath in enumerate(images_file_path):
        # 画像読み込み
        image = cv2.imread(image_filepath)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        # 正解ラベルは緑色、不正解ラベルは赤色にする
        true_label = os.path.normpath(image_filepath).split(os.sep)[-2]
        predicted_label = predicted_labels[i] if predicted_labels else true_label
        color = "green" if true_label == predicted_label else "red"

        # 画像とラベル名をセット
        ax.ravel()[i].imshow(image)
        ax.ravel()[i].set_title(predicted_label, color=color)
        ax.ravel()[i].set_axis_off()
    plt.tight_layout()
    plt.show()

In [None]:
# 予測（RandAugumentなし）
model = model.eval()

predicted_labels = []
with torch.no_grad():
    for images, _ in test_loader:
        # 推論
        images = images.to(device, non_blocking=True)
        output = model(images)

        # 推論結果のラベル生成
        predictions = (torch.sigmoid(output) >= 0.5)[:, 0].cpu().numpy()
        predicted_labels += ["Cat" if is_cat else "Dog" for is_cat in predictions]

In [None]:
# 結果の表示
display_image_grid(test_file_path_list, predicted_labels)

In [None]:
# 予測（RandAugument）
model2 = model2.eval()

predicted_labels = []
with torch.no_grad():
    for images, _ in test_loader:
        # 推論
        images = images.to(device, non_blocking=True)
        output = model2(images)

        # 推論結果のラベル生成
        predictions = (torch.sigmoid(output) >= 0.5)[:, 0].cpu().numpy()
        predicted_labels += ["Cat" if is_cat else "Dog" for is_cat in predictions]

In [None]:
# 結果の表示
display_image_grid(test_file_path_list, predicted_labels)