In [None]:
!pip3 install pytorch_lightning

# Importing libraries

In [3]:
# General libraries
import pandas as pd  
import numpy as np  
import cv2        
import os
import matplotlib.pyplot as plt

import torch
from torch import nn, optim
import torchvision
from torchvision import transforms, datasets, models, utils
from torch.utils.data import Dataset, DataLoader 
from PIL import Image
from sklearn.model_selection import train_test_split
from torch.nn import functional as F
from skimage import io, transform
from torch.optim import lr_scheduler
from skimage.transform import AffineTransform, warp

import pytorch_lightning as pl
from pytorch_lightning import Trainer

# Loading data using Dataset and DataLoader

In [4]:
class MyData(Dataset):
    def __init__(self, path_dir, test_ratio=0.2, train=True, transform=None):
        """
            Input:
                path_dir: train folder
                test_ratio: split size
            formart image_name: <number_id>_A<age>_G<0,1>.png
        """
        list_age = []
        list_gender = []
        list_path = []
        for image_name in os.listdir(path_dir):
            age = ((image_name.split(".")[0]).split("_")[1]).split("A")[-1]
            gender = ((image_name.split(".")[0]).split("_")[2]).split("G")[-1]
            image_path = os.path.join(path_dir, image_name)
            
            list_age.append(float(age))
            list_gender.append(int(gender))
            list_path.append(image_path)
        
        # max age
        self.max_age = max(list_age)

        # normalize age
        list_age = [age / self.max_age for age in list_age]

        # #Splitting the data into train and validation set
        X_train, X_test, y_age_train, y_age_test, y_gender_train, y_gender_test = \
        train_test_split(list_path, list_age, list_gender, test_size=test_ratio)
        
        if train:
            self.X = X_train
            self.age_y = y_age_train
            self.gender_y = y_gender_train
        else:
            self.X = X_test
            self.age_y = y_age_test
            self.gender_y = y_gender_test
        
        # apply transformation
        self.transform=transform
    
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        image = cv2.imread(self.X[idx])
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = image.astype('float')
        age = np.array(self.age_y[idx]).astype('float')
        gender = np.array(self.gender_y[idx]).astype('float')

        sample={'image': image, 'label_age': age, 'label_gender': gender}

        if self.transform:
            sample = self.transform(sample)
        
        return sample

In [5]:
class RGBToTensor(object):
    """Convert ndarrays in sample to Tensors."""
    def __call__(self, sample):
        image, age, gender = sample['image'], sample['label_age'], sample['label_gender']

        # swap color axis because
        # numpy image: H x W x C
        # torch image: C x H x W
        image = torch.from_numpy(image).permute(2, 0, 1).float()
        age = torch.from_numpy(age).float()
        gender = torch.from_numpy(gender).float()

        return {'image': image,
                'label_age': age,
                'label_gender': gender}

In [6]:
transformed_train_data = MyData(path_dir="mega_age_gender", test_ratio=0.2, train=True, transform=transforms.Compose([RGBToTensor()]))
transformed_test_data = MyData(path_dir="mega_age_gender", test_ratio=0.2, train=False, transform=transforms.Compose([RGBToTensor()]))

train_dataloader = DataLoader(transformed_train_data, batch_size=32, shuffle=True)
test_dataloader = DataLoader(transformed_test_data, batch_size=32, shuffle=True)

# device

In [7]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Model

In [8]:
class AgeGenderModel(pl.LightningModule):
    def __init__(self):
        super().__init__()
        self.feature_extractor = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=6, kernel_size=5, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
            nn.Dropout(p=0.5)
        )

        self.fc_age = nn.Linear(73926, 1)  #For age class
        self.fc_gender = nn.Linear(73926, 1)    #For gender class

        self.criterion_binary= nn.BCELoss()
        self.criterion_regression = nn.MSELoss()
        
    def forward(self, x):
        x = self.feature_extractor(x)
        x = x.view(x.size(0), -1)

        age = self.fc_age(x)
        gender= torch.sigmoid(self.fc_gender(x))  

        return {'age': age, 'gender': gender}

    def training_step(self, batch, batch_idx):
      image, label_age, label_gender = batch['image'], batch['label_age'], batch['label_gender']
      
      label_hat = self(image)
      label_age_hat = label_hat['age']
      label_gender_hat = label_hat['gender']

      loss_age = self.criterion_regression(label_age_hat, label_age)
      loss_gender = self.criterion_binary(label_gender_hat, label_gender.unsqueeze(-1))
      loss = loss_age + loss_gender

      self.log('train_loss', loss, on_step=False, on_epoch=True)
      self.log('train_loss_age', loss_age, on_step=False, on_epoch=True)
      self.log('train_loss_gender', loss_gender, on_step=False, on_epoch=True)

      return loss

    def validation_step(self, batch, batch_idx):
      image, label_age, label_gender = batch['image'], batch['label_age'], batch['label_gender']
      
      label_hat = self(image)
      label_age_hat = label_hat['age']
      label_gender_hat = label_hat['gender']

      loss_age = self.criterion_regression(label_age_hat, label_age)
      loss_gender = self.criterion_binary(label_gender_hat, label_gender.unsqueeze(-1))
      loss = loss_age + loss_gender

      self.log('val_loss', loss, on_step=False, on_epoch=True)
      self.log('val_loss_age', loss_age, on_step=False, on_epoch=True)
      self.log('val_loss_gender', loss_gender, on_step=False, on_epoch=True)

      return loss
    
    def test_step(self, batch, batch_idx):
      image, label_age, label_gender = batch['image'], batch['label_age'], batch['label_gender']
      
      label_hat = self(image)
      label_age_hat = label_hat['age']
      label_gender_hat = label_hat['gender']

      loss_age = self.criterion_regression(label_age_hat, label_age)
      loss_gender = self.criterion_binary(label_gender_hat, label_gender.unsqueeze(-1))
      loss = loss_age + loss_gender

      self.log('test_loss', loss, on_step=False, on_epoch=True)
      self.log('test_loss_age', loss_age, on_step=False, on_epoch=True)
      self.log('test_loss_gender', loss_gender, on_step=False, on_epoch=True)

      return loss
    
    def configure_optimizers(self):
      optimizer = torch.optim.SGD(self.parameters(), lr=0.001, momentum=0.9)

      return optimizer

# Train

In [9]:
from pytorch_lightning.callbacks import ModelCheckpoint

checkpoint_callback = ModelCheckpoint(
    monitor="val_loss",
    dirpath="model_checkpoint",
    filename="AgeGender-{epoch:02d}-{val_loss:.2f}",
    save_top_k=3,
    mode="min",
)

In [10]:
pl.seed_everything(0)
model = AgeGenderModel()

trainer = pl.Trainer(
    callbacks=[checkpoint_callback],
    max_epochs=5
)
trainer.fit(model, train_dataloader, test_dataloader)

Global seed set to 0
GPU available: True, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
  "GPU available but not used. Set the gpus flag in your trainer"

  | Name                 | Type       | Params
----------------------------------------------------
0 | feature_extractor    | Sequential | 456   
1 | fc_age               | Linear     | 73.9 K
2 | fc_gender            | Linear     | 73.9 K
3 | criterion_binary     | BCELoss    | 0     
4 | criterion_regression | MSELoss    | 0     
----------------------------------------------------
148 K     Trainable params
0         Non-trainable params
148 K     Total params
0.593     Total estimated model params size (MB)


Validation sanity check: 0it [00:00, ?it/s]

  f"Your {mode}_dataloader has `shuffle=True`, it is best practice to turn"
  return torch.max_pool2d(input, kernel_size, stride, padding, dilation, ceil_mode)
  return F.mse_loss(input, target, reduction=self.reduction)
Global seed set to 0


Training: -1it [00:00, ?it/s]

  return F.mse_loss(input, target, reduction=self.reduction)


Validating: 0it [00:00, ?it/s]

  return F.mse_loss(input, target, reduction=self.reduction)


Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

# Load model

In [11]:
checkpoint_callback.best_model_path

'/content/model_checkpoint/AgeGender-epoch=04-val_loss=0.75.ckpt'

In [13]:
loaded_model = AgeGenderModel()
loaded_model = loaded_model.load_from_checkpoint("model_checkpoint/AgeGender-epoch=04-val_loss=0.75.ckpt")

In [20]:
def predict(image, loaded_model, max_age, gender_threshold=0.5):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = torch.from_numpy(image).permute(2, 0, 1).float()
    image = image.unsqueeze(0) 

    predicted = loaded_model(image)
    age = predicted["age"].item() * max_age
    prob_gender = predicted["gender"].item()
    gender = 1 if prob_gender > gender_threshold else 0
    
    return age, gender

In [21]:
image_path = "mega_age_gender/0_A14_G0.png"
image = cv2.imread(image_path)
pred_age, pred_gender = predict(image, loaded_model, 69)
print("pred_age: {}, pred_gender: {}".format(pred_age, pred_gender))

pred_age: 30.563735783100128, pred_gender: 1
