In [None]:
!pip install ninja



In [None]:
!pip uninstall -y datasets


[0m

In [None]:
!gdown --id 1EM87UquaoQmk17Q8d5kYIAHqu0dkYqdT
!mv /content/HFGI/stylegan2-ffhq-config-f.pt /content/HFGI/pretrained



Downloading...

From (original): https://drive.google.com/uc?id=1EM87UquaoQmk17Q8d5kYIAHqu0dkYqdT

From (redirected): https://drive.google.com/uc?id=1EM87UquaoQmk17Q8d5kYIAHqu0dkYqdT&confirm=t&uuid=795edc69-c31e-44b4-a8cf-add54d1aeae6

To: /content/HFGI/stylegan2-ffhq-config-f.pt

100% 381M/381M [00:10<00:00, 38.1MB/s]


In [None]:
!gdown --id 1KW7bjndL3QG3sxBbZxreGHigcCCpsDgn
!mv /content/HFGI/model_ir_se50.pth /content/HFGI/pretrained



Downloading...

From (original): https://drive.google.com/uc?id=1KW7bjndL3QG3sxBbZxreGHigcCCpsDgn

From (redirected): https://drive.google.com/uc?id=1KW7bjndL3QG3sxBbZxreGHigcCCpsDgn&confirm=t&uuid=589f5da2-3612-476e-b38c-c3e0ce471721

To: /content/HFGI/model_ir_se50.pth

100% 175M/175M [00:01<00:00, 136MB/s]


In [None]:
%%writefile /content/HFGI/configs/transforms_config.py
from abc import abstractmethod
import torchvision.transforms as transforms


class TransformsConfig(object):

	def __init__(self, opts):
		self.opts = opts

	@abstractmethod
	def get_transforms(self):
		pass


class EncodeTransforms(TransformsConfig):

	def __init__(self, opts):
		super(EncodeTransforms, self).__init__(opts)

	def get_transforms(self):
		transforms_dict = {
			'transform_gt_train': transforms.Compose([
				transforms.Resize((256, 256)),
				transforms.ToTensor(),
				transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])]),
			'transform_source': transforms.Compose([
				transforms.Resize((256, 256)),
        transforms.Grayscale(num_output_channels=3),
				transforms.ToTensor(),
				transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])]),
			'transform_test': transforms.Compose([
				transforms.Resize((256, 256)),
				transforms.ToTensor(),
				transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])]),
			'transform_inference': transforms.Compose([
				transforms.Resize((256, 256)),
				transforms.Grayscale(num_output_channels=3),
				transforms.ToTensor(),
				transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])])
		}
		return transforms_dict


class CarsEncodeTransforms(TransformsConfig):

	def __init__(self, opts):
		super(CarsEncodeTransforms, self).__init__(opts)

	def get_transforms(self):
		transforms_dict = {
			'transform_gt_train': transforms.Compose([
				transforms.Resize((192, 256)),
				transforms.RandomHorizontalFlip(0.5),
				transforms.ToTensor(),
				transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])]),
			'transform_source': None,
			'transform_test': transforms.Compose([
				transforms.Resize((192, 256)),
				transforms.ToTensor(),
				transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])]),
			'transform_inference': transforms.Compose([
				transforms.Resize((192, 256)),
				transforms.ToTensor(),
				transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])])
		}
		return transforms_dict


Overwriting /content/HFGI/configs/transforms_config.py


In [None]:
%%writefile /content/HFGI/configs/paths_config.py
dataset_paths = {
	#  Face Datasets (FFHQ - train, CelebA-HQ - test)
	'ffhq': '/content/HFGI/celebahq-resized-256x256',
	'ffhq_val': '/content/HFGI/test_imgs',

	#  Cars Dataset (Stanford cars)
	'cars_train': '',
	'cars_val': '',
}

model_paths = {
	'stylegan_ffhq': './pretrained/stylegan2-ffhq-config-f.pt',
	'ir_se50': './pretrained/model_ir_se50.pth',
	'shape_predictor': './pretrained/shape_predictor_68_face_landmarks.dat',
	'moco': './pretrained/moco_v2_800ep_pretrain.pt'
}


Overwriting /content/HFGI/configs/paths_config.py


In [None]:
%%writefile /content/HFGI/training/coach.py
import os
import random
import matplotlib
import matplotlib.pyplot as plt

matplotlib.use('Agg')
import numpy as np
import torch
from torch import nn, autograd
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
import torch.nn.functional as F

from utils import common, train_utils
from criteria import id_loss, moco_loss
from configs import data_configs
from datasets.images_dataset import ImagesDataset
from criteria.lpips.lpips import LPIPS
from models.psp import pSp
from training.ranger import Ranger

random.seed(0)
torch.manual_seed(0)


class Coach:
    def __init__(self, opts):
        self.opts = opts
        self.global_step = 0
        self.device = 'cuda'
        self.opts.device = self.device
        self.net = pSp(self.opts).to(self.device) # modify it to your basic encoder

        # Initialize loss
        if self.opts.lpips_lambda > 0:
            self.lpips_loss = LPIPS(net_type=self.opts.lpips_type).to(self.device).eval()
        if self.opts.id_lambda > 0:
            if 'ffhq' in self.opts.dataset_type or 'celeb' in self.opts.dataset_type:
                self.id_loss = id_loss.IDLoss().to(self.device).eval()
            else:
                self.id_loss = moco_loss.MocoLoss(opts).to(self.device).eval()
        self.mse_loss = nn.MSELoss().to(self.device).eval()

        # Initialize optimizer
        self.optimizer = self.configure_optimizers()

        # Initialize dataset
        self.train_dataset, self.test_dataset = self.configure_datasets()
        self.train_dataloader = DataLoader(self.train_dataset,
                                           batch_size=self.opts.batch_size,
                                           shuffle=True,
                                           num_workers=int(self.opts.workers),
                                           drop_last=True)
        self.test_dataloader = DataLoader(self.test_dataset,
                                          batch_size=self.opts.test_batch_size,
                                          shuffle=False,
                                          num_workers=int(self.opts.test_workers),
                                          drop_last=True)

        # Initialize logger
        log_dir = os.path.join(opts.exp_dir, 'logs')
        os.makedirs(log_dir, exist_ok=True)
        self.logger = SummaryWriter(log_dir=log_dir)

        # Initialize checkpoint dir
        self.checkpoint_dir = os.path.join(opts.exp_dir, 'checkpoints')
        os.makedirs(self.checkpoint_dir, exist_ok=True)


    def train(self):
        self.net.train()
        while self.global_step < self.opts.max_steps:
            for batch_idx, batch in enumerate(self.train_dataloader):
                loss_dict = {}

                x, y, y_hat, latent, res_delta, rec = self.forward(batch)
                loss, encoder_loss_dict, id_logs = self.calc_loss(x, y, y_hat, latent, res_delta)
                loss_dict = {**loss_dict, **encoder_loss_dict}
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()

                # Logging related
                if self.global_step % self.opts.image_interval == 0:
                    self.parse_and_log_images(id_logs, x, y, y_hat, title='images/train/faces')
                if self.global_step % self.opts.board_interval == 0:
                    self.print_metrics(loss_dict, prefix='train')
                    self.log_metrics(loss_dict, prefix='train')

                # Validation related
                val_loss_dict = None
                if self.global_step % self.opts.val_interval == 0:
                    val_loss_dict = self.validate()

                if self.global_step % self.opts.save_interval == 0:
                    if val_loss_dict is not None:
                        self.checkpoint_me(val_loss_dict)
                    else:
                        self.checkpoint_me(loss_dict)

                if self.global_step == self.opts.max_steps:
                    break

                self.global_step += 1


    def validate(self):
        self.net.eval()
        agg_loss_dict = []
        for batch_idx, batch in enumerate(self.test_dataloader):
            cur_loss_dict = {}
            with torch.no_grad():
                x, y, y_hat, latent, res_delta, rec = self.forward(batch)
                loss, cur_encoder_loss_dict, id_logs = self.calc_loss(x, y, y_hat, latent, res_delta)
                cur_loss_dict = {**cur_loss_dict, **cur_encoder_loss_dict}
            agg_loss_dict.append(cur_loss_dict)

            # Logging related
            self.parse_and_log_images(id_logs, x, y, y_hat,
                                      title='images/test/faces',
                                      subscript='{:04d}'.format(batch_idx))

            # For first step just do sanity test on small amount of data
            if self.global_step == 0 and batch_idx >= 4:
                self.net.train()
                return None  # Do not log, inaccurate in first batch

        loss_dict = train_utils.aggregate_loss_dict(agg_loss_dict)
        self.log_metrics(loss_dict, prefix='test')
        self.print_metrics(loss_dict, prefix='test')

        self.net.train()
        return loss_dict

    def checkpoint_me(self, loss_dict):
        save_name =  'iteration_{}.pt'.format(self.global_step)
        save_dict = self.__get_save_dict()
        checkpoint_path = os.path.join(self.checkpoint_dir, save_name)
        torch.save(save_dict, checkpoint_path)
        with open(os.path.join(self.checkpoint_dir, 'timestamp.txt'), 'a') as f:
            f.write('Step - {}, \n{}\n'.format(self.global_step, loss_dict))

    def configure_optimizers(self):
        params = list(self.net.residue.parameters())
        params  += list(self.net.grid_align.parameters())
        if self.opts.train_decoder:
            params += list(self.net.decoder.parameters())
        else:
            self.requires_grad(self.net.decoder, False)
            self.requires_grad(self.net.encoder, False)
        if self.opts.optim_name == 'adam':
            optimizer = torch.optim.Adam(params, lr=self.opts.learning_rate)
        else:
            optimizer = Ranger(params, lr=self.opts.learning_rate)
        return optimizer

    def configure_datasets(self):
        if self.opts.dataset_type not in data_configs.DATASETS.keys():
            Exception('{} is not a valid dataset_type'.format(self.opts.dataset_type))
        print('Loading dataset for {}'.format(self.opts.dataset_type))
        dataset_args = data_configs.DATASETS[self.opts.dataset_type]
        transforms_dict = dataset_args['transforms'](self.opts).get_transforms()
        train_dataset = ImagesDataset(source_root=dataset_args['train_source_root'],
                                      target_root=dataset_args['train_target_root'],
                                      source_transform=transforms_dict['transform_source'],
                                      target_transform=transforms_dict['transform_gt_train'],
                                      opts=self.opts)
        test_dataset = ImagesDataset(source_root=dataset_args['test_source_root'],
                                     target_root=dataset_args['test_target_root'],
                                     source_transform=transforms_dict['transform_source'],
                                     target_transform=transforms_dict['transform_test'],
                                     opts=self.opts)
        print("Number of training samples: {}".format(len(train_dataset)))
        print("Number of test samples: {}".format(len(test_dataset)))
        return train_dataset, test_dataset

    def calc_loss(self, x, y, y_hat, latent, res_delta):
        loss_dict = {}
        loss = 0.0
        id_logs = None

        if self.opts.id_lambda > 0:
            loss_id, sim_improvement, id_logs = self.id_loss(y_hat, y, x)
            loss_dict['loss_id'] = float(loss_id)
            loss_dict['id_improve'] = float(sim_improvement)
            loss += loss_id * self.opts.id_lambda
        if self.opts.l2_lambda > 0:
            loss_l2 = F.mse_loss(y_hat, y)
            loss_dict['loss_l2'] = float(loss_l2)
            loss += loss_l2 * self.opts.l2_lambda
        if self.opts.lpips_lambda > 0:
            loss_lpips = self.lpips_loss(y_hat, y)
            loss_dict['loss_lpips'] = float(loss_lpips)
            loss += loss_lpips * self.opts.lpips_lambda

        if self.opts.res_lambda > 0:
            target = torch.zeros_like(res_delta)
            loss_res = F.l1_loss(res_delta, target)
            loss_dict['loss_res'] = float(loss_res)
            loss += loss_res * self.opts.res_lambda

        loss_dict['loss'] = float(loss)
        return loss, loss_dict, id_logs

    def forward(self, batch):
        x, y = batch
        x, y = x.to(self.device).float(), y.to(self.device).float()
        y_hat, latent, res_delta, rec = self.net.forward(x, return_latents=True)
        if self.opts.dataset_type == "cars_encode":
            y_hat = y_hat[:, :, 32:224, :]
        return x, y, y_hat, latent, res_delta, rec

    def log_metrics(self, metrics_dict, prefix):
        for key, value in metrics_dict.items():
            self.logger.add_scalar('{}/{}'.format(prefix, key), value, self.global_step)

    def print_metrics(self, metrics_dict, prefix):
        print('Metrics for {}, step {}'.format(prefix, self.global_step))
        for key, value in metrics_dict.items():
            print('\t{} = '.format(key), value)

    def parse_and_log_images(self, id_logs, x, y, y_hat, title, subscript=None, display_count=2):
        im_data = []
        for i in range(display_count):
            cur_im_data = {
                'input_face': common.log_input_image(x[i], self.opts),
                'target_face': common.tensor2im(y[i]),
                'output_face': common.tensor2im(y_hat[i]),
            }
            if id_logs is not None:
                for key in id_logs[i]:
                    cur_im_data[key] = id_logs[i][key]
            im_data.append(cur_im_data)
        self.log_images(title, im_data=im_data, subscript=subscript)

    def log_images(self, name, im_data, subscript=None, log_latest=False):
        fig = common.vis_faces(im_data)
        step = self.global_step
        if log_latest:
            step = 0
        if subscript:
            path = os.path.join(self.logger.log_dir, name, '{}_{:04d}.jpg'.format(subscript, step))
        else:
            path = os.path.join(self.logger.log_dir, name, '{:04d}.jpg'.format(step))
        os.makedirs(os.path.dirname(path), exist_ok=True)
        fig.savefig(path)
        plt.close(fig)

    def __get_save_dict(self):
        save_dict = {
            'state_dict': self.net.state_dict(),
            'opts': vars(self.opts)}

        if self.opts.start_from_latent_avg:
            save_dict['latent_avg'] = self.net.latent_avg
        return save_dict

    @staticmethod
    def requires_grad(model, flag=True):
        for p in model.parameters():
            p.requires_grad = flag

Overwriting /content/HFGI/training/coach.py


In [None]:
%%writefile /content/HFGI/inference.sh
python ./scripts/inference.py \
--images_dir=/content/drive/MyDrive/485/celebahq256_all  --n_sample=2000 --edit_attribute='inversion'  \
--save_dir=./experiment/inference_results  ./content/drive/MyDrive/485/hfgi_iteration_30000.pt


Overwriting /content/HFGI/inference.sh


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [31]:
!python ./scripts/inference.py \
--images_dir=/content/drive/MyDrive/485/celebahq256_all  --n_sample=2000 --edit_attribute='inversion'  \
--save_dir=./experiment/inference_results  /content/drive/MyDrive/485/hfgi_iteration_30000.pt


Loading basic encoder from checkpoint: /content/drive/MyDrive/485/hfgi_iteration_30000.pt

images path: /content/drive/MyDrive/485/celebahq256_all

dataset length: 30000


  self.pid = os.fork()

inference finished!


In [33]:
! mkdir ~/.kaggle

In [39]:
! cp kaggle.json ~/.kaggle/

In [40]:
! chmod 600 ~/.kaggle/kaggle.json

In [38]:
!mv "kaggle-2.json" "kaggle.json"

In [41]:
!kaggle datasets download xhlulu/flickrfaceshq-dataset-nvidia-resized-256px


Dataset URL: https://www.kaggle.com/datasets/xhlulu/flickrfaceshq-dataset-nvidia-resized-256px

License(s): unknown

Downloading flickrfaceshq-dataset-nvidia-resized-256px.zip to /content/HFGI

100% 1.88G/1.89G [00:39<00:00, 37.8MB/s]

100% 1.89G/1.89G [00:39<00:00, 50.8MB/s]


In [55]:
import os
import shutil
import random

# Path to the downloaded dataset folder
dataset_path = "/kaggle/input/flickrfaceshq-dataset-nvidia-resized-256px/resized"

# Path to the folder where you want to save the subsets
output_folder = "/kaggle/working/outputs"

# Create output folders if they don't exist
os.makedirs(output_folder, exist_ok=True)
os.makedirs(os.path.join(output_folder, "test3"), exist_ok=True)
os.makedirs(os.path.join(output_folder, "ground_truth3"), exist_ok=True)

# List all image files in the dataset folder
image_files = [f for f in os.listdir(dataset_path) if os.path.isfile(os.path.join(dataset_path, f))]

# Shuffle the list of image files
random.shuffle(image_files)

# Take the first 2000 images for each folder
folder1_images = image_files[:2500]
folder2_images = image_files[2500:5000]

# Copy images to folder1
for image in folder1_images:
    src = os.path.join(dataset_path, image)
    dst = os.path.join(output_folder, "test3", image)
    shutil.copy(src, dst)

# Copy images to folder2
for image in folder2_images:
    src = os.path.join(dataset_path, image)
    dst = os.path.join(output_folder, "ground_truth3", image)
    shutil.copy(src, dst)

In [44]:
!python ./scripts/inference.py \
--images_dir=/content/HFGI/images/test  --n_sample=2000 --edit_attribute='inversion'  \
--save_dir=/content/HFGI/images/test_results  /content/drive/MyDrive/485/hfgi_iteration_30000.pt

Loading basic encoder from checkpoint: /content/drive/MyDrive/485/hfgi_iteration_30000.pt

images path: /content/HFGI/images/test

dataset length: 2000


  self.pid = os.fork()


In [2]:
!pip install clean-fid

Collecting clean-fid
  Downloading clean_fid-0.1.35-py3-none-any.whl.metadata (36 kB)
Downloading clean_fid-0.1.35-py3-none-any.whl (26 kB)
Installing collected packages: clean-fid
Successfully installed clean-fid-0.1.35


In [46]:
from cleanfid import fid
hfgi_score = fid.compute_fid('/content/HFGI/images/test_results', '/content/HFGI/images/ground_truth')




compute FID between two folders

Found 2000 images in the folder /content/HFGI/images/test_results



  self.pid = os.fork()

FID test_results : 100%|██████████| 63/63 [00:24<00:00,  2.53it/s]


Found 2000 images in the folder /content/HFGI/images/grand_truth


FID grand_truth : 100%|██████████| 63/63 [00:26<00:00,  2.36it/s]


In [47]:
hfgi_score

106.35741032881893

In [50]:
!mv "/content/drive/MyDrive/485/melih.zip" "/content/HFGI/psp_melih/melih.zip"

NotImplementedError: A UTF-8 locale is required. Got ANSI_X3.4-1968

In [60]:
!export LANG=en_US.UTF-8

NotImplementedError: A UTF-8 locale is required. Got ANSI_X3.4-1968

In [61]:
!unzip /content/HFGI/psp_melih/melih.zip

NotImplementedError: A UTF-8 locale is required. Got ANSI_X3.4-1968

In [62]:
import subprocess

subprocess.run(['unzip', '/content/HFGI/psp_melih/melih.zip'], encoding='utf-8')

CompletedProcess(args=['unzip', '/content/HFGI/psp_melih/melih.zip'], returncode=0)

In [63]:
from cleanfid import fid
hfgi_score = fid.compute_fid('/content/HFGI/output', '/content/HFGI/melih_ground_truth')




compute FID between two folders

Found 1991 images in the folder /content/HFGI/output



  self.pid = os.fork()


  self.pid = os.fork()

FID output : 100%|██████████| 63/63 [00:27<00:00,  2.28it/s]


Found 1991 images in the folder /content/HFGI/melih_ground_truth


FID melih_ground_truth : 100%|██████████| 63/63 [00:24<00:00,  2.55it/s]


ValueError: Imaginary component 0.011012653718757792

In [69]:
from cleanfid import fid

# Reduce the number of workers to 2
# Compute FID score with reduced workers
psp_score = fid.compute_fid('/content/HFGI/output', '/content/HFGI/images/ground_truth')
print(f"psp Score: {psp_score}")

compute FID between two folders

Found 2000 images in the folder /content/HFGI/images/test_results


FID test_results : 100%|██████████| 63/63 [00:26<00:00,  2.36it/s]


Found 2000 images in the folder /content/HFGI/images/ground_truth


FID ground_truth : 100%|██████████| 63/63 [00:28<00:00,  2.18it/s]


psp Score: 106.35741032881893


In [7]:
import os
os.mkdir("/kaggle/working/psp")
os.chdir('/kaggle/working/psp')
CODE_DIR = 'pixel2style2pixel'

In [8]:
!git clone https://github.com/eladrich/pixel2style2pixel.git $CODE_DIR

Cloning into 'pixel2style2pixel'...
remote: Enumerating objects: 418, done.[K
remote: Counting objects: 100% (4/4), done.[K
remote: Compressing objects: 100% (4/4), done.[K
remote: Total 418 (delta 0), reused 2 (delta 0), pack-reused 414[K
Receiving objects: 100% (418/418), 92.94 MiB | 54.32 MiB/s, done.
Resolving deltas: 100% (147/147), done.


In [72]:
import subprocess

subprocess.run(['git', 'clone', 'https://github.com/eladrich/pixel2style2pixel.git', CODE_DIR], encoding='utf-8')

CompletedProcess(args=['git', 'clone', 'https://github.com/eladrich/pixel2style2pixel.git', 'pixel2style2pixel'], returncode=0)

In [9]:
!wget https://github.com/ninja-build/ninja/releases/download/v1.8.2/ninja-linux.zip
!sudo unzip ninja-linux.zip -d /usr/local/bin/
!sudo update-alternatives --install /usr/bin/ninja ninja /usr/local/bin/ninja 1 --force

--2024-05-19 19:22:21--  https://github.com/ninja-build/ninja/releases/download/v1.8.2/ninja-linux.zip
Resolving github.com (github.com)... 140.82.116.3
Connecting to github.com (github.com)|140.82.116.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/1335132/d2f252e2-9801-11e7-9fbf-bc7b4e4b5c83?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=releaseassetproduction%2F20240519%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240519T192221Z&X-Amz-Expires=300&X-Amz-Signature=c4a4de85a9ef68bf739f6d15f2365cc39c8463ed584d78d09d996271a3b91090&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=1335132&response-content-disposition=attachment%3B%20filename%3Dninja-linux.zip&response-content-type=application%2Foctet-stream [following]
--2024-05-19 19:22:21--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/1335132/d2f252e2-9801-11e7-9fbf-bc7b4e4b5c83?X-Amz-Algor

In [74]:
import locale
locale.getpreferredencoding = lambda: "UTF-8"

In [75]:
!wget https://github.com/ninja-build/ninja/releases/download/v1.8.2/ninja-linux.zip
!sudo unzip ninja-linux.zip -d /usr/local/bin/
!sudo update-alternatives --install /usr/bin/ninja ninja /usr/local/bin/ninja 1 --force

--2024-05-19 18:19:14--  https://github.com/ninja-build/ninja/releases/download/v1.8.2/ninja-linux.zip

Resolving github.com (github.com)... 140.82.113.4

Connecting to github.com (github.com)|140.82.113.4|:443... connected.

HTTP request sent, awaiting response... 302 Found

Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/1335132/d2f252e2-9801-11e7-9fbf-bc7b4e4b5c83?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=releaseassetproduction%2F20240519%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240519T181915Z&X-Amz-Expires=300&X-Amz-Signature=045075abb7080c2f13c52aa035461790f264888db38b7ac4c151bbefd7b71b0c&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=1335132&response-content-disposition=attachment%3B%20filename%3Dninja-linux.zip&response-content-type=application%2Foctet-stream [following]

--2024-05-19 18:19:15--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/1335132/d2f252e2-9801-11e7-9fbf-bc7b4e4b5c83?X-Amz-

In [10]:
os.chdir(f'/kaggle/working/psp/pixel2style2pixel')

In [35]:
%%writefile /kaggle/working/psp/pixel2style2pixel/models/encoders/psp_encoders.py
import numpy as np
import torch
import torch.nn.functional as F
from torch import nn
from torch.nn import Linear, Conv2d, BatchNorm2d, PReLU, Sequential, Module

from models.encoders.helpers import get_blocks, Flatten, bottleneck_IR, bottleneck_IR_SE
from models.stylegan2.model import EqualLinear

import torch.nn as nn

class TransposeConvNet(nn.Module):
    def __init__(self):
        super(TransposeConvNet, self).__init__()
        
        # Transpose convolution layers

        self.conv1 = nn.ConvTranspose2d(in_channels=3, out_channels=64, kernel_size=4, stride=2, padding=1)
        self.conv2 = nn.ConvTranspose2d(in_channels=64, out_channels=128, kernel_size=4, stride=2, padding=1)
        self.conv3 = nn.ConvTranspose2d(in_channels=128, out_channels=128, kernel_size=3, stride=1, padding=1)
        
        # Batch normalization layers
        self.bn1 = nn.BatchNorm2d(64)
        self.bn2 = nn.BatchNorm2d(128)
        
        # ReLU activation function
        self.relu = nn.ReLU()
        
    def forward(self, x):
        # Forward pass through the network
        
        x = self.relu(self.bn1(self.conv1(x)))
        x = self.relu(self.bn2(self.conv2(x)))
        x = self.conv3(x)
        return x

class GradualStyleBlock(Module):
    def __init__(self, in_c, out_c, spatial):
        super(GradualStyleBlock, self).__init__()
        self.out_c = out_c
        self.spatial = spatial
        num_pools = int(np.log2(spatial))
        modules = []
        modules += [Conv2d(in_c, out_c, kernel_size=3, stride=2, padding=1),
                    nn.LeakyReLU()]
        for i in range(num_pools - 1):
            modules += [
                Conv2d(out_c, out_c, kernel_size=3, stride=2, padding=1),
                nn.LeakyReLU()
            ]
        self.convs = nn.Sequential(*modules)
        self.linear = EqualLinear(out_c, out_c, lr_mul=1)

    def forward(self, x):
        x = self.convs(x)
        x = x.view(-1, self.out_c)
        x = self.linear(x)
        return x


class GradualStyleEncoder(Module):
    def __init__(self, num_layers, mode='ir', opts=None):
        super(GradualStyleEncoder, self).__init__()
        assert num_layers in [50, 100, 152], 'num_layers should be 50,100, or 152'
        assert mode in ['ir', 'ir_se'], 'mode should be ir or ir_se'
        blocks = get_blocks(num_layers)
        if mode == 'ir':
            unit_module = bottleneck_IR
        elif mode == 'ir_se':
            unit_module = bottleneck_IR_SE
        self.condition_layer = TransposeConvNet()
        
        
        
        self.cat_size = 768
        self.out_size = 512
        
        self.conv = nn.Conv2d(self.cat_size, self.out_size, kernel_size=1)
        self.input_layer = Sequential(Conv2d(3, 64, (3, 3), 1, 1, bias=False),
                                      BatchNorm2d(64),
                                      PReLU(64))
        modules = []
        for block in blocks:
            for bottleneck in block:
                modules.append(unit_module(bottleneck.in_channel,
                                           bottleneck.depth,
                                           bottleneck.stride))
        self.body = Sequential(*modules)

        self.styles = nn.ModuleList()
        self.style_count = opts.n_styles
        self.coarse_ind = 3
        self.middle_ind = 7
        for i in range(self.style_count):
            if i < self.coarse_ind:
                style = GradualStyleBlock(512, 512, 16)
            elif i < self.middle_ind:
                style = GradualStyleBlock(512, 512, 32)
            else:
                style = GradualStyleBlock(512, 512, 64)
            self.styles.append(style)
        self.latlayer1 = nn.Conv2d(256, 512, kernel_size=1, stride=1, padding=0)
        self.latlayer2 = nn.Conv2d(128, 512, kernel_size=1, stride=1, padding=0)
        self.alpha = 0.2

    def _upsample_add(self, x, y):
        '''Upsample and add two feature maps.
        Args:
          x: (Variable) top feature map to be upsampled.
          y: (Variable) lateral feature map.
        Returns:
          (Variable) added feature map.
        Note in PyTorch, when input size is odd, the upsampled feature map
        with `F.upsample(..., scale_factor=2, mode='nearest')`
        maybe not equal to the lateral feature map size.
        e.g.
        original input size: [N,_,15,15] ->
        conv2d feature map size: [N,_,8,8] ->
        upsampled feature map size: [N,_,16,16]
        So we choose bilinear upsample which supports arbitrary output sizes.
        '''
        _, _, H, W = y.size()
        return F.interpolate(x, size=(H, W), mode='bilinear', align_corners=True) + y

    def forward(self, x, y_hist):
        #condition = self.condition_layer(y_hist)
        #print(condition.shape)
        #print(f"{x.shape}: xshape")
        #print(f"{y_hist.shape}: y_hist shape")
        #print(f"{condition.shape}: condition shape")
        #print(condition.shape)
        # batch size = 4
        x = self.input_layer(x)

        latents = []
        modulelist = list(self.body._modules.values())
        for i, l in enumerate(modulelist):
            x = l(x)
            if i == 6:
                c1 = x
            elif i == 20:
                c2 = x
            elif i == 23:
                c3 = x
        
        #c1_conditioned = torch.cat((c1, condition), dim=1)
        
        #c1_conditioned = self.conv(c1_conditioned)

        for j in range(self.coarse_ind):
            latents.append(self.styles[j](c3))
        
        out = torch.stack(latents, dim=1)

        p2 = self._upsample_add(c3, self.latlayer1(c2))
        for j in range(self.coarse_ind, self.middle_ind):
            latents.append(self.styles[j](p2))
        
        out = torch.stack(latents, dim=1)

        p1 = self._upsample_add(p2, self.latlayer2(c1))
        
        p1_conditioned = torch.cat((p1, y_hist), dim=1)
        
        p1_conditioned = self.conv(p1_conditioned)
        
        
        for j in range(self.middle_ind, self.style_count):
            latents.append(self.styles[j](p1_conditioned))
            

        out = torch.stack(latents, dim=1)
        return out

"""
After input layer:  torch.Size([4, 64, 256, 256])
c1 shape:  torch.Size([4, 128, 64, 64])
c2 shape:  torch.Size([4, 256, 32, 32])
c3 shape:  torch.Size([4, 512, 16, 16])
After coarse layers:  torch.Size([4, 3, 512])
p2 shape:  torch.Size([4, 512, 32, 32])
After mid layers:  torch.Size([4, 7, 512])
p1 shape:  torch.Size([4, 512, 64, 64])
Output shape:  torch.Size([4, 18, 512])
"""
class BackboneEncoderUsingLastLayerIntoW(Module):
    def __init__(self, num_layers, mode='ir', opts=None):
        super(BackboneEncoderUsingLastLayerIntoW, self).__init__()
        print('Using BackboneEncoderUsingLastLayerIntoW')
        assert num_layers in [50, 100, 152], 'num_layers should be 50,100, or 152'
        assert mode in ['ir', 'ir_se'], 'mode should be ir or ir_se'
        blocks = get_blocks(num_layers)
        if mode == 'ir':
            unit_module = bottleneck_IR
        elif mode == 'ir_se':
            unit_module = bottleneck_IR_SE
        self.input_layer = Sequential(Conv2d(opts.input_nc, 64, (3, 3), 1, 1, bias=False),
                                      BatchNorm2d(64),
                                      PReLU(64))
        self.output_pool = torch.nn.AdaptiveAvgPool2d((1, 1))
        self.linear = EqualLinear(512, 512, lr_mul=1)
        modules = []
        for block in blocks:
            for bottleneck in block:
                modules.append(unit_module(bottleneck.in_channel,
                                           bottleneck.depth,
                                           bottleneck.stride))
        self.body = Sequential(*modules)

    def forward(self, x):
        x = self.input_layer(x)
        x = self.body(x)
        x = self.output_pool(x)
        x = x.view(-1, 512)
        x = self.linear(x)
        return x


class BackboneEncoderUsingLastLayerIntoWPlus(Module):
    def __init__(self, num_layers, mode='ir', opts=None):
        super(BackboneEncoderUsingLastLayerIntoWPlus, self).__init__()
        print('Using BackboneEncoderUsingLastLayerIntoWPlus')
        assert num_layers in [50, 100, 152], 'num_layers should be 50,100, or 152'
        assert mode in ['ir', 'ir_se'], 'mode should be ir or ir_se'
        blocks = get_blocks(num_layers)
        if mode == 'ir':
            unit_module = bottleneck_IR
        elif mode == 'ir_se':
            unit_module = bottleneck_IR_SE
        self.n_styles = opts.n_styles
        self.input_layer = Sequential(Conv2d(opts.input_nc, 64, (3, 3), 1, 1, bias=False),
                                      BatchNorm2d(64),
                                      PReLU(64))
        self.output_layer_2 = Sequential(BatchNorm2d(512),
                                         torch.nn.AdaptiveAvgPool2d((7, 7)),
                                         Flatten(),
                                         Linear(512 * 7 * 7, 512))
        self.linear = EqualLinear(512, 512 * self.n_styles, lr_mul=1)
        modules = []
        for block in blocks:
            for bottleneck in block:
                modules.append(unit_module(bottleneck.in_channel,
                                           bottleneck.depth,
                                           bottleneck.stride))
        self.body = Sequential(*modules)

    def forward(self, x):
        x = self.input_layer(x)
        x = self.body(x)
        x = self.output_layer_2(x)
        x = self.linear(x)
        x = x.view(-1, self.n_styles, 512)
        return x


Overwriting /kaggle/working/psp/pixel2style2pixel/models/encoders/psp_encoders.py


In [36]:
%%writefile /kaggle/working/psp/pixel2style2pixel/models/psp.py
"""
This file defines the core research contribution
"""
import matplotlib
matplotlib.use('Agg')
import math

import torch
from torch import nn
from models.encoders import psp_encoders
from models.stylegan2.model import Generator
from configs.paths_config import model_paths


def get_keys(d, name):
	if 'state_dict' in d:
		d = d['state_dict']
	d_filt = {k[len(name) + 1:]: v for k, v in d.items() if k[:len(name)] == name}
	return d_filt


class pSp(nn.Module):

	def __init__(self, opts):
		super(pSp, self).__init__()
		self.set_opts(opts)
		# compute number of style inputs based on the output resolution
		self.opts.n_styles = int(math.log(self.opts.output_size, 2)) * 2 - 2
		# Define architecture
		self.encoder = self.set_encoder()
		self.decoder = Generator(self.opts.output_size, 512, 8)
		self.face_pool = torch.nn.AdaptiveAvgPool2d((256, 256))
		# Load weights if needed
		self.load_weights()

	def set_encoder(self):
		if self.opts.encoder_type == 'GradualStyleEncoder':
			encoder = psp_encoders.GradualStyleEncoder(50, 'ir_se', self.opts)
		elif self.opts.encoder_type == 'BackboneEncoderUsingLastLayerIntoW':
			encoder = psp_encoders.BackboneEncoderUsingLastLayerIntoW(50, 'ir_se', self.opts)
		elif self.opts.encoder_type == 'BackboneEncoderUsingLastLayerIntoWPlus':
			encoder = psp_encoders.BackboneEncoderUsingLastLayerIntoWPlus(50, 'ir_se', self.opts)
		else:
			raise Exception('{} is not a valid encoders'.format(self.opts.encoder_type))
		return encoder

	def load_weights(self):
		if self.opts.checkpoint_path is not None:
			print('Loading pSp from checkpoint: {}'.format(self.opts.checkpoint_path))
			ckpt = torch.load(self.opts.checkpoint_path, map_location='cpu')
			self.encoder.load_state_dict(get_keys(ckpt, 'encoder'), strict=True)
			self.decoder.load_state_dict(get_keys(ckpt, 'decoder'), strict=True)
			self.__load_latent_avg(ckpt)
		else:
			print('Loading encoders weights from irse50!')
			encoder_ckpt = torch.load(model_paths['ir_se50'])
			# if input to encoder is not an RGB image, do not load the input layer weights
			if self.opts.label_nc != 0:
				encoder_ckpt = {k: v for k, v in encoder_ckpt.items() if "input_layer" not in k}
			self.encoder.load_state_dict(encoder_ckpt, strict=False)
			print('Loading decoder weights from pretrained!')
			ckpt = torch.load(self.opts.stylegan_weights)
			self.decoder.load_state_dict(ckpt['g_ema'], strict=False)
			if self.opts.learn_in_w:
				self.__load_latent_avg(ckpt, repeat=1)
			else:
				self.__load_latent_avg(ckpt, repeat=self.opts.n_styles)

	def forward(self, x, y_hist, resize=True, latent_mask=None, input_code=False, randomize_noise=True,
	            inject_latent=None, return_latents=False, alpha=None):
		if input_code:
			codes = x
		else:
            
			codes = self.encoder(x, y_hist)
			# normalize with respect to the center of an average face
			if self.opts.start_from_latent_avg:
				if self.opts.learn_in_w:
					codes = codes + self.latent_avg.repeat(codes.shape[0], 1)
				else:
					codes = codes + self.latent_avg.repeat(codes.shape[0], 1, 1)


		if latent_mask is not None:
			for i in latent_mask:
				if inject_latent is not None:
					if alpha is not None:
						codes[:, i] = alpha * inject_latent[:, i] + (1 - alpha) * codes[:, i]
					else:
						codes[:, i] = inject_latent[:, i]
				else:
					codes[:, i] = 0

		input_is_latent = not input_code
		images, result_latent = self.decoder([codes],
		                                     input_is_latent=input_is_latent,
		                                     randomize_noise=randomize_noise,
		                                     return_latents=return_latents)

		if resize:
			images = self.face_pool(images)

		if return_latents:
			return images, result_latent
		else:
			return images

	def set_opts(self, opts):
		self.opts = opts

	def __load_latent_avg(self, ckpt, repeat=None):
		if 'latent_avg' in ckpt:
			self.latent_avg = ckpt['latent_avg'].to(self.opts.device)
			if repeat is not None:
				self.latent_avg = self.latent_avg.repeat(repeat, 1)
		else:
			self.latent_avg = None


Overwriting /kaggle/working/psp/pixel2style2pixel/models/psp.py


In [37]:
from argparse import Namespace
import time
import sys
import pprint
import numpy as np
from PIL import Image
import torch
import torchvision.transforms as transforms

sys.path.append(".")
sys.path.append("..")

from datasets import augmentations
from utils.common import tensor2im, log_input_image
from models.psp import pSp

%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [38]:
!pip install gdown
!gdown 1o9GsYgdMci5isBuCgcDx8xcSAzIPlrmn

Collecting gdown
  Downloading gdown-5.2.0-py3-none-any.whl.metadata (5.8 kB)
Downloading gdown-5.2.0-py3-none-any.whl (18 kB)
Installing collected packages: gdown
Successfully installed gdown-5.2.0
Downloading...
From (original): https://drive.google.com/uc?id=1o9GsYgdMci5isBuCgcDx8xcSAzIPlrmn
From (redirected): https://drive.google.com/uc?id=1o9GsYgdMci5isBuCgcDx8xcSAzIPlrmn&confirm=t&uuid=6026485b-942a-4b1f-bfd6-5b6284fb7ced
To: /kaggle/working/psp/pixel2style2pixel/1d_hist_loss.pt
100%|███████████████████████████████████████| 1.20G/1.20G [00:06<00:00, 200MB/s]


In [13]:
import shutil

# Define the path to the file in your Google Drive
drive_file_path = '/kaggle/input/model_30k/pytorch/rgb-30k/1/model_rgb_loss.pt'

os.mkdir("/kaggle/working/psp/pixel2style2pixel/pretrained_models")

# Define the destination path in your Colab environment
destination_path = '/kaggle/working/psp/pixel2style2pixel/pretrained_models/model_rgb_loss.pt'

# Create the destination folder if it doesn't exist
#!mkdir -p /content/my_folder

# Copy the file
shutil.copy(drive_file_path, destination_path)

'/kaggle/working/psp/pixel2style2pixel/pretrained_models/model_rgb_loss.pt'

In [14]:
#@title Select which experiment you wish to perform inference on: { run: "auto" }
experiment_type = 'ffhq_encode' #@param ['ffhq_encode', 'ffhq_frontalize', 'celebs_sketch_to_face', 'celebs_seg_to_face', 'celebs_super_resolution', 'toonify']

In [39]:
EXPERIMENT_DATA_ARGS = {
    "ffhq_encode": {
        "model_path": "/kaggle/working/psp/pixel2style2pixel/1d_hist_loss.pt",
        "image_path": "/home/melih.cosgun/GENERATIVE/fid_psp/images/test/00031.jpg",
        "transform": transforms.Compose([
            transforms.Resize((256, 256)),
            transforms.Grayscale(num_output_channels=3),
            transforms.ToTensor(),
            transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])])
    },
    "ffhq_frontalize": {
        "model_path": "pretrained_models/psp_ffhq_frontalization.pt",
        "image_path": "notebooks/images/input_img.jpg",
        "transform": transforms.Compose([
            transforms.Resize((256, 256)),
            transforms.ToTensor(),
            transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])])
    },
    "celebs_sketch_to_face": {
        "model_path": "pretrained_models/psp_celebs_sketch_to_face.pt",
        "image_path": "notebooks/images/input_sketch.jpg",
        "transform": transforms.Compose([
            transforms.Resize((256, 256)),
            transforms.ToTensor()])
    },
    "celebs_seg_to_face": {
        "model_path": "pretrained_models/psp_celebs_seg_to_face.pt",
        "image_path": "notebooks/images/input_mask.png",
        "transform": transforms.Compose([
            transforms.Resize((256, 256)),
            augmentations.ToOneHot(n_classes=19),
            transforms.ToTensor()])
    },
    "celebs_super_resolution": {
        "model_path": "pretrained_models/psp_celebs_super_resolution.pt",
        "image_path": "notebooks/images/input_img.jpg",
        "transform": transforms.Compose([
            transforms.Resize((256, 256)),
            augmentations.BilinearResize(factors=[16]),
            transforms.Resize((256, 256)),
            transforms.ToTensor(),
            transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])])
    },
    "toonify": {
        "model_path": "pretrained_models/psp_ffhq_toonify.pt",
        "image_path": "notebooks/images/input_img.jpg",
        "transform": transforms.Compose([
            transforms.Resize((256, 256)),
            transforms.ToTensor(),
            transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])])
    },
}

In [40]:
EXPERIMENT_ARGS = EXPERIMENT_DATA_ARGS[experiment_type]

In [41]:
if os.path.getsize(EXPERIMENT_ARGS['model_path']) < 1000000:
  raise ValueError("Pretrained model was unable to be downlaoded correctly!")

In [42]:
model_path = EXPERIMENT_ARGS['model_path']
ckpt = torch.load(model_path, map_location='cpu')

In [43]:
opts = ckpt['opts']
pprint.pprint(opts)

{'batch_size': 4,
 'board_interval': 50,
 'checkpoint_path': None,
 'dataset_type': 'ffhq_encode',
 'device': 'cuda:0',
 'encoder_type': 'GradualStyleEncoder',
 'exp_dir': 'exp/run2',
 'id_lambda': 0.1,
 'image_interval': 100,
 'input_nc': 3,
 'l2_lambda': 1.0,
 'l2_lambda_crop': 0,
 'label_nc': 0,
 'learn_in_w': False,
 'learning_rate': 0.0001,
 'lpips_lambda': 0.8,
 'lpips_lambda_crop': 0,
 'max_steps': 500000,
 'moco_lambda': 0,
 'n_styles': 18,
 'optim_name': 'ranger',
 'output_size': 1024,
 'resize_factors': None,
 'save_interval': 10000,
 'start_from_latent_avg': True,
 'stylegan_weights': 'pretrained_models/stylegan2-ffhq-config-f.pt',
 'test_batch_size': 4,
 'test_workers': 4,
 'train_decoder': False,
 'use_wandb': False,
 'val_interval': 10000,
 'w_norm_lambda': 0,
 'workers': 4}


In [44]:
# update the training options
opts['checkpoint_path'] = model_path
if 'learn_in_w' not in opts:
    opts['learn_in_w'] = False
if 'output_size' not in opts:
    opts['output_size'] = 1024

In [45]:
opts = Namespace(**opts)
net = pSp(opts)
net.eval()
net.cuda()
print('Model successfully loaded!')

Loading pSp from checkpoint: /kaggle/working/psp/pixel2style2pixel/1d_hist_loss.pt
Model successfully loaded!


In [23]:
!wget http://dlib.net/files/shape_predictor_68_face_landmarks.dat.bz2
!bzip2 -dk shape_predictor_68_face_landmarks.dat.bz2

--2024-05-19 19:25:54--  http://dlib.net/files/shape_predictor_68_face_landmarks.dat.bz2
Resolving dlib.net (dlib.net)... 107.180.26.78
Connecting to dlib.net (dlib.net)|107.180.26.78|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 64040097 (61M)
Saving to: 'shape_predictor_68_face_landmarks.dat.bz2'


2024-05-19 19:25:54 (78.5 MB/s) - 'shape_predictor_68_face_landmarks.dat.bz2' saved [64040097/64040097]



In [89]:
def run_alignment(image_path):
  import dlib
  from scripts.align_all_parallel import align_face
  predictor = dlib.shape_predictor("shape_predictor_68_face_landmarks.dat")
  aligned_image = align_face(filepath=image_path, predictor=predictor)
  print("Aligned image has shape: {}".format(aligned_image.size))
  return aligned_image

In [24]:
img_transforms = EXPERIMENT_ARGS['transform']

In [46]:
import cv2
from PIL import Image
import torch
import torch.nn.functional as F

def compute_histogram(image, N=64):
    # Split the image into its RGB components
    b = image[0, :, :]  # Blue channel
    g = image[1, :, :]  # Green channel
    r = image[2, :, :]  # Red channel
    
    # Calculate histograms
    hist_r = cv2.calcHist([r], [0], None, [256], [0, 256])
    hist_g = cv2.calcHist([g], [0], None, [256], [0, 256])
    hist_b = cv2.calcHist([b], [0], None, [256], [0, 256])
    
    # Flatten histograms and convert to a torch tensor
    histogram = np.array([hist_r, hist_g, hist_b])
    histogram = torch.from_numpy(histogram)
    
    hist = histogram.unsqueeze(0).unsqueeze(0)
    
    # Resize histogram to (256, 64, 64)
    histogram_resized = F.interpolate(hist, size=(256, 64, 64), mode='nearest')  # Shape: (1, 256, 64, 64)
    
    # Squeeze and return
    return histogram_resized.squeeze().squeeze()

In [None]:

cond_path = "/kaggle/input/celebhq8020/celebahq256_all/test/00042.jpg"
condition_img = Image.open(cond_path)
condition_img = condition_img.convert('RGB')

transform = transforms.Compose([
				transforms.Resize((256, 256)),
				transforms.ToTensor(),
				transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])])

condition_img = transform(condition_img)
condition_img = np.array(condition_img)
def compute_histogram(image, N=64):
    # Split the image into its RGB components
    b = image[0, :, :]  # Blue channel
    g = image[1, :, :]  # Green channel
    r = image[2, :, :]  # Red channel
    
    # Calculate histograms
    hist_r = cv2.calcHist([r], [0], None, [256], [0, 256])
    hist_g = cv2.calcHist([g], [0], None, [256], [0, 256])
    hist_b = cv2.calcHist([b], [0], None, [256], [0, 256])
    
    # Flatten histograms and convert to a torch tensor
    histogram = np.array([hist_r, hist_g, hist_b])
    histogram = torch.from_numpy(histogram)
    
    hist = histogram.unsqueeze(0).unsqueeze(0)
    
    # Resize histogram to (256, 64, 64)
    histogram_resized = F.interpolate(hist, size=(256, 64, 64), mode='nearest')  # Shape: (1, 256, 64, 64)
    
    # Squeeze and return
    return histogram_resized.squeeze().squeeze()


In [48]:
def run_on_batch(inputs, cond, net, latent_mask=None):
    if latent_mask is None:
        result_batch = net(inputs.to("cuda").float(),cond.to("cuda").float(),randomize_noise=False)
    else:
        result_batch = []
        for image_idx, input_image in enumerate(inputs):
            # get latent vector to inject into our input image
            vec_to_inject = np.random.randn(1, 512).astype('float32')
            _, latent_to_inject = net(torch.from_numpy(vec_to_inject).to("cuda"),
                                      input_code=True,
                                      return_latents=True)
            # get output image with injected style vector
            res = net(input_image.unsqueeze(0).to("cuda").float(),
                      latent_mask=latent_mask,
                      inject_latent=latent_to_inject)
            result_batch.append(res)
        result_batch = torch.cat(result_batch, dim=0)
    return result_batch

In [26]:
#if experiment_type in ["celebs_sketch_to_face", "celebs_seg_to_face"]:
#    latent_mask = [8, 9, 10, 11, 12, 13, 14, 15, 16, 17]
#else:
latent_mask = None

In [56]:
input_folder = '/kaggle/working/outputs/test3'
output_folder = '/kaggle/working/outputs/cond_output3'
os.makedirs(output_folder, exist_ok=True)

# List all files in the input folder
image_names = os.listdir(input_folder)

counter = 0
fault_counter = 0


#images = [Image.open(os.path.join(input_folder, image_name)) for image_name in image_names]

#transformed = img_transforms(image_names)


# Iterate through each image
for image_name in image_names:
    try:
        print(f"Count: {counter}")
        input_image = Image.open(os.path.join(input_folder, image_name))
        #image_path = os.path.join(input_folder, image_name)
        #input_image = run_alignment(image_path)

        img_transforms = EXPERIMENT_ARGS['transform']
        transformed_image = img_transforms(input_image)
        
        transform = transforms.Compose([
                transforms.Resize((256, 256)),
                transforms.ToTensor(),
                transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])])

        condition_img = transform(input_image)
        condition_img = np.array(condition_img)
        condition = compute_histogram(condition_img)

        with torch.no_grad():
            tic = time.time()
            print(latent_mask)
            result_image = run_on_batch(transformed_image.unsqueeze(0), condition.unsqueeze(0), net, latent_mask)[0]
            toc = time.time()
            print('Inference took {:.4f} seconds.'.format(toc - tic))

        input_vis_image = log_input_image(transformed_image, opts)
        output_image = tensor2im(result_image)

        # Save the image
        output_image_path = os.path.join(output_folder, image_name)

        # Assuming `result_image` is a PIL image object
        output_image.save(output_image_path)

        # Assuming `result_image` is a NumPy array
        # Convert it back to a PIL image before saving if necessary
        # Image.fromarray(result_image).save(output_image_path)

        print(f"Image saved to: {output_image_path}")

        counter += 1
    except UnboundLocalError as e:
        print(f"An error occurred: {e}")
        fault_counter += 1
        continue

Count: 0
None
Inference took 0.0435 seconds.
Image saved to: /kaggle/working/outputs/cond_output3/27328.jpg
Count: 1
None
Inference took 0.0347 seconds.
Image saved to: /kaggle/working/outputs/cond_output3/36990.jpg
Count: 2
None
Inference took 0.0356 seconds.
Image saved to: /kaggle/working/outputs/cond_output3/06417.jpg
Count: 3
None
Inference took 0.0348 seconds.
Image saved to: /kaggle/working/outputs/cond_output3/52351.jpg
Count: 4
None
Inference took 0.0357 seconds.
Image saved to: /kaggle/working/outputs/cond_output3/15060.jpg
Count: 5
None
Inference took 0.0340 seconds.
Image saved to: /kaggle/working/outputs/cond_output3/25460.jpg
Count: 6
None
Inference took 0.0351 seconds.
Image saved to: /kaggle/working/outputs/cond_output3/58559.jpg
Count: 7
None
Inference took 0.0362 seconds.
Image saved to: /kaggle/working/outputs/cond_output3/37613.jpg
Count: 8
None
Inference took 0.0352 seconds.
Image saved to: /kaggle/working/outputs/cond_output3/50858.jpg
Count: 9
None
Inference took

In [57]:
from cleanfid import fid
psp_score = fid.compute_fid('/kaggle/working/outputs/ground_truth3', '/kaggle/working/outputs/cond_output3')
print(f"psp Score: {psp_score}")

compute FID between two folders
Found 2500 images in the folder /kaggle/working/outputs/ground_truth3


FID ground_truth3 : 100%|██████████| 79/79 [00:19<00:00,  3.99it/s]


Found 2500 images in the folder /kaggle/working/outputs/cond_output3


FID cond_output3 : 100%|██████████| 79/79 [00:19<00:00,  4.11it/s]


psp Score: 131.87282379156045


In [28]:
import numpy as np
import torch
import torchvision.transforms as transforms
from torchvision.models import inception_v3
from scipy.linalg import sqrtm
from PIL import Image
import os

def load_images(image_paths, transform, device):
    images = []
    for path in image_paths:
        img = Image.open(path).convert('RGB')
        img = transform(img)
        images.append(img)
    images = torch.stack(images).to(device)
    return images

def get_activations(images, model):
    with torch.no_grad():
        pred = model(images)
    return pred.cpu().numpy()

def calculate_fid(mu1, sigma1, mu2, sigma2):
    ssdiff = np.sum((mu1 - mu2) ** 2.0)
    covmean = sqrtm(sigma1.dot(sigma2))
    if np.iscomplexobj(covmean):
        covmean = covmean.real
    fid = ssdiff + np.trace(sigma1 + sigma2 - 2.0 * covmean)
    return fid

def compute_statistics(images, model):
    act = get_activations(images, model)
    mu = np.mean(act, axis=0)
    sigma = np.cov(act, rowvar=False)
    return mu, sigma

def main(real_dir, gen_dir):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    transform = transforms.Compose([
        transforms.Resize((299, 299)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])

    inception_model = inception_v3(pretrained=True, transform_input=False).to(device)
    inception_model.eval()

    real_image_paths = [os.path.join(real_dir, img) for img in os.listdir(real_dir)]
    gen_image_paths = [os.path.join(gen_dir, img) for img in os.listdir(gen_dir)]

    real_images = load_images(real_image_paths, transform, device)
    gen_images = load_images(gen_image_paths, transform, device)

    mu_real, sigma_real = compute_statistics(real_images, inception_model)
    mu_gen, sigma_gen = compute_statistics(gen_images, inception_model)

    fid_value = calculate_fid(mu_real, sigma_real, mu_gen, sigma_gen)
    print(f'FID: {fid_value}')


In [29]:
a = main('/kaggle/working/outputs/ground_truth', "/kaggle/working/outputs/output")


Downloading: "https://download.pytorch.org/models/inception_v3_google-0cc3c7bd.pth" to /root/.cache/torch/hub/checkpoints/inception_v3_google-0cc3c7bd.pth
100%|██████████| 104M/104M [00:01<00:00, 83.8MB/s] 


OutOfMemoryError: CUDA out of memory. Tried to allocate 5.29 GiB. GPU 0 has a total capacty of 15.89 GiB of which 4.95 GiB is free. Process 2737 has 10.95 GiB memory in use. Of the allocated memory 10.52 GiB is allocated by PyTorch, and 129.17 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting max_split_size_mb to avoid fragmentation.  See documentation for Memory Management and PYTORCH_CUDA_ALLOC_CONF

In [99]:
torch.cuda.empty_cache()