# pytorchによるpix2pixの写経
## 参照URL
https://github.com/GINK03/pytorch-pix2pix

## import

In [1]:
import torch
import torch.nn as nn
from torch.autograd import  Variable
import numpy as np

## 重みの初期化

In [2]:
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        m.weight.data.normal_(0.0, 0.02)
    elif classname.find('BatchNorm2d') != -1 or classname.find('InstanceNorm2d') != -1:
        m.weight.data.normal_(1.0, 0.02)
        m.bias.data.fill_(0)

## 正規化層の取得

In [3]:
def get_norm_layer(norm_type):
    if norm_type == 'batch':
        norm_layer = nn.BatchNorm2d
    elif norm_type == 'instance':
        norm_layer = nn.InstanceNorm2d
    else:
        print('normalization layer [%s] is not found' % norm_type)
    
    return norm_layer

## Generatorの生成関数
`ResnetGenerator`のラッパー関数という印象を受ける。特にやっていることは、正規化層の選択と、GPUを利用するならこの時点で送るといったところか。

しかし、`cuda()`するのと`to(device)`するのはどっちが主流なのか？←https://qiita.com/vintersnow/items/91545c27e2003f62ebc4　によると`to(device)`はバージョンアップで使えるようになった表現らしい。これで、条件分岐なしでCPU・GPU両方に向けたコードが書ける。


因みに`torch.nn.Module#apply(fn)`
は子の`nn.Module`全てに`fn`を適用するメソッドである。つまり、`netG.apply(weights_init)`で全ての層のパラメータを初期化しているわけである。

In [4]:
def define_G(input_nc, output_nc, ngf, norm='batch', use_dropout=False, gpu_ids=[]):
    netG = None
    use_gpu = len(gpu_ids) > 0
    norm_layer = get_norm_layer(norm_type=norm)
    
    if use_gpu:
        assert(torch.cuda.is_available())
    
    netG = ResnetGenerator(input_nc, output_nc, ngf, 
                           norm_layer=norm_layer, use_dropout=use_dropout, n_blocks=9, gpu_ids=gpu_ids)
    
    if len(gpu_ids) > 0:
        netG.cuda()
        
    netG.apply(weights_init)
    return netG

## Discriminatorの生成関数
`define_G()`と同様に`NLayerDiscriminator`のラッパーという感じだ。

In [5]:
def define_D(input_nc, ndf, norm='batch', use_sigmoid=False, gpu_ids=[]):
    netD = None
    use_gpu = len(gpu_ids) > 0
    norm_layer = get_norm_layer(norm_type=norm)
    
    if use_gpu:
        assert(torch.cuda.is_available())
        
    netD = NLayerDiscriminator(input_nc, ndf, n_layers=3, 
                               norm_layer=norm_layer, use_sigmoid=use_sigmoid, gpu_ids=gpu_ids)
    
    if use_gpu:
        netD.cuda()
    
    netD.apply(weights_init)
    return netD

In [6]:
def print_network(net):
    num_params = 0
    for param in net.parameters():
        num_params += param.numel()
    
    print(net)
    print('Total number of parameters: %d' % num_params)

## class GANLoss
githubによると
```
LSGANや通常のGANで用いられるGANの損失を定義する
```
とある。

使われ方を見たり、中身を見たらわかるだろう（希望的観測）。

真偽の数値を決めておいて、使用する損失関数を状況に合わせて変えるためのAdapterといってもよいのだろうか？

### numel()

### Binary Cross Entropy Loss (BCE Loss)
$N$をバッチサイズとして、
\begin{align*}
l(x, y ) &= L = \lbrace l_1, l_2, \dots, l_N \rbrace ^T \\
l_n &= -w_n \left(  y_n \cdot \log x_n + (1 - y_n)\cdot \log(1 - x_n) \right) 
\end{align*}

### Mean Squared Error Loss (MSE Loss)
\begin{align*}
l(x, y ) &= L = \lbrace l_1, l_2, \dots, l_N \rbrace ^T \\
l_n &= (x_n - y_n)^2 
\end{align*}
2乗距離。

In [23]:
class GANLoss(nn.Module):
    def __init__(self, use_lsgan=True, target_real_label=1.0, target_fake_label=0.0,
                 tensor=torch.FloatTensor):
        super(GANLoss, self).__init__()
        self.real_label = target_real_label
        self.fake_label = target_fake_label
        self.real_label_var = self.fake_label_var = None
        self.Tensor = tensor
        if use_lsgan:
            self.loss = nn.MSELoss()
        else:
            self.loss = nn.BCELoss()
        
    def get_target_tensor(self, input, target_is_real):
        target_tensor = None
        if target_is_real:
            create_label = ((self.real_label_var is None) or
                            (self.real_label_var.numel() != input.numel()))
            if create_label:
                real_tensor = self.Tensor(input.size()).fill_(self.real_label)
                self.real_label_var = Variable(real_tensor, requires_grad = False)
            target_tensor  = self.fake_label_var
        else:
            create_label = ((self.fake_label_var is None) or
                            (self.fake_label_var.numel() != input.numel()))
            if create_label:
                fake_tensor = self.Tensor(input.size()).fill_(self.fake_label)
                self.fake_label_var = Variable(fake_tensor, requires_grad=False)
            target_tensor = fake_tensor
            
        return target_tensor
    
    # get_target_tensorのラッパ
    # forward()に対する__call__()のようなものだろう
    def __call__(self,input, target_is_real):
        target_tensor = self.get_target_tensor(input, target_is_real)
        return self.loss(input, target_tensor.cuda())

## class ResnetGenerator
本丸その１：生成器。

### torch.cuda.FloatTensor
`cuda()`や`to(device)`でcuda対応のGPUに転送されたテンソルのクラス。現在のバージョンではコンストラクタの様式が変わった(`torch.Tensor(・, dtype=・, device=・)`)ので、型がどうなっているのか不思議だったがisinstanceしたところ、`dtype`, `device`により型が変わるようだ。

In [8]:
class ResnetGenerator(nn.Module):
    def __init__(self, input_nc, output_nc, ngf=64, norm_layer=nn.BatchNorm2d, 
                 use_dropout=False, n_blocks=6, gpu_ids=[]):
        assert(n_blocks >= 0)
        super(ResnetGenerator, self).__init__
        self.input_nc = input_nc
        self.output_nc = output_nc
        self.ngf = ngf
        self.gpu_ids = gpu_ids
        
        model = [nn.Conv2d(input_nc, ngf, kernel_size=7, padding=3),
                 norm_layer(ngf, affine=True),
                 nn.ReLU(True)]
        
        n_downsampling = 2
        # チャンネル数を増やす
        for i in range(n_downsampling):
            mult = 2**i
            model += [
                nn.Conv2d(ngf * mult, ngf * mult * 2, kernel_size=3, 
                          stride=2, padding=1),
                norm_layer(ngf * mult * 2, affine=True),
                nn.ReLU(True)
            ]
            
        
        mult = 2**n_downsampling
        # 残余ネットワークの層を追加
        for i in range(n_blocks):
            model += [ResNetBlock(ngf * mult, 'zero', norm_layer=norm_layer, use_dropout=use_dropout)]
        
        # チャンネル数を小さくする
        for i in range(n_downsampling):
            mult = 2**(n_downsampling - 1)
            model += [
                nn.ConvTranspose2d(ngf * mult, int(ngf * mult / 2),
                                   kernel_size=3, stride=2,
                                   padding=1, output_padding=1),
                norm_layer(int(ngf * mult / 2), affine=True),
                nn.ReLU(True)
            ]
        
        # 出力層用に形を整える
        model += [nn.Conv2d(ngf, output_nc, kernel_size=7, padding=3)]
        model += [nn.Tanh()]
        
        self.model = nn.Sequential(*model)
        
    def forward(self, input):
        if self.gpu_ids and isinstance(input.data, torch.cuda.FloatTensor):
            return nn.parallel.data_parallel(self.model, input, self.gpu_ids)
        else:
            return self.model(input)

## class ResnetBlock
親玉。一番よくわからない。https://deepage.net/deep_learning/2016/11/30/resnet.html でちゃんと勉強しよう。

In [None]:
class ResnetBlock(nn.Module):
    def __init__(self, dim, padding_type, norm_layer, use_dropout):
        super(ResnetBlock, self).__init__()
        self.conv_block = self.build_conv_block(dim, padding_type, norm_layer, use_dropout)
        
    def build_conv_block(self, dim, padding_type, norm_layer, use_dropout):
        conv_block = []
        p = 0
        assert(padding_type == 'zero')
        p = 1
        
        conv_block += [
            nn.Conv2d(dim, dim, kernel_size=3, padding=p),
            norm_layer(dim, affine=True),
            nn.ReLU(True)
        ]
        
        if use_dropout:
            conv_block  += [nn.Dropout(0.5)]
        
        conv_block += [
            nn.Conv2d(dim, dim, kernel_size=3, padding=p),
            norm_layer(dim,affine(True))
        ]
        return nn.Sequential(*conv_block)
    
    def forward(self, x):
        out = x + self.conv_block(x)
        return out

## class NLayerDiscriminator
本丸その２。

In [None]:
class NLayerDiscriminator(nn.Module):
    def __init__(self, input_nc, ndf=64, n_layers=3, 
                 norm_layer=nn.BatchNorm2d, use_sigmoid=False, gpu_ids=[]):
        super(NLayerDiscriminator, self).__init__()
        self.gpu_ids = gpu_ids
        
        kw = 4
        padw = int(np.ceil((kw-1)/2))
        seq = [
            nn.Conv2d(input_nc, ndf, kernel_size=kw, stride=2, padding=padw),
            nn.LeakyReLU(0.2, True)
        ]
        
        nf_mult = 1
        nf_mult_prev = 1
        
        for n in range(1, n_layers):
            nf_mult_prev = nf_mult
            nf_mult = min(2**n, 8)
            seq += [
                nn.Conv2d(ndf * nf_mult_prev, ndf * nf_mult, kernel_size=kw, stride=2, padding=padw),
                norm_layer(ndf, * nf_mult, affine=True),
                nn.LeakyReLU(0.2, True)
            ]
        
        nf_mult_prev = nf_mult
        nf_mult = min(2**n_layers, 8)
        seq += [
            nn.Conv2d(ndf * nf_mult_prev, ndf * nf_mult, kernel_size=kw, stride=1, padding=padw),
            norm_layer(ndf, * nf_mult, affine=True),
            nn.LeakyReLU(0.2, True)
        ]
        
        seq += [nn.Conv2d(ndf * nf_mult, 1, kernel_size=kw, stride=1, padding=padw)]
        
        if use_sigmoid:
            seq += [nn.Sigmoid()]
        
        self.model = nn.Sequential(*seq)
        
    def forward(self, input):
        # 0.4以降はTensorが計算グラフを持てるので、こうする必要があるかは調査が必要
        if len(self.gpu_ids) and isinstance(input.data, torch.cuda.FloatTensor):
            return nn.parallel.data_parallel(self.model, input, self.gpu_ids)
        else:
            return self.model(input)

## 準備

### パラメータ

In [14]:
input_nc = 3
output_nc = 2
ngf = 64
ndf = 64

batch_size = 64

# Adam用
lr = 0.00015
beta = (0.5, 0.999)

# L1用
lamb = 10


### ネットワーク

In [11]:
netG = define_G(input_nc, output_nc, ngf, norm='batch', use_dropout=False, gpu_ids=[0])
netD = define_D(input_nc, output_nc, ndf, norm='batch', use_sigmoid=False, gpu_ids=[0])

AssertionError: 

### 損失関数

### nn.L1Loss
\begin{align*}
l(x,y) &= L = \lbrace l_1, \dots, l_N \rbrace^\top \\
l_n &= |x_n - y_n|
\end{align*}
要するに、差の絶対値
### nn.MSELoss

In [12]:
criterionGAN = GANLoss()
criterionL1 = nn.L1Loss()
criterionMSE = nn.MSELoss()

### 最適化スキーム

In [13]:
from torch import optim
optimizerG = optim.Adam(netG.parameters(), lr, beta)
optimizerD = optim.Adam(netD.parameters(), lr, beta)

NameError: name 'netG' is not defined

### 変数

In [15]:
real_a = torch.Tensor(batch_size, input_nc, 256, 256)
real_b = torch.Tensor(batch_size, output_nc, 256, 256)

## デバイス(CPU or GPU)の設定 & 変数化

In [22]:
device = torch.device("cuda" if torch.cuda.is_available() else 'cpu')

netG, netD, \
criterionGAN, criterionL1, criterionMSE, \
real_a, real_b = map(lambda x: x.to(device), 
                     [netG, netD, criterionGAN, criterionL1, criterionMSE, real_a, real_b])

real_a = Variable(real_a)
real_b = Variable(real_b)

NameError: name 'netG' is not defined

## 学習

### torch.cat(seq, dim=0, out=None) → Tensor
与えられた`Tensor`の列`seq`を次元`dim`の軸で連結する。
```py
>>> x = torch.randn(2, 3)
>>> x
tensor([[ 0.6580, -1.0969, -0.4614],
        [-0.1034, -0.5790,  0.1497]])
>>> torch.cat((x, x, x), 0)
tensor([[ 0.6580, -1.0969, -0.4614],
        [-0.1034, -0.5790,  0.1497],
        [ 0.6580, -1.0969, -0.4614],
        [-0.1034, -0.5790,  0.1497],
        [ 0.6580, -1.0969, -0.4614],
        [-0.1034, -0.5790,  0.1497]])
>>> torch.cat((x, x, x), 1)
tensor([[ 0.6580, -1.0969, -0.4614,  0.6580, -1.0969, -0.4614,  0.6580,
         -1.0969, -0.4614],
        [-0.1034, -0.5790,  0.1497, -0.1034, -0.5790,  0.1497, -0.1034,
         -0.5790,  0.1497]])
```

In [24]:
def train(epoch):
    for iteration, batch in enumerate(training_data_loader, 1):
        # forward
        real_a_cpu, real_b_cpu = batch[0:2]
        
        # 本物画像
        real_a.data.resize_(real_a_cpu.size()).copy_(real_a_cpu)
        real_b.data.resize_(real_b_cpu.size()).copy_(real_b_cpu)
        # 生成器の作った偽物画像
        fake_b = netG(real_a)
        
        #-----------------------------------------
        # ●判別器の更新
        # max log(D(x,y)) + log(1 - D(x, G(x)))
        #-----------------------------------------
        optimizerD.zero_grad()
        
        # 偽物で学習
        fake_ab = torch.cat((real_a, fake_b), 1)
        pred_fake = netD.forward(fake_ab.detach())   
        # ↑なんで__call__じゃないんだ？
        #   `detach`するのは勾配がGeneratorに伝わらないようにするためだったか？
        loss_d_fake = criterionGAN(pred_fake, target_is_real=False)
        
        # 本物で学習
        real_ab = torch.cat((real_a, real_b), 1)
        pred_real = netD.forward(real_ab)
        loss_d_real = criterionGAN(pred_real, target_is_real=True)
        
        # 損失を合算
        loss_d = (loss_d_fake, loss_d_real) * 0.5
        loss_d.backward()
        optimizerD.step()
        
        #-----------------------------------------
        # ●生成器の更新
        # max log(D(x,G(x))) + L1(y, G(x))
        #-----------------------------------------
        optimizerG.zero_grad()
        # 偽物で騙しにかかる (第１項)
        fake_ab = torch.cat((real_a, fake_b), 1)
        pred_fake = netD.forward(fake_ab)
        loss_g_gan = criterionGAN(pred_fake, target_is_real=True)
        
        # どれだけ本物に近かったか（第２項）
        loss_g_l1 = criterionL1(fake_b, real_b) * lamb
        loss_g = loss_g_gan + loss_g_l1
        loss_g.backward()
        optimizerG.step()
        print("===> Epoch[{}]({}/{}): Loss_D: {:.4f} Loss_G: {:.4f}".format(
                epoch, iteration, len(training_data_loader), loss_d.data[0], loss_g.data[0]))