# Dataset preprocessing

**Steps:**
1. MiVOLO: detect keypoints
2. cv2: to grayscale
3. ndimage.shift: compute center of face and move it to the center of the image
4. ndimage.rotate: compute angle between line of the eyes and horizontal plane and rotate
5. crop: compute aspect ratio and use it to crop image
6. ndimage.zoom: resize to always the same size
7. normalize: histogram stretching

In [1]:
import os
import csv
import cv2
import numpy as np
from tqdm import tqdm
import math
from scipy import ndimage
import matplotlib.pyplot as plt
import shutil

dataset_root = 'C:/DATASETS/AGE-FER'
dataset_imgs_path_in = os.path.join(dataset_root, 'images')
dataset_imgs_path_debug = os.path.join(dataset_root, 'images-debug')
csv_columns = ['dataset','user_id','name','class','age','gender','race','perspective', 'age_group', 'subset', 'auto_age', 'auto_gender', 'auto_perspective', 'age_group_clean', 'gaze']

# Mode used for ndimage transformations
MODE = 'reflect'

# Image size
IMG_SIZE = 224

EYE_L = 0
EYE_R = 1
NOSE = 2
MOUTH_L = 3
MOUTH_R = 4

def preprocess_img(img_name, path_in, path_out, path_no_face, debug=False, skip_face_detection=False, skip_landmarks=False, grayscale=False, img_size=224):
    img = cv2.imread(os.path.join(path_in, img_name))

    # To grayscale
    if grayscale:
        img_gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) 
    else:
        img_gray = img

    if not skip_landmarks:

        # Get keypoints
        keypoints = get_keypoints(img, use_detector=not skip_face_detection)
        if keypoints is None:
            if path_no_face is not None:
                shutil.copy(os.path.join(path_in, img_name), os.path.join(path_no_face, img_name))
            return False

        # Shift
        middle_point = (img.shape[1]//2, img.shape[0]//2)
        if grayscale:
            img_gray = ndimage.shift(img_gray, (middle_point[1] - keypoints[NOSE][1], middle_point[0] - keypoints[NOSE][0]), mode=MODE)
        else:
            img_gray = ndimage.shift(img_gray, (middle_point[1] - keypoints[NOSE][1], middle_point[0] - keypoints[NOSE][0], 0), mode=MODE)
        keypoints = translate_points(keypoints, (middle_point[0] - keypoints[NOSE][0], middle_point[1] - keypoints[NOSE][1]))

        # Rotate
        angle = get_eye_rotation(keypoints[EYE_L], keypoints[EYE_R])
        img_gray = ndimage.rotate(img_gray, radians_to_degrees(angle), reshape=False, mode=MODE)
        keypoints = rotate_points(keypoints, angle, middle_point)

        # Crop
        padding_h = np.linalg.norm(np.mean([keypoints[MOUTH_L], keypoints[MOUTH_R]], axis=0) - np.mean([keypoints[EYE_L], keypoints[EYE_R]], axis=0)) / 2
        height = width = int(4 * padding_h)
        eye_d = keypoints[EYE_R][0] - keypoints[EYE_L][0]
        padding_w = (width - eye_d) / 2
        p1_x = int(keypoints[EYE_L][0] - padding_w)
        p1_y = int(keypoints[EYE_L][1] - padding_h)
        p2_x = p1_x + width
        p2_y = p1_y + height

        # Add borders if bbox is out of bounds
        left = top = right = bottom = 0

        if p1_x < 0:
            left = -p1_x

        if p1_y < 0:
            top = -p1_y

        if p2_x > img_gray.shape[1]:
            right = p2_x - img_gray.shape[1]

        if p2_y > img_gray.shape[0]:
            bottom = p2_y - img_gray.shape[0]

        if left > 0 or top > 0 or right > 0 or bottom > 0:
            img_gray = cv2.copyMakeBorder(img_gray, top, bottom, left, right, cv2.BORDER_REFLECT_101)

        # Update keypoints
        bbox = (p1_x+left, p1_y+top, p2_x+left, p2_y+top)
        keypoints = translate_points(keypoints, (-p1_x, -p1_y))

        # Crop
        img_gray = crop_img(img_gray, bbox)

    # Zero size check
    if img_gray.size == 0:
        if path_no_face is not None:
            shutil.copy(os.path.join(path_in, img_name), os.path.join(path_no_face, img_name))
        print('Zero size:', img_name)
        return False

    # Resize
    zoom = np.array([img_size, img_size]) / img_gray.shape[:2]
    if grayscale:
        img_gray = ndimage.zoom(img_gray, zoom, mode=MODE)
    else:
        img_gray = ndimage.zoom(img_gray, (zoom[0], zoom[1], 1), mode=MODE)
    
    # Resize keypoints
    if not skip_landmarks:
        keypoints = resize_points(keypoints, zoom)

    # Zero size check
    if img_gray.size == 0 or img_gray.shape[0] != img_size or img_gray.shape[1] != img_size:
        if path_no_face is not None:
            shutil.copy(os.path.join(path_in, img_name), os.path.join(path_no_face, img_name))
        print('Zero size (2):', img_name)
        return False
    
    # Normalize
    img_gray = img_to_float(img_gray)
    img_gray = histogram_stretching(img_gray)
    img_gray = img_to_uint8(img_gray)

    if debug:
        img_gray = cv2.cvtColor(img_gray, cv2.COLOR_GRAY2BGR)
        if not skip_landmarks:
            for kp in keypoints:
                cv2.circle(img_gray, (int(kp[0]), int(kp[1])), radius=int(img_size/30), color=(0,0,255), thickness=-1)
        
    # Save result
    cv2.imwrite(os.path.join(path_out, img_name), img_gray)
    return True

def preprocess(dataset_imgs_path_out, dataset_imgs_path_no_face, dataset_labels_path, dataset_labels_path_out, dataset_labels_path_no_face, grayscale=True, img_size=224):
    
    # Create output folders
    if not os.path.exists(dataset_imgs_path_out):
        os.mkdir(dataset_imgs_path_out)
    if not os.path.exists(dataset_imgs_path_no_face):
        os.mkdir(dataset_imgs_path_no_face)

    with open(dataset_labels_path, 'r') as csv_input, open(dataset_labels_path_out, 'w', newline='') as csv_output, open(dataset_labels_path_no_face, 'w', newline='') as csv_no_face:

        # CSV reader and writer
        reader = csv.DictReader(csv_input, delimiter=',', quotechar='"')
        writer = csv.DictWriter(csv_output, delimiter=',', quotechar='"', fieldnames=csv_columns)
        writer_no_face = csv.DictWriter(csv_no_face, delimiter=',', quotechar='"', fieldnames=csv_columns)

        # Write header
        writer.writeheader()
        writer_no_face.writeheader()

        # Count lines and reset reader
        total_lines = sum(1 for _ in reader)
        csv_input.seek(0)
        reader = csv.DictReader(csv_input, delimiter=',', quotechar='"')

        # Process each row
        for row in tqdm(reader, total=total_lines):

            # Preprocess image if not already done
            if not os.path.exists(os.path.join(dataset_imgs_path_out, row['name'])) and not os.path.exists(os.path.join(dataset_imgs_path_no_face, row['name'])):
                
                if row['dataset'] in ['AffectNet', 'RAF-DB', 'NHFI', 'FER2013', 'ExpW', 'Google-FE-Test']:
                    skip_face_detection = True
                else:
                    skip_face_detection = False

                if preprocess_img(row['name'], dataset_imgs_path_in, dataset_imgs_path_out, dataset_imgs_path_no_face, debug=False, skip_face_detection=skip_face_detection, skip_landmarks=False, grayscale=grayscale, img_size=img_size):
                    writer.writerow(row)
                else:
                    writer_no_face.writerow(row)
            else:
                if os.path.exists(os.path.join(dataset_imgs_path_out, row['name'])):
                    writer.writerow(row)
                else:
                    writer_no_face.writerow(row)

def get_eye_rotation(eye1, eye2):
    v_eyes = get_vector(eye1, eye2)
    angle = get_angle(v_eyes, (1, 0))
    return angle if v_eyes[1] > 0 else -angle

def get_angle(v1, v2):
    """Get angle between two vectors."""
    return math.acos(np.dot(v1, v2) / (magnitude(v1) * magnitude(v2)))

def get_unit_vector(v):
    """Normalize vector"""
    m = magnitude(v)
    return (v[0]/m, v[1]/m)
    
def get_vector(p1, p2):
    """Get vector between two points."""
    return get_unit_vector((p2[0] - p1[0], p2[1] - p1[1]))

def magnitude(v): 
    """Get magnitude of a vector."""
    return math.sqrt(sum(pow(x, 2) for x in v))

def crop_img(img, bbox):
    """Crop an image by a bbox."""
    return img[bbox[1]:bbox[3], bbox[0]:bbox[2], ...]

def resize_points(keypoints, zoom):
    return [(kp[0] * zoom[0], kp[1] * zoom[1]) for kp in keypoints]

def translate_points(keypoints, movement):
    return [(kp[0] + movement[0], kp[1] + movement[1]) for kp in keypoints]
    
def rotate_points(keypoints, radians, center):
    """Rotate all keypoints around the origin (0, 0)."""
    
    # Translate to origin
    keypoints = translate_points(keypoints, (-center[0], -center[1]))
    
    # Rotate each point around origin
    keypoints = [rotate_point_origin(kp, radians) for kp in keypoints]
    
    # Translate back
    keypoints = translate_points(keypoints, (center[0], center[1]))
    return keypoints

def rotate_point_origin(xy, radians):
    """Rotate a point around the origin (0, 0)."""
    x, y = xy
    xx = x * math.cos(radians) + y * math.sin(radians)
    yy = -x * math.sin(radians) + y * math.cos(radians)

    return xx, yy

def radians_to_degrees(rad):
    return rad * 180 / math.pi

def show_img(img):
    if len(img.shape) > 1:
        plt.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
    else:
        plt.imshow(cv2.cvtColor(img, cv2.COLOR_GRAY2RGB))

def histogram_stretching(img, h_min=0, h_max=1):
    max_value = np.max(img)
    min_value = np.min(img)
    if max_value > 0 and min_value != max_value:
        return h_min+(h_max-h_min)*(img-min_value)/(max_value-min_value)
    else:
        return img
    
def img_to_uint8(img):
    return (img * 255).astype('uint8')

def img_to_float(img):
    return img / 255

## Preprocess

### Using SPIGA

In [None]:
from ultralytics.yolo.engine.model import YOLO
from spiga.inference.config import ModelConfig
from spiga.inference.framework import SPIGAFramework

weights = '../weights/yolov8x_person_face.pt'
yolo = YOLO(weights)
yolo.fuse()

dataset = 'wflw'
cfg = ModelConfig(dataset)
cfg.load_model_url = None
cfg.model_weights_path = '../weights'
cfg.model_weights = 'spiga_wflw.pt'
processor = SPIGAFramework(cfg)

def get_bbox_hw(img):
    
    # YOLO detect face
    yolo_pred = yolo(img, conf=.4, iou=.7, half=True, verbose=False)
    
    if len(yolo_pred) < 1:
        return None
    
    yolo_pred = yolo_pred[0].boxes
    classes = yolo_pred.cls.numpy(force=True)
    bboxes = yolo_pred.xyxy.numpy(force=True)
    
    if not 1 in classes:
        return None
    
    face_bbox = bboxes[np.where(classes == 1)[0]][0].astype('int')
    face_bbox_hw = np.array([face_bbox[0], face_bbox[1], face_bbox[2] - face_bbox[0], face_bbox[3] - face_bbox[1]])
    return face_bbox_hw

def get_spiga_feature(img, use_detector=True, feature='landmarks'):
    
    if use_detector:
        face_bbox_hw = get_bbox_hw(img)
    else:
        face_bbox_hw = [0, 0, img.shape[1], img.shape[0]]
    
    if face_bbox_hw is None:
        print('No face detected.')
        return None

    features = processor.inference(img, [face_bbox_hw])
    if features is None:
        return None
    if features[feature] is None:
        return None
    return np.array(features[feature][0])

def get_keypoints(img, use_detector=True):
    landmarks = get_spiga_feature(img, use_detector, 'landmarks')
    if landmarks is None:
        return None
    return [np.mean(landmarks[60:68], axis=0), np.mean(landmarks[68:76], axis=0), landmarks[53], landmarks[88], landmarks[92]]

def get_pose(img, use_detector=True):
    return get_spiga_feature(img, use_detector, 'headpose')

Model summary (fused): 268 layers, 68125494 parameters, 0 gradients


SPIGA model loaded!


In [None]:
dataset_labels_path = os.path.join(dataset_root, 'labels5 - excluded no age.csv')
dataset_labels_path_out = os.path.join(dataset_root, '24-datasets.csv')
dataset_labels_path_no_face = os.path.join(dataset_root, 'labels6 - no-face.csv')
dataset_imgs_path_out = os.path.join(dataset_root, 'images-preprocessed')
dataset_imgs_path_no_face = os.path.join(dataset_root, 'images-no-face')

preprocess(dataset_imgs_path_out, dataset_imgs_path_no_face, dataset_labels_path, dataset_labels_path_out, dataset_labels_path_no_face, grayscale=True, img_size=IMG_SIZE)

100%|█████████▉| 444517/446365 [00:45<00:00, 17069.06it/s]

No face detected.


100%|██████████| 446365/446365 [04:23<00:00, 1695.50it/s] 
