# 第5回講義 演習

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.autograd as autograd
import torch.optim as optim
import torch.nn.functional as F
from sklearn.utils import shuffle

from tqdm import tqdm

np.random.seed(34)
torch.manual_seed(34)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

## 課題 3. WGAN-GP character言語モデル

In [None]:
N_C_ITERS = 10
N_ITERS = 10000
BATCH_SIZE = 64
VALID_INTERVAL = 100

X_LEN = 32 # 生成する文の長さ
Z_DIM = 128 # Gに入力するノイズzの次元数
H_DIM = 512 # GとDの隠れ層の次元数

K_SIZE = 5 # GとDの畳み込みのカーネルの幅

LMD = 10 # 勾配ペナルティの係数

TRAIN_X_PATH = 'data/billion_word/train_x.txt'

### 1. データの読み込み

Billion word コーパス (http://www.statmt.org/lm-benchmark/) を使用します.

今回は単語レベルではなく文字レベルでの学習となるため, 新しくVocabを作ります.

In [None]:
class Vocab:
    def __init__(self, chars):
        self.char2id = {k: v for v, k in enumerate(chars)}
        self.id2char = {v: k for k, v in self.char2id.items()}
    
    def encode(self, sentence):
        return [self.char2id[char] for char in sentence]

In [None]:
def load_data(path, x_len=32, n_data=10e+10):
    data = []
    for i, line in enumerate(open(path, encoding='utf-8')):
        if i >= n_data:
            break
        
        sentence = line.strip()
        if len(sentence) < x_len:
            continue

        data.append(sentence[:x_len])
    return data

In [None]:
class Dataloader:
    def __init__(self, data_x, device, batch_size, char_size):
        self.data_x = data_x
        self.pointer = 0
        self.batch_size = batch_size
        self.device = device
        self.char_size = char_size
    
    def reset(self):
        self.data_x = shuffle(self.data_x)
        self.pointer = 0
    
    def __iter__(self):
        return self
    
    def __next__(self):
        if self.pointer >= len(self.data_x):
            self.reset()
        
        start = self.pointer
        end = self.pointer + self.batch_size
        self.pointer = end
        
        batch_x = [datum for datum in self.data_x[start:end]]
        batch_x = self.convert_to_tensor(batch_x, torch.long)
        batch_x = self.to_one_hot(batch_x)
        
        return batch_x
    
    def convert_to_tensor(self, batch, dtype):
        return torch.tensor(batch, dtype=dtype).to(self.device)
    
    def to_one_hot(self, batch):
        zeros = torch.zeros(batch.size(1), self.char_size).to(device) # (x_len, char_size)
        return torch.stack([zeros.scatter(1, batch_n.unsqueeze(1), 1.0) for batch_n in batch])

In [None]:
CHARS = set('abcdefghijklmnopqrstuvwxyz0123456789-,;.!?:’/\|_@#$%ˆ&* ̃‘+-=<>()[]{}')
CHAR_SIZE = len(CHARS)

vocab = Vocab(CHARS)

sens_train_x = load_data(TRAIN_X_PATH, X_LEN)
train_x = [vocab.encode(sen) for sen in sens_train_x]

In [None]:
dataloader_train = Dataloader(train_x, device, BATCH_SIZE, CHAR_SIZE)

### 2. モデルの定義

Generator, DiscriminatorともにCNN + Residual Blockで実装します.

In [None]:
def sample_z_prior(batch_size):
    """事前分布p(z)からzをサンプリングする
    :param batch_size: int.
    """
    z = torch.randn(batch_size, Z_DIM).to(device)
    return z

In [None]:
class ResBlock(nn.Module):
    def __init__(self, h_dim, k_size):
        super(ResBlock, self).__init__()
        self.Conv1 = nn.Conv1d(h_dim, h_dim, k_size, padding=int((k_size-1)/2))
        self.Conv2 = nn.Conv1d(h_dim, h_dim, k_size, padding=int((k_size-1)/2))
    
    def forward(self, x):
        """
        :param x: tensor (バッチサイズ, 語彙数, 系列長)
        """
        residual = x
        x = F.relu(x)
        x = self.Conv1(x)
        x = F.relu(x)
        x = self.Conv2(x)
        return residual + 0.3 * x

In [None]:
class Generator(nn.Module):
    def __init__(self, z_dim, h_dim, x_len, char_size, k_size):
        super(Generator, self).__init__()
        self.Linear_1   = nn.Linear(z_dim, x_len * h_dim)
        self.ResBlock_1 = ResBlock(h_dim, k_size)
        self.ResBlock_2 = ResBlock(h_dim, k_size)
        self.ResBlock_3 = ResBlock(h_dim, k_size)
        self.ResBlock_4 = ResBlock(h_dim, k_size)
        self.ResBlock_5 = ResBlock(h_dim, k_size)
        self.Conv       = nn.Conv1d(h_dim, char_size, 1)
        
        self.h_dim = h_dim
        self.x_len = x_len
    
    def forward(self, z):
        """
        :param z: tensor (バッチサイズ, z_dim)
        """
        x = self.Linear_1(z).reshape(z.size(0), self.h_dim, self.x_len) # (バッチサイズ, h_dim, 系列長)
        x = self.ResBlock_1(x)
        x = self.ResBlock_2(x)
        x = self.ResBlock_3(x)
        x = self.ResBlock_4(x)
        x = self.ResBlock_5(x)
        x = self.Conv(x).permute(0, 2, 1)
        x = F.softmax(x, dim=-1) # (バッチサイズ, 系列長, 語彙数)
        return x

In [None]:
class Discriminator(nn.Module):
    def __init__(self, h_dim, x_len, char_size, k_size):
        super(Discriminator, self).__init__()
        self.Conv       = nn.Conv1d(char_size, h_dim, 1)
        self.ResBlock_1 = ResBlock(h_dim, k_size)
        self.ResBlock_2 = ResBlock(h_dim, k_size)
        self.ResBlock_3 = ResBlock(h_dim, k_size)
        self.ResBlock_4 = ResBlock(h_dim, k_size)
        self.ResBlock_5 = ResBlock(h_dim, k_size)
        self.Linear     = nn.Linear(x_len * h_dim, 1)
    
    def forward(self, x):
        """
        :param x: tensor (バッチサイズ, 系列長, 語彙数)
        """
        x = x.permute(0, 2, 1) # (バッチサイズ, 語彙数, 系列長)
        x = self.Conv(x)
        x = self.ResBlock_1(x)
        x = self.ResBlock_2(x)
        x = self.ResBlock_3(x)
        x = self.ResBlock_4(x)
        x = self.ResBlock_5(x).reshape(x.size(0), x.size(1) * x.size(2))
        x = self.Linear(x).squeeze()
        return x # (バッチサイズ,)

In [None]:
G = Generator(Z_DIM, H_DIM, X_LEN, CHAR_SIZE, K_SIZE).to(device)
D = Discriminator(H_DIM, X_LEN, CHAR_SIZE, K_SIZE).to(device)

optimizer_G = optim.Adam(G.parameters(), lr=1e-4, betas=(0.5, 0.9))
optimizer_D = optim.Adam(D.parameters(), lr=1e-4, betas=(0.5, 0.9))

### 3. 学習

WGAN-GPではDiscriminator (Critic) の勾配を安定させるため, 勾配に対するペナルティ項を追加します.

ペナルティ項の勾配の計算には真のデータと生成されたデータの内分点を用います.

Dの誤差関数
$$
    \mathcal{L}(\theta_D) = D(\tilde{x}) - D(x) + \lambda\left(||\nabla_{\hat{x}}D(\hat{x}) - 1||_2\right)^2
$$

Gの誤差関数
$$
    \mathcal{L}(\theta_G) = - D\left(G(z)\right)
$$

参考: [DL輪読会] Improved Training of Wasserstein GANs (https://www.slideshare.net/DeepLearningJP2016/dlimproved-training-of-wasserstein-gans-81010174)

元論文: I. Gulrajani et al. "Improved Training of Wasserstein GANs". NIPS. 2017

In [None]:
def compute_loss_D(x_real, train=False):
    """
    :param x_real: tensor (バッチサイズ, 系列長, 語彙数)
    :param train: bool
    """
    batch_size = x_real.size(0)

    # 偽データを生成
    z = sample_z_prior(batch_size)
    x_fake = G.forward(z)

    # ソフト偽データを生成
    eps = torch.rand(batch_size, 1, 1).to(device)
    x_soft = eps * x_real + (1 - eps) * x_fake

    # 真データ・偽データ・ソフト偽データを識別
    y_real = D.forward(x_real)
    y_fake = D.forward(x_fake)
    y_soft = D.forward(x_soft)

    # ソフト偽データの勾配を計算
    y_soft_grad = autograd.grad(
        outputs=y_soft,
        inputs=x_soft,
        grad_outputs=torch.ones(y_soft.size()).to(device),
        create_graph=True,
        retain_graph=True
    )[0] # (batch_size, sen_len, char_size)

    # 誤差を計算
    grad_pen = ((y_soft_grad.norm(2, dim=1) - 1)**2).mean()
    loss_D = y_fake.mean() - y_real.mean() + LMD * grad_pen
    
    if train:
        # パラメータを更新
        D.zero_grad()
        loss_D.backward()
        optimizer_D.step()
    
    return loss_D

In [None]:
def compute_loss_G(train=False):
    """
    :param train: bool
    """
    # 偽データを生成
    z = sample_z_prior(BATCH_SIZE)
    x_fake = G.forward(z)
    
    # 偽データを識別
    y_fake = D.forward(x_fake)
    
    # 誤差を計算
    loss_G = - y_fake.mean()
    
    if train:
        # パラメータを更新
        G.zero_grad()
        loss_G.backward()
        optimizer_G.step()
    
    return loss_G

<img src="figure/gulrajani_arxiv2017.png" width="600mm">

In [None]:
for i in range(N_ITERS):
    # ディスクリミネータを学習
    losses_D = []
    for _ in range(N_C_ITERS):
        # 真データを取得
        x_real = next(dataloader_train)
        
        # 誤差を計算 & パラメータを更新
        loss_D = compute_loss_D(x_real, train=True)
        losses_D.append(loss_D.item())
        print(loss_D.item())
    
    # ジェネレータを学習
    # 誤差を計算 & パラメータを更新
    loss_G = compute_loss_G(train=True)
    print(loss_G.item())
    
    if (i + 1) % VALID_INTERVAL == 0:
        print('ITERS: {}, D\'s Loss: {:.3f}, G\'s Loss: {:.3f}'.format(
            i + 1, np.mean(losses_D), loss_G.item()
        ))
        # 偽データを生成
        n_samples = 3
        z = sample_z_prior(n_samples)
        x_fake = G.forward(z).argmax(2).cpu().numpy()
        for n, x_fake_n in enumerate(x_fake):
            print(n, ':', ''.join(vocab.id2char[i] for i in x_fake_n))
        print()

### 4. 生成

学習させたジェネレータで文を生成してみます.

In [None]:
n_samples = 10
z = sample_z_prior(n_samples)
x_fake = G.forward(z).argmax(2).cpu().numpy()
for n, x_fake_n in enumerate(x_fake):
    print(n, ':', ''.join(vocab.id2char[i] for i in x_fake_n))

参考文献

- 原論文: http://papers.nips.cc/paper/7159-improved-training-of-wasserstein-gans
- TensorFlow (公式) : https://github.com/igul222/improved_wgan_training