In [5]:
import torch
import torchvision

# パッケージのインポート
import os
import random
import numpy as np

import torch.nn as nn
import torch.optim as optim
import torch.utils.data
import torchvision.datasets as dset
import torchvision.transforms as transforms
import torchvision.utils as vutils
# import torchsummary

In [6]:
# 使用するデバイス
# GPU を使用しない環境（CPU環境）で実行する場合は DEVICE = 'cpu' とする
# os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID'
# DEVICE = 'cuda:0'
DEVICE = 'cpu'

# 全ての訓練データを一回ずつ使用することを「1エポック」として，何エポック分学習するか
# 再開モードの場合も, このエポック数の分だけ追加学習される（N_EPOCHSは最終エポック番号ではない）
N_EPOCHS = 20

# 学習時のバッチサイズ
BATCH_SIZE = 100

# 訓練データセット（画像ファイルリスト）のファイル名
DATASET_CSV = './MNIST/train_list.csv'

# 画像ファイルの先頭に付加する文字列（データセットが存在するディレクトリのパス）
DATA_DIR = './MNIST/'

# 特徴ベクトルの次元数
N = 32

# バッチ正規化を使用するか否か
USE_BATCH_NORM = True

# 学習結果の保存先フォルダ
MODEL_DIR = './cgan_models_MNIST/'

# 学習結果のニューラルネットワークの保存先
MODEL_FILE_ENC = os.path.join(MODEL_DIR, 'mnist_encoder_model.pth') # エンコーダ
MODEL_FILE_DEC = os.path.join(MODEL_DIR, 'mnist_decoder_model.pth') # デコーダ

# 中断／再開の際に用いる一時ファイルの保存先
CHECKPOINT_EPOCH = os.path.join(MODEL_DIR, 'checkpoint_epoch.pkl')
CHECKPOINT_ENC_MODEL = os.path.join(MODEL_DIR, 'checkpoint_enc_model.pth')
CHECKPOINT_DEC_MODEL = os.path.join(MODEL_DIR, 'checkpoint_dec_model.pth')
CHECKPOINT_ENC_OPT = os.path.join(MODEL_DIR, 'checkpoint_enc_opt.pth')
CHECKPOINT_DEC_OPT = os.path.join(MODEL_DIR, 'checkpoint_dec_opt.pth')


# 設定
workers = 2
batch_size=50
nz = 100
nch_g = 128
nch_d = 128
n_epoch = 10
lr = 0.0002
beta1 = 0.5
outf = './result_3_3-CGAN'
display_interval = 600
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# 保存先ディレクトリを作成
try:
    os.makedirs(outf, exist_ok=True)
except OSError as error: 
    print(error)
    pass

# 乱数のシード（種）を固定
random.seed(0)
np.random.seed(0)
torch.manual_seed(0)

<torch._C.Generator at 0x20e384bf6b0>

##### 訓練データセットの読み込み

In [7]:
from torch.utils.data import DataLoader, random_split
from mylib.data_io import CSVBasedDataset
from mylib.utility import save_datasets, load_datasets_from_file


# 前回の試行の続きを行いたい場合は True にする -> 再開モードになる
RESTART_MODE = False


# 再開モードの場合は，前回使用したデータセットをロードして使用する
if RESTART_MODE:
    train_dataset, valid_dataset = load_datasets_from_file(MODEL_DIR)
    if train_dataset is None:
        print('error: there is no checkpoint previously saved.')
        exit()
    train_size = len(train_dataset)
    valid_size = len(valid_dataset)

# そうでない場合は，データセットを読み込む
else:

    # CSVファイルを読み込み, 訓練データセットを用意
    dataset = CSVBasedDataset(
        filename = DATASET_CSV,
        items = [
            'File Path', # X
        ],
        dtypes = [
            'image', # Xの型
        ],
        dirname = DATA_DIR,
    )

    # 訓練データセットを分割し，一方を検証用に回す
    dataset_size = len(dataset)
    valid_size = int(0.01 * dataset_size) # 全体の 1% を検証用に
    train_size = dataset_size - valid_size # 残りの 99% を学習用に
    train_dataset, valid_dataset = random_split(dataset, [train_size, valid_size])

    # データセット情報をファイルに保存
    save_datasets(MODEL_DIR, train_dataset, valid_dataset)

# 訓練データおよび検証用データをミニバッチに分けて使用するための「データローダ」を用意
train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, pin_memory=True)
valid_dataloader = DataLoader(valid_dataset, batch_size=BATCH_SIZE, shuffle=False, pin_memory=True)

##### ネットワークの定義

In [8]:
class Generator(nn.Module):
    """
    生成器Gのクラス
    """
    def __init__(self, nz=100, nch_g=64, nch=1):
        """
        :param nz: 入力ベクトルzの次元
        :param nch_g: 最終層の入力チャネル数
        :param nch: 出力画像のチャネル数
        """
        super(Generator, self).__init__()

        # ニューラルネットワークの構造を定義する
        self.layers = nn.ModuleDict({
            'layer0': nn.Sequential(
                nn.ConvTranspose2d(nz, nch_g * 4, 3, 1, 0),     # 転置畳み込み
                nn.BatchNorm2d(nch_g * 4),                      # バッチノーマライゼーション
                nn.ReLU()                                       # ReLU
            ),  # (B, nz, 1, 1) -> (B, nch_g*4, 3, 3)
            'layer1': nn.Sequential(
                nn.ConvTranspose2d(nch_g * 4, nch_g * 2, 3, 2, 0),
                nn.BatchNorm2d(nch_g * 2),
                nn.ReLU()
            ),  # (B, nch_g*4, 3, 3) -> (B, nch_g*2, 7, 7)
            'layer2': nn.Sequential(
                nn.ConvTranspose2d(nch_g * 2, nch_g, 4, 2, 1),
                nn.BatchNorm2d(nch_g),
                nn.ReLU()
            ),  # (B, nch_g*2, 7, 7) -> (B, nch_g, 14, 14)
            'layer3': nn.Sequential(
                nn.ConvTranspose2d(nch_g, nch, 4, 2, 1),
                nn.Tanh()
            )   # (B, nch_g, 14, 14) -> (B, nch, 28, 28)
        })

    def forward(self, z):
        """
        順方向の演算
        :param z: 入力ベクトル
        :return: 生成画像
        """
        for layer in self.layers.values():  # self.layersの各層で演算を行う
            z = layer(z)
        return z

class Discriminator(nn.Module):
    """
    識別器Dのクラス
    """
    def __init__(self, nch=1, nch_d=64):
        """
        :param nch: 入力画像のチャネル数
        :param nch_d: 先頭層の出力チャネル数
        """
        super(Discriminator, self).__init__()

        # ニューラルネットワークの構造を定義する
        self.layers = nn.ModuleDict({
            'layer0': nn.Sequential(
                nn.Conv2d(nch, nch_d, 4, 2, 1),     # 畳み込み
                nn.LeakyReLU(negative_slope=0.2)    # leaky ReLU関数
            ),  # (B, nch, 28, 28) -> (B, nch_d, 14, 14)
            'layer1': nn.Sequential(
                nn.Conv2d(nch_d, nch_d * 2, 4, 2, 1),
                nn.BatchNorm2d(nch_d * 2),
                nn.LeakyReLU(negative_slope=0.2)
            ),  # (B, nch_d, 14, 14) -> (B, nch_d*2, 7, 7)
            'layer2': nn.Sequential(
                nn.Conv2d(nch_d * 2, nch_d * 4, 3, 2, 0),
                nn.BatchNorm2d(nch_d * 4),
                nn.LeakyReLU(negative_slope=0.2)
            ),  # (B, nch_d*2, 7, 7) -> (B, nch_d*4, 3, 3)
            'layer3': nn.Sequential(
                nn.Conv2d(nch_d * 4, 1, 3, 1, 0),
                nn.Sigmoid()
            )    
            # (B, nch_d*4, 3, 3) -> (B, 1, 1, 1)
        })

    def forward(self, x):
        """
        順方向の演算
        :param x: 本物画像あるいは生成画像
        :return: 識別信号
        """
        for layer in self.layers.values():  # self.layersの各層で演算を行う
            x = layer(x)
        return x.squeeze()     # Tensorの形状を(B)に変更して戻り値とする

In [9]:
def weights_init(m):
    """
    ニューラルネットワークの重みを初期化する。作成したインスタンスに対しapplyメソッドで適用する
    :param m: ニューラルネットワークを構成する層
    """
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:            # 畳み込み層の場合
        m.weight.data.normal_(0.0, 0.02)
        m.bias.data.fill_(0)
    elif classname.find('Linear') != -1:        # 全結合層の場合
        m.weight.data.normal_(0.0, 0.02)
        m.bias.data.fill_(0)
    elif classname.find('BatchNorm') != -1:     # バッチノーマライゼーションの場合
        m.weight.data.normal_(1.0, 0.02)
        m.bias.data.fill_(0)

In [10]:
# 生成器G。ランダムベクトルから生成画像を作成する
netG = Generator(nz=nz+10, nch_g=nch_g).to(device) #10はn_class=10を指す。出し分けに必要なラベル情報
netG.apply(weights_init)    # weights_init関数で初期化
print(netG)

Generator(
  (layers): ModuleDict(
    (layer0): Sequential(
      (0): ConvTranspose2d(110, 512, kernel_size=(3, 3), stride=(1, 1))
      (1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU()
    )
    (layer1): Sequential(
      (0): ConvTranspose2d(512, 256, kernel_size=(3, 3), stride=(2, 2))
      (1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU()
    )
    (layer2): Sequential(
      (0): ConvTranspose2d(256, 128, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
      (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU()
    )
    (layer3): Sequential(
      (0): ConvTranspose2d(128, 1, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
      (1): Tanh()
    )
  )
)


In [11]:
# 生成器G。ランダムベクトルから生成画像を作成する
netG = Generator(nz=nz+10, nch_g=nch_g).to(device) #10はn_class=10を指す。出し分けに必要なラベル情報
netG.apply(weights_init)    # weights_init関数で初期化
print(netG)

Generator(
  (layers): ModuleDict(
    (layer0): Sequential(
      (0): ConvTranspose2d(110, 512, kernel_size=(3, 3), stride=(1, 1))
      (1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU()
    )
    (layer1): Sequential(
      (0): ConvTranspose2d(512, 256, kernel_size=(3, 3), stride=(2, 2))
      (1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU()
    )
    (layer2): Sequential(
      (0): ConvTranspose2d(256, 128, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
      (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU()
    )
    (layer3): Sequential(
      (0): ConvTranspose2d(128, 1, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
      (1): Tanh()
    )
  )
)


In [12]:
# 識別器D。画像が本物か生成かを識別する
netD = Discriminator(nch=1+10, nch_d=nch_d).to(device) #10はn_class=10を指す。分類に必要なラベル情報
netD.apply(weights_init)
print(netD)

Discriminator(
  (layers): ModuleDict(
    (layer0): Sequential(
      (0): Conv2d(11, 128, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
      (1): LeakyReLU(negative_slope=0.2)
    )
    (layer1): Sequential(
      (0): Conv2d(128, 256, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
      (1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): LeakyReLU(negative_slope=0.2)
    )
    (layer2): Sequential(
      (0): Conv2d(256, 512, kernel_size=(3, 3), stride=(2, 2))
      (1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): LeakyReLU(negative_slope=0.2)
    )
    (layer3): Sequential(
      (0): Conv2d(512, 1, kernel_size=(3, 3), stride=(1, 1))
      (1): Sigmoid()
    )
  )
)


##### 学習の実行

In [13]:
criterion = nn.BCELoss()    # バイナリークロスエントロピー（Sigmoid関数無し）

# オプティマイザ−のセットアップ
optimizerD = optim.Adam(netD.parameters(), lr=lr, betas=(beta1, 0.999), weight_decay=1e-5)  # 識別器D用
optimizerG = optim.Adam(netG.parameters(), lr=lr, betas=(beta1, 0.999), weight_decay=1e-5)  # 生成器G用

In [14]:
def onehot_encode(label, device, n_class=10):
    """
    カテゴリカル変数のラベルをOne-Hoe形式に変換する
    :param label: 変換対象のラベル
    :param device: 学習に使用するデバイス。CPUあるいはGPU
    :param n_class: ラベルのクラス数
    :return:
    """
    eye = torch.eye(n_class, device=device)
    # ランダムベクトルあるいは画像と連結するために(B, c_class, 1, 1)のTensorにして戻す
    return eye[label].view(-1, n_class, 1, 1) 

In [15]:
def concat_image_label(image, label, device, n_class=10):
    """
    画像とラベルを連結する
    :param image:　画像
    :param label: ラベル
    :param device: 学習に使用するデバイス。CPUあるいはGPU
    :param n_class: ラベルのクラス数
    :return:　画像とラベルをチャネル方向に連結したTensor
    """
    B, C, H, W = image.shape    # 画像Tensorの大きさを取得
    
    oh_label = onehot_encode(label, device)         # ラベルをOne-Hotベクトル化
    oh_label = oh_label.expand(B, n_class, H, W)    # 画像のサイズに合わせるようラベルを拡張する
    return torch.cat((image, oh_label), dim=1)      # 画像とラベルをチャネル方向（dim=1）で連結する

In [16]:
def concat_noise_label(noise, label, device):
    """
    ノイズ（ランダムベクトル）とラベルを連結する
    :param noise: ノイズ
    :param label: ラベル
    :param device: 学習に使用するデバイス。CPUあるいはGPU
    :return:　ノイズとラベルを連結したTensor
    """
    oh_label = onehot_encode(label, device)     # ラベルをOne-Hotベクトル化
    return torch.cat((noise, oh_label), dim=1)  # ノイズとラベルをチャネル方向（dim=1）で連結する

In [17]:
# 生成器のエポックごとの画像生成に使用する確認用の固定ノイズ
fixed_noise = torch.randn(batch_size, nz, 1, 1, device=device) 
# 確認用のラベル。0〜9のラベルの繰り返し
fixed_label = [i for i in range(10)] * (batch_size // 10)
fixed_label = torch.tensor(fixed_label, dtype=torch.long, device=device)
# 確認用のノイズとラベルを連結
fixed_noise_label = concat_noise_label(fixed_noise, fixed_label, device) 
print(fixed_noise.shape)
print(fixed_label)
print(fixed_noise_label.shape)

torch.Size([50, 100, 1, 1])
tensor([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3,
        4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7,
        8, 9])
torch.Size([50, 110, 1, 1])


In [None]:
# 学習のループ
for epoch in range(n_epoch):
    for itr, data in enumerate(dataloader):
        real_image = data[0].to(device)     # 本物画像
        real_label = data[1].to(device)     # 本物画像に対応するラベル
        # 本物画像とラベルを連結
        real_image_label = concat_image_label(real_image, real_label, device) 
        sample_size = real_image.size(0)    # 画像枚数

        # 標準正規分布からノイズを生成
        noise = torch.randn(sample_size, nz, 1, 1, device=device)
        # 生成画像生成用のラベル
        fake_label = torch.randint(10, (sample_size,), dtype=torch.long, device=device)
        # ノイズとラベルを連結
        fake_noise_label = concat_noise_label(noise, fake_label, device)        
        # 本物画像に対する識別信号の目標値「1」
        real_target = torch.full((sample_size,), 1., device=device)
        # 生成画像に対する識別信号の目標値「0」
        fake_target = torch.full((sample_size,), 0., device=device)
        
        ############################
        # 識別器Dの更新
        ###########################
        netD.zero_grad()    # 勾配の初期化
        
        # 識別器Dで本物画像とラベルの組み合わせに対する識別信号を出力
        output = netD(real_image_label)
        # 本物画像に対する識別信号の損失値
        errD_real = criterion(output, real_target)

        D_x = output.mean().item()  # 本物画像の識別信号の平均

        fake_image = netG(fake_noise_label)  # 生成器Gでラベルに対応した生成画像を生成
        # 生成画像とラベルを連結
        fake_image_label = concat_image_label(fake_image, fake_label, device)   
        
        # 識別器Dで本物画像に対する識別信号を出力
        output = netD(fake_image_label.detach()) 
        # 生成画像に対する識別信号の損失値
        errD_fake = criterion(output, fake_target)  
        D_G_z1 = output.mean().item()# 生成画像の識別信号の平均

        errD = errD_real + errD_fake    # 識別器Dの全体の損失
        errD.backward()    # 誤差逆伝播
        optimizerD.step()   # Dのパラメーターを更新

        ############################
        # 生成器Gの更新
        ###########################
        netG.zero_grad()    # 勾配の初期化
        
        output = netD(fake_image_label)     # 更新した識別器Dで改めて生成画像とラベルの組み合わせに対する識別信号を出力
        errG = criterion(output, real_target)   # 生成器Gの損失値。Dに生成画像を本物画像と誤認させたいため目標値は「1」
        errG.backward()     # 誤差逆伝播
        D_G_z2 = output.mean().item()# 更新した識別器Dによる生成画像の識別信号の平均

        optimizerG.step()   # Gのパラメータを更新

        if itr % display_interval == 0: 
            print('[{}/{}][{}/{}] Loss_D: {:.3f} Loss_G: {:.3f} D(x): {:.3f} D(G(z)): {:.3f}/{:.3f}'
                  .format(epoch + 1, n_epoch,
                          itr + 1, len(dataloader),
                          errD.item(), errG.item(), D_x, D_G_z1, D_G_z2))
            
        if epoch == 0 and itr == 0:     # 初回に本物画像を保存する
            vutils.save_image(real_image, '{}/real_samples.png'.format(outf),
                              normalize=True, nrow=10)

    ############################
    # 確認用画像の生成
    ############################
    fake_image = netG(fixed_noise_label)  # 1エポック終了ごとに確認用の生成画像を生成する
    vutils.save_image(fake_image.detach(), '{}/fake_samples_epoch_{:03d}.png'.format(outf, epoch + 1),
                      normalize=True, nrow=10)

    ############################
    # モデルの保存
    ############################
    if (epoch + 1) % 10 == 0:   # 10エポックごとにモデルを保存する
        torch.save(netG.state_dict(), '{}/netG_epoch_{}.pth'.format(outf, epoch + 1))
        torch.save(netD.state_dict(), '{}/netD_epoch_{}.pth'.format(outf, epoch + 1))