In [None]:
#pip3 install pandas matplotlib tensorflow matplotlib huggingface datasets dlib scikit-learn albumentations

In [None]:
import os
import tensorflow as tf
import numpy as np
from sklearn.model_selection import train_test_split
from datasets import DatasetDict, Dataset ,load_from_disk
import dlib
import cv2
import shutil
import random
import albumentations as alb
import json

In [2]:
def dlib_detector(img):
    dlib_face_detector = dlib.get_frontal_face_detector()
    dlib_face_locations = dlib_face_detector(img)
    return dlib_face_locations  # xmin,ymin,xmax,ymax

## resize all images to same shape

In [None]:
# Target size for all images
TARGET_SIZE = (640, 480)  # (width, height)

input_dir = './Dataset_raw/'
output_dir = './Dataset_resized/'

# Ensure output directories exist
for class_name in os.listdir(input_dir):
    class_path = os.path.join(output_dir, class_name)
    os.makedirs(class_path, exist_ok=True)

for class_name in os.listdir(input_dir):
    class_path = os.path.join(input_dir, class_name)
    if not os.path.isdir(class_path):
        continue
    
    for image_name in os.listdir(class_path):
        image_path = os.path.join(class_path, image_name)
        
        try:
            image = cv2.imread(image_path)
            if image is None:
                print(f"Error reading {image_path}")
                continue
            
            # Get the original dimensions
            h, w = image.shape[:2]
            
            # Calculate the aspect ratio and resize accordingly
            scale = min(TARGET_SIZE[1] / h, TARGET_SIZE[0] / w)
            new_w = int(w * scale)
            new_h = int(h * scale)
            
            resized_image = cv2.resize(image, (new_w, new_h))  # Resize while keeping aspect ratio
            
            # Create a black canvas of the target size
            canvas = cv2.copyMakeBorder(
                resized_image, 
                (TARGET_SIZE[1] - new_h) // 2,  # Top padding
                (TARGET_SIZE[1] - new_h + 1) // 2,  # Bottom padding
                (TARGET_SIZE[0] - new_w) // 2,  # Left padding
                (TARGET_SIZE[0] - new_w + 1) // 2,  # Right padding
                cv2.BORDER_CONSTANT,
                value=(0, 0, 0)  # Black padding
            )

            save_path = os.path.join(output_dir, class_name, image_name)
            cv2.imwrite(save_path, canvas)  # Save the padded image
        except Exception as e:
            print(f"Error processing {image_path}: {e}")

print("Image resizing with aspect ratio preservation complete!")


## split the data to train test val

In [None]:
# Paths
input_dir = './Dataset_resized/'
output_dir = './Dataset_split/'

# Define split ratios (can be changed as needed)
train_split = 0.65
val_split = 0.25
test_split = 0.1

# Ensure the output directories exist
splits = ['train', 'val', 'test']
for split in splits:
    for class_name in os.listdir(input_dir):
        split_dir = os.path.join(output_dir, split, class_name)
        os.makedirs(split_dir, exist_ok=True)

# Process each folder (person1, person2, etc.)
for class_name in os.listdir(input_dir):
    class_path = os.path.join(input_dir, class_name)
    
    if not os.path.isdir(class_path):
        continue  # Skip files, only process directories (like person1, person2)
    
    # Get all image files in the current class folder
    images = [f for f in os.listdir(class_path) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif'))]
    random.shuffle(images)  # Shuffle to ensure randomness

    # Split images into train, val, and test
    total_images = len(images)
    train_count = int(total_images * train_split)
    val_count = int(total_images * val_split)
    
    train_images = images[:train_count]
    val_images = images[train_count:train_count + val_count]
    test_images = images[train_count + val_count:]

    # Copy files to train, val, and test directories
    for image_set, split in zip([train_images, val_images, test_images], splits):
        for image_name in image_set:
            src_path = os.path.join(class_path, image_name)
            dst_path = os.path.join(output_dir, split, class_name, image_name)
            
            try:
                shutil.copy(src_path, dst_path)
            except Exception as e:
                print(f"Error copying {src_path} to {dst_path}: {e}")

print("Dataset successfully organized!")

## augment dataset

In [None]:
# Define the augmentor
augmentor = alb.Compose([
    alb.RandomCrop(width=450, height=450), 
    alb.HorizontalFlip(p=0.5), 
    alb.RandomBrightnessContrast(p=0.2),
    alb.RandomGamma(p=0.2), 
    alb.RGBShift(p=0.2), 
    alb.VerticalFlip(p=0.5)
], bbox_params=alb.BboxParams(format='albumentations', label_fields=['class_labels']))

def create_class_dict(input_dir='Dataset_split'):
    person_names = set()  
    for partition in ['train', 'test', 'val']:
        partition_path = os.path.join(input_dir, partition)
        for person_folder in os.listdir(partition_path):
            person_path = os.path.join(partition_path, person_folder)
            if os.path.isdir(person_path):
                person_names.add(person_folder)
    person_names = sorted(person_names)
    class_dict = {person_name: idx for idx, person_name in enumerate(person_names)}
    with open('class_dict.json', 'w') as f:
        json.dump(class_dict, f, indent=4)
    return class_dict

def create_aug_data(input_dir='Dataset_split', output_dir='aug_data'):
    class_dict = create_class_dict(input_dir)

    for partition in ['train', 'test', 'val']:
        image_dir = os.path.join(output_dir, partition, 'images')
        label_dir = os.path.join(output_dir, partition, 'labels')
        os.makedirs(image_dir, exist_ok=True)
        os.makedirs(label_dir, exist_ok=True)
    
    for partition in ['train', 'test', 'val']:
        partition_path = os.path.join(input_dir, partition)
        
        for person_folder in os.listdir(partition_path):
            person_path = os.path.join(partition_path, person_folder)
            
            if not os.path.isdir(person_path):
                continue
            
            class_label = class_dict[person_folder]
            
            for image_name in os.listdir(person_path):
                image_path = os.path.join(person_path, image_name)
                
                try:
                    img = cv2.imread(image_path)  
                    if img is None:
                        print(f"Error reading image: {image_path}")
                        continue
                    
                    height, width = img.shape[:2]
                    image_numpy = np.array(img)
                    # Assuming dlib_detector returns a dlib.rectangle
                    dlib_rect = dlib_detector(image_numpy)
                    # Extract the coordinates from the dlib rectangle
                    x_min = dlib_rect[0].left()
                    y_min = dlib_rect[0].top()
                    x_max = dlib_rect[0].right()
                    y_max = dlib_rect[0].bottom()

                    # Normalize the coordinates relative to the image size
                    coords = [
                        x_min / width,  
                        y_min / height, 
                        x_max / width,  
                        y_max / height  
                    ]
                                        
                    for x in range(60):  # Generate 60 augmented versions for each image
                        augmented = augmentor(
                            image=img, 
                            bboxes=[coords], 
                            class_labels=[class_label]
                        )
                        
                        aug_image_name = f'{os.path.splitext(image_name)[0]}.{x}.jpg'
                        aug_image_path = os.path.join(output_dir, partition, 'images', aug_image_name)
                        cv2.imwrite(aug_image_path, augmented['image'])
                        
                        if len(augmented['bboxes']) > 0:
                            updated_bbox = augmented['bboxes'][0]
                        else:
                            updated_bbox = [0, 0, 0, 0]  # If no bbox, set it to zero
                        
                        annotation = {
                            "image": aug_image_name,
                            "bbox": updated_bbox,
                            "class": class_label  # Class label for this person
                        }
                        
                        aug_json_name = f'{os.path.splitext(image_name)[0]}.{x}.json'
                        aug_json_path = os.path.join(output_dir, partition, 'labels', aug_json_name)
                        with open(aug_json_path, 'w') as json_file:
                            json.dump(annotation, json_file)
                
                except Exception as e:
                    print(f"Error processing {image_path}: {e}")
create_aug_data()

  & (clipped_box_areas / denormalized_box_areas >= min_visibility - epsilon)
