## StarGAN

### Initialize

In [18]:
import argparse
import os
import random

import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import classification_report
import torch
from torch.nn.functional import softmax
import torchvision
from stargan.data_loader import get_loader
from stargan.psolver import Disruptor
from stargan.attacks import LinfPGDAttack

In [19]:
def set_seed(seed):
    """Set seed"""
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False
    os.environ["PYTHONHASHSEED"] = str(seed)

In [20]:
parser = argparse.ArgumentParser()

# Model configuration.
parser.add_argument('--c_dim', type=int, default=5, help='dimension of domain labels (1st dataset)')
parser.add_argument('--c2_dim', type=int, default=8, help='dimension of domain labels (2nd dataset)')
parser.add_argument('--celeba_crop_size', type=int, default=178, help='crop size for the CelebA dataset')
parser.add_argument('--rafd_crop_size', type=int, default=256, help='crop size for the RaFD dataset')
parser.add_argument('--image_size', type=int, default=128, help='image resolution')
parser.add_argument('--g_conv_dim', type=int, default=64, help='number of conv filters in the first layer of G')
parser.add_argument('--d_conv_dim', type=int, default=64, help='number of conv filters in the first layer of D')
parser.add_argument('--g_repeat_num', type=int, default=6, help='number of residual blocks in G')
parser.add_argument('--d_repeat_num', type=int, default=6, help='number of strided conv layers in D')
parser.add_argument('--lambda_cls', type=float, default=1, help='weight for domain classification loss')
parser.add_argument('--lambda_rec', type=float, default=10, help='weight for reconstruction loss')
parser.add_argument('--lambda_gp', type=float, default=10, help='weight for gradient penalty')
parser.add_argument('--eps', type=float, default=0.05, help='epsilon for perturbation')
parser.add_argument('--order', type=int, default=2, help='distance metric')
parser.add_argument('--detector', type=str, default='xception', choices=['xception', 'resnet18', 'resnet50'])

# Training configuration.
parser.add_argument('--seed', type=int, default=0, help='seed for experiments')
parser.add_argument('--dataset', type=str, default='CelebA', choices=['CelebA', 'RaFD', 'Both'])
parser.add_argument('--batch_size', type=int, default=32, help='mini-batch size')
parser.add_argument('--epochs', type=int, default=30, help='number of total epochs for training P')
parser.add_argument('--lr', type=float, default=1e-4, help='learning rate for P')
parser.add_argument('--beta1', type=float, default=0.99, help='beta1 for Adam optimizer')
parser.add_argument('--beta2', type=float, default=0.999, help='beta2 for Adam optimizer')
parser.add_argument('--resume', default=False, action='store_true', help='resume training from last epoch')
parser.add_argument('--selected_attrs', '--list', nargs='+', help='selected attributes for the CelebA dataset',
                    default=['Black_Hair', 'Blond_Hair', 'Brown_Hair', 'Male', 'Young'])
parser.add_argument('--alpha', type=float, default=0.1, help="alpha for gradnorm")

# Miscellaneous.
parser.add_argument('--num_workers', type=int, default=48)
parser.add_argument('--mode', type=str, default='train', choices=['train', 'test'])
parser.add_argument('--disable_tensorboard', action='store_true', default=False)

# Directories.
parser.add_argument('--outputs_dir', type=str, default='experiment1')
parser.add_argument('--gen_ckpt', type=str,
                    default='stargan/stargan_celeba_128/models/200000-G.ckpt')
parser.add_argument('--detector_path', type=str,
                    default='detection/detector_c23.pth')
parser.add_argument('--celeba_image_dir', type=str, default='stargan/data/celeba/images')
parser.add_argument('--attr_path', type=str, default='stargan/data/celeba/list_attr_celeba.txt')
parser.add_argument('--log_dir', type=str, default='logs')
parser.add_argument('--model_save_dir', type=str,
                    default='model_save_dir')
parser.add_argument('--sample_dir', type=str, default='samples')
parser.add_argument('--result_dir', type=str, default='results')

# Step size.
parser.add_argument('--log_step', type=int, default=1)
parser.add_argument('--sample_step', type=int, default=1)

config = parser.parse_args(args=[])
print(config)

Namespace(alpha=0.1, attr_path='stargan/data/celeba/list_attr_celeba.txt', batch_size=32, beta1=0.99, beta2=0.999, c2_dim=8, c_dim=5, celeba_crop_size=178, celeba_image_dir='stargan/data/celeba/images', d_conv_dim=64, d_repeat_num=6, dataset='CelebA', detector='xception', detector_path='detection/detector_c23.pth', disable_tensorboard=False, epochs=30, eps=0.05, g_conv_dim=64, g_repeat_num=6, gen_ckpt='stargan/stargan_celeba_128/models/200000-G.ckpt', image_size=128, lambda_cls=1, lambda_gp=10, lambda_rec=10, log_dir='logs', log_step=1, lr=0.0001, mode='train', model_save_dir='model_save_dir', num_workers=48, order=2, outputs_dir='experiment1', rafd_crop_size=256, result_dir='results', resume=False, sample_dir='samples', sample_step=1, seed=0, selected_attrs=['Black_Hair', 'Blond_Hair', 'Brown_Hair', 'Male', 'Young'])


### Instantiate Objects & Define Methods

In [21]:
set_seed(config.seed)
torch.cuda.set_device(0)
device = torch.device("cuda")

In [38]:
# Solver for training and testing DeepFake Disruptor
config.num_workers = 1
config.mode = 'test'
config.batch_size = 1
# bb_dir = '/home/data/bb_celeba/'

celeba_loader = get_loader(config.celeba_image_dir, config.attr_path, config.selected_attrs,
                            config.celeba_crop_size, config.image_size, config.batch_size,
                            'CelebA', config.mode, config.num_workers)

config.detector = "resnet50"
resnet18 = "garbage/resnet_18_weights/last.ckpt"
resnet50 = "garbage/resnet_50_weights/r50.ckpt"

config.detector_path = resnet50

solver = Disruptor(config, celeba_loader).cuda()

G = solver.G

D = solver.D.eval()
D.eval()

P = solver.P
P.eval()
P.load_state_dict(torch.load("stargan/128/perturbation_models/best.ckpt", map_location="cuda"))

del solver

pgd_attack = LinfPGDAttack(model=G, device=device)

Finished preprocessing the CelebA dataset...




In [23]:
def denorm(x):
    """Convert the range from [-1, 1] to [0, 1]."""
    out = (x + 1) / 2
    return out.clamp_(0, 1)

def get_images_without_attr(celeba_loader, attr, num_images=100, bb=False):
    attrs = ['Black_Hair', 'Blond_Hair', 'Brown_Hair', 'Male', 'Young']
    i = 0
    for i, attribute in enumerate(attrs):
        if attribute == attr:
            break
    images = []
    if bb:
        bboxes = []
        for x, c, bb in celeba_loader:
            # target must not be bald
            images.extend(x[(c[:, i] != 1) & (c[:, :3].sum(dim=1) != 0)])
            bboxes.extend(bb[(c[:, i] != 1) & (c[:, :3].sum(dim=1) != 0)])
            if len(images) >= 100:
                images = images[:100]
                bboxes = bboxes[:100]
                break
        bboxes = torch.stack(bboxes)
        images = torch.stack(images)
        return images, bboxes
    else:
        for x, c in celeba_loader:
            # target must not be bald
            images.extend(x[(c[:, i] != 1) & (c[:, :3].sum(dim=1) != 0)])
            if len(images) >= 100:
                images = images[:100]
                break
        images = torch.stack(images)
        return images

def show_images(x):
    images = denorm(x.cpu())
    grid_img = torchvision.utils.make_grid(images, nrow=10)
    plt.figure(figsize=(15, 10))
    plt.imshow(grid_img.permute(1, 2, 0))

In [24]:
@torch.no_grad()
def generate(x, c):
    return G(x, c)[0]

def joint_class_attack(x_real, x_fake, c):
    x_adv, perturb = pgd_attack.perturb(x_real, x_fake, c)
    x_adv = x_real + perturb
    return x_adv
    
@torch.no_grad()
def detect(x):
    output = D(x)
    output = softmax(output, 1)
    prediction = output.argmin(1, keepdim=False)
    return prediction

@torch.no_grad()
def perturb(x):
    return P(x) + x

In [25]:
attribute = "Black_Hair"
x = get_images_without_attr(celeba_loader, attribute, bb=False)
x = x.cuda()
c = torch.tensor([1, 0, 0, 0, 0])
c = c.tile(100).view(100, 5).cuda()

### Qualitative Results

In [None]:
show_images(x)

In [None]:
x_fake = generate(x, c)
show_images(x_fake)

In [None]:
x_adv = joint_class_attack(x, x_fake, c)
show_images(x_adv)

In [None]:
x_pert = perturb(x)
show_images(x_pert)

In [None]:
x_adv_fake = generate(x_adv, c)
show_images(x_adv_fake)

In [None]:
x_pert_fake = generate(x_pert, c)
show_images(x_pert_fake)

### Evaluation

In [26]:
def get_l2_distance(x_fake, xp_fake):
    B = x_fake.size(0)
    x_fake = x_fake.view(B, -1)
    xp_fake = xp_fake.view(B, -1)
    return torch.linalg.norm(x_fake - xp_fake, dim=1, ord=2)

In [39]:
def get_metrics(x, c):
    xp = perturb(x)
    x_fake = generate(x, c)
    xp_fake = generate(xp, c)
    x_adv = joint_class_attack(x, x_fake, c)
    x_adv_fake = generate(x_adv, c)
    
    predicted_real = detect(x).cpu().numpy()
    predicted_real_p = detect(xp).cpu().numpy()

    predicted_fake = detect(x_fake).cpu().numpy()
    predicted_fake_p = detect(xp_fake).cpu().numpy()
    predicted_fake_adv = detect(x_adv_fake).cpu().numpy()
    
    print("Success Rate G[x+P(x)]:", predicted_fake_p.sum() / 100)

    y_pred = np.hstack((predicted_real, predicted_fake))
    yp_pred = np.hstack((predicted_real_p, predicted_fake_p))
    # yp_pred = np.hstack((predicted_real, predicted_fake_p))
    yadv_pred = np.hstack((predicted_real, predicted_fake_adv))
    y_true = np.hstack((np.ones(100), np.zeros(100)))

    report_stargan = classification_report(y_true, y_pred, target_names=["fake", "real"])
    report_pgd = classification_report(y_true, yadv_pred, target_names=["fake", "real"])
    report_disruptor = classification_report(y_true, yp_pred, target_names=["fake", "real"])

    return report_stargan, report_pgd, report_disruptor

In [40]:
report_stargan, report_pgd, report_disruptor = get_metrics(x, c)

Success Rate G[x+P(x)]: 0.27


In [41]:
print(report_stargan)

              precision    recall  f1-score   support

        fake       0.53      0.08      0.14       100
        real       0.50      0.93      0.65       100

    accuracy                           0.51       200
   macro avg       0.52      0.51      0.40       200
weighted avg       0.52      0.51      0.40       200



In [42]:
print(report_pgd)

              precision    recall  f1-score   support

        fake       0.88      0.53      0.66       100
        real       0.66      0.93      0.78       100

    accuracy                           0.73       200
   macro avg       0.77      0.73      0.72       200
weighted avg       0.77      0.73      0.72       200



In [43]:
print(report_disruptor)

              precision    recall  f1-score   support

        fake       0.94      0.73      0.82       100
        real       0.78      0.95      0.86       100

    accuracy                           0.84       200
   macro avg       0.86      0.84      0.84       200
weighted avg       0.86      0.84      0.84       200



## Inference Test

In [25]:
attribute = "Black_Hair"
x = get_images_without_attr(celeba_loader, attribute, bb=False)
x = x.cuda()
c = torch.tensor([1, 0, 0, 0, 0])
c = c.tile(100).view(100, 5).cuda()


In [30]:
start = torch.cuda.Event(enable_timing=True)
end = torch.cuda.Event(enable_timing=True)


In [28]:
P_ev = P.eval()

In [32]:
ds = celeba_loader.dataset

In [34]:
len(ds)

1999

In [46]:
x_100 = torch.zeros((100, 1, 3, 128, 128), device="cuda")
c_100 = torch.zeros((100, 1, 5), device="cuda")
x_fake = torch.zeros((100, 1, 3, 128, 128), device="cuda")

for i, (x, _) in enumerate(celeba_loader):
    if i == 100:
        break
    x_100[i, 0] = x.cuda()
    idx = torch.randint(low=0, high=len(ds), size=(1,)).item()
    c = ds[idx][1].cuda()
    c_100[i, 0] = c
    x_fake[i, 0] = generate(x.cuda(), c.unsqueeze(0))
    

In [49]:
start.record()
for i in range(100):
    P(x_100[i])
end.record()

# Waits for everything to finish running
torch.cuda.synchronize()

print(start.elapsed_time(end) / 100 / 1000) 

0.007794022216796875


In [50]:
start.record()
for i in range(100):
    pgd_attack.perturb(x_100[i], x_fake[i], c_100[i])
end.record()

# Waits for everything to finish running
torch.cuda.synchronize()

print(start.elapsed_time(end) / 100 / 1000) 

0.2134470703125
