In [None]:
from google.colab import drive
drive.mount("/content/gdrive/", force_remount=True)

In [None]:
!pip install tensorboardX
!pip install pytorchcv

import numpy as np
import torch
from tqdm import tqdm
from torch.utils.data import DataLoader
import torch.cuda
import torch.multiprocessing
import warnings
import torch.nn as nn
from tensorboardX import SummaryWriter
import os
from pytorchcv.model_provider import get_model as ptcv_get_model
from torch.utils.data import Dataset
from imgaug import augmenters as iaa
from PIL import Image
import dlib
import cv2
from collections import OrderedDict

import matplotlib.pyplot as plt
import matplotlib.image as image

warnings.filterwarnings(action='ignore')

torch.backends.cudnn.benchmark = True

step = 0
Acc_val_max = 0.0

In [None]:
imagenet_stats = {'mean': [0.485, 0.456, 0.406],
                    'std': [0.229, 0.224, 0.225]}


def datapair_generator(arg, data, data_age, lb_list, up_list):
    ref_list = []
    test_list = []

    data_age_max = data_age.max()
    data_age_min = data_age.min()

    tau_2 = arg.tau + 5

    lb_list_hard, up_list_hard = get_bounds(data_age_min, data_age_max, tau=tau_2)

    invalid_age = np.setdiff1d(np.arange(int(data_age_max) + 1), np.unique(data_age))

    for i, ref_age in enumerate(data_age):
        ref_age -= 18
        flag = np.random.rand()

        if flag < 0.1:
            start, end = data_age_min, lb_list_hard[ref_age]
        elif 0.1 <= flag < 0.3:
            start, end = lb_list_hard[ref_age], lb_list[ref_age]
        elif 0.3 <= flag < 0.7:
            start, end = lb_list[ref_age], up_list[ref_age]
        elif 0.7 <= flag < 0.9:
            start, end = up_list[ref_age], up_list_hard[ref_age]
        elif flag >= 0.9:
            start, end = up_list_hard[ref_age], data_age_max

        candidate_age = np.setdiff1d(np.arange(int(start), int(end) + 1), invalid_age)
        test_age = np.random.choice(candidate_age, 1)

        idx_test = np.where(data_age == test_age[0])[0]

        ref_list.append(i)
        test_list.append(np.random.choice(idx_test, 1)[0].tolist())

    ref_list = data[np.array(ref_list)]
    test_list = data[np.array(test_list)]

    return ref_list, test_list


def ImgAugTransform(img):
    aug = iaa.Sequential([
        iaa.CropToFixedSize(width=224, height=224),
        iaa.Fliplr(0.5),
    ])

    img = np.array(img)
    img = aug(image=img)
    return img


def ImgAugTransform_Test(img):
    aug = iaa.Sequential([
        iaa.CropToFixedSize(width=224, height=224, position="center")
    ])

    img = np.array(img)
    img = aug(image=img)
    return img


In [None]:
class AgeDataLoader(Dataset):
    def __init__(self, arg):

        self.path = arg.data_path
        with open(os.path.join(arg.data_path, "train_image.txt"), "r") as fp:
            self.data = [d.splitlines()[0] for d in fp.readlines()]

        self.age_data = [int(d.split("/")[0]) for d in self.data]
        self.data, self.age_data = np.array(self.data), np.array(self.age_data)
        lb_list, up_list = get_bounds(self.age_data.min(), self.age_data.max(), tau=arg.tau)
        ref_list, test_list = datapair_generator(arg, self.data, self.age_data, lb_list, up_list)

        self.reg_img_list = ref_list
        self.test_img_list = test_list
        self.label_list = []

        for i in range(len(ref_list)):

            ref_age, test_age = int(self.reg_img_list[i].split("/")[0]), int(self.test_img_list[i].split("/")[0])

            if test_age < lb_list[ref_age - 18]:
                class_name = 0

            elif test_age > up_list[ref_age - 18]:
                class_name = 2

            else:
                class_name = 1

            self.label_list.append(class_name)

    def __len__(self):
        return len(self.label_list)

    def __getitem__(self, idx):
        ref_img_path = os.path.join(self.path, self.reg_img_list[idx])
        test_img_path = os.path.join(self.path, self.test_img_list[idx])
        label = self.label_list[idx]

        ref_img = Image.open(str(ref_img_path))
        ref_img = ref_img.resize((224, 224))

        test_img = Image.open(str(test_img_path))
        test_img = test_img.resize((224, 224))

        ref_img = ImgAugTransform(ref_img).astype(np.float32) / 255.
        test_img = ImgAugTransform(test_img).astype(np.float32) / 255.

        ref_img = torch.from_numpy(np.transpose(ref_img, (2, 0, 1)))
        test_img = torch.from_numpy(np.transpose(test_img, (2, 0, 1)))

        dtype = ref_img.dtype
        mean = torch.as_tensor(imagenet_stats['mean'], dtype=dtype, device=ref_img.device)
        std = torch.as_tensor(imagenet_stats['std'], dtype=dtype, device=ref_img.device)
        ref_img.sub_(mean[:, None, None]).div_(std[:, None, None])
        test_img.sub_(mean[:, None, None]).div_(std[:, None, None])

        return ref_img, test_img, label

In [None]:
class Model_BN_VGG16(torch.nn.Module):
    def __init__(self):
        super(Model_BN_VGG16, self).__init__()
        self.encoder = ptcv_get_model("bn_vgg16", pretrained=True).features
        self.comparator = Comparator_v1(1024, 512)
        self.avg_pool = nn.AvgPool2d(kernel_size=7)

    def forward_siamese(self, x):
        x = self.encoder.stage1(x)
        x = self.encoder.stage2(x)
        x = self.encoder.stage3(x)
        x = self.encoder.stage4(x)
        x = self.encoder.stage5(x)
        x = self.avg_pool(x)

        return x.squeeze()

    def forward(self, phase, **kwargs):
        if phase == 'train':
            x_1, x_2 = kwargs['x_1'], kwargs['x_2']
            x_1 = self.forward_siamese(x_1)
            x_2 = self.forward_siamese(x_2)

            x = torch.cat([x_1, x_2], dim=1)

            output = self.comparator(x)

            return output

        elif phase == 'test':
            x_1, x_2 = kwargs['ref'], kwargs['test']
            x = torch.cat([x_1, x_2], dim=1)

            output = self.comparator(x)

            return output

        elif phase == 'extraction':
            x = kwargs['x']
            x = self.forward_siamese(x)

            return x


class Comparator_v1(torch.nn.Sequential):
    def __init__(self, input_channel, output_channel):
        super(Comparator_v1, self).__init__()
        self.fcA = nn.Linear(input_channel, output_channel)
        self.leakyreluA = nn.ReLU(inplace=True)
        self.fcB = nn.Linear(output_channel, output_channel)
        self.leakyreluB = nn.ReLU(inplace=True)
        self.dropout = nn.Dropout(p=0.8)
        self.fcC = nn.Linear(output_channel, 3)

    def forward(self, x):
        x = self.fcA(x)
        x = self.leakyreluA(x)
        x = self.fcB(x)
        x = self.leakyreluB(x)
        x = self.dropout(x)
        x = self.fcC(x)

        return x


def create_model():
    # Create model
    print('Get BN_Vgg16')
    model = Model_BN_VGG16().cuda()

    return model

In [None]:
def get_bounds(age_min, age_max, tau=3):
    lb_list = []
    up_list = []

    for age_tmp in range(18, age_max + 1):
        lb = max(age_tmp - tau, age_min)
        up = min(age_tmp + tau, age_max)

        lb_list.append(lb)
        up_list.append(up)

    return lb_list, up_list

def train(arg):
    global step, Acc_val_max
    os.makedirs(arg.test_model_path, exist_ok=True)

    device = torch.device("cuda:%s" % 0 if torch.cuda.is_available() else "cpu")
    print(device)

    model = create_model()
    model.to(device)

    for param in model.comparator.parameters():
        param.requires_grad = False

    optimizer = torch.optim.Adam(list(model.parameters()), lr=arg.lr)

    if arg.pretrained:
        initial_model = os.path.join(arg.pretrained_model_path)
        checkpoint = torch.load(initial_model, map_location=device)
        model_dict = model.state_dict()

        from collections import OrderedDict
        new_model_state_dict = OrderedDict()
        for k, v in model_dict.items():
            if k in checkpoint['model_state_dict'].keys():
                new_model_state_dict[k] = checkpoint['model_state_dict'][k]
                print(f'Loaded\t{k}')
            else:
                new_model_state_dict[k] = v
                print(f'Not Loaded\t{k}')

        model.load_state_dict(new_model_state_dict)

        optimizer_dict = optimizer.state_dict()
        optimizer_dict.update(checkpoint['optimizer_state_dict'])
        optimizer.load_state_dict(optimizer_dict)

        print("=> loaded checkpoint '{}'".format(initial_model))

    criterion = nn.CrossEntropyLoss()
    criterion.to(device)
    writer = SummaryWriter()

    model.train()

    best_acc = 0.0
    for epoch in range(0, arg.epoch):
        local_step = 0
        running_loss = 0.0
        Acc_train = 0.0

        Train = AgeDataLoader(arg)
        dataloader_train = DataLoader(Train, batch_size=arg.train_batch_size, shuffle=True, num_workers=4)

        for i, data in enumerate(tqdm(dataloader_train, desc="Train")):
            ref, test, labels = data[0], data[1], data[2]
            labels = labels.view(1, -1)[0]
            ref, test, labels = ref.to(device), test.to(device), labels.to(device)

            optimizer.zero_grad()

            with torch.set_grad_enabled(mode=True):
                outputs = model('train', x_1=ref, x_2=test)
                outputs = outputs.squeeze()
                loss = criterion(outputs, labels).to(device)

                loss.backward()
                optimizer.step()

            running_loss += loss.item() * arg.train_batch_size
            Acc_train += (torch.argmax(outputs, 1) == labels).float().sum()

            writer.add_scalar('train/loss', loss.item(), global_step=step)
            writer.add_scalar('train/acc',
                              float((torch.argmax(outputs, 1) == labels).float().sum() / arg.train_batch_size),
                              global_step=step)

            step += 1
            local_step += 1

        print('Train')
        print('epoch : %d, loss: %.4f, acc: %.4f' % (epoch + 1, running_loss / len(Train), Acc_train / len(Train)))

        acc = Acc_train / len(Train)

        if acc > best_acc:
          best_acc = acc
          torch.save({
              'epoch': epoch,
              'model_state_dict': model.state_dict(),
              'optimizer_state_dict': optimizer.state_dict(),
              'loss': running_loss
          }, os.path.join(arg.test_model_path, 'Epoch_{:04d}.pth'.format(epoch + 1)))
    print('Finished Training')


In [None]:
import easydict

dataset_path = "/content/gdrive/MyDrive/2024_MCL_Internship/AgeEstimation/dataset/AFAD-Lite"
ckpt_path = "/content/gdrive/MyDrive/2024_MCL_Internship/AgeEstimation/checkpoints_day7"
if not os.path.exists(ckpt_path): os.mkdir(ckpt_path)

train_arg = easydict.EasyDict({
    "tau": 3,
    "lr" : 1e-4,
    "train_batch_size": 18,
    "test_batch_size": 1,
    "epoch": 100,
    "pretrained": False,
    "pretrained_model_path": None,
    "test_model_path": ckpt_path,
    "data_path": dataset_path
})

train(train_arg)

In [None]:
class SingleImage(Dataset):
    def __init__(self, arg, data, img_size=256):

        self.data = np.array(data)
        self.img_size = img_size

        self.img = []

        for da in self.data:
            img_path = os.path.join(arg.data_path, da)
            self.img.append(str(img_path))

    def __len__(self):
        return len(self.img)

    def __getitem__(self, idx):
        img_path = self.img[idx]

        img = Image.open(str(img_path))
        img = img.resize((self.img_size, self.img_size))
        img = ImgAugTransform_Test(img).astype(np.float32) / 255.

        img = torch.from_numpy(np.transpose(img, (2, 0, 1)))

        dtype = img.dtype
        mean = torch.as_tensor(imagenet_stats['mean'], dtype=dtype, device=img.device)
        std = torch.as_tensor(imagenet_stats['std'], dtype=dtype, device=img.device)
        img.sub_(mean[:, None, None]).div_(std[:, None, None])

        return img


class SingleImage_val(Dataset):
    def __init__(self, arg, data, img_size=256):
        self.img_size = img_size

        self.img = [str(data[0])]

    def __len__(self):
        return len(self.img)

    def __getitem__(self, idx):
        img_path = self.img[idx]

        img = Image.open(str(img_path))
        img = img.resize((self.img_size, self.img_size))
        img = ImgAugTransform_Test(img).astype(np.float32) / 255.

        img = torch.from_numpy(np.transpose(img, (2, 0, 1)))

        dtype = img.dtype
        mean = torch.as_tensor(imagenet_stats['mean'], dtype=dtype, device=img.device)
        std = torch.as_tensor(imagenet_stats['std'], dtype=dtype, device=img.device)
        img.sub_(mean[:, None, None]).div_(std[:, None, None])

        return img


def feature_extraction(arg, train_data, test_data, model, device):
    features = {'train': [], 'test': []}

    Images_train = SingleImage(arg, train_data)
    dataloader_Images_train = DataLoader(Images_train, batch_size=100, shuffle=False, num_workers=4)

    if not type(test_data) == list:
        Images_test = SingleImage(arg, test_data)
        dataloader_Images_test = DataLoader(Images_test, batch_size=100, shuffle=False, num_workers=4)
    else:
        Images_test = SingleImage_val(arg, test_data)
        dataloader_Images_test = DataLoader(Images_test, batch_size=100, shuffle=False, num_workers=4)

    with torch.no_grad():
        for i, data in enumerate(tqdm(dataloader_Images_train, desc='Extract Train Data Features')):
            inputs = data
            inputs = inputs.to(device)

            outputs = torch.squeeze(model('extraction', x=inputs))
            outputs_numpy = outputs.cpu().detach().numpy().reshape(-1, 512)
            features['train'].extend(outputs_numpy)

        for i, data in enumerate(tqdm(dataloader_Images_test, desc='Extract Test Data Features')):
            inputs = data
            inputs = inputs.to(device)

            outputs = torch.squeeze(model('extraction', x=inputs))
            outputs_numpy = outputs.cpu().detach().numpy().reshape(-1, 512)
            features['test'].extend(outputs_numpy)

    features = {'train': np.array(features['train']), 'test': np.array(features['test'])}

    return features

In [None]:
def select_reference_randomly(train_data, train_data_age, limit=20):
    # ref vector
    refer_idx = []
    refer_age = []
    train_data_age = np.array(train_data_age)
    for i in range(90):
        # ref_image = train_data.loc[train_data['age'] == i]
        ref_image = train_data[train_data_age == i]
        if len(ref_image) >= limit:
            gap = int(len(ref_image) / min(len(ref_image), limit))
            idx = np.where(train_data_age == i)[0][np.arange(0, min(len(ref_image), gap * limit), gap)].tolist()
            refer_idx.append(idx)
            refer_age.append([i] * len(idx))
        elif 0 < len(ref_image) < limit:
            gap = int(len(ref_image) / min(len(ref_image), limit))
            idx = np.where(train_data_age == i)[0][np.arange(0, min(len(ref_image), gap * limit), gap)].tolist()

            need = limit - len(idx)
            idx_additional = np.random.choice(idx, need, replace=True).tolist()
            idx = idx + idx_additional
            refer_idx.append(idx)
            refer_age.append([i] * len(idx))
        else:
            refer_idx.append([])
            refer_age.append([])

    return refer_idx, refer_age

In [None]:
def predict_age_random(arg, test_data, epoch):
    device = torch.device("cuda:%s" % 0 if torch.cuda.is_available() else "cpu")
    print(device)

    model = create_model()
    model.to(device)

    for param in model.comparator.parameters():
        param.requires_grad = False

    optimizer = torch.optim.Adam(list(model.parameters()), lr=arg.lr)

    # initial_model = os.path.join(os.path.join(arg.test_model_path, 'Epoch_{:04d}.pth'.format(epoch)))
    initial_model = "/content/gdrive/MyDrive/2024_MCL_Internship/AgeEstimation/checkpoints_day7/Epoch_0100.pth"
    print(initial_model)

    checkpoint = torch.load(initial_model, map_location=device)
    model_dict = model.state_dict()

    from collections import OrderedDict
    new_model_state_dict = OrderedDict()
    for k, v in model_dict.items():
        if k in checkpoint['model_state_dict'].keys():
            new_model_state_dict[k] = checkpoint['model_state_dict'][k]
            # print(f'Loaded\t{k}')
        else:
            new_model_state_dict[k] = v
            # print(f'Not Loaded\t{k}')

    model.load_state_dict(new_model_state_dict)

    optimizer_dict = optimizer.state_dict()
    optimizer_dict.update(checkpoint['optimizer_state_dict'])
    optimizer.load_state_dict(optimizer_dict)

    print("=> loaded checkpoint '{}'".format(initial_model))

    model.eval()

    with open(os.path.join(arg.data_path, "train_image.txt"), "r") as fp:
        train_data = np.array([d.splitlines()[0] for d in fp.readlines()])

    test_data = preprocess(test_data)

    age_data = np.array([int(d.split("/")[0]) for d in train_data])

    features = feature_extraction(arg, train_data, test_data, model, device)
    refer_idx, refer_age = select_reference_randomly(train_data, age_data, limit=5)
    refer_idx, refer_age = sum(refer_idx, []), sum(refer_age, [])

    lb, ub = get_bounds(age_data.min(), age_data.max(), arg.tau)

    predicted_age = []

    fig = plt.figure()
    ax0 = fig.add_subplot(3, 3, 1)
    ax0.get_xaxis().set_visible(False)
    ax0.get_yaxis().set_visible(False)
    ax1 = fig.add_subplot(3, 3, 2)
    ax1.get_xaxis().set_visible(False)
    ax1.get_yaxis().set_visible(False)
    ax2 = fig.add_subplot(3, 3, 4)
    ax2.get_xaxis().set_visible(False)
    ax2.get_yaxis().set_visible(False)
    ax3 = fig.add_subplot(3, 3, 5)
    ax3.get_xaxis().set_visible(False)
    ax3.get_yaxis().set_visible(False)
    ax4 = fig.add_subplot(3, 3, 6)
    ax4.get_xaxis().set_visible(False)
    ax4.get_yaxis().set_visible(False)
    ax5 = fig.add_subplot(3, 3, 8)
    ax5.get_xaxis().set_visible(False)
    ax5.get_yaxis().set_visible(False)
    ax6 = fig.add_subplot(3, 3, 9)
    ax6.get_xaxis().set_visible(False)
    ax6.get_yaxis().set_visible(False)
    less = False
    sim = False
    larg = False

    with torch.no_grad():
        for i in tqdm(range(len(test_data)), "Test"):
            vote = np.zeros(shape=23, dtype=np.int)

            test_duplicate = [i] * len(refer_idx)
            ref_features, test_feature = np.array(features['train'][refer_idx]), np.array(
                features['test'][test_duplicate])
            ref_features, test_feature = torch.as_tensor(ref_features, dtype=torch.float32).to(device), torch.as_tensor(
                test_feature, dtype=torch.float32).to(device)
            outputs = model('test', ref=ref_features, test=test_feature).argmax(dim=1).squeeze().cpu().detach().numpy()

            for j in range(len(refer_age)):
                if outputs[j] == 0:
                    if not less:
                        img = image.imread(os.path.join(arg.data_path, train_data[j]))
                        ax0.imshow(img)
                        less = True
                    else:
                        img = image.imread(os.path.join(arg.data_path, train_data[j]))
                        ax1.imshow(img)
                    vote[:int(lb[refer_age[j] - 18] - 18)] += 1
                elif outputs[j] == 2:
                    if not larg:
                        img = image.imread(os.path.join(arg.data_path, train_data[j]))
                        ax5.imshow(img)
                        larg = True
                    else:
                        img = image.imread(os.path.join(arg.data_path, train_data[j]))
                        ax6.imshow(img)
                    vote[int(ub[refer_age[j] - 18] + 1 - 18):] += 1
                else:
                    if not sim:
                        img = image.imread(os.path.join(arg.data_path, train_data[j]))
                        ax2.imshow(img)
                        sim = True
                    else:
                        img = image.imread(os.path.join(arg.data_path, train_data[j]))
                        ax4.imshow(img)
                    vote[int(lb[refer_age[j] - 18] - 18):int(ub[refer_age[j] - 18] + 1 - 18)] += 1

                if not type(test_data) == list:
                    img = image.imread(os.path.join(arg.data_path, "%d" % test_data.age[i], "%d" % test_data.gender[i],
                                                    test_data.filename[i]))
                    ax3.imshow(img)
                else:
                    img = image.imread(test_data[0])
                    ax3.imshow(img)
            predicted_age.append(vote.argmax() + 18)
            plt.show()
            print(f"Age {vote.argmax() + 18}")
    return predicted_age

In [None]:
FACIAL_LANDMARKS_68_IDXS = OrderedDict([
    ("mouth", (48, 68)),
    ("inner_mouth", (60, 68)),
    ("right_eyebrow", (17, 22)),
    ("left_eyebrow", (22, 27)),
    ("right_eye", (36, 42)),
    ("left_eye", (42, 48)),
    ("nose", (27, 36)),
    ("jaw", (0, 17))
])

# For dlib’s 5-point facial landmark detector:
FACIAL_LANDMARKS_5_IDXS = OrderedDict([
    ("right_eye", (2, 3)),
    ("left_eye", (0, 1)),
    ("nose", (4))
])


def shape_to_np(shape, dtype="int"):
    coords = np.zeros((shape.num_parts, 2), dtype=dtype)

    for i in range(0, shape.num_parts):
        coords[i] = (shape.part(i).x, shape.part(i).y)

    return coords


class FaceAligner:
    def __init__(self, predictor, desiredLeftEye=(0.35, 0.35),
                 desiredFaceWidth=256, desiredFaceHeight=None):
        # store the facial landmark predictor, desired output left
        # eye position, and desired output face width + height
        self.predictor = predictor
        self.desiredLeftEye = desiredLeftEye
        self.desiredFaceWidth = desiredFaceWidth
        self.desiredFaceHeight = desiredFaceHeight

        # if the desired face height is None, set it to be the
        # desired face width (normal behavior)
        if self.desiredFaceHeight is None:
            self.desiredFaceHeight = self.desiredFaceWidth

    def align(self, image, gray, rect):
        # convert the landmark (x, y)-coordinates to a NumPy array
        shape = self.predictor(gray, rect)
        shape = shape_to_np(shape)

        # simple hack ;)
        if (len(shape) == 68):
            # extract the left and right eye (x, y)-coordinates
            (lStart, lEnd) = FACIAL_LANDMARKS_68_IDXS["left_eye"]
            (rStart, rEnd) = FACIAL_LANDMARKS_68_IDXS["right_eye"]
        else:
            (lStart, lEnd) = FACIAL_LANDMARKS_5_IDXS["left_eye"]
            (rStart, rEnd) = FACIAL_LANDMARKS_5_IDXS["right_eye"]

        leftEyePts = shape[lStart:lEnd]
        rightEyePts = shape[rStart:rEnd]

        # compute the center of mass for each eye
        leftEyeCenter = leftEyePts.mean(axis=0).astype("int")
        rightEyeCenter = rightEyePts.mean(axis=0).astype("int")

        # compute the angle between the eye centroids
        dY = rightEyeCenter[1] - leftEyeCenter[1]
        dX = rightEyeCenter[0] - leftEyeCenter[0]
        angle = np.degrees(np.arctan2(dY, dX)) - 180

        # compute the desired right eye x-coordinate based on the
        # desired x-coordinate of the left eye
        desiredRightEyeX = 1.0 - self.desiredLeftEye[0]

        # determine the scale of the new resulting image by taking
        # the ratio of the distance between eyes in the *current*
        # image to the ratio of distance between eyes in the
        # *desired* image
        dist = np.sqrt((dX ** 2) + (dY ** 2))
        desiredDist = (desiredRightEyeX - self.desiredLeftEye[0])
        desiredDist *= self.desiredFaceWidth
        scale = desiredDist / dist

        # compute center (x, y)-coordinates (i.e., the median point)
        # between the two eyes in the input image
        eyesCenter = (int((leftEyeCenter[0] + rightEyeCenter[0]) // 2),
                      int((leftEyeCenter[1] + rightEyeCenter[1]) // 2))

        # grab the rotation matrix for rotating and scaling the face
        M = cv2.getRotationMatrix2D(eyesCenter, angle, scale)

        # update the translation component of the matrix
        tX = self.desiredFaceWidth * 0.5
        tY = self.desiredFaceHeight * self.desiredLeftEye[1]
        M[0, 2] += (tX - eyesCenter[0])
        M[1, 2] += (tY - eyesCenter[1])

        # apply the affine transformation
        (w, h) = (self.desiredFaceWidth, self.desiredFaceHeight)
        output = cv2.warpAffine(image, M, (w, h),
                                flags=cv2.INTER_CUBIC)

        # return the aligned face
        return output


def preprocess(test_data):
    detector = dlib.get_frontal_face_detector()
    predictor = dlib.shape_predictor('/content/gdrive/MyDrive/2024_MCL_Internship/AgeEstimation/datasets/shape_predictor_68_face_landmarks.dat')
    fa = FaceAligner(predictor, desiredFaceWidth=224, desiredFaceHeight=224)

    aligned_data = []
    for f in test_data:
        image = cv2.imread(f)
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

        rects = detector(gray, 2)
        processed = False

        for rect in rects:
            image = fa.align(image, gray, rect)
            processed = True

        if not processed:
            image = cv2.resize(image, (224, 224))

        cv2.imwrite(f.split(".")[0] + "_aligned" + ".png", image)
        aligned_data.append(f.split(".")[0] + "_aligned" + ".png")

    return aligned_data

In [None]:
test_data = ["/content/gdrive/MyDrive/2024_MCL_Internship/AgeEstimation/face.jpg"]
predict_age_random(train_arg, test_data, 41)