### この実験は選択実験とします．難しい場合はパスしても構いません．

# 実験3-5 自己教師あり学習

自己教師あり学習は，比較的最近に提案された学習手法で，ラベルを必要としない教師なし学習の手法として着目されています．
その基本的な考え方は，

- 同じデータから派生されるデータ拡張は同じ（似たような）表現にマップされるべき
- 異なるデータ（のデータ拡張）は，異なる表現になるべき

というアイディアに基づいた学習手法です．
教師あり学習では，ラベルとよばれる絶対的な指標がありますが，ここでは，表現が似るべきというわりと曖昧なコンセプトに基づいて学習を考えていきます．

## 実験準備

この実験では

 - `STL10`, `CIFAR10`, `Fashin-MNIST` データセット
 
を用います．これらは結構なディスク容量を消費するので，IED で実験を行う場合，少々小細工が必要となります．

まず，実験2-4 と同様に，画像のデータセットをダウンロードする先を設定します．
IED では `/usr/local/class/media/dataset` にありますので，
```code:python
datadir = '/usr/local/class/media/dataset/'
```
を指定してください．自分の家や google Colab で頑張る場合は，適宜設定してください．

以下のコードではデータディレクトリを `./data` と指定していますが，IEDでこれをやっていくと使用制限を超えて，quota が溢れたというエラーが出てきて実験ができくなるので注意が必要です．

In [None]:
import os

# GPU の指定
os.environ['CUDA_VISIBLE_DEVICES'] = '0' # 使うGPUを指定

# データディレクトリの指定 (IED の場合は，下のほうをコメントアウト)
datadir = './data' # 自前のディレクトリに置く場合
# datadir = '/usr/local/class/media/dataset' # IED で実験する場合

## 自己教師付き学習のためのデータローダー

ここでは SimCLR （の簡易版）を考えてみます．
自己教師あり学習のキーポイントの一つはデータ拡張の部分にあります．
すなわち，一つの画像から，異なるデータ拡張を施した画像のペアを生成させます．

ここではランダムな，画像切り抜き，フリップを考えています．
データセットとしては `CIFAR10` や `STL10` を考えればよいです．ここでは視認性をとって `STL10` で実験を行います．
なお，`STL10` データセットはかなり大きいデータセットなので，ディスク容量には気をつけてください．

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Dataset


# 基本となるデータセットの設定
#base_dataset = datasets.CIFAR10(root=datadir, train=True, download=True, transform=None)
base_dataset = datasets.STL10(root=datadir, split='train', download=True, transform=None)

#cifar10_labels_map = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
stl10_labels_map = ['airplane', 'bird', 'car', 'cat', 'deer', 'dog', 'horse', 'monkey', 'ship', 'truck']
labels_map = stl10_labels_map

# CIFAR-10 は 32x32 の画像サイズ, STL-10 は 96x96 の画像サイズ
# CIFAR-10 でやる場合は，RandomResizedCrop の size を 32 にする必要がある
# データセットの画像サイズ
SIZE = 96
W, H = SIZE, SIZE

transform_original = transforms.Compose([
    transforms.Resize((SIZE, SIZE)), 
    transforms.ToTensor()
])

transform_1 = transforms.Compose([
    transforms.RandomResizedCrop(size=SIZE),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor()
])

transform_2 = transforms.Compose([
    transforms.RandomResizedCrop(size=SIZE),
    transforms.RandomVerticalFlip(),
    transforms.ToTensor()
])

# データ拡張を適用したデータセットを作成するクラス
class SelfSupervisedDataset(Dataset):
    def __init__(self, base_dataset, transform_1, transform_2):
        """
        Args:
            base_dataset (Dataset): 元となるデータセット（例: STL-10）。
            transform_1 (callable): 1つ目のデータ拡張。
            transform_2 (callable): 2つ目のデータ拡張。
        """
        self.base_dataset = base_dataset
        self.transform_1 = transform_1
        self.transform_2 = transform_2

    def __len__(self):
        return len(self.base_dataset)

    def __getitem__(self, idx):
        img, label = self.base_dataset[idx]  # 元画像とラベルを取得

        # base が PIL 形式であることを仮定
        # なので，ToTensor を base_dataset の段階でとるとしくじる
        # img = transforms.ToPILImage()(img) # tensor -> PILImage

        augmented_1 = self.transform_1(img)  # データ拡張1を適用
        augmented_2 = self.transform_2(img)  # データ拡張2を適用
        return augmented_1, augmented_2, label


#self_supervised_dataset = SelfSupervisedDataset(base_dataset, transform_1, transform_2)
self_supervised_dataset = SelfSupervisedDataset(base_dataset, transform_original, transform_1)

# DataLoaderを作成
dataloader = DataLoader(self_supervised_dataset, batch_size=256, shuffle=True, num_workers=4) # num_workers は適宜変更する．デッドロックの可能性あり

In [None]:
# 取得データの確認
import matplotlib.pyplot as plt

# サンプルループ（1バッチ分のデータを取得）
for batch in dataloader:
    augmented_1, augmented_2, labels = batch
    print(f"Augmented 1 shape: {augmented_1.shape}, Augmented 2 shape: {augmented_2.shape}")
    break

#next(iter(dataloader)) # num_workers と，next(iter(dataloader)) の組み合わせはデッドロックを生じさせる可能性あり
fig, ax = plt.subplots(3, 2, figsize=(8, 12))
for i in range(3):
    ax[i, 0].imshow(augmented_1[i].permute(1, 2, 0))
    ax[i, 1].imshow(augmented_2[i].permute(1, 2, 0))
    ax[i, 0].set_title(f"Augmented 1: {labels_map[labels[i].item()]}")
    ax[i, 1].set_title(f"Augmented 2: {labels_map[labels[i].item()]}")
    ax[i, 0].axis("off")
    ax[i, 1].axis("off")
    

## SimCLR の構成

### ネットワークモデルの構成

ここまで来たら，あとは好きなようにモデルを組み立てれば良いのですが，ここでは SimCLR を利用するため，
ネットワークの構成としては，表現を得るためのモデル(`encoder`)と，その表現を投影し似ているかどうかを判定するためのモデル(`projector`)を規定します．

- `encoder`: 単純な CNN で，３回 畳み込み+プーリング操作 を行い，特徴層を作ります．
- `projector`: 特徴を MLP で射影し，64次元のベクトルとして取り出します．

In [12]:
class SimpleCNN(nn.Module):
    def __init__(self, W, H):
        super(SimpleCNN, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1), # 3xWxH -> 64xWxH
            nn.ReLU(),
            nn.MaxPool2d(2), # 64xWxH -> 64x(W/2)x(H/2)
            nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2), # 64x(W/4)x(H/4)
            nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2), # 64x(W/8)x(H/8)
            nn.Flatten() # 64x(W/8)x(H/8) -> 64*(W/8)*(H/8)
        )
        self.projector = nn.Sequential(
            nn.Linear(64*(W//8)*(H//8), 64),
            nn.ReLU(),
            nn.Linear(64, 64)
        )

    def forward(self, x):
        x = self.encoder(x)
        x = x.view(x.size(0), -1)
        x = self.projector(x)
        return x

### 損失関数の設計

損失関数は，投影された先の特徴量間の類似度を,正例の場合は大きく，負例の場合は小さくなるように設計します．
これは NT-Xent 関数と呼ばれ，下記のように定義されます．

ドラフト版では 余弦関数を用いたもので規定していましたが，いわゆるモード崩壊が起きてしまうので，`NT-Xent loss` 関数と呼ばれる損失を頑張って書いておきました．

In [13]:
# NT-Xent Loss の実装例
# 同じ画像から作られたペア→正例
# 異なる画像から作られたペア→負例
# 正例同士は似て，負例同士は異なるようにロスを設計する

def nt_xent_loss(z1, z2, temperature):
    z = torch.cat([z1, z2], dim=0)
    n = z.size(0)
    sim_matrix = torch.matmul(z, z.T) / temperature
    # 数値安定性を考慮して，行の最大値を基準に差っ引いとく
    sim_matrix = sim_matrix - torch.max(sim_matrix, dim=1, keepdim=True)[0]
    sim_matrix = torch.exp(sim_matrix)

    # 対角成分は同じ画像同士の類似度なので正例，それ以外が負例
    mask = (torch.ones_like(sim_matrix) - torch.eye(n, device=sim_matrix.device)).bool()
    pos_sim = sim_matrix[range(n), range(n)]
    neg_sim = sim_matrix[mask].reshape(n, -1)

    loss = -torch.log(pos_sim / (pos_sim + neg_sim.sum(dim=-1)))
    loss = loss.mean()
    return loss



### SimCLR での学習

データとモデルと損失関数の準備が終わったので，学習させます．
学習ループは，下記のようにかけます．

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = SimpleCNN(W, H).to(device)

optimizer = optim.Adam(model.parameters(), lr=1e-3)

history = [] # 学習履歴
num_epochs = 15 # エポック数，性能に合わせて調整

for epoch in range(num_epochs):
    for (images1, images2, _) in dataloader:
        images1, images2 = images1.to(device), images2.to(device)

        # 特徴抽出→projector
        z1 = model(images1)
        z2 = model(images2)

        # 類似性損失: NT-Xent 損失を計算
        loss = nt_xent_loss(z1, z2, temperature=0.4)  # temperature はハイパーパラメータ

        history.append(loss.item())

        # 学習
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print(f"Epoch {epoch+1}, Loss: {loss.item()}")


多分，これでなんとか回って，ロスが下がるはずです．

In [None]:
# history のプロット
plt.semilogy(history)
plt.xlabel("Iterations")
plt.ylabel("Loss")
plt.grid()


## 性能評価

`SimpleCNN.encoder` がうまく学習できていればよいな，と思って性能を測ってみます．

In [None]:
#train_dataset = datasets.CIFAR10(root=datadir, train=True, download=True, transform=transform_original)
#test_dataset = datasets.CIFAR10(root=datadir, train=False, download=True, transform=transform_original)
train_dataset = datasets.STL10(root=datadir, split='train', download=True, transform=transform_original)
test_dataset = datasets.STL10(root=datadir, split='test', download=True, transform=transform_original)

train_ratio = 1.0 # 学習パターンを使う割合
num_train_samples = int(len(train_dataset) * train_ratio)
subset_train_dataset, _ = torch.utils.data.random_split(train_dataset, [num_train_samples, len(train_dataset) - num_train_samples])

train_dataloader = DataLoader(subset_train_dataset, batch_size=64, shuffle=True, num_workers=4)
test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=True, num_workers=4)

In [None]:
# 特徴抽出層は上で学習したものを固定して使う
for param in model.parameters():
    param.requires_grad = False

# 簡単な分類器を訓練 3層のMLP
classifier = nn.Sequential(
    nn.Linear(64*(W//8)*(H//8), 128),
    nn.ReLU(),
    nn.Linear(128, 10).to(device)
).to(device)

optimizer = optim.Adam(classifier.parameters(), lr=5e-4)
criterion = nn.CrossEntropyLoss()

history = []

# ラベルを使って学習
num_epochs = 100
for epoch in range(num_epochs):
    average_loss = 0.0
    for (images, labels) in train_dataloader:
        images, labels = images.to(device), labels.to(device)
        features = model.encoder(images)
        outputs = classifier(features)

        loss = criterion(outputs, labels)
        history.append(loss.item())
        average_loss += loss.item()

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    average_loss /= len(train_dataloader)
    print(f"Epoch {epoch+1}, Loss: {average_loss}")
    
plt.semilogy(history)
plt.xlabel("Iterations")
plt.ylabel("Loss")
plt.grid()

あとは，識別性能をテストでーたで測っておきます．

In [None]:
# 識別性能をテストデータ測る
# ついでに混同行列も作成

from sklearn.metrics import confusion_matrix
import seaborn as sns
import numpy as np

conf_matrix = np.zeros((10, 10))
with torch.no_grad():
    for (images, labels) in test_dataloader:
        images, labels = images.to(device), labels.to(device)
        features = model.encoder(images)
        outputs = classifier(features)
        _, predicted = torch.max(outputs, 1)
        conf_matrix += confusion_matrix(labels.cpu().numpy(), predicted.cpu().numpy(), labels=np.arange(10))

conf_matrix = np.array(conf_matrix, dtype=int)
correct = np.sum(np.diag(conf_matrix))
total = np.sum(conf_matrix)
print(f"Accuracy: {correct / total:.4f}")


plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", xticklabels=labels_map, yticklabels=labels_map)
plt.xlabel("Predicted")
plt.ylabel("True")
plt.title("Confusion Matrix{:.4f}".format(correct / total))


まぁ，それなりに．
なお，`STL-10` の 識別性能は下記で確認できます．

https://paperswithcode.com/sota/image-classification-on-stl-10?tag_filter=231%2C0

# 実験3-5

1. 自己教師あり学習による識別器を構成し，`CIFAR10` を用いた場合の識別性能を評価しなさい．（識別器はロジスティック回帰やSVM を用いて構わない）
2. 自己教師あり学習によって得られた `CIFAR10` の特徴表現を，PCA や t-SNE を用いて図示し，各クラスのデータが構造を持つかどうかを考察しなさい．
3. 自己教師あり学習によって得られた `CIFAR10` の特徴表現を k-means 法によりクラスタリングを行い，自己教師あり学習の特徴がクラスタリングに有効かどうかを評価しなさい．