In [None]:
!gdown --id 1zl2nsDqjSjHCwVnl2d_H3BPd0_HikW3P --output hw2_data.zip
!unzip -q hw2_data.zip
!rm hw2_data.zip

Downloading...
From: https://drive.google.com/uc?id=1zl2nsDqjSjHCwVnl2d_H3BPd0_HikW3P
To: /content/hw2_data.zip
100% 642M/642M [00:03<00:00, 164MB/s]


In [1]:
import glob
import numpy as np
import os
import random
import torch
import torch.nn as nn
import torchvision
from PIL import Image
from torch.utils import data
from torch.utils.data import DataLoader
from torchvision.transforms import transforms
from tqdm.auto import tqdm

In [None]:
!bash hw2_1.sh ./test_repo

In [None]:
def fix_random_seeds(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.enabled = False

In [None]:
class facedataset(data.Dataset):
    def __init__(self, inputs_path: list, transforms):
        self.inputs_path = inputs_path
        self.transforms = transforms

    def __len__(self):
        return len(self.inputs_path)
    
    def __getitem__(self, index: int):
        input_path = self.inputs_path[index]
        input = Image.open(input_path)
        return self.transforms(input)

In [None]:
def get_dataset(batch_size, n_workers, image_size):

    train_transforms = transforms.Compose([
        transforms.Resize((image_size, image_size)),
        transforms.ToTensor(),
        transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)),
    ])

    train_files = glob.glob('./hw2_data/face/train/*')
    train_files.sort()
    train_set = facedataset(train_files, train_transforms)
    train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=n_workers, pin_memory=True)

    return train_loader

In [2]:
# model
class EqualConv2d(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=3, stride=1, padding=1):
        super().__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding)
        self.scale = (2 / (in_channels * kernel_size ** 2)) ** 0.5
        self.conv.weight.data.normal_()
        self.conv.bias.data.zero_()
        self.bias = self.conv.bias
        self.conv.bias = None
    
    def forward(self, x):
        return self.conv(x * self.scale) + self.bias.view(1, self.bias.shape[0], 1, 1)

class PixelNorm(nn.Module):
    def __init__(self):
        super().__init__()
        self.eps = 1e-8

    def forward(self, x):
        return x / torch.sqrt(torch.mean(x ** 2, dim=1, keepdim=True) + self.eps)

class Conv_Block(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride, padding, usepixelnorm=True):
        super().__init__()
        
        self.usepixelnorm = usepixelnorm
        self.pixelnorm = PixelNorm()

        self.conv1 = nn.Sequential(
            EqualConv2d(in_channels, out_channels, kernel_size, stride, padding),
            nn.LeakyReLU(0.2),
        )
        self.conv2 = nn.Sequential(
            EqualConv2d(out_channels, out_channels, kernel_size, stride, padding),
            nn.LeakyReLU(0.2),
        )
    
    def forward(self, x):
        x = self.conv1(x)
        if self.usepixelnorm:
            x = self.pixelnorm(x)
        
        x = self.conv2(x)
        if self.usepixelnorm:
            x = self.pixelnorm(x)

        return x

class Generator(nn.Module):
    def __init__(self, z_dim):
        super().__init__()
        self.initial_conv_block = nn.Sequential(
            PixelNorm(),
            nn.ConvTranspose2d(z_dim, z_dim, 4, 1),
            nn.LeakyReLU(0.2),
            PixelNorm(),
            EqualConv2d(z_dim, z_dim, kernel_size=3, stride=1, padding=1),
            nn.LeakyReLU(0.2),
            PixelNorm(),
        )
        self.initial_to_rgb = EqualConv2d(z_dim, 3, kernel_size=1, stride=1, padding=0)

        self.upsample = nn.Upsample(scale_factor=2, mode='nearest')

        self.conv_blocks = nn.ModuleList([])
        self.to_rgbs = nn.ModuleList([])
        for i in range(len(factors) - 1):
            in_channels = int(z_dim * factors[i])
            out_channels = int(z_dim * factors[i + 1])
            self.conv_blocks.append(Conv_Block(in_channels, out_channels, kernel_size=3, stride=1, padding=1))
            self.to_rgbs.append(EqualConv2d(out_channels, 3, kernel_size=1, stride=1, padding=0))

        self.tanh = nn.Tanh()
        
    def fade_in(self, x, alpha, depth):
        x_up = self.upsample(x)
        if depth == 1:
          x_scaled = self.initial_to_rgb(x_up)
        else:
          x_scaled = self.to_rgbs[depth - 2](x_up)
        x_conv = self.conv_blocks[depth - 1](x_up)
        x_conv = self.to_rgbs[depth - 1](x_conv)
        x = (1 - alpha) * x_scaled + alpha * x_conv
        x = self.tanh(x)
        return x

    def forward(self, x, depth, alpha=1):
        
        x = self.initial_conv_block(x)

        if depth == 0:
            x = self.initial_to_rgb(x)
        else:
            for i in range(depth - 1):
                x = self.upsample(x)
                x = self.conv_blocks[i](x)
            x = self.fade_in(x, alpha, depth)
        return x

class Discriminator(nn.Module):
    def __init__(self, z_dim):
        super().__init__()
        self.from_rgbs = nn.ModuleList([])
        self.conv_blocks = nn.ModuleList([])
        for i in range(len(factors) - 1, 0, -1):
            in_channels = int(z_dim * factors[i])
            out_channels = int(z_dim * factors[i - 1])
            self.from_rgbs.append(EqualConv2d(3, in_channels, kernel_size=1, stride=1, padding=0))
            self.conv_blocks.append(Conv_Block(in_channels, out_channels, kernel_size=3, stride=1, padding=1, usepixelnorm=False))

        self.downsample = nn.AvgPool2d(kernel_size=2, stride=2)

        self.final_from_rgb = EqualConv2d(3, z_dim, kernel_size=1, stride=1, padding=0)

        self.final_conv_block = nn.Sequential(
            EqualConv2d(z_dim + 1, z_dim, kernel_size=3, stride=1, padding=1),
            nn.LeakyReLU(0.2),
            EqualConv2d(z_dim, z_dim, kernel_size=4, stride=1, padding=0),
            nn.LeakyReLU(0.2),
            EqualConv2d(z_dim, 1, kernel_size=1, stride=1, padding=0)
        )
        # print(self.conv_blocks)
        # print(self.from_rgbs)

    def fade_in(self, x, depth, alpha=1):
        x_down = self.downsample(x)
        if depth == 1:
          x_scaled = self.final_from_rgb(x_down)
        else:
          x_scaled = self.from_rgbs[-depth + 1](x_down)

        x_conv = self.from_rgbs[-depth](x)
        x_conv = self.conv_blocks[-depth](x_conv)
        x_conv = self.downsample(x_conv)
        # print(x_conv.shape)
        return (1 - alpha) * x_scaled + alpha * x_conv

    def forward(self, x, depth, alpha=1):
        if depth == 0:
            x = self.final_from_rgb(x)
        else:
            x = self.fade_in(x, depth, alpha)
            for i in range(len(factors) - depth, len(factors) - 1):
                # print(f'i = {i}, x.shape = {x.shape}')
                x = self.conv_blocks[i](x)
                x = self.downsample(x)

        x_std = torch.std(x, dim=0, keepdim=True).mean()
        concat_std = torch.ones((x.shape[0], 1, x.shape[2], x.shape[3])).to(device) * x_std
        x = torch.cat((x, concat_std), dim=1)
        x = self.final_conv_block(x)
        return x

In [4]:
z_dim = 128
factors = [1, 1, 1 / 2, 1 / 4, 1 / 8]
G = Generator(z_dim)
D = Discriminator(z_dim)
print(G)
print(D)

Generator(
  (initial_conv_block): Sequential(
    (0): PixelNorm()
    (1): ConvTranspose2d(128, 128, kernel_size=(4, 4), stride=(1, 1))
    (2): LeakyReLU(negative_slope=0.2)
    (3): PixelNorm()
    (4): EqualConv2d(
      (conv): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    )
    (5): LeakyReLU(negative_slope=0.2)
    (6): PixelNorm()
  )
  (initial_to_rgb): EqualConv2d(
    (conv): Conv2d(128, 3, kernel_size=(1, 1), stride=(1, 1), bias=False)
  )
  (upsample): Upsample(scale_factor=2.0, mode=nearest)
  (conv_blocks): ModuleList(
    (0): Conv_Block(
      (pixelnorm): PixelNorm()
      (conv1): Sequential(
        (0): EqualConv2d(
          (conv): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        )
        (1): LeakyReLU(negative_slope=0.2)
      )
      (conv2): Sequential(
        (0): EqualConv2d(
          (conv): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      

In [None]:
# gradient penalty
def gradient_penalty(device, D, fake_imgs, real_imgs, alpha, depth):
    batch_size = fake_imgs.shape[0]
    epsilon = torch.rand((batch_size, 1, 1, 1)).to(device)
    interpolate_imgs = epsilon * fake_imgs + (1 - epsilon) * real_imgs
    interpolate_imgs.requires_grad = True
    interpolate_labels = D(interpolate_imgs, depth, alpha)

    gradient = torch.autograd.grad(outputs=interpolate_labels, inputs=interpolate_imgs, grad_outputs=torch.ones_like(interpolate_labels), allow_unused=True)[0]
    gradient = gradient.view(gradient.shape[0], -1)
    gradient_norm = gradient.norm(2, dim=1)
    gradient_penalty = torch.mean((gradient_norm - 1) ** 2)
    return gradient_penalty

In [None]:
# train
def train(device, train_loader, start_epoch, n_epochs, depth, n_critic, z_dim, G, optim_G, D, optim_D, lambda_gp, eps_drift, z_samples):
    
    total_iters = len(train_loader) * n_epochs
    alpha = 0
    
    for epoch in range(start_epoch, n_epochs):
        
        G.train()
        D.train()

        # train
        # batch_idx = 0
        # for imgs in tqdm(train_loader):
        for batch_idx, imgs in enumerate(train_loader):
            
            alpha += 1 / total_iters
            
            real_imgs = imgs
            real_imgs = real_imgs.to(device)

            # Train D
            z = torch.randn((real_imgs.shape[0], z_dim, 1, 1)).to(device)
            
            fake_imgs = G(z, depth, alpha)
            
            fake_labels = D(fake_imgs.detach(), depth, alpha)
            real_labels = D(real_imgs, depth, alpha)
            gp = gradient_penalty(device, D, fake_imgs.detach(), real_imgs, alpha, depth)
            loss_D = torch.mean(fake_labels) - torch.mean(real_labels) + lambda_gp * gp + eps_drift * torch.mean(real_labels ** 2)
            # with open('./record.txt', 'a') as f:
            #     f.write(f'first_term: {torch.mean(fake_labels).item():.5f}\n')
            #     f.write(f'second_term: {torch.mean(real_labels):.5f}\n')
            #     f.write(f'third_term: {lambda_gp * gp:.5f}\n')
            #     f.write(f'fourth_term: {eps_drift * torch.mean(real_labels ** 2):.5f}\n')

            D.zero_grad()
            loss_D.backward()
            optim_D.step()

            # Train G
            if batch_idx % n_critic == 0:
                z = torch.randn((real_imgs.shape[0], z_dim, 1, 1)).to(device)
            
                fake_imgs = G(z, depth, alpha)
                fake_labels = D(fake_imgs, depth, alpha)

                loss_G = -torch.mean(fake_labels)

                G.zero_grad()
                loss_G.backward()
                optim_G.step()
            # batch_idx += 1
        with open('./record.txt', 'a') as f:
            f.write(f'[ Train | {epoch + 1:03d}/{n_epochs:03d} ] loss_D: {loss_D:.5f}, loss_G: {loss_G:.5f}\n')
        # print(f'loss_D: {loss_D:.5f}, loss_G: {loss_G:.5f}')
        
        # evaluate
        G.eval()
        f_imgs_sample = (G(z_samples.to(device), depth).data + 1) / 2.0
        if not os.path.exists('./output'):
            os.makedirs('./output', exist_ok=True)
        if not os.path.exists(f'./output/{2 ** (depth + 2)}x{2 ** (depth + 2)}'):
            os.makedirs(f'./output/{2 ** (depth + 2)}x{2 ** (depth + 2)}')
        filename = os.path.join(f'./output/{2 ** (depth + 2)}x{2 ** (depth + 2)}', f'Epoch_{epoch + 1:03d}.jpg')
        torchvision.utils.save_image(f_imgs_sample, filename, nrow=8)
        # print(f'Save some samples to {filename}.')
        with open('./record.txt', 'a') as f:
            f.write(f'Save some samples to {filename}.\n')

        # save checkpoint
        torch.save({'last_epoch': epoch + 1,
                    'model_G': G.state_dict(),
                    'optim_G': optim_G.state_dict(),
                    'model_D': D.state_dict(),
                    'optim_D': optim_D.state_dict(),
                    'loss_G': loss_G.item(),
                    'loss_D': loss_D.item(),
                    'depth': depth,
                    # 'scheduler': scheduler.state_dict(),
                    }, f'./model_1.ckpt')
        
        with open('./record.txt', 'a') as f:
            f.write('Saving model\n')

In [None]:
def test(device, G, z_samples, depth, output_repo='./output_repo'):
    # load checkpoint
    ckpt = torch.load(f'./model_1.ckpt', map_location='cpu')
    G.load_state_dict(ckpt['model'])
    G.eval()
    
    # generate images
    generated_imgs = (G(z_samples.to(device), depth).data + 1) / 2.0
    for i in range(generated_imgs.shape[0]):
        filename = os.path.join(output_repo, f'{i}.jpg')
        torchvision.utils.save_image(generated_imgs[i], filename)
    with open('./record.txt', 'a') as f:
        f.write('Save 1000 generated images.\n')

In [None]:
seed = 0
fix_random_seeds(seed)

# settings
device = "cuda" if torch.cuda.is_available() else "cpu"
n_workers = 0
factors = [1, 1 / 2, 1 / 4, 1 / 8, 1 / 16]
res = [4, 8, 16, 32, 64]
batch_size = [32, 32, 32, 16, 16]
n_epochs = [80, 60, 40, 30, 30]
n_critic = 1
z_dim = 256
z_samples = torch.randn((1000, z_dim, 1, 1))
lambda_gp = 10
eps_drift = 0.001

load_model = os.path.exists('./model_1.ckpt')

G = Generator(z_dim).to(device)
D = Discriminator(z_dim).to(device)
optim_G = torch.optim.Adam(G.parameters(), lr=0.001, betas=(0, 0.99))
optim_D = torch.optim.Adam(D.parameters(), lr=0.001, betas=(0, 0.99))

start_epoch = 0
start_depth = 0

if load_model:
    ckpt = torch.load('./model_1.ckpt')
    start_epoch = ckpt['last_epoch']
    G.load_state_dict(ckpt['model_G'])
    optim_G.load_state_dict(ckpt['optim_G'])
    D.load_state_dict(ckpt['model_D'])
    optim_D.load_state_dict(ckpt['optim_D'])
    start_depth = ckpt['depth']

with open('./record.txt', 'w') as f:
    f.write('')

# training
# if mode == 'train':

for i in range(start_depth, len(res)):
    with open('record.txt', 'a') as f:
        f.write(f'training {2 ** (i + 2)}x{2 ** (i + 2)}\n')
    if i != start_depth:
      start_epoch = 0
    image_size = res[i]
    train_loader = get_dataset(batch_size[i], n_workers, image_size)
    train(device, train_loader, start_epoch, n_epochs[i], i, n_critic, z_dim, G, optim_G, D, optim_D, lambda_gp, eps_drift, z_samples[:32])
print('finish training')
# testing

# if mode == 'test':
test(device, G, z_samples, len(res) - 1)

# Test FID

In [None]:
rm ./drive/MyDrive/DLCV/output_repo/*.png

In [None]:
!mkdir './output_repo'

In [None]:
!pip install pytorch-fid

Collecting pytorch-fid
  Downloading pytorch-fid-0.2.1.tar.gz (14 kB)
Building wheels for collected packages: pytorch-fid
  Building wheel for pytorch-fid (setup.py) ... [?25l[?25hdone
  Created wheel for pytorch-fid: filename=pytorch_fid-0.2.1-py3-none-any.whl size=14835 sha256=27c8c8a4fac652a22d67344c7febc5b97c125caed99270db1385dec8ad1e268b
  Stored in directory: /root/.cache/pip/wheels/24/ac/03/c5634775c8a64f702343ef5923278f8d3bb8c651debc4a6890
Successfully built pytorch-fid
Installing collected packages: pytorch-fid
Successfully installed pytorch-fid-0.2.1


In [None]:
import glob
files = glob.glob('./drive/MyDrive/DLCV/output_repo/*')
files.sort()
print(files)
print(len(files))

[]
0


In [None]:
!bash hw2_p1.sh './output_repo'

--2021-11-20 10:53:44--  https://www.dropbox.com/s/p6zqifenm366r05/hw2_p1.ckpt?dl=0
Resolving www.dropbox.com (www.dropbox.com)... 162.125.64.18, 2620:100:601b:18::a27d:812
Connecting to www.dropbox.com (www.dropbox.com)|162.125.64.18|:443... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: /s/raw/p6zqifenm366r05/hw2_p1.ckpt [following]
--2021-11-20 10:53:44--  https://www.dropbox.com/s/raw/p6zqifenm366r05/hw2_p1.ckpt
Reusing existing connection to www.dropbox.com:443.
HTTP request sent, awaiting response... 302 Found
Location: https://uce5da45baf1f535b4185b0f0975.dl.dropboxusercontent.com/cd/0/inline/BaXX-SXlZ23IOoTpxo-R358Z2jiY-o5mOFHHTMYkYV7yOhmI8et0loI1-ek1zKUzoHdm35Um1fdgKEN_QMGFl32lAQYHSJ1vkRF-eUrvukeCp96Uty0Fg10cd3iy8543xqCNwlsylO9d6dQIaca6NLjc/file# [following]
--2021-11-20 10:53:45--  https://uce5da45baf1f535b4185b0f0975.dl.dropboxusercontent.com/cd/0/inline/BaXX-SXlZ23IOoTpxo-R358Z2jiY-o5mOFHHTMYkYV7yOhmI8et0loI1-ek1zKUzoHdm35Um1fdgKEN_QMGFl3

In [None]:
!python -m pytorch_fid ./hw2_data/face/test ./output_repo

Downloading: "https://github.com/mseitzer/pytorch-fid/releases/download/fid_weights/pt_inception-2015-12-05-6726825d.pth" to /root/.cache/torch/hub/checkpoints/pt_inception-2015-12-05-6726825d.pth
100% 91.2M/91.2M [00:02<00:00, 39.6MB/s]
100% 53/53 [00:25<00:00,  2.12it/s]
100% 20/20 [00:09<00:00,  2.05it/s]
FID:  39.6029908483398


# Test IS

In [None]:
import torch
from torch import nn
from torch.autograd import Variable
from torch.nn import functional as F
import torch.utils.data

from torchvision.models.inception import inception_v3

import numpy as np
from scipy.stats import entropy

def inception_score(imgs, cuda=True, batch_size=32, resize=False, splits=1):
    """Computes the inception score of the generated images imgs
    imgs -- Torch dataset of (3xHxW) numpy images normalized in the range [-1, 1]
    cuda -- whether or not to run on GPU
    batch_size -- batch size for feeding into Inception v3
    splits -- number of splits
    """
    N = len(imgs)

    assert batch_size > 0
    assert N > batch_size

    # Set up dtype
    if cuda:
        dtype = torch.cuda.FloatTensor
    else:
        if torch.cuda.is_available():
            print("WARNING: You have a CUDA device, so you should probably set cuda=True")
        dtype = torch.FloatTensor

    # Set up dataloader
    dataloader = torch.utils.data.DataLoader(imgs, batch_size=batch_size)

    # Load inception model
    inception_model = inception_v3(pretrained=True, transform_input=False).type(dtype)
    inception_model.eval();
    up = nn.Upsample(size=(299, 299), mode='bilinear').type(dtype)
    def get_pred(x):
        if resize:
            x = up(x)
        x = inception_model(x)
        return F.softmax(x).data.cpu().numpy()

    # Get predictions
    preds = np.zeros((N, 1000))

    for i, batch in enumerate(dataloader, 0):
        batch = batch.type(dtype)
        batchv = Variable(batch)
        batch_size_i = batch.size()[0]

        preds[i*batch_size:i*batch_size + batch_size_i] = get_pred(batchv)

    # Now compute the mean kl-div
    split_scores = []

    for k in range(splits):
        part = preds[k * (N // splits): (k+1) * (N // splits), :]
        py = np.mean(part, axis=0)
        scores = []
        for i in range(part.shape[0]):
            pyx = part[i, :]
            scores.append(entropy(pyx, py))
        split_scores.append(np.exp(np.mean(scores)))

    return np.mean(split_scores), np.std(split_scores)

In [None]:
class facedataset(data.Dataset):
    def __init__(self, inputs_path: list, transforms):
        self.inputs_path = inputs_path
        self.transforms = transforms

    def __len__(self):
        return len(self.inputs_path)
    
    def __getitem__(self, index: int):
        input_path = self.inputs_path[index]
        input = Image.open(input_path)
        return self.transforms(input)

NameError: ignored

In [None]:
def get_dataset(batch_size, n_workers):

    train_transforms = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)),
    ])

    # test_files = glob.glob('./drive/MyDrive/DLCV/output_repo/*')
    test_files = glob.glob('./output_repo/*')
    test_files.sort()
    test_set = facedataset(test_files, train_transforms)

    return test_set

In [None]:
test_set = get_dataset(32, 0)
inception_score(test_set, cuda=torch.cuda.is_available(), batch_size=32, resize=True, splits=1)