# 学習の実行と提出ファイルの作成  

pytorchフレームワークを使用して学習・評価、提出データの作成をします。  
「ランタイム」-「ランタイムのタイプを変更」により学習に使用する  
『ハードウェア アクセラレータ』の設定が可能です。

### ライブラリのインポート

In [None]:
from google.colab import drive
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision.models import resnet18, ResNet18_Weights
import torchvision.transforms.v2 as T
import pandas as pd
from sklearn.model_selection import train_test_split
from tqdm.notebook import trange, tqdm
import random
import numpy as np
from PIL import Image
from glob import glob
import os

In [None]:
# GoogleDriveのマウント
from google.colab import drive
drive.mount('/content/drive')
WorkDir="/content/drive/MyDrive/industry_defect_detection"

In [None]:
# 作業ディレクトリの作成
%cd /content
!mkdir dataset

In [None]:
# 学習に使用するデータをGoogleDriveからコピーします。 5分程度(目安)
!cp -rf /content/drive/MyDrive/industry_defect_detection/train ./dataset
!cp -rf /content/drive/MyDrive/industry_defect_detection/test ./dataset
!cp -rf /content/drive/MyDrive/industry_defect_detection/train_annotations.tsv ./dataset
!cp -rf /content/drive/MyDrive/industry_defect_detection/sample_submit.tsv ./dataset
%cd dataset

In [None]:
RandomState = 42  # ランダムのパターンを固定し、再現性担保

random.seed(RandomState)
np.random.seed(RandomState)
torch.manual_seed(RandomState)
torch.cuda.manual_seed_all(RandomState)
torch.backends.cudnn.deterministic = True

class AverageMeter:  # ログ用
    def __init__(self):
        self.val, self.sum, self.count, self.avg = None, None, None, None
        self.reset()

    def reset(self):
        self.val, self.sum, self.count, self.avg = 0, 0, 0, 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

## 学習・テストデータ定義
学習データに関するモジュールを定義します。  
* 学習・テストデータの定義  
train_test_splitを使用して学習データ、テストデータに分けます。  
* PyTorch データセット定義  
学習中に使用するデータセットを定義しています。  
* データ拡張定義  
torchvisionのComposeを使用して、複数のTransform(データ拡張)を適用します。  
学習データ用、テストデータ用のデータ拡張をそれぞれ定義します。  
  * 学習データ用  
  ランダムクロップ　：　画像を切り取り、学習データを増やすことが可能  
  正規化　：　学習データを正規化し、安定した学習が可能  
  　　　　※『MlCompe2023_02_学習データの可視化.ipynb』で算出した平均と標準偏差を使用。  
  * テストデータ用  
  リサイズ　：　入力データサイズの変更。使用するモデルの入力に合わせてサイズを変更します。  
  正規化　：　学習データ用と同じように正規化します。





In [None]:
# 学習・テストデータ定義
def split_df(val_ratio, random_state=42):
    df_all = pd.read_table("/content/dataset/train_annotations.tsv", index_col=0)
    if val_ratio == 0.0:
        return df_all, []
    df_train, df_val = [], []
    for label in (0, 1):
        for pattern in ("A", "B", "C", "D"):
            temp = df_all[(df_all.label == label) & (df_all.pattern == pattern)]
            temp_train, temp_val = train_test_split(temp, test_size=val_ratio, random_state=random_state)
            df_train.append(temp_train)
            df_val.append(temp_val)
    return pd.concat(df_train), pd.concat(df_val)

# PyTorch データセット定義
class ImageDataset(Dataset):
    def __init__(self, root, df, transform=None):
        super().__init__()
        self.root = root
        self.df = df
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, item):
        ann = self.df.iloc[item]
        image = Image.open(f"{self.root}/{ann.name}.png").convert("RGB")
        if self.transform is not None:
            image = self.transform(image)
        label = ann.label
        return image, label

    def get_label(self, item):
        return self.df.iloc[item].label

# データ拡張定義
transform_train = T.Compose([
    T.ToImage(),
    T.ToDtype(torch.float32, scale=True),
    T.RandomResizedCrop(size=224, scale=(0.8, 1.0),
                        ratio=(3.0 / 4.0, 4.0 / 3.0),
                        antialias=True),
    # Add Other Transforms >>>
    # ... Ref: https://note.com/kiyo_ai_note/n/ndb9df1fa3d24
    # <<< Add Other Transforms
    T.Normalize(mean=tuple(x/255 for x in (119.84508152, 119.84508152, 119.84508152)),
                std=tuple(x/255 for x in (54.22375462, 54.22375462, 54.22375462))),
])

transform_val = T.Compose([
    T.ToImage(),
    T.ToDtype(torch.float32, scale=True),
    T.Resize(size=224, antialias=True),
    T.Normalize(mean=tuple(x/255 for x in (119.84508152, 119.84508152, 119.84508152)),
                std=tuple(x/255 for x in (54.22375462, 54.22375462, 54.22375462))),
])

## 学習の実行
学習時のテストで最も良かったモデルを、**best_model.pth**という名前で保存します。  
※目安：20分程度  
**※※ 作成したモデルをGoogleDriveへ保存せずにブラウザを閉じた場合、削除されます。**

In [None]:
# ハイパーパラメータ定義
ValRatio = 0.2  # 0.0 でもOK (提供された画像はすべて使うが、学習中の評価は不可能)
BatchSize = 64
LearningRate = 0.01
SGDMomentum = 0.9
WeightDecay = 0.0005
Epochs = 30
AmpEnabled = True

# モデル定義
model = resnet18(weights=ResNet18_Weights.DEFAULT)
model.fc = torch.nn.Linear(in_features=model.fc.in_features,
                           out_features=2,  # 正常異常に変更
                           bias=True)

# データローダ作成
df_train, df_val = split_df(ValRatio, random_state=RandomState)
print(f"Pandas -> Train: {len(df_train)}, Valid: {len(df_val)}")

dataset_train = ImageDataset(root="train", df=df_train, transform=transform_train)
dataset_val = ImageDataset(root="train", df=df_val, transform=transform_val)
print(f"Dataset -> Train: {len(dataset_train)}, Valid: {len(dataset_val)}")

dataloader_train = DataLoader(dataset=dataset_train, batch_size=BatchSize,
                              num_workers=2, shuffle=True, drop_last=True)
dataloader_val = DataLoader(dataset=dataset_val, batch_size=BatchSize,
                            num_workers=2, shuffle=False, drop_last=False)
print(f"Dataloader -> Train: {len(dataloader_train)}, Valid: {len(dataloader_val)}")

# 損失関数
criteria = torch.nn.CrossEntropyLoss()

# 最適化関数
optim = torch.optim.SGD(model.parameters(),
                        lr=LearningRate,
                        momentum=SGDMomentum,
                        weight_decay=WeightDecay,
                        nesterov=True)

# 学習率スケジューラ
sched = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer=optim,
                                                             T_0=Epochs)

# fp16
amp_scaler = torch.cuda.amp.GradScaler(enabled=AmpEnabled)

# 学習検証開始
bar = trange(Epochs)
model = model.cuda()
train_loss_meter, train_acc_meter = AverageMeter(), AverageMeter()
valid_loss_meter, valid_acc_meter = AverageMeter(), AverageMeter()
best_acc = -1
for ep in bar:
    ep_lr = optim.param_groups[0]["lr"]
    train_loss_meter.reset()
    train_acc_meter.reset()
    valid_loss_meter.reset()
    valid_acc_meter.reset()

    model.train()
    for image, target in tqdm(dataloader_train, leave=False):
        image, target = image.cuda(), target.cuda()
        with torch.cuda.amp.autocast(enabled=amp_scaler.is_enabled()):
            pred = model(image)
            loss = criteria(pred, target)
        optim.zero_grad()
        amp_scaler.scale(loss).backward()
        amp_scaler.step(optim)
        amp_scaler.update()
        train_loss_meter.update(loss.item(), image.shape[0])
        train_acc_meter.update(pred.argmax(dim=1).eq(target).float().mean().item(), image.shape[0])
    sched.step()

    if len(dataset_val):
        model.eval()
        with torch.no_grad():
            for image, target in tqdm(dataloader_val, leave=False):
                image, target = image.cuda(), target.cuda()
                with torch.cuda.amp.autocast(enabled=amp_scaler.is_enabled()):
                    pred = model(image)
                    loss = criteria(pred, target)
                valid_loss_meter.update(loss.item(), image.shape[0])
                valid_acc_meter.update(pred.argmax(dim=1).eq(target).float().mean().item(), image.shape[0])
        bar.set_description(f"LR: {ep_lr:.6f}, Loss/Train: {train_loss_meter.avg:.5f}, Loss/Valid: {valid_loss_meter.avg:.5f}, Acc/Train: {train_acc_meter.avg:.5f}, Acc/Valid: {valid_acc_meter.avg:.5f}")
        use_acc = valid_acc_meter.avg
    else:
        bar.set_description(f"LR: {ep_lr:.6f}, Loss/Train: {train_loss_meter.avg:.5f}, Acc/Train: {train_acc_meter.avg:.5f}")
        use_acc = train_acc_meter.avg
    if best_acc <= use_acc:
        best_acc = use_acc
        torch.save(model.state_dict(), "best_model.pth")

# 学習時の最も良かったモデルが、best_model.pth として保存される。

In [None]:
# 作成したモデルを、GoogleDriveへコピー
!cp "best_model.pth" "/content/drive/MyDrive/industry_defect_detection"

## 提出ファイルの作成
推論結果が submit_data.tsv として保存します。  
GoogleColabからダウンロードして、コンテストページへ投稿してください。  
※目安 ：40秒程度

In [None]:
# 提出ファイル作成
test_model = resnet18(weights=ResNet18_Weights.DEFAULT)
test_model.fc = torch.nn.Linear(in_features=test_model.fc.in_features,
                                out_features=2,  # 正常異常に変更
                                bias=True)
print(test_model.load_state_dict(torch.load("best_model.pth", map_location="cpu")))
test_model = test_model.cuda().eval()

submit_data = []
with torch.no_grad():
    for path in tqdm(sorted(glob("test/*.png"))):
        image = transform_val(Image.open(path).convert("RGB")).cuda()[None]
        score = test_model(image).softmax(dim=1)
        abnormal_score = score[:, 1].item()
        submit_data.append(f"{os.path.splitext(os.path.basename(path))[0]}\t{abnormal_score:.5f}\n")
with open("submit_data.tsv", "w") as f:
    f.write("".join(submit_data))
# best_model.pth での推論結果が submit_data.tsv として保存される。
# これを提出