In [None]:
# 必要なライブラリのインストール
import matplotlib.pyplot as plt
import torch
import torch.nn.functional as F
print(torch.__version__)
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torchvision.datasets import CIFAR10

2.5.1+cu121


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# 残差ブロックの定義
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, downsample=None, dropout_prob=0.3):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.dropout = nn.Dropout(dropout_prob)  # Dropoutを追加
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.downsample = downsample  # スキップ接続用のダウンサンプル層

    def forward(self, x):
        identity = x  # スキップ接続のための入力

        out = self.conv1(x)
        out = self.bn1(out)
        out = F.relu(out)
        out = self.dropout(out)  # Dropoutを適用

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            identity = self.downsample(x)  # スキップ接続の次元を揃える

        out += identity
        out = F.relu(out)
        return out

# ResNetの定義
class CIFAR10ResNet(nn.Module):
    def __init__(self, num_classes=10, dropout_prob=0.1):
        super(CIFAR10ResNet, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(32)

        # 残差ブロックのレイヤー
        self.layer1 = self._make_layer(32, 64, 2, stride=1, dropout_prob=dropout_prob/3)
        self.layer2 = self._make_layer(64, 128, 2, stride=2, dropout_prob=dropout_prob/2)
        self.layer3 = self._make_layer(128, 256, 2, stride=2, dropout_prob=dropout_prob/1.5)
        self.layer4 = self._make_layer(256, 512, 2, stride=2, dropout_prob=dropout_prob)
        # self.layer5 = self._make_layer(512, 1024, 1, stride=2, dropout_prob=dropout_prob)

        self.maxpool = nn.AdaptiveAvgPool2d((1, 1))  # 全体の平均をとるように変更
        self.fc = nn.Linear(128 * 4, num_classes)
        self.dropout = nn.Dropout(dropout_prob)  # FC層の前にDropoutを追加

    def _make_layer(self, in_channels, out_channels, blocks, stride=1, dropout_prob=0.1):
        downsample = None
        if stride != 1 or in_channels != out_channels:
            downsample = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

        layers = []
        layers.append(ResidualBlock(in_channels, out_channels, stride, downsample, dropout_prob))
        for _ in range(1, blocks):
            layers.append(ResidualBlock(out_channels, out_channels, dropout_prob=dropout_prob))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = F.relu(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        # x = self.layer5(x)

        x = self.maxpool(x)
        x = torch.flatten(x, 1)  # 平坦化
        x = self.dropout(x)  # Dropoutを適用
        x = self.fc(x)
        return x

class LabelSmoothingLoss(nn.Module):
    def __init__(self, classes, smoothing=0.1, dim=-1):
        super(LabelSmoothingLoss, self).__init__()
        self.confidence = 1.0 - smoothing
        self.smoothing = smoothing
        self.cls = classes
        self.dim = dim

    def forward(self, pred, target):
        pred = pred.log_softmax(dim=self.dim)
        with torch.no_grad():
            # 正解ラベルをスムージング
            true_dist = torch.zeros_like(pred)
            true_dist.fill_(self.smoothing / (self.cls - 1))
            true_dist.scatter_(1, target.data.unsqueeze(1), self.confidence)
        return torch.mean(torch.sum(-true_dist * pred, dim=self.dim))



In [None]:
import torch.optim as optim
# 確率勾配降下法で最適化する手法やパラメータを選択する
# ToDo: SGDとAdamの違いについて調査する
#optimizer = optim.Adam(net.parameters(), lr=0.001)
# optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)
#adamに変更

In [None]:
# テンソルを送るデバイスを選択する
# Neural Networkを学習する場合、並列計算を行うため基本的にはGPUを選択
def get_device(gpu_id=-1):
    if gpu_id >= 0 and torch.cuda.is_available():
        return torch.device("cuda", gpu_id)
    else:
        return torch.device("cpu")

#optimizer = optimizer.to(device)
#loss_fn = F.nll_loss().to(device)
#loss_fn = F.nll_loss()

In [None]:
!nvcc --version

nvcc: NVIDIA (R) Cuda compiler driver
Copyright (c) 2005-2023 NVIDIA Corporation
Built on Tue_Aug_15_22:02:13_PDT_2023
Cuda compilation tools, release 12.2, V12.2.140
Build cuda_12.2.r12.2/compiler.33191640_0


In [None]:
import torch.nn as nn
from sklearn.metrics import accuracy_score
from torch.cuda.amp import GradScaler
from torch.amp import autocast
# import time

scaler = GradScaler()

def train(Epoch, Net, Train_dataloader, Val_daraloader, optimizer_):
  for epoch in range(Epoch):
    Net.train()
    running_loss = 0.0  # ここで初期化
    acc = 0.0           # ここで初期化
    # start = time.time()
    for i, data in enumerate(Train_dataloader):
      inputs, labels = data
      inputs, labels = inputs.to(device), labels.to(device)

      optimizer.zero_grad()
      with autocast(device_type='cuda'):
          outputs = Net(inputs)
          loss = loss_fn(outputs, labels)

      scaler.scale(loss).backward()
      scaler.step(optimizer)
      scaler.update()

      # 損失と精度の計算
      running_loss += loss.item()
      preds = torch.argmax(outputs, dim=1)
      acc += (preds == labels).sum().item() / batch_size_


      if i % 500 == 499:
          print(f"epoch: {epoch}, iter: {i + 1}, train_loss: {running_loss / 500}, train_acc: {acc/500 * 100}")
          running_loss = 0.0
          acc = 0
    # end = time.time()
    # print(f"epoch: {epoch}, time: {end - start}")
    scheduler.step()
    # if(epoch > 10):
      # val(Net, Val_daraloader)

  print('Finished Training')

  scaler = GradScaler()


In [None]:
import pandas as pd
# 検証モード
# ToDo: 訓練モードと比較して振る舞いはどう変化する？
def val(Net, val_dataloader):
    Net.eval()
    total_loss = 0.0
    total_acc = 0.0
    cnt = 0

    with torch.no_grad():
        for data in val_dataloader:
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)
            with autocast(device_type='cuda'):  # ここも修正
                outputs = Net(inputs)
                loss = loss_fn(outputs, labels)

            total_loss += loss.item()
            preds = torch.argmax(outputs, dim=1)
            total_acc += (preds == labels).sum().item() / batch_size_
            cnt += 1

    print(f"Validation Loss: {total_loss / cnt}, Validation Accuracy: {total_acc / cnt * 100}")





In [None]:
# net = CIFAR10ResNet(num_classes=10)

device = get_device(gpu_id=0)
print(device)


# softmax = nn.Softmax(dim=1) #確率の正規化（全てのクラスの確率の合計=1）

# def net_init(tmp_net):
#   optimizer = optim.Adam(tmp_net.parameters(), lr=0.001)
#   tmp_net = tmp_net.to(device) #モデルを実際に転送
  # 損失関数
  # この損失を微分し、勾配降下法によって各パラメータを更新する
  # ToDo: その他の損失関数について調べる。CrossEntropyLoss以外にも有効な関数があるかも...!


cuda:0


In [None]:
import torch
from torchvision import transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader, Subset
from sklearn.model_selection import KFold

# 前処理の定義
train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),                   # ランダムに水平反転
    transforms.RandomCrop(32, padding=4),                # 4ピクセルのパディングを加えた後、32x32のランダムクロップ
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5071, 0.4867, 0.4408], std=[0.2675, 0.2565, 0.2761]),  # 正規化
    transforms.RandomErasing(p=0.5, scale=(0.05, 0.2), ratio=(0.3, 3.3), value='random')  # Random Erasingの追加
])

test_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5071, 0.4867, 0.4408], std=[0.2675, 0.2565, 0.2761]),
])

# CIFAR-10の訓練データの読み込み（前処理は訓練用）
full_train_data = CIFAR10(root='~/tmp/cifar', train=True, download=True, transform=train_transform)

# KFoldの設定
kf = KFold(n_splits=5, shuffle=True, random_state=44)

# テストデータの読み込み（前処理はテスト用）
test_data = CIFAR10(root='~/tmp/cifar', train=False, download=True, transform=test_transform)
test_loader = DataLoader(test_data, batch_size=1, shuffle=False)

models = []

# 各Foldでの訓練と検証
for fold, (train_indices, val_indices) in enumerate(kf.split(full_train_data)):
    print(f'Fold {fold + 1}')

    # 訓練用と検証用のサブセットを作成
    train_subset = Subset(full_train_data, train_indices)

    # 検証用のデータセットはテスト時の前処理が必要なので、別途ロード
    # 検証用データにはテスト用のtransformを適用
    val_transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5071, 0.4867, 0.4408], std=[0.2675, 0.2565, 0.2761]),
    ])
    full_val_data = CIFAR10(root='~/tmp/cifar', train=True, download=False, transform=val_transform)
    val_subset = Subset(full_val_data, val_indices)

    # データローダーの定義
    batch_size_ = 64
    train_loader = DataLoader(train_subset, batch_size=batch_size_, shuffle=True)
    validation_loader = DataLoader(val_subset, batch_size=batch_size_, shuffle=False)

    tmp_net = CIFAR10ResNet(num_classes=10, dropout_prob=0.1)
    # net_init(tmp_net)

    optimizer = optim.Adam(tmp_net.parameters(), lr=0.001)
    # optimizer = optim.SGD(tmp_net.parameters(), lr=0.1, momentum=0.9, weight_decay=5e-4)
    tmp_net = tmp_net.to(device) #モデルを実際に転送
    scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=40, eta_min=1e-5)
    loss_fn = LabelSmoothingLoss(classes=10, smoothing=0.1)


    train(40, tmp_net, train_loader, validation_loader, optimizer)
    val(tmp_net, validation_loader)

    models.append(tmp_net)

    # ここでモデルの訓練と検証を行います
    # 例:
    # model = YourModel()
    # optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    # criterion = torch.nn.CrossEntropyLoss()
    # for epoch in range(num_epochs):
    #     train(...)  # 訓練ループ
    #     validate(...)  # 検証ループ

    # 訓練と検証の後、必要に応じてモデルの保存やメトリクスの記録を行う
    print(f'Training and validation for fold {fold + 1} completed.\n')


# テストデータはここでは使用しませんが、全てのFoldでの訓練が終わった後に使用できます。
# 例:
# test_model(test_loader)


Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to /root/tmp/cifar/cifar-10-python.tar.gz


100%|██████████| 170M/170M [00:13<00:00, 13.0MB/s]


Extracting /root/tmp/cifar/cifar-10-python.tar.gz to /root/tmp/cifar
Files already downloaded and verified
Fold 1
epoch: 0, iter: 500, train_loss: 1.787160111427307, train_acc: 40.884375
epoch: 1, iter: 500, train_loss: 1.4478786239624024, train_acc: 58.0125
epoch: 2, iter: 500, train_loss: 1.305425691843033, train_acc: 65.19375
epoch: 3, iter: 500, train_loss: 1.19325439286232, train_acc: 70.865625
epoch: 4, iter: 500, train_loss: 1.1240886706113815, train_acc: 74.121875
epoch: 5, iter: 500, train_loss: 1.0577392665147782, train_acc: 77.075
epoch: 6, iter: 500, train_loss: 1.0158108251094817, train_acc: 79.06875
epoch: 7, iter: 500, train_loss: 0.9748799475431442, train_acc: 80.94687499999999
epoch: 8, iter: 500, train_loss: 0.9552147091627121, train_acc: 81.7125
epoch: 9, iter: 500, train_loss: 0.9243268812894821, train_acc: 83.321875
epoch: 10, iter: 500, train_loss: 0.8968439078330994, train_acc: 84.503125
epoch: 11, iter: 500, train_loss: 0.8761167448759078, train_acc: 85.39375
ep

In [None]:
import numpy as np
import pandas as pd
import torch

predict_l = []
id_l = []
id = 0

# test_loaderの各バッチでループ
for data in test_loader:
    with torch.no_grad():
        inputs, _ = data
        inputs = inputs.cuda()

        # モデルの予測を格納するテンソルを初期化
        final_pred = torch.zeros(inputs.size(0), 10).cuda()  # num_classesはクラス数で置き換えてください

        for model in models:
            model.eval()
            outputs = model(inputs)
            final_pred += outputs.data / len(models)  # 平均を取るために各モデルの出力を加算

        # 予測ラベルの取得
        _, predicted = torch.max(final_pred, 1)
        predict_l.extend(predicted.cpu().numpy())  # バッチサイズ分追加
        id_l.extend([id + i for i in range(len(predicted))])
        id += len(predicted)

# DataFrameの作成とCSV出力
submit = pd.DataFrame(columns=["image_id", "labels"])
submit["image_id"] = id_l
submit["labels"] = predict_l
submit.to_csv("submition.csv", index=False)

# Google Colabでファイルをダウンロード
from google.colab import files
files.download('submition.csv')


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

# ベースラインはここまで
## 精度向上のための今後のToDo
### - データ拡張（e.g., Dropout, Flip）について調べて、いろいろなデータ拡張を試してみる
### - これまで使用されてきたハイパーパラメーター（バッチサイズなど）の役割を理解して、最適な数値を探索する
### - モデルに機能を追加する（e.g., Residual Connectionやnormalizationの追加など）
### - 有名な画像認識モデル（ResNetやLeNetなど）について調べて、効果がありそうな機能を追加する
### - 信頼できる精度検証手段について調べて実装してみる（e.g., KFold検証）
### - エラー解析(どのクラスとどのクラスを間違いやすいか)




In [None]:
!cat /proc/uptime | awk '{print $1 / 60 / 60 " hours"}'

2.18751 hours


In [None]:
total_loss = 0.0
total_acc = 0.0
cnt = 0

with torch.no_grad():
    for data in validation_loader:
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)

        # 各モデルの予測値を格納
        all_outputs = []

        for model in models:  # modelsには5つのモデルが格納されている
            model.eval()
            with autocast(device_type='cuda'):
                outputs = model(inputs)
            all_outputs.append(outputs)

        # 予測値を平均化
        avg_outputs = torch.stack(all_outputs).mean(dim=0)
        loss = loss_fn(avg_outputs, labels)

        total_loss += loss.item()
        preds = torch.argmax(avg_outputs, dim=1)
        total_acc += (preds == labels).sum().item() / batch_size_
        cnt += 1

print(f"Validation Loss: {total_loss / cnt}, Validation Accuracy: {total_acc / cnt * 100}")


Validation Loss: 0.5611657792595541, Validation Accuracy: 99.15406050955414
