# 概要

pix2pixとPANはLoss以外全て共通と言えます．よって，「本家のpix2pix実装にPANのlossを付け加える」という形で実装を行いました．このnotebookでは，ネットワーク構造，学習パートの要点を説明します．これだけだと動きません．動かしたい場合はREADMEを参考にして下さい．

本家のpix2pix実装: https://github.com/junyanz/pytorch-CycleGAN-and-pix2pix  
（厳密にはlua実装 https://github.com/phillipi/pix2pix が本家ですが，↑も同じチームの人が作ったpytorch版ということなので，こちらを「本家」と呼びます．）

## 共通部分(./models/network.py)

ネットワークの構造は共通です．pix2pixとPANのパラメータ数は，GもDも全く同じになります．ネットワークの構造は，torchのシーケンスをprint関数で出力したもの(./G_printed.txtと./D_printed.txt)が分かりやすいかもしれません．これらはprint出力をただ貼り付けただけのテキストファイルなので実装とは何の関係もありません．実際のGとDの実装は以下のようになります．

### G
まずGはU-Netです．Skip-connectionの表現で再帰っぽい記述を使っていますが，特に大きな意味はないと思います．（ハードコーディングを避けただけだと思います．）

In [2]:
import torch
import torch.nn as nn
from torch.nn import init
import functools
from torch.autograd import Variable
from torch.optim import lr_scheduler
import numpy as np

def define_G(input_nc, output_nc, ngf, which_model_netG, norm='batch', use_dropout=False, init_type='normal', gpu_ids=[]):
    netG = None
    use_gpu = len(gpu_ids) > 0
    norm_layer = get_norm_layer(norm_type=norm)

    if use_gpu:
        assert(torch.cuda.is_available())
    # 今回は全部の実験でunet_256を使用
    if which_model_netG == 'unet_128':
        netG = UnetGenerator(input_nc, output_nc, 7, ngf, norm_layer=norm_layer, use_dropout=use_dropout, gpu_ids=gpu_ids)
    elif which_model_netG == 'unet_256':
        netG = UnetGenerator(input_nc, output_nc, 8, ngf, norm_layer=norm_layer, use_dropout=use_dropout, gpu_ids=gpu_ids)
    else:
        raise NotImplementedError('Generator model name [%s] is not recognized' % which_model_netG)
    if len(gpu_ids) > 0:
        netG.cuda(device_id=gpu_ids[0])
    init_weights(netG, init_type=init_type)
    return netG

class UnetGenerator(nn.Module):
    def __init__(self, input_nc, output_nc, num_downs, ngf=64,
                 norm_layer=nn.BatchNorm2d, use_dropout=False, gpu_ids=[]):
        super(UnetGenerator, self).__init__()
        self.gpu_ids = gpu_ids

        # construct unet structure
        # 一番真ん中のレイヤ(ボトルネック部分)
        unet_block = UnetSkipConnectionBlock(ngf * 8, ngf * 8, input_nc=None, submodule=None, norm_layer=norm_layer, innermost=True)
        # 真ん中のレイヤをサンドイッチするようにdown，upレイヤをくっつける
        for i in range(num_downs - 5):
            unet_block = UnetSkipConnectionBlock(ngf * 8, ngf * 8, input_nc=None, submodule=unet_block, norm_layer=norm_layer, use_dropout=use_dropout)
        unet_block = UnetSkipConnectionBlock(ngf * 4, ngf * 8, input_nc=None, submodule=unet_block, norm_layer=norm_layer)
        unet_block = UnetSkipConnectionBlock(ngf * 2, ngf * 4, input_nc=None, submodule=unet_block, norm_layer=norm_layer)
        unet_block = UnetSkipConnectionBlock(ngf, ngf * 2, input_nc=None, submodule=unet_block, norm_layer=norm_layer)
        # 一番外(一番左と一番右)のレイヤ．つまりinputとoutput
        unet_block = UnetSkipConnectionBlock(output_nc, ngf, input_nc=input_nc, submodule=unet_block, outermost=True, norm_layer=norm_layer)

        self.model = unet_block

    def forward(self, input):
        if self.gpu_ids and isinstance(input.data, torch.cuda.FloatTensor):
            return nn.parallel.data_parallel(self.model, input, self.gpu_ids)
        else:
            return self.model(input)
        
# 再帰的に下のような構造をつくる(イメージ)
# これまでに構成した[submodule]というのが白い四角□
# これから構成するdownやupが黒い四角■
# ■                ■
# ■                ■
# ■ □      □ ■
# ■ □ □ □ ■
class UnetSkipConnectionBlock(nn.Module):
    def __init__(self, outer_nc, inner_nc, input_nc=None,
                 submodule=None, outermost=False, innermost=False, norm_layer=nn.BatchNorm2d, use_dropout=False):
        super(UnetSkipConnectionBlock, self).__init__()
        self.outermost = outermost
        if type(norm_layer) == functools.partial:
            use_bias = norm_layer.func == nn.InstanceNorm2d
        else:
            use_bias = norm_layer == nn.InstanceNorm2d
        if input_nc is None:
            input_nc = outer_nc
        downconv = nn.Conv2d(input_nc, inner_nc, kernel_size=4,
                             stride=2, padding=1, bias=use_bias)
        downrelu = nn.LeakyReLU(0.2, True)
        downnorm = norm_layer(inner_nc)
        uprelu = nn.ReLU(True)
        upnorm = norm_layer(outer_nc)

        if outermost: # 一番外側(input, output)
            upconv = nn.ConvTranspose2d(inner_nc * 2, outer_nc,
                                        kernel_size=4, stride=2,
                                        padding=1)
            down = [downconv]
            up = [uprelu, upconv, nn.Tanh()]
            model = down + [submodule] + up # サンドイッチ
        elif innermost: # 一番真ん中
            upconv = nn.ConvTranspose2d(inner_nc, outer_nc,
                                        kernel_size=4, stride=2,
                                        padding=1, bias=use_bias)
            down = [downrelu, downconv]
            up = [uprelu, upconv, upnorm]
            model = down + up # サンドイッチ
        else: # 真ん中 を挟むようにしてつくる途中のレイヤ
            upconv = nn.ConvTranspose2d(inner_nc * 2, outer_nc,
                                        kernel_size=4, stride=2,
                                        padding=1, bias=use_bias)
            down = [downrelu, downconv, downnorm]
            up = [uprelu, upconv, upnorm]

            if use_dropout:
                model = down + [submodule] + up + [nn.Dropout(0.5)] # サンドイッチ
            else:
                model = down + [submodule] + up

        self.model = nn.Sequential(*model)

    def forward(self, x):
        if self.outermost:
            return self.model(x)
        else:
            return torch.cat([x, self.model(x)], 1) # これがSkip-cnnectionの再帰表現

### D
次に，Dです．convとLLeRUとBNを使ったシンプルなTrue/False判別器です．inputは「変換前画像-変換後本物画像ペア」または「変換前画像-生成された偽物画像ペア」のどちらかを想定しています．「ペア」というのは単純に2つの画像をconcatしたものを入力することで表現しています．

PANで中間層の出力を使いたいので，register_forward_hookで中間層の出力をself.intermediate_outputsに格納しています．シーケンスでhookを登録することにより，「○○Layerにデータが入ってきた時に何かを格納する/関数を実行する」みたいなことが可能になります．

In [4]:
class NLayerDiscriminator(nn.Module):

    def __init__(self, input_nc, ndf=64, n_layers=3, norm_layer=nn.BatchNorm2d, use_sigmoid=False, gpu_ids=[]):
        super(NLayerDiscriminator, self).__init__()
        self.gpu_ids = gpu_ids
        if type(norm_layer) == functools.partial:
            use_bias = norm_layer.func == nn.InstanceNorm2d
        else:
            use_bias = norm_layer == nn.InstanceNorm2d

        kw = 4
        padw = 1
        input_conv = nn.Conv2d(input_nc, ndf, kernel_size=kw, stride=2, padding=padw)
        sequence = [
            input_conv,
            nn.LeakyReLU(0.2, True)
        ]
        # register_forward_hookで中間層出力
        input_conv.register_forward_hook(self.add_intermediate_output) 

        nf_mult = 1
        nf_mult_prev = 1
        for n in range(1, n_layers):
            nf_mult_prev = nf_mult
            nf_mult = min(2**n, 8)
            intermediate_conv = nn.Conv2d(ndf * nf_mult_prev, ndf * nf_mult,
                      kernel_size=kw, stride=2, padding=padw, bias=use_bias)
            sequence += [
                intermediate_conv,
                norm_layer(ndf * nf_mult),
                nn.LeakyReLU(0.2, True)
            ]
            # register_forward_hookで中間層出力
            intermediate_conv.register_forward_hook(self.add_intermediate_output)

        nf_mult_prev = nf_mult
        nf_mult = min(2**n_layers, 8)
        intermediate_conv2 = nn.Conv2d(ndf * nf_mult_prev, ndf * nf_mult, kernel_size=kw, stride=1, padding=padw, bias=use_bias)
        sequence += [
            intermediate_conv2,
            norm_layer(ndf * nf_mult),
            nn.LeakyReLU(0.2, True)
        ]
        # register_forward_hookで中間層出力
        intermediate_conv2.register_forward_hook(self.add_intermediate_output)

        last_conv = nn.Conv2d(ndf * nf_mult, 1, kernel_size=kw, stride=1, padding=padw)
        sequence += [last_conv]
        # register_forward_hookで中間層出力
        last_conv.register_forward_hook(self.add_intermediate_output)

        if use_sigmoid:
            sequence += [nn.Sigmoid()]

        self.model = nn.Sequential(*sequence)
        self.intermediate_outputs = []

    def forward(self, input):
        self.intermediate_outputs = []
        if len(self.gpu_ids) and isinstance(input.data, torch.cuda.FloatTensor):
            return nn.parallel.data_parallel(self.model, input, self.gpu_ids)
        else:
            return self.model(input)

    def add_intermediate_output(self, conv, input, output):
        self.intermediate_outputs.append(Variable(output.data, requires_grad=False))

    def get_intermediate_outputs(self):
        return self.intermediate_outputs

## 共通しない部分(Loss)

主に違うのはbackward_D( )とbackward_G( )の中だけです．

### pix2pix

In [8]:
class Pix2PixModel(BaseModel):
    
    # 初期化は省略
    
    def forward(self):
        self.real_A = Variable(self.input_A)
        self.fake_B = self.netG.forward(self.real_A)
        self.real_B = Variable(self.input_B)

    # no backprop gradients
    def test(self):
        self.real_A = Variable(self.input_A, volatile=True)
        self.fake_B = self.netG.forward(self.real_A)
        self.real_B = Variable(self.input_B, volatile=True)
        
    def backward_D(self):
        # Fake
        fake_AB = torch.cat((self.real_A, self.fake_B), 1)
        #fake_AB --> torch.Size([1, 6, 256, 256])

        # detach: make fake_AB volatile
        self.pred_fake = self.netD.forward(fake_AB.detach())
        self.loss_D_fake = self.criterionGAN(self.pred_fake, False)

        # Real
        real_AB = torch.cat((self.real_A, self.real_B), 1)
        self.pred_real = self.netD.forward(real_AB)
        self.loss_D_real = self.criterionGAN(self.pred_real, True)

        # Combined loss
        self.loss_D = (self.loss_D_fake + self.loss_D_real) * 0.5

        self.loss_D.backward()

    def backward_G(self):
        # First, G(A) should fake the discriminator
        fake_AB = torch.cat((self.real_A, self.fake_B), 1)
        pred_fake = self.netD.forward(fake_AB)
        self.loss_G_GAN = self.criterionGAN(pred_fake, True)

        # Second, G(A) = B
        self.loss_G_L1 = self.criterionL1(self.fake_B, self.real_B) * self.opt.lambda_A

        self.loss_G = self.loss_G_GAN + self.loss_G_L1

        self.loss_G.backward()

    def optimize_parameters(self):
        self.forward()

        self.optimizer_D.zero_grad()
        self.backward_D()
        self.optimizer_D.step()

        self.optimizer_G.zero_grad()
        self.backward_G()
        self.optimizer_G.step()

NameError: name 'BaseModel' is not defined

### PAN

In [9]:
class PanModel(BaseModel):
    def forward(self):
        self.real_A = Variable(self.input_A)
        self.fake_B = self.netG.forward(self.real_A)
        self.real_B = Variable(self.input_B)

    # no backprop gradients
    def test(self):
        self.real_A = Variable(self.input_A, volatile=True)
        self.fake_B = self.netG.forward(self.real_A)
        self.real_B = Variable(self.input_B, volatile=True)

    def backward_D(self):
        # Fake
        fake_AB = torch.cat((self.real_A, self.fake_B), 1)
        #fake_AB --> torch.Size([1, 6, 256, 256])

        # detach: make fake_AB volatile
        self.pred_fake = self.netD.forward(fake_AB.detach())
        self.loss_D_fake = self.criterionGAN(self.pred_fake, False)

        # outputs of intermediate layers
        fake_inters = self.netD.get_intermediate_outputs()

        # Real
        real_AB = torch.cat((self.real_A, self.real_B), 1)
        self.pred_real = self.netD.forward(real_AB)
        self.loss_D_real = self.criterionGAN(self.pred_real, True)

        # outputs of intermediate layers
        real_inters = self.netD.get_intermediate_outputs()

        # calc Parceptual Adversarial Loss
        self.loss_PAN = 0
        for (fake_i, real_i, lam) in zip(fake_inters, real_inters, self.pan_lambdas):
            self.loss_PAN += self.criterionPAN(fake_i, real_i) * lam

        if self.loss_PAN.data[0] > self.pan_mergin_m:
            loss_PAN = Variable(self.Tensor(np.array([0], dtype=np.float)), requires_grad=False)
        else:
            loss_PAN = Variable(self.Tensor(np.array([self.pan_mergin_m], dtype=np.float)), requires_grad=False) - self.loss_PAN

        # Combined loss
        self.loss_D = (self.loss_D_fake + self.loss_D_real) * 0.5 + loss_PAN

        self.loss_D.backward()

    def backward_G(self, retain):
        # First, G(A) should fake the discriminator
        fake_AB = torch.cat((self.real_A, self.fake_B), 1)
        pred_fake = self.netD.forward(fake_AB)
        self.loss_G_GAN = self.criterionGAN(pred_fake, True)

        # Combined loss
        self.loss_G = self.loss_G_GAN + self.loss_PAN

        self.loss_G.backward(retain_graph=retain)

    def optimize_parameters(self):
        self.forward()

        # update D
        self.optimizer_D.zero_grad()
        self.backward_D()
        self.optimizer_D.step()

        self.optimizer_G.zero_grad()
        self.backward_G()
        self.optimizer_G.step()

NameError: name 'BaseModel' is not defined