# Mini-Project on Age Prediction Using Faces: Code Explanation #

- Transfer Learning: Used InceptionResNetV2 Pretrained Model, replaced last layer with single output layer to make age prediction, replaced first layer to accept single-channel inputs. 
- Data Augmentation: Changed images to grayscale
- Cross-Validation: Chose best model from each fold's epochs based on validation score, took average score of each fold's model and returned as prediction

In [None]:
import numpy as np
import pandas as pd
from glob import glob
from os.path import join
from pathlib import Path
from PIL import Image
import matplotlib.pyplot as plt
from tqdm import tqdm

import torch
import torch.nn as nn
import torchvision
from torchvision import transforms
from torchvision.transforms import Compose, Resize, CenterCrop, ToTensor, Normalize
import torch.optim as optim
from torchvision import models
from torch.utils.data import random_split
import torch.nn.functional as F
import timm

from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import KFold

class AgeDataset(torch.utils.data.Dataset):

    def __init__(self, data_path, annot_path, train=True):
        super(AgeDataset, self).__init__()

        self.annot_path = annot_path
        self.data_path = data_path
        self.train = train

        self.ann = pd.read_csv(annot_path)
        self.files = self.ann['file_id']
        if train:
            self.ages = self.ann['age']
        self.transform = self._transform(224)

    @staticmethod    
    def _convert_image_to_rgb(image):
        return image.convert("RGB")

    # def _transform(self, n_px):
    #     mean = [0.485, 0.456, 0.406]
    #     std = [0.229, 0.224, 0.225]
    #     return Compose([
    #         Resize(n_px),
    #         # Resize((299, 299)),
    #         self._convert_image_to_rgb,
    #         ToTensor(),
    #         Normalize(mean, std),
    #     ])
    def _transform(self, size):
        transform = [
            transforms.Grayscale(num_output_channels=1),
            transforms.Resize((size, size)),
            transforms.ToTensor(),
            # transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
            transforms.Normalize(mean=[0.5], std=[0.5]),
        ]
        
        return transforms.Compose(transform)

    def read_img(self, file_name):
        im_path = join(self.data_path,file_name)   
        img = Image.open(im_path)
        img = self.transform(img)
        return img

    def __getitem__(self, index):
        file_name = self.files[index]
        img = self.read_img(file_name)
        if self.train:
            age = self.ages[index]
            return img, age
        else:
            return img

    def __len__(self):
        return len(self.files)





In [None]:
# # Define the ratio of train and validation data
# train_ratio = 0.9
# val_ratio = 1 - train_ratio

# train_path = './smai-24-age-prediction/content/faces_dataset/train'
# train_ann = './smai-24-age-prediction/content/faces_dataset/train.csv'

train_path = '/kaggle/input/smai-24-age-prediction/content/faces_dataset/train'
train_ann = '/kaggle/input/smai-24-age-prediction/content/faces_dataset/train.csv'
# train_dataset = AgeDataset(train_path, train_ann, train=True)

# # Calculate the number of samples to include in each set
# train_size = int(train_ratio * len(train_dataset))
# val_size = len(train_dataset) - train_size

# # Divide the dataset by randomly selecting samples
# train_dataset, val_dataset = random_split(train_dataset, [train_size, val_size])

# # Create data loaders for the training and validation sets
# train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True)

# val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=64, shuffle=False)

# Define the number of folds
n_folds = 5

# Create a KFold object
kfold = KFold(n_splits=n_folds, shuffle=True)

# Create the full dataset
full_dataset = AgeDataset(train_path, train_ann, train=True)

test_path = '/kaggle/input/smai-24-age-prediction/content/faces_dataset/test'
test_ann = '/kaggle/input/smai-24-age-prediction/content/faces_dataset/submission.csv'

# test_path = './smai-24-age-prediction/content/faces_dataset/test'
# test_ann = './smai-24-age-prediction/content/faces_dataset/submission.csv'
test_dataset = AgeDataset(test_path, test_ann, train=False)

test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=64, shuffle=False)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')



In [None]:
# MLP model

# class MLP(nn.Module):
#     def __init__(self, input_size, hidden_size, output_size):
#         super(MLP, self).__init__()
#         self.fc1 = nn.Linear(input_size, hidden_size)
#         self.fc2 = nn.Linear(hidden_size, output_size)
#         self.relu = nn.ReLU()

#     def forward(self, x):
#         x = x.view(x.size(0), -1)
#         x = self.fc1(x)
#         x = self.relu(x)
#         x = self.fc2(x)
#         return x
    

    
# class CNN(nn.Module):
#     def __init__(self):
#         super(CNN, self).__init__()
#         self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=2)
#         self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=2)
#         self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=2)
#         self.conv4 = nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=2)
#         self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
#         self.dropout = nn.Dropout(0.5) 
#         self.fc1 = nn.Linear(256 * 15 * 15, 1024)
#         self.fc2 = nn.Linear(1024, 512)
#         self.fc3 = nn.Linear(512, 1)

#     def forward(self, x):
#         x = self.pool(F.relu(self.conv1(x)))
#         x = self.pool(F.relu(self.conv2(x)))
#         x = self.pool(F.relu(self.conv3(x)))
#         x = self.pool(F.relu(self.conv4(x)))
#         x = x.view(-1, 256 * 15 * 15)
#         x = F.relu(self.fc1(x))
#         x = self.dropout(x) 
#         x = F.relu(self.fc2(x))
#         x = self.dropout(x)  
#         x = self.fc3(x)
#         return x



# class AgeModel(nn.Module):
#     def __init__(self):
#         super(AgeModel, self).__init__()
#         self.base_model = models.resnet50(pretrained=True)
#         num_ftrs = self.base_model.fc.in_features
#         self.base_model.fc = nn.Linear(num_ftrs, 1)

#     def forward(self, x):
#         return self.base_model(x)
    
# class InceptionModel(nn.Module):
#     def __init__(self):
#         super(InceptionModel, self).__init__()
#         self.base_model = models.inception_v3(pretrained=True)
#         num_ftrs = self.base_model.fc.in_features
#         self.base_model.fc = nn.Linear(num_ftrs, 1)

#     def forward(self, x):
#         return self.base_model(x)

# class ResNeXtModel(nn.Module):
#     def __init__(self):
#         super(ResNeXtModel, self).__init__()
#         self.base_model = models.resnext50_32x4d(pretrained=True)
#         num_ftrs = self.base_model.fc.in_features
#         self.base_model.fc = nn.Linear(num_ftrs, 1)

#     def forward(self, x):
#         return self.base_model(x)
    
# class EfficientNetV2Model(nn.Module):
#     def __init__(self):
#         super(EfficientNetV2Model, self).__init__()
#         self.base_model = timm.create_model('tf_efficientnetv2_s', pretrained=True)
#         num_ftrs = self.base_model.classifier.in_features
#         self.base_model.classifier = nn.Linear(num_ftrs, 1)

#     def forward(self, x):
#         return self.base_model(x)
    
# class SwinSmallTransformerModel(nn.Module):
#     def __init__(self):
#         super(SwinSmallTransformerModel, self).__init__()
#         self.base_model = timm.create_model('swin_tiny_patch4_window7_224', pretrained=True)
#         # Replace the final layer to output a single value for age prediction
#         num_ftrs = self.base_model.head.in_features
#         self.base_model.head = nn.Linear(num_ftrs, 1)
#         self.fc = nn.Linear(49, 1) 

#     def forward(self, x):
#         x = self.base_model(x)
#         x = x.view(x.size(0), -1)  
#         x = self.fc(x)  
#         return x
    
# class MnasNetModel(nn.Module):
#     def __init__(self):
#         super(MnasNetModel, self).__init__()
#         self.base_model = models.mnasnet1_0(pretrained=True)
#         num_ftrs = self.base_model.classifier[1].in_features
#         self.base_model.classifier[1] = nn.Linear(num_ftrs, 1)

#     def forward(self, x):
#         return self.base_model(x)


class InceptionResNetV2Model(nn.Module):
    def __init__(self):
        super(InceptionResNetV2Model, self).__init__()
        self.base_model = timm.create_model('inception_resnet_v2', pretrained=True)
        # Changing first layer to accept single channel input
        self.base_model.conv2d_1a.conv = nn.Conv2d(1, 32, kernel_size=(3, 3), stride=(2, 2), bias=False)
        num_ftrs = self.base_model.classif.in_features
        self.base_model.classif = nn.Linear(num_ftrs, 1)

    def forward(self, x):
        return self.base_model(x)

In [None]:
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# # model = MLP(224*224*3, 512, 1).to(device)
# # model = CNN().to(device)
# # model=InceptionModel().to(device)
# # model=ResNeXtModel().to(device)
# # model=EfficientNetV2Model().to(device)
# # model=SwinSmallTransformerModel().to(device)
# # model=MnasNetModel().to(device)
# model=InceptionResNetV2Model().to(device)
# criterion = nn.MSELoss()
# optimizer = optim.Adam(model.parameters(), lr=0.001)
# # print(model)

# n_epochs = 15
# train_losses = []
# train_mae = []

# for epoch in range(n_epochs):
#     model.train()
#     running_loss = 0.0
#     all_labels = []
#     all_predictions = []
#     progress_bar = tqdm(enumerate(train_loader), total=len(train_loader), leave=False)
#     for i, data in progress_bar:
#         inputs, labels = data
#         inputs, labels = inputs.to(device), labels.to(device)
#         optimizer.zero_grad()
#         outputs = model(inputs)
#         outputs = outputs.view(-1)
#         loss = criterion(outputs, labels.float())
#         loss.backward()
#         optimizer.step()
#         running_loss += loss.item()
#         all_labels.extend(labels.detach().cpu().numpy())
#         all_predictions.extend(outputs.detach().cpu().numpy())
#         progress_bar.set_description(f'Epoch {epoch+1}')
#     train_losses.append(running_loss/len(train_loader))
#     epoch_mae = mean_absolute_error(all_labels, all_predictions)
#     train_mae.append(epoch_mae)
#     print(f'Epoch {epoch+1}, loss: {running_loss/len(train_loader)}, MAE: {epoch_mae}')
    
#     # Validation phase
#     model.eval()
#     val_running_loss = 0.0
#     val_all_labels = []
#     val_all_predictions = []
#     with torch.no_grad():
#         for i, data in enumerate(val_loader, 0):
#             inputs, labels = data
#             inputs, labels = inputs.to(device), labels.to(device)
#             outputs = model(inputs)
#             outputs = outputs.view(-1)
#             loss = criterion(outputs, labels.float())
#             val_running_loss += loss.item()
#             val_all_labels.extend(labels.detach().cpu().numpy())
#             val_all_predictions.extend(outputs.detach().cpu().numpy())

#             # Print the first 10 predictions and actual values
# #             if i < 1:
# #                 print(f'Predicted: {outputs}, Actual: {labels}')

#     val_epoch_mae = mean_absolute_error(val_all_labels, val_all_predictions)
#     print(f'Validation Epoch {epoch+1}, loss: {val_running_loss/len(val_loader)}, MAE: {val_epoch_mae}')
    


In [None]:
# For each fold...
for fold, (train_ids, val_ids) in enumerate(kfold.split(full_dataset)):
    
    # Initialize the best validation loss to infinity
    best_val_loss = float('inf')
    best_val_mae = float('inf')

    # Create train and validation subsets for this fold
    train_subsampler = torch.utils.data.SubsetRandomSampler(train_ids)
    val_subsampler = torch.utils.data.SubsetRandomSampler(val_ids)

    # Create data loaders for the training and validation sets
    train_loader = torch.utils.data.DataLoader(full_dataset, batch_size=64, sampler=train_subsampler)
    val_loader = torch.utils.data.DataLoader(full_dataset, batch_size=64, sampler=val_subsampler)

    # Create a new model for this fold
    model = InceptionResNetV2Model().to(device)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # Train and validate the model on these loaders
    n_epochs = 16
    for epoch in range(n_epochs):
        # Training phase
        model.train()
        train_running_loss = 0.0
        train_all_labels = []
        train_all_predictions = []
        for i, data in tqdm(enumerate(train_loader), total=len(train_loader)):
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.float().to(device)

            optimizer.zero_grad()

            outputs = model(inputs)
            outputs = outputs.view(-1)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            train_running_loss += loss.item()
            train_all_labels.extend(labels.detach().cpu().numpy())
            train_all_predictions.extend(outputs.detach().cpu().numpy())

        train_epoch_mse = train_running_loss / len(train_loader)
        train_epoch_mae = mean_absolute_error(train_all_labels, train_all_predictions)
        print(f'Train Epoch {epoch+1}, MSE: {train_epoch_mse}, MAE: {train_epoch_mae}')

        # Validation phase
        model.eval()
        val_running_loss = 0.0
        val_all_labels = []
        val_all_predictions = []
        with torch.no_grad():
            for i, data in tqdm(enumerate(val_loader), total=len(val_loader)):
                inputs, labels = data
                inputs, labels = inputs.to(device), labels.float().to(device)

                outputs = model(inputs)
                outputs = outputs.view(-1)
                loss = criterion(outputs, labels)

                val_running_loss += loss.item()
                val_all_labels.extend(labels.detach().cpu().numpy())
                val_all_predictions.extend(outputs.detach().cpu().numpy())

        val_epoch_mse = val_running_loss / len(val_loader)
        val_epoch_mae = mean_absolute_error(val_all_labels, val_all_predictions)
        print(f'Validation Epoch {epoch+1}, MSE: {val_epoch_mse}, MAE: {val_epoch_mae}')

        # After each epoch, check if the validation loss has improved
        if val_epoch_mse < best_val_loss:
            # If it has, update the best validation loss
            best_val_loss = val_epoch_mse
            best_val_mae = val_epoch_mae

            # And save the model
            torch.save(model.state_dict(), f'best_model_fold_{fold}.pth')

    print(f'Best validation loss for fold {fold}: {best_val_loss}, Best validation MAE: {best_val_mae}')

In [None]:
@torch.no_grad()
def predict(loader, models):
    # Set all models to evaluation mode
    for model in models:
        model.eval()

    predictions = []

    for img in tqdm(loader):
        img = img.to(device)

        # Compute the prediction of each model
        preds = [model(img) for model in models]

        # Average the predictions
        avg_pred = torch.stack(preds).mean(0)

        predictions.extend(avg_pred.flatten().detach().tolist())

    return predictions

# Load the models
models = [InceptionResNetV2Model().to(device) for _ in range(n_folds)]
for i, model in enumerate(models):
    model.load_state_dict(torch.load(f'best_model_fold_{i}.pth'))

# Compute the predictions
preds = predict(test_loader, models)

submit = pd.read_csv('/kaggle/input/smai-24-age-prediction/content/faces_dataset/submission.csv')
submit['age'] = preds
submit.head()

submit.to_csv('baseline.csv',index=False)