In [None]:
import sys
sys.path.append("../")

import os
import argparse
import numpy as np
import GPUtil
import time
import itertools
from tqdm import tqdm 
import datetime
from typing import Any, Union, Tuple, Dict, List

import torch
from torch import nn
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
import torchvision.models as models
from torchvision.models.feature_extraction import create_feature_extractor, get_graph_node_names

from facenet_pytorch import InceptionResnetV1
from sklearn.model_selection import train_test_split

from util import save_json, load_json, create_logger, resolve_path
from fairface import Fairface
import umap
import matplotlib.pyplot as plt

In [None]:

def get_options() -> Any:
    parser = argparse.ArgumentParser()

    # Timestamp
    time_stamp = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M")
    parser.add_argument("--identifier", type=str, default=time_stamp, help="name of result folder")

    # Folders
    parser.add_argument("--save", type=bool, default=False, help="if save log and data")
    parser.add_argument("--dataset_dir", type=str, default="~/nas/dataset/fairface", help="path to directory which includes dataset(Fairface)")
    parser.add_argument("--result_dir", type=str, default="~/nas/results/attribute_inversion/step1", help="path to directory which includes results")

    # Conditions
    parser.add_argument("--gpu_idx", type=int, default=0, help="index of cuda devices")
    parser.add_argument("--batch_size", type=int, default=64, help="size of the batches")
    parser.add_argument("--img_channels", type=int, default=3, help="number of image channels")
    parser.add_argument("--img_size", type=int, default=64, help="size of each image dimension")
    parser.add_argument("--data_num", type=int, default=10000, help="number of data to use")
    parser.add_argument("--attack_model", type=str, default="ResNet152", help="attack model(feature extraction): 'VGG16', 'ResNet152', 'FaceNet'")
    parser.add_argument("--target_model", type=str, default="FaceNet", help="target model(feature extraction): 'VGG16', 'ResNet152', 'FaceNet'")
    parser.add_argument("--attributes", nargs='+', help="attributes to slice the dataset(age, gender, race)")

    opt = parser.parse_args()

    return opt


In [None]:

def load_model(model: str) -> Tuple[nn.Module, int, str]:
    layer_dict = dict()
    img_size_resnet152 = 112
    img_size_facenet = 160

    if model == "ResNet152":
        layer_name = "flatten"
        model = models.resnet152(pretrained=True) # pre-trained on ImageNet
        layer_dict[layer_name] = layer_name
        model = create_feature_extractor(model, layer_dict)
        return model, img_size_resnet152, layer_name
    
    if model == "FaceNet":
        return InceptionResnetV1(classify=False, num_classes=None, pretrained="vggface2"), img_size_facenet, None

    return None


In [None]:

def load_fairface_dataset_loader(
    base_dir: str,
    usage: str,
    data_num: int,
    attribute_group_list: dict,
    options: Any,
) -> Tuple[Fairface, list]:
    transform = transforms.Compose([
        transforms.ToTensor()
    ])

    attribute_list = []

    for attribute_group in attribute_group_list:
        if attribute_group in options.attributes:
            attribute_list.append(attribute_group_list[attribute_group])

    attributes_list = list(itertools.product(*attribute_list))

    datasets = dict()
    dataloaders = dict()

    for attributes in attributes_list:
        dataset = Fairface(
            base_dir=base_dir,
            usage=usage,
            transform=transform,
            data_num=data_num,
            attributes=attributes
        )

        datasets[attributes] = dataset

        dataloader = DataLoader(
            dataset,
            batch_size=options.batch_size,
            shuffle=False,
            # Optimization:
            num_workers=os.cpu_count(),
            pin_memory=True
        )

        dataloaders[attributes] = dataloader

    return datasets, dataloaders, attributes_list


In [None]:

def extract_feature(batch: Any,
                    device: Any,
                    transform: transforms,
                    model: nn.Module,
                    layer_name:str):
    batch= transform(batch[0]).to(device)
    feature = model(batch)
    if type(feature) is dict:
        feature = feature[layer_name]

    return feature


In [None]:

def show_umap(features_T: torch.Tensor,
                   features_F: torch.Tensor,
                   attribute_datanum: dict,
                   options: Any):
    # UMAP
    embedding_T = umap.UMAP().fit_transform(features_T.detach().cpu())
    embedding_F = umap.UMAP().fit_transform(features_F.detach().cpu())
    umap_x_T=embedding_T[:,0]
    umap_y_T=embedding_T[:,1]
    umap_x_F=embedding_F[:,0]
    umap_y_F=embedding_F[:,1]
    base_datanum = 0
    fig, axes = plt.subplots(nrows=1, ncols=2, sharex=False)
    for attribute in attribute_datanum:
        datanum = attribute_datanum[attribute]
        axes[0].scatter((np.mean(umap_x_T[base_datanum:base_datanum+datanum])), np.mean(umap_y_T[base_datanum:base_datanum+datanum]), label=(''.join(attribute) + "_T"))
        axes[1].scatter(np.mean(umap_x_F[base_datanum:base_datanum+datanum]), np.mean(umap_y_F[base_datanum:base_datanum+datanum]), label=(''.join(attribute) + "_F"))
        base_datanum += datanum
    axes[0].legend(prop={'size': 5})
    axes[1].legend(prop={'size': 5})
    plt.title("UMAP")
    plt.savefig(resolve_path(options.result_dir, 'umap.png'))


In [None]:

options = get_options()

# Decide device
device = f"cuda:{options.gpu_idx}" if torch.cuda.is_available() else "cpu"
options.device = device

# Create directories to save data
if options.save:
    # Create directory to save results
    result_dir = resolve_path(options.result_dir, options.identifier)
    os.makedirs(result_dir, exist_ok=True)

    # Save step1 options
    save_json(resolve_path(result_dir, "step1.json"), vars(options))

# Create logger
if options.save:
    logger = create_logger(f"Step 1", resolve_path(result_dir, "training.log"))
else:
    logger = create_logger(f"Step 1")

# Log options
logger.info(vars(options))

# Load attribute list
attribute_group_list = load_json("attributes.json")

T, img_size_T, _ = load_model(options.target_model)
F, img_size_F, layer_name_F = load_model(options.attack_model)
T.to(device)
F.to(device)
T.eval()
F.eval()

datasets, dataloaders, attributes_list = load_fairface_dataset_loader(options.dataset_dir, \
                                            'train', \
                                            options.data_num, \
                                            attribute_group_list, \
                                            options)

transform_T = transforms.Compose([
    transforms.Resize((img_size_T, img_size_T)),
])
transform_F = transforms.Compose([
    transforms.Resize((img_size_F, img_size_F)),
])

attribute_feature_T = dict()
attribute_feature_F = dict()
for dataloader_name in tqdm(dataloaders):
    features_T = torch.Tensor().to(device)
    features_F = torch.Tensor().to(device)
    # if 'Black' in dataloader_name or 'White' in dataloader_name or 'East Asian' in dataloader_name:
    for batch in dataloaders[dataloader_name]:
        feature_T = extract_feature(batch, device, transform_T, T, None)
        feature_F = extract_feature(batch, device, transform_F, F, layer_name_F)
        features_T = torch.cat((features_T, feature_T.detach()), 0)
        features_F = torch.cat((features_F, feature_F.detach()), 0)
    attribute_feature_T[dataloader_name] = features_T
    attribute_feature_F[dataloader_name] = features_F


In [None]:

for attribute in attribute_feature_T:
    sum_of_cosine_similarity_T = 0
    sum_of_cosine_similarity_F = 0
    features_T = attribute_feature_T[attribute]
    features_F = attribute_feature_F[attribute]
    average_of_features_T = torch.mean(features_T, dim=0)
    average_of_features_F = torch.mean(features_F, dim=0)
    print(average_of_features_T)
    print(average_of_features_T.shape)
    average_of_features_T = average_of_features_T.expand(features_T.shape[0], -1)
    average_of_features_F = average_of_features_F.expand(features_F.shape[0], -1)
    print(average_of_features_T.shape)
    metric = nn.CosineSimilarity(dim=1)
    sum_of_cosine_similarity_T += metric(average_of_features_T, features_T)
    sum_of_cosine_similarity_F += metric(average_of_features_F, features_F)
    print("average cosine similarity of T:", sum_of_cosine_similarity_T / features_T.shape[0])
    print("average cosine similarity of F:", sum_of_cosine_similarity_F / features_F.shape[0])