<a href="https://colab.research.google.com/github/hasune613/hello-world/blob/main/%E8%84%B3%E7%A7%91%E5%AD%A6%E3%81%A8%E6%95%99%E5%B8%AB%E3%81%AA%E3%81%97%E5%AD%A6%E7%BF%92%E3%81%AE%E9%96%A2%E4%BF%82%E3%80%82%E6%83%85%E5%A0%B1%E9%87%8F%E6%9C%80%E5%A4%A7%E5%8C%96%E6%95%99%E5%B8%AB%E3%81%AA%E3%81%97%E5%AD%A6%E7%BF%92%E3%81%A7MNIST.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import random
import numpy as np
import torch

In [None]:
seed_value = 0
os.environ['PYTHONHASHSEED'] = str(seed_value)
random.seed(seed_value)
torch.manual_seed(seed_value) # PyTorchを使う場合

<torch._C.Generator at 0x7f3bbe594730>

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)
# GPUを使用。cudaと出力されるのを確認。

cuda


In [None]:
# MNISTの画像をダウンロードし、DataLoaderにする（TrainとTest）
from torchvision import datasets , transforms

batch_size_train = 512

train_loader = torch.utils.data.DataLoader(
    datasets.MNIST('.',train=True, download=True,
                   transform=transforms.Compose([
                    transforms.ToTensor(),
                   ])),
                   batch_size = batch_size_train,shuffle=True, drop_last= True)
# drop_lastは最後のミニバッチが規定のサイズより小さい場合は使用しない設定

test_loader = torch.utils.data.DataLoader(
    datasets.MNIST('.',train=False, transform = transforms.Compose([
        transforms.ToTensor(),
        ])),
        batch_size = 1024,shuffle=False
)

In [None]:
# ディープラーニングモデル
import torch.nn as nn
import torch.nn.functional as F

OVER_CLUSTRING_Rate = 10
# 最終出力層は性能upを見込み、10種類に加え、さらに多くに分類させる（overclustring）
# 最終出力層は10種類版とoverclustering版。損失関数もその両方のアウトプットを計算して総和を使用。

class NetIIC(nn.Module):
    def __init__(self):
        super(NetIIC,self).__init__()

        self.conv1 = nn.Conv2d(1,128,5,2,bias = False)
        self.bn1 = nn.BatchNorm2d(128)
        
        self.conv2 = nn.Conv2d(128,128,5,1,bias = False)
        self.bn2 = nn.BatchNorm2d(128)
        
        self.conv3 = nn.Conv2d(128,128,5,1,bias = False)
        self.bn3 = nn.BatchNorm2d(128)
        
        self.conv4 = nn.Conv2d(128,256,4,1,bias = False)
        self.bn4 = nn.BatchNorm2d(256)

        # 0-9に対応すると期待したい10種類のクラス
        self.fc = nn.Linear(256,10)

        # overclustering
        # 実際の想定よりも多めにクラスタリングさせることで、ネットワークで微細な変化を捉えられるようにする
        self.fc_overclustering = nn.Linear(256,10*OVER_CLUSTRING_Rate)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        x = F.relu(self.bn4(self.conv4(x)))
        x_prefinal = x.view(x.size(0),-1)
        y = F.softmax(self.fc(x_prefinal),dim=1)

        y_overclustering = F.softmax(self.fc_overclustering(x_prefinal),dim=1)# overclustering

        return y,y_overclustering
        



In [None]:
import torch.nn.init as init

def weight_init(m):
     """重み初期化"""
     if isinstance(m,nn.Conv2d):
         init.xavier_normal_(m.weight.data)
         if m.bias is not None:
             init.normal_(m.bias.data)
     elif isinstance(m,  nn.BatchNorm2d):
         init.normal_(m.weight.data, mean=1,std=0.02)
         init.constant_(m.bias.data,0)
     elif isinstance(m, nn.Linear):
       
        init.kaiming_normal_(m.weight.data)

        if m.bias is not None:
            init.normal_(m.bias.data)


In [None]:
import torchvision as tv
import torchvision.transforms.functional as TF

In [None]:
def perturb_imagedata(x):
    y = x.clone()
    batch_size = x.size(0)

    # ランダムなアフィン変換を実施
    trans = tv.transforms.RandomAffine(15,(0.2,0.2,),(0.2,0.75,))
    for i in range(batch_size):
        y[i,0] = TF.to_tensor(trans(TF.to_pil_image(y[i,0])))

    noise = torch.randn(batch_size,1,x.size(2),x.size(3))
    div = torch.randint(20,30,(batch_size,),
                        dtype = torch.float32).view(batch_size,1,1,1)
    y += noise / div

    return y
                    



In [None]:
import sys

In [None]:
def compute_joint(x_out, x_tf_out):
    # x_out、x_tf_outは torch.Size([512, 10])。この二つをかけ算して同時分布を求める、torch.Size([2048, 10, 10])にする。
    # torch.Size([512, 10, 1]) * torch.Size([512, 1, 10])
    p_i_j = x_out.unsqueeze(2) * x_tf_out.unsqueeze(1)
    # p_i_j は　torch.Size([512, 10, 10])

    # 全ミニバッチを足し算する ⇒ torch.Size([10, 10])
    p_i_j = p_i_j .sum(dim=0)

    # 転置行列と足し算して割り算（対称化） ⇒ torch.Size([10, 10])   
    p_i_j = (p_i_j + p_i_j.t()) / 2.

    # 規格化 ⇒ torch.Size([10, 10])
    p_i_j = p_i_j / p_i_j.sum()

    return p_i_j
    #p_i_jは通常画像の判定出力10種類と、
    #変換画像の判定10種類の100パターンに対して、
    #全ミニバッチが100パターンのどれだったのかの確率分布表を示す

    

In [None]:
def IID_loss(x_out, x_tf_out, EPS = sys.float_info.epsilon):
    # torch.Size([512, 10])、後ろの10は分類数なので、overclusteringのときは100
    bs,k = x_out.size()
    p_i_j = compute_joint(x_out,x_tf_out)# torch.Size([10, 10])

    #同時確率の分布表から、変換画像の10パターンをsumをして周辺化し、
    #元画像だけの周辺確率の分布表を作る
    p_i = p_i_j.sum(dim=1).view( k, 1).expand(k,  k)

    #同時確率の分布表から、元画像の10パターンをsumをして周辺化し、
    #変換画像だけの周辺確率の分布表を作る
    p_j = p_i_j.sum(dim = 0).view(1,k).expand(k , k)

    # 0に近い値をlogに入れると発散するので、避ける
    p_i_j = torch.where(p_i_j < EPS, torch.tensor(
        [EPS],device = p_i_j.device), p_i_j)
    p_j = torch.where(p_j < EPS, torch.tensor([EPS], device=p_j.device),p_j)
    p_i = torch.where(p_i < EPS, torch.tensor([EPS], device=p_j.device),p_i)

    alpha = 2.0 #通常の相互情報量の計算はalphaは1
    loss = -1 * (p_i_j *(torch.log(p_i_j) -alpha*
                        torch.log(p_j) - alpha * torch.log(p_i))).sum()

    return loss

In [None]:
total_epoch = 3

model = NetIIC()
model.apply(weight_init)
model.to(device)

optimizer = torch.optim.Adam(model.parameters(), lr= 1e-3)

def train (total_epoch, model, train_loader, optimizer, device):
    model.train()
    scheduler = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(
        optimizer, T_0=2, T_mult=2
    )

    for epoch in range(total_epoch):
        for batch_idx , (data, target) in enumerate(train_loader):

            scheduler.step()

            data_perturb = perturb_imagedata(data)# ノイズを与える

            data = data.to(device)
            data_perturb = data_perturb.to(device)
            # 最適化関数の初期化
            optimizer.zero_grad()
            # ニューラルネットワーク出力
            output, output_overclustering = model(data)
            output_perturb, output_perturb_overclustering = model(data_perturb)
            # 最適化関数の初期化
            loss1 = IID_loss(output, output_perturb)
            loss2 = IID_loss(output_overclustering,
                            output_perturb_overclustering)
            loss = loss1 = loss2
            # 損失を減らすように更新
            loss.backward()
            optimizer.step()
            # ログ出力
            if batch_idx % 10 ==0:
                print(f'Train Epoch {epoch}:iter{batch_idx} -\tLoss1:{loss1.item():.6f} - \tLoss2:{loss2.item():.6f}-\tLoss_total:{loss1.item()+loss2.item():.6f}')


    return  model, optimizer

model_trained, optimizer = train(
    total_epoch, model , train_loader, optimizer, device
)





Train Epoch 0:iter0 -	Loss1:-7.031949 - 	Loss2:-7.031949-	Loss_total:-14.063898
Train Epoch 0:iter10 -	Loss1:-8.761274 - 	Loss2:-8.761274-	Loss_total:-17.522549
Train Epoch 0:iter20 -	Loss1:-9.085955 - 	Loss2:-9.085955-	Loss_total:-18.171909
Train Epoch 0:iter30 -	Loss1:-9.167633 - 	Loss2:-9.167633-	Loss_total:-18.335266
Train Epoch 0:iter40 -	Loss1:-9.241488 - 	Loss2:-9.241488-	Loss_total:-18.482975
Train Epoch 0:iter50 -	Loss1:-9.273063 - 	Loss2:-9.273063-	Loss_total:-18.546125
Train Epoch 0:iter60 -	Loss1:-9.259229 - 	Loss2:-9.259229-	Loss_total:-18.518457
Train Epoch 0:iter70 -	Loss1:-9.280230 - 	Loss2:-9.280230-	Loss_total:-18.560459
Train Epoch 0:iter80 -	Loss1:-9.306996 - 	Loss2:-9.306996-	Loss_total:-18.613993
Train Epoch 0:iter90 -	Loss1:-9.347582 - 	Loss2:-9.347582-	Loss_total:-18.695164
Train Epoch 0:iter100 -	Loss1:-9.402233 - 	Loss2:-9.402233-	Loss_total:-18.804466
Train Epoch 0:iter110 -	Loss1:-9.391451 - 	Loss2:-9.391451-	Loss_total:-18.782902
Train Epoch 1:iter0 -	Loss1

In [None]:
# モデル分類のクラスターの結果を確認

def test(model, device, train_loader):
    model.eval()

    out_targs = []
    ref_targs = []
    cnt = 0

    with torch.no_grad():
        for data, target in test_loader:
            cnt += 1
            data = data.to(device)
            target = target.to(device)
            outputs , outputs_overclustering = model(data)

            out_targs.append(outputs.argmax(dim=1).cpu())
            ref_targs.append(target.cpu())
            
    out_targs = torch.cat(out_targs)
    ref_targs = torch.cat(ref_targs)

    return out_targs.numpy() , ref_targs.numpy()

out_targs, ref_targs = test(model_trained, device, train_loader)


In [None]:
!pip install -q scipy

In [None]:
import numpy as np
import scipy.stats as stats

# 混同行列（的な）を作る
matrix = np.zeros((10,10))

# 縦に数字の0から9を、横に判定されたクラスの頻度表を作成
for i in range(len(out_targs)):
    row = ref_targs[i]
    col = out_targs[i]
    matrix[row][col] += 1

np.set_printoptions(suppress=True)
print(matrix)



[[  1. 439.  37. 310.  66.   0.   3. 124.   0.   0.]
 [434. 658.   1.   0.   8.   3.  25.   0.   1.   5.]
 [  0. 725.   8.  61.  74.   6.  17.  76.  65.   0.]
 [ 11. 834.   0.   2.  15.  46.  91.   9.   1.   1.]
 [  1. 698.  11.   7.   9.  44.  28. 184.   0.   0.]
 [  8. 701.  72.   7.  32.  50.   9.  13.   0.   0.]
 [ 20.  77. 312.   8. 263.   3.  36. 238.   1.   0.]
 [  0. 528.   0.   5.  21. 189. 253.   8.   1.  23.]
 [ 37. 636. 102.  23.   2. 119.  32.  22.   1.   0.]
 [  5. 801.   3.  18.   6.  41.  17. 115.   1.   2.]]


In [None]:
total_num = matrix.sum().sum()
print(total_num)
correct_num_list = matrix.max(axis = 0)

print(correct_num_list)
print(correct_num_list.sum())

print('正解率：',correct_num_list.sum() / total_num*100)


10000.0
[434. 834. 312. 310. 263. 189. 253. 238.  65.  23.]
2921.0
正解率： 29.21
