In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# pip install pytorch-lightning

Collecting pytorch-lightning
  Downloading pytorch_lightning-2.2.0.post0-py3-none-any.whl (800 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m800.9/800.9 kB[0m [31m11.0 MB/s[0m eta [36m0:00:00[0m
Collecting torchmetrics>=0.7.0 (from pytorch-lightning)
  Downloading torchmetrics-1.3.1-py3-none-any.whl (840 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m840.4/840.4 kB[0m [31m56.9 MB/s[0m eta [36m0:00:00[0m
Collecting lightning-utilities>=0.8.0 (from pytorch-lightning)
  Downloading lightning_utilities-0.10.1-py3-none-any.whl (24 kB)
Installing collected packages: lightning-utilities, torchmetrics, pytorch-lightning
Successfully installed lightning-utilities-0.10.1 pytorch-lightning-2.2.0.post0 torchmetrics-1.3.1


In [None]:
from pytorch_lightning import Trainer
import itertools
import pytorch_lightning as pl
import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import transforms
from torchvision.utils import make_grid
import os
from PIL import Image
from torch.utils.data import Dataset
import numpy as np
import torch.nn as nn
import torch.nn.functional as F

IMG_EXTENSIONS = ["png", "jpg"]

In [None]:
import zipfile
zip_ref = zipfile.ZipFile("/content/drive/MyDrive/FaceAging/archive.zip", 'r')
zip_ref.extractall("/content/drive/MyDrive/FaceAging/archive")
zip_ref.close()

In [None]:
# pip install pytorch-lightning==1.5.10

In [None]:
import shutil
import os

inp_dir="/content/drive/MyDrive/FaceAging/archive/UTKFace"
out_dir="/content/drive/MyDrive/FaceAging/archive/processed"


image_names = [x for x in os.listdir(inp_dir) if x.endswith('.jpg')]
print(f"Total images found: {len(image_names)}")

ages = [int(x.split('_')[0]) for x in image_names]

ages_to_keep_a = [x for x in range(18, 29)]
ages_to_keep_b = [x for x in range(40, 120)]

domainA, domainB = [], []
for image_name, age in zip(image_names, ages):
    if age in ages_to_keep_a:
        domainA.append(image_name)
    elif age in ages_to_keep_b:
        domainB.append(image_name)

N = min(len(domainA), len(domainB))
domainA = domainA[:N]
domainB = domainB[:N]

print(f"Image in A: {len(domainA)} and B: {len(domainB)}")

domainA_dir = os.path.join(out_dir, 'trainA')
domainB_dir = os.path.join(out_dir, 'trainB')

os.makedirs(domainA_dir, exist_ok=True)
os.makedirs(domainB_dir, exist_ok=True)

for imageA, imageB in zip(domainA, domainB):
    shutil.copy(os.path.join(inp_dir, imageA), os.path.join(domainA_dir, imageA))
    shutil.copy(os.path.join(inp_dir, imageB), os.path.join(domainB_dir, imageB))


Total images found: 23708
Image in A: 7134 and B: 7134


In [None]:

class ResidualBlock(nn.Module):
    def __init__(self, in_features):
        super(ResidualBlock, self).__init__()

        conv_block = [nn.ReflectionPad2d(1),
                      nn.Conv2d(in_features, in_features, 3),
                      nn.BatchNorm2d(in_features),
                      nn.ReLU(),
                      nn.ReflectionPad2d(1),
                      nn.Conv2d(in_features, in_features, 3),
                      nn.BatchNorm2d(in_features)]

        self.conv_block = nn.Sequential(*conv_block)

    def forward(self, x):
        return x + self.conv_block(x)


class Generator(nn.Module):
    def __init__(self, ngf, n_residual_blocks=9):
        super(Generator, self).__init__()

        # Initial convolution block
        model = [nn.ReflectionPad2d(3),
                 nn.Conv2d(3, ngf, 7),
                 nn.BatchNorm2d(ngf),
                 nn.ReLU()]

        # Downsampling
        in_features = ngf
        out_features = in_features * 2
        for _ in range(2):
            model += [nn.Conv2d(in_features, out_features, 3, stride=2, padding=1),
                      nn.BatchNorm2d(out_features),
                      nn.ReLU()]
            in_features = out_features
            out_features = in_features * 2

        # Residual blocks
        for _ in range(n_residual_blocks):
            model += [ResidualBlock(in_features)]

        # Upsampling
        out_features = in_features // 2
        for _ in range(2):
            model += [nn.ConvTranspose2d(in_features, out_features, 3, stride=2, padding=1, output_padding=1),
                      nn.BatchNorm2d(out_features),
                      nn.ReLU()]
            in_features = out_features
            out_features = in_features // 2

        # Output layer
        model += [nn.ReflectionPad2d(3),
                  nn.Conv2d(ngf, 3, 7),
                  nn.Tanh()]

        self.model = nn.Sequential(*model)

    def forward(self, x):
        return self.model(x)


class Discriminator(nn.Module):
    def __init__(self, ndf):
        super(Discriminator, self).__init__()

        # A bunch of convolutions one after another
        model = [nn.Conv2d(3, ndf, 4, stride=2, padding=1),
                 nn.LeakyReLU(0.2, inplace=True)]

        model += [nn.Conv2d(ndf, ndf * 2, 4, stride=2, padding=1),
                  nn.BatchNorm2d(ndf * 2),
                  nn.LeakyReLU(0.2, inplace=True)]

        model += [nn.Conv2d(ndf * 2, ndf * 4, 4, stride=2, padding=1),
                  nn.InstanceNorm2d(ndf * 4),
                  nn.LeakyReLU(0.2, inplace=True)]

        model += [nn.Conv2d(ndf * 4, ndf * 8, 4, padding=1),
                  nn.InstanceNorm2d(ndf * 8),
                  nn.LeakyReLU(0.2, inplace=True)]

        # FCN classification layer
        model += [nn.Conv2d(ndf * 8, 1, 4, padding=1)]

        self.model = nn.Sequential(*model)

    def forward(self, x):
        x = self.model(x)
        # Average pooling and flatten
        return F.avg_pool2d(x, x.size()[2:]).view(x.size()[0], -1)