# ラベルに関する設定

In [None]:
# 画像を格納したzipファイルの名前
class_names = ["beef", "pork"]

# class_names = ["class1", "class2"]
# この場合，以下のファイルが Google Drive にアップロードされている想定
#   class1.zip
#   class2.zip

# GPU割当・ドライブのマウント

In [None]:
# GPU の確認
!nvidia-smi
# Google Drive のマウント（Google Drive のファイルを読み書きできるようにする）
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# 事前に upload したファイルを Google Drive からコピー

data_path = "/content/drive/My Drive/Colaboratory/jts_private/"

import shutil
import os
to_data_dir = "data/"
os.makedirs(to_data_dir, exist_ok=True)

zip_fname_list = [cn + ".zip" for cn in class_names]
for fname in zip_fname_list:
    shutil.copy(data_path + fname, to_data_dir + fname)

In [None]:
# zipファイルの解凍
import zipfile
for zip_name in zip_fname_list:
    with zipfile.ZipFile(to_data_dir + zip_name) as zf:
        zf.extractall(to_data_dir)

# 交差検証用のデータ分割，ラベル付

In [None]:
import csv
from pathlib import Path
import random
import numpy as np
from sklearn.model_selection import KFold

In [None]:
def split_train_test(class_kind, fold):
    paths = []
    labels = []
    for idx, kind in enumerate(class_kind):
        p = Path("./data/{}/".format(kind))
        imgs = list(p.glob("*"))
        for imgpath in imgs:
            paths.append(imgpath)
            labels.append(idx)

    paths = np.array(paths)
    labels = np.array(labels)

    # 分割処理
    kf = KFold(n_splits=fold, random_state=0, shuffle=True)

    for idx, (train_index, test_index) in enumerate(kf.split(paths)):

        # 訓練データ
        with open("./data/train_kfold_{:02}.csv".format(idx), "w") as f:
            writer = csv.writer(f)
            writer.writerow(["filename", "label"])
            for path, label in zip(paths[train_index], labels[train_index]):
                writer.writerow([path, label])

        # テストデータ
        with open("./data/test_kfold_{:02}.csv".format(idx), "w") as f:
            writer = csv.writer(f)
            writer.writerow(["filename", "label"])
            for path, label in zip(paths[test_index], labels[test_index]):
                writer.writerow([path, label])

In [None]:
fold_num = 5
split_train_test(class_names, fold_num)

# 学習コードの実行

In [None]:
import os
import random
import time

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
from torch.utils.data import Dataset
from torchvision.datasets.folder import default_loader
import torchvision.models as models
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix

In [None]:
# 各種設定
BATCH_SIZE = 50
MAX_EPOCH = 25
IMAGE_SIZE = 224
# シード値の固定
random.seed(0)
np.random.seed(0)
torch.manual_seed(0)

torch.cuda.manual_seed_all(0)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

# GPU利用設定
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("device =", device)

In [None]:
# 自作のデータセットの処理
class MyDataset(Dataset):
    def __init__(self, csv_file, root_dir, transform=None, target_transform=None, loader=default_loader):
        self.df = pd.read_csv(csv_file)
        self.root_dir = root_dir  # 画像が入っているディレクトリのパス
        self.loader = loader
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.df)

    # dataloaderで読み込むときの処理
    def __getitem__(self, idx):
        # filename取得
        img_name = self.df.iloc[idx, 0]
        # 画像のパス設定
        img_path = os.path.join(self.root_dir, img_name)
        # 画像読み込み
        image = self.loader(img_path)
        # user_id = self.df.iloc[idx, 2]

        label = self.df.iloc[idx, 1]

        if self.transform is not None:
            image = self.transform(image)
        if self.target_transform is not None:
            label = self.target_transform(label)

        return image, label, img_path


# データの前処理を定義
transform = transforms.Compose(
    [
        # 画像のリサイズ
        transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
        # tensorに変換
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                             std=[0.229, 0.224, 0.225])
    ]
)

# 訓練用のデータ前処理を定義
train_transform = transforms.Compose(
    [
        # 画像のリサイズ
        transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
        # ここにData augmentation処理を記述する
        transforms.RandomHorizontalFlip(p=0.5),
        transforms.RandomRotation(10),
        transforms.ColorJitter(brightness=0.9, contrast=0.9),
        # tensorに変換
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                             std=[0.229, 0.224, 0.225])
    ]
)




In [None]:
K = 5
train_dataset_list = []
test_dataset_list = []
train_loader_list = []
valid_loader_list = []
test_loader_list = []
model_list = []

for i in range(K):
    # 訓練データを取得
    train_dataset = MyDataset(
        csv_file="./data/train_kfold_{:02}.csv".format(i),
        root_dir='./',  # 画像を保存したディレクトリ(適宜書き換えて)
        transform=transform,
    )
    # 訓練データの一部を検証データとして使用
    num_train = len(train_dataset)
    num_valid = int(num_train * 0.2)
    train_dataset, valid_dataset = torch.utils.data.random_split(
        train_dataset,
        [num_train - num_valid, num_valid],
    )
    # DataLoaderを作成

    # 訓練時のtransformを設定
    train_dataset.dataset.transform = train_transform

    train_loader = torch.utils.data.DataLoader(
        train_dataset,
        batch_size=BATCH_SIZE,
        shuffle=True,
    )
    valid_loader = torch.utils.data.DataLoader(
        valid_dataset,
        batch_size=BATCH_SIZE,
        shuffle=False,
    )
    # テストデータも同様
    test_dataset = MyDataset(
        "./data/test_kfold_{:02}.csv".format(i),
        root_dir='./',
        transform=transform,
    )
    test_loader = torch.utils.data.DataLoader(
        test_dataset,
        batch_size=BATCH_SIZE,
        shuffle=False,
    )

    # ニューラルネットワーク
    model = models.resnet18(pretrained=True)
    # print(model)
    model.fc = nn.Linear(512, 2)

    model = model.to(device)

    train_dataset_list.append(train_dataset)
    test_dataset_list.append(test_dataset)
    valid_loader_list.append(valid_loader)
    train_loader_list.append(train_loader)
    test_loader_list.append(test_loader)
    model_list.append(model)

In [None]:
v_acc_list = []
t_acc_list = []
v_accuracy_list = []
cm_list = []
test_accuracy_list = []
for i in range(K):
    # 損失関数
    loss_function = nn.CrossEntropyLoss()
    # 勾配降下法を行うoptimizer
    optimizer = optim.Adam(model_list[i].parameters(), lr=0.0001)

    start = time.time()
    print('processing train data ...')

    v_acc = []
    t_acc = []
    best_v_acc = 0
    v_accuracy = []
    print("epoch\ttrain loss\tvalid loss\tvalid accuracy\tprocessed time")
    for epoch in range(MAX_EPOCH):
        model_list[i].train()
        train_loss_list = []
        # DataLoaderをfor文で回すと入力と正解ラベルが得られる
        for x, label, img_path in train_loader_list[i]:
            x = x.to(device)
            label = label.to(device)
            # 勾配を0に初期化
            optimizer.zero_grad()
            # 順伝播
            output = model_list[i](x)
            # 誤差の計算
            loss = loss_function(output, label)
            # 誤差逆伝播
            loss.backward()
            # パラメータ更新
            optimizer.step()
            # ミニバッチの訓練誤差をlistに追加
            train_loss_list.append(loss.item())
        # 各ミニバッチでの訓練誤差の平均を取り，本エポックでの訓練誤差とする
        train_loss_mean = np.mean(train_loss_list)

        # 検証データでも同様に誤差を計算
        # モデルを評価する時は model.eval() とする
        model_list[0].eval()
        valid_loss_list = []
        valid_correct, valid_total = 0, 0
        for x, label, img_path in valid_loader_list[i]:
            x = x.to(device)
            label = label.to(device)

            output = model_list[i](x)
            loss = loss_function(output, label)
            valid_loss_list.append(loss.item())

            pred = output.argmax(dim=1, keepdim=True)
            # 正解ラベルと比較，一致している数を加算
            valid_correct += pred.eq(label.view_as(pred)).sum().item()
            # 正解率(accuracy)を出すためにテストデータの数も加算
            valid_total += label.size()[0]

        valid_loss_mean = np.mean(valid_loss_list)

        valid_accuracy = valid_correct / valid_total

        print("{0}\t{1:.6f}\t{2:.6f}\t{3:.6f}\t{4:.6f}".format(epoch, train_loss_mean, valid_loss_mean, valid_accuracy, time.time() - start))
        t_acc.append(train_loss_mean)
        v_acc.append(valid_loss_mean)
        v_accuracy.append(valid_accuracy)
        # モデル保存
        if valid_accuracy > best_v_acc:
            best_v_acc = valid_accuracy
            model_dir = "./model/"
            os.makedirs(model_dir, exist_ok=True)
            save_path = model_dir + "cnn{:02}.pt".format(i)
            torch.save(model_list[i].state_dict(), save_path)

    # 可視化コード
    fig, ax = plt.subplots(1,1)
    ax.plot(t_acc, label="train loss", marker="o")
    ax.plot(v_acc, label="valid loss", marker="*")
    ax.legend()
    plt.savefig("losscurve.png", bbox_inches="tight")

    fig, ax = plt.subplots(1,1)
    ax.plot(v_accuracy, label="valid acc", marker="o")
    ax.legend()
    plt.savefig("acccurve.png", bbox_inches="tight")

    # モデル読込
    model_list[i].load_state_dict(torch.load("./model/cnn{:02}.pt".format(i)))

    # モデルの評価(テストデータを使用)
    print('processing test data ...')
    model_list[i].eval()
    test_loss = 0
    test_correct = 0
    test_total = 0
    for n, (x, label, img_path) in enumerate(test_loader_list[i]):
        x = x.to(device)
        label = label.to(device)
        output = model_list[i](x)
        loss = loss_function(output, label)
        # 出力値が最大のインデックスを取得
        pred = output.argmax(dim=1, keepdim=True)

        preds = pred if n == 0 else torch.cat([preds, pred])
        labels = label if n == 0 else torch.cat([labels, label])

        # 正解ラベルと比較，一致している数を加算
        test_correct += pred.eq(label.view_as(pred)).sum().item()
        # 正解率(accuracy)を出すためにテストデータの数も加算
        test_total += label.size()[0]

    test_accuracy = test_correct / test_total
    print("Test accuracy :", test_accuracy)

    cm = confusion_matrix(labels.cpu(), preds.cpu())

    v_acc_list.append(v_acc)
    t_acc_list.append(t_acc)
    v_accuracy_list.append(v_accuracy)
    test_accuracy_list.append(test_accuracy)
    cm_list.append(cm)



# **分析コードを書きましょう**

## テストデータに対するAccuracyの平均値を算出

In [None]:
np.mean(test_accuracy_list)

## 各データ分割におけるテストデータの混同行列を表示

In [None]:
# 混同行列 をきれいに出力するためのコード
def tune_str(x, class_names=class_names):
    max_length = max([len(cn) for cn in class_names])
    x = str(x)
    while len(x) < max_length:
        x = " " + x
    return x

In [None]:
for i, cm in enumerate(cm_list):
    print("分割:", i)
    header = "{} ".format(tune_str(" "))
    for class_name in class_names:
        header += " {} ".format(tune_str(class_name))
    print(header)
    for row_i, class_name_i in enumerate(class_names):
        txt = "{} ".format(tune_str(class_name_i))
        for col_i, class_name_j in enumerate(class_names):
            txt += " {} ".format(tune_str(cm[row_i][col_i]))
        print(txt)
    print()

## 誤認識があった画像パスを出力

In [None]:
for i in range(K):
    print("分割:", i)
    for n, (x, label, img_path) in enumerate(test_loader_list[i]):
        x = x.to(device)
        label = label.to(device)
        output = model_list[i](x)
        # 出力値が最大のインデックスを取得
        pred = output.argmax(dim=1, keepdim=True)
        
        # ここに処理を記述
        cnt = 0
        for p, l in zip(pred, label):
            if p != l:
                print(img_path[cnt])
            cnt += 1
    print()