In [None]:
! pip install opencv-python tqdm numpy torch matplotlib

In [1]:
import os
import cv2
from tqdm import tqdm

import torch
from torch.utils.data import Dataset, DataLoader, random_split

import torchvision
import torch.optim as optim
import torch.nn as nn

# from PIL import Image

import matplotlib.pyplot as plt

# FDDB Parsing

In [8]:
# ===============================================
# CONFIG
# ===============================================

# Paths
dataset_path = r"C:\PROJECTS\MILO\MILO\FaceRecognition\data\Dataset_FDDB\images"
labels_path = r"C:\PROJECTS\MILO\MILO\FaceRecognition\data\Dataset_FDDB\label.txt"
output_dir  = r"C:\PROJECTS\MILO\MILO\FaceRecognition\data\Augmented"  # Where to save results

output_label_file = os.path.join(output_dir, "all_augmentations.txt")

# Target resolution
TARGET_HEIGHT, TARGET_WIDTH = 1080, 1920

# Define a list of scales (feel free to adjust)
SCALES = [0.5, 1.0]
SCALES.extend([1/scale for scale in SCALES])

# Define alignment anchors for both axes
# e.g. 0 → align to the "top" or "left", 0.5 → align "center", 1.0 → align "bottom" or "right".
# You could also do 0, 0.25, 0.5, 0.75, 1.0 to get 5 steps for each axis.
ALIGNMENT_X = [0.0, 0.5, 1.0]  # Left, Center, Right
ALIGNMENT_Y = [0.0, 0.5, 1.0]  # Top, Center, Bottom

# Interpolation for upscaling/downscaling
# - cv2.INTER_LANCZOS4 is best in many cases for upscaling (but slower)
# - cv2.INTER_CUBIC is also good for upscaling
# - cv2.INTER_AREA is often better for downscaling
UPSCALE_INTERPOLATION   = cv2.INTER_LANCZOS4
DOWNSCALE_INTERPOLATION = cv2.INTER_AREA

# ===============================================
# FUNCTIONS
# ===============================================

def parse_labels(label_path):
    """ Reads the label file and returns a dictionary: image_path -> list of bounding boxes. """
    labels = {}
    with open(label_path, 'r') as f:
        lines = f.readlines()
        image_path = None
        for line in lines:
            line = line.strip()
            if line.startswith("#"):
                # New image path
                image_path = line[2:]  # Remove '# '
                labels[image_path] = []
            else:
                # bounding box in "x_min y_min x_max y_max" format
                bbox = list(map(int, line.split()))
                labels[image_path].append(bbox)
    return labels


def scale_image(image, bboxes, scale):
    """
    Scale the image by a certain factor.
    Returns the scaled image and updated bounding boxes.
    """
    orig_h, orig_w = image.shape[:2]
    
    # Ensure at least 1 pixel in width and height
    new_w = int(orig_w * scale)
    new_h = int(orig_h * scale)

    if new_w < 1 or new_h < 1:
        print(f"Skipping scale {scale} for this image because dimension would be too small.")
        return image.copy(), bboxes.copy()  # or just skip

    
    if scale < 1.0:
        # Downscale => INTER_AREA is typically good for reducing sizes
        interp = DOWNSCALE_INTERPOLATION
    else:
        # Upscale => e.g., INTER_LANCZOS4 or INTER_CUBIC
        interp = UPSCALE_INTERPOLATION
    
    # Resize with safe dimension sizes
    scaled_image = cv2.resize(image, (new_w, new_h), interpolation=interp)
    
    # Scale bounding boxes
    scaled_bboxes = []
    for (x_min, y_min, x_max, y_max) in bboxes:
        x_min_s = int(x_min * (new_w / orig_w))
        y_min_s = int(y_min * (new_h / orig_h))
        x_max_s = int(x_max * (new_w / orig_w))
        y_max_s = int(y_max * (new_h / orig_h))
        scaled_bboxes.append([x_min_s, y_min_s, x_max_s, y_max_s])
    
    return scaled_image, scaled_bboxes


def pad_image(image, bboxes, align_x=0.5, align_y=0.5):
    """
    Pads the image to the target resolution (1920x1080).
    `align_x` and `align_y` ∈ [0, 1], controlling the alignment along each axis:
        - 0.0 => flush to top/left
        - 0.5 => center
        - 1.0 => flush to bottom/right
    Returns the padded image and updated bounding boxes.
    """
    orig_h, orig_w = image.shape[:2]
    
    # How much total padding is needed on each dimension
    pad_x = max(0, TARGET_WIDTH - orig_w)
    pad_y = max(0, TARGET_HEIGHT - orig_h)
    
    # Compute how much goes on the top vs bottom
    # e.g. if align_y=0, all the padding goes to bottom
    #      if align_y=1, all the padding goes to top
    #      if align_y=0.5, half top, half bottom
    top = int(pad_y * align_y)
    bottom = pad_y - top
    
    # Similarly for left, right
    left = int(pad_x * align_x)
    right = pad_x - left
    
    # Pad the image
    padded_image = cv2.copyMakeBorder(
        image,
        top, bottom, left, right,
        cv2.BORDER_CONSTANT, value=(0, 0, 0)
    )
    
    # Shift bounding boxes
    updated_bboxes = []
    for (x_min, y_min, x_max, y_max) in bboxes:
        x_min_pad = x_min + left
        x_max_pad = x_max + left
        y_min_pad = y_min + top
        y_max_pad = y_max + top
        updated_bboxes.append([x_min_pad, y_min_pad, x_max_pad, y_max_pad])
    
    return padded_image, updated_bboxes



# ===============================================
# MAIN PROCESS
# ===============================================

# 1) Parse labels
labels_dict = parse_labels(labels_path)

# 2) Create output folder
os.makedirs(output_dir, exist_ok=True)

# 3) Calculate total steps for the progress bar
num_images   = len(labels_dict)
num_scales   = len(SCALES)
num_positions = len(ALIGNMENT_X) * len(ALIGNMENT_Y)
total_steps = num_images * num_scales * num_positions

# 4) Open one single text file for all augmented results
with open(output_label_file, 'w') as label_out, tqdm(total=total_steps, desc="Processing") as pbar:

    # 5) Iterate over images
    for rel_img_path, bboxes in labels_dict.items():
        full_img_path = os.path.join(dataset_path, rel_img_path)
        if not os.path.isfile(full_img_path):
            print(f"[Warning] Image not found: {full_img_path}")
            continue

        # Load image
        image = cv2.imread(full_img_path)
        if image is None:
            print(f"[Warning] Failed to load: {full_img_path}")
            continue

        # 6) For each scale
        for scale in SCALES:
            scaled_img, scaled_bboxes = scale_image(image, bboxes, scale)

            # 7) For each alignment
            for ax in ALIGNMENT_X:
                for ay in ALIGNMENT_Y:
                    padded_img, padded_bboxes = pad_image(scaled_img, scaled_bboxes,
                                                            align_x=ax, align_y=ay)

                    # Create a file name for the augmented image
                    base_name = os.path.splitext(os.path.basename(rel_img_path))[0]
                    out_name  = f"{base_name}_s{scale}_ax{ax}_ay{ay}.jpg"

                    # Save the augmented image (comment out if not needed)
                    out_path = os.path.join(output_dir, out_name)
                    cv2.imwrite(out_path, padded_img)

                    # ======================================
                    # Write to single annotation file
                    # ======================================
                    # 1) Write a header line for the new image version
                    label_out.write(f"# {out_name}\n")

                    # 2) Write each bounding box on its own line
                    for (x_min, y_min, x_max, y_max) in padded_bboxes:
                        label_out.write(f"{x_min} {y_min} {x_max} {y_max}\n")

                    pbar.update(1)


Processing: 100%|██████████| 102060/102060 [11:41<00:00, 145.41it/s]


# Prep Dataset for Use

In [10]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class FaceDataset(Dataset):
    def __init__(self, images_folder, labels_file, transform = None):
        self.images_folder = images_folder
        self.transform = transform # technically should be a parameter, but due to situation, we are doing that seperately beforehand
        self.data = []

        with open(labels_file, "r") as f:
            lines = f.readlines()
            image_path = None
            boxes = []
            for line in lines:
                line = line.strip()
                if line.startswith("#"):
                    if image_path:  # Save previous image
                        self.data.append((image_path, boxes))
                    image_path = os.path.join(images_folder, line[2:])
                    boxes = []
                else:
                    boxes.append(list(map(int, line.split())))
            if image_path:  # Save last image
                self.data.append((image_path, boxes))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        image_path, boxes = self.data[idx]
        image = cv2.imread(image_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        # image = Image.open(image).convert('RGB')

        # keep in for easy refactor in the future
        if self.transform:
            image = self.transform(image)

        # # Convert to PyTorch tensor
        image = torch.from_numpy(image).float().permute(2, 0, 1) / 255.0  # Normalize to [0,1]

        # Convert bounding boxes to tensor
        boxes = torch.tensor(boxes, dtype=torch.float32)

        # Labels (assuming all objects belong to class 1, since it's face detection)
        labels = torch.ones((boxes.shape[0],), dtype=torch.int64)

        target = {"boxes": boxes, "labels": labels}
        return image, target


In [11]:
full_images_path = r"C:\PROJECTS\MILO\MILO\FaceRecognition\data\Dataset_FDDB\aug_images"
full_labels_path = r"C:\PROJECTS\MILO\MILO\FaceRecognition\data\Dataset_FDDB\aug_label.txt"
dataset = FaceDataset(full_images_path, full_labels_path)

train_size = int(0.8 * len(dataset))

indices = torch.randperm(len(dataset)).tolist()
train_indices, test_indices = indices[:train_size], indices[train_size:]

train_dataset = torch.utils.data.Subset(dataset, train_indices)
test_dataset = torch.utils.data.Subset(dataset, test_indices)

def collate_fn(batch):
    images, targets = zip(*batch)
    return list(images), list(targets)

train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False, collate_fn=collate_fn)

# NEW MODEL TRAINING

In [9]:
from torchvision.models.detection import ssdlite320_mobilenet_v3_large, SSDLite320_MobileNet_V3_Large_Weights, ssdlite
from torchvision.models.detection.anchor_utils import DefaultBoxGenerator
import math

# # Check if CUDA is available
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load SSD300 with VGG16 backbone
model = ssdlite320_mobilenet_v3_large(num_classes = 2)  # Set to True if you want pretrained weights
# model.to(device)



In [10]:
# Initialize lists to store losses
train_losses = []
eval_losses = []

# Live plotting function
def plot_losses():
    plt.figure(figsize=(8, 5))
    plt.plot(train_losses, label="Train Loss", marker="o")
    plt.plot(eval_losses, label="Eval Loss", marker="o", linestyle="dashed")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.title("Training vs Evaluation Loss")
    plt.legend()
    plt.grid()
    plt.show(block=False)
    plt.pause(0.1)  # Pause to update the plot

In [11]:
optimizer = optim.Adam(model.parameters(), lr=0.001)
prev_eval = math.inf
# Training Loop with tqdm & Live Graph
def train_model(model, train_loader, test_loader, num_epochs=10):
    model.train()
    for epoch in range(num_epochs):
        total_train_loss = 0
        # tqdm progress bar for training
        progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}", unit="batch")
        for images, targets in progress_bar:
            optimizer.zero_grad()
            # Move data to GPU if available
            images = [img.to(device) for img in images]
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            # Forward pass
            outputs = model(images, targets)

            # Compute loss
            # loss = criterion(outputs, targets)
            loss = sum(loss for loss in outputs.values())
            loss.backward()
            optimizer.step()
            
            total_train_loss += loss.item()
            progress_bar.set_postfix(loss=f"{loss.item():.4f}")

        avg_train_loss = total_train_loss / len(train_loader)
        train_losses.append(avg_train_loss)

        # Evaluate Model After Each Epoch
        avg_eval_loss = evaluate_model(model, test_loader)
        eval_losses.append(avg_eval_loss)

        if avg_eval_loss < prev_eval:
            torch.save(model.state_dict(), r"C:\PROJECTS\MILO\MILO\FaceRecognition\model_default.pth")


        # Print Epoch Summary
        print(f"\nEpoch {epoch+1}: Train Loss = {avg_train_loss:.4f}, Eval Loss = {avg_eval_loss:.4f}")


# Evaluation Function with tqdm Progress Bar
def evaluate_model(model, test_loader):
    model.train()
    total_loss = 0

    with torch.no_grad():
        progress_bar = tqdm(test_loader, desc="Evaluating", unit="batch")
        for images, targets in progress_bar:
            images = [img.to(device) for img in images]
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            # When targets are provided, the model returns a dictionary of losses.
            loss_dict = model(images, targets)
            loss = sum(loss for loss in loss_dict.values())
            total_loss += loss.item()

            progress_bar.set_postfix(loss=f"{loss.item():.4f}")

    avg_loss = total_loss / len(test_loader)
    return avg_loss

# Check if CUDA is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# device = torch.device("cpu")

In [None]:
optimizer = optim.Adam(model.parameters(), lr=0.0001)
os.environ["CUDA_LAUNCH_BLOCKING"] = "1"
os.environ["TORCH_USE_CUDA_DSA"] = "1"
print(device)
model.to(device)
# Train Model with Evaluation Each Epoch
train_model(model, train_loader, test_loader, num_epochs=20)
plot_losses()

cuda


Epoch 1/20: 100%|██████████| 5103/5103 [12:12<00:00,  6.97batch/s, loss=2.6525]
Evaluating: 100%|██████████| 1276/1276 [02:25<00:00,  8.77batch/s, loss=4.8882] 



Epoch 1: Train Loss = 2.7857, Eval Loss = 3.9285


Epoch 2/20: 100%|██████████| 5103/5103 [10:59<00:00,  7.74batch/s, loss=3.7874]
Evaluating: 100%|██████████| 1276/1276 [01:48<00:00, 11.72batch/s, loss=5.2102]



Epoch 2: Train Loss = 2.6984, Eval Loss = 3.9426


Epoch 3/20: 100%|██████████| 5103/5103 [10:47<00:00,  7.89batch/s, loss=2.2836]
Evaluating: 100%|██████████| 1276/1276 [01:49<00:00, 11.66batch/s, loss=5.2013] 



Epoch 3: Train Loss = 2.6537, Eval Loss = 3.9631


Epoch 4/20: 100%|██████████| 5103/5103 [10:41<00:00,  7.96batch/s, loss=4.5308]
Evaluating: 100%|██████████| 1276/1276 [01:49<00:00, 11.62batch/s, loss=5.2242] 



Epoch 4: Train Loss = 2.6237, Eval Loss = 3.9775


Epoch 5/20: 100%|██████████| 5103/5103 [10:40<00:00,  7.97batch/s, loss=4.6423]
Evaluating: 100%|██████████| 1276/1276 [01:49<00:00, 11.66batch/s, loss=5.1551] 



Epoch 5: Train Loss = 2.6098, Eval Loss = 3.9820


Epoch 6/20:  56%|█████▋    | 2883/5103 [08:00<04:36,  8.03batch/s, loss=1.5773]

# Model Training

## SSDLite

In [None]:
from torchvision.models.detection import ssdlite320_mobilenet_v3_large, SSDLite320_MobileNet_V3_Large_Weights, ssdlite
from torchvision.models.detection.anchor_utils import DefaultBoxGenerator
import math

# # Check if CUDA is available
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load SSD300 with VGG16 backbone
model = ssdlite320_mobilenet_v3_large(weights=SSDLite320_MobileNet_V3_Large_Weights.DEFAULT)  # Set to True if you want pretrained weights
# model.to(device)

# Get the number of input features for the classification head
in_channels = [list(m.parameters())[0].shape[0] for m in model.head.classification_head.module_list]

# # Check input channels
# num_anchors = model.head.classification_head.module_list[0][1].out_channels // 91  # Default COCO classes is 91

# dummy_input = torch.randn(1, 3, 1080, 1920)
# features = model.backbone(dummy_input)
# feature_map_shapes = [f.shape[-2:] for f in features.values()]


# # For example, using the height (1080) as the reference:
# new_anchor_generator = get_new_anchor_generator(1080, feature_map_shapes)
# # Replace the model's anchor generator:
# model.anchor_generator = new_anchor_generator


# # Modify classification head to have 2 classes 
# model.head.classification_head.num_classes = 2  # Update class count
# model.head.classification_head = ssdlite.SSDLiteClassificationHead(
#     in_channels = in_channels,
#     num_anchors = [num_anchors], 
#     num_classes = 2,
#     norm_layer = nn.BatchNorm2d
# )

# Compute the number of feature maps from the backbone
dummy_input = torch.randn(1, 3, 1080, 1920)
features = model.backbone(dummy_input)
feature_map_shapes = [f.shape[-2:] for f in features.values()]
num_feature_maps = len(feature_map_shapes)

# Define aspect ratios (one per feature map)
aspect_ratios = [[1.0, 2.0, 0.5]] * num_feature_maps

# Compute the number of anchors per feature map:
# (Typically, it's len(aspect_ratios[i]) + 1 for the extra anchor)
num_anchors_list = [len(ratios) + 1 for ratios in aspect_ratios]  # e.g. [4, 4, ..., 4]

def get_new_anchor_generator(input_size, feature_map_shapes, aspect_ratios=None):
    num_feature_maps = len(feature_map_shapes)
    s_min, s_max = 0.2, 0.9
    scales = [s_min + (s_max - s_min) * k / num_feature_maps for k in range(num_feature_maps + 1)]
    
    if aspect_ratios is None:
        aspect_ratios = [[1.0, 2.0, 0.5]] * num_feature_maps
    
    anchor_generator = DefaultBoxGenerator(aspect_ratios, scales=scales)
    return anchor_generator

new_anchor_generator = get_new_anchor_generator(1080, feature_map_shapes, aspect_ratios=aspect_ratios)
model.anchor_generator = new_anchor_generator

# Compute the in_channels for each feature map head as before:
in_channels = [list(m.parameters())[0].shape[0] for m in model.head.classification_head.module_list]

model.head.classification_head = ssdlite.SSDLiteClassificationHead(
    in_channels=in_channels,
    num_anchors=num_anchors_list,  # now a list for each feature map
    num_classes=2,
    norm_layer=nn.BatchNorm2d
)

model.head.regression_head = ssdlite.SSDLiteRegressionHead(
    in_channels=in_channels,
    num_anchors=num_anchors_list,
    norm_layer=nn.BatchNorm2d
)


# Modify for 1080p Input
model.size = (1080, 1920)

# Freeze all layers by default
for param in model.parameters():
    param.requires_grad = False

# # Unfreeze the first few layers of the backbone
# for layer in list(model.backbone.features)[:1]:  # Modify the number as needed
#     for param in layer.parameters():
#         param.requires_grad = True

# Unfreeze the last layers of the detection head (classification + box regression)
for param in model.head.classification_head.parameters():
    param.requires_grad = True

for param in model.head.regression_head.parameters():
    param.requires_grad = True

# # Print which layers are trainable
# trainable_layers = [name for name, param in model.named_parameters() if param.requires_grad]
# print("Trainable layers:", trainable_layers)



In [None]:
# Initialize lists to store losses
train_losses = []
eval_losses = []

# Live plotting function
def plot_losses():
    plt.figure(figsize=(8, 5))
    plt.plot(train_losses, label="Train Loss", marker="o")
    plt.plot(eval_losses, label="Eval Loss", marker="o", linestyle="dashed")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.title("Training vs Evaluation Loss")
    plt.legend()
    plt.grid()
    plt.show(block=False)
    plt.pause(0.1)  # Pause to update the plot

In [None]:
optimizer = optim.Adam(model.parameters(), lr=0.001)
# Training Loop with tqdm & Live Graph
def train_model(model, train_loader, test_loader, num_epochs=10):
    model.train()
    for epoch in range(num_epochs):
        total_train_loss = 0
        # tqdm progress bar for training
        progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}", unit="batch")
        for images, targets in progress_bar:
            optimizer.zero_grad()
            # Move data to GPU if available
            images = [img.to(device) for img in images]
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            # Forward pass
            outputs = model(images, targets)

            # Compute loss
            # loss = criterion(outputs, targets)
            loss = sum(loss for loss in outputs.values())
            loss.backward()
            optimizer.step()
            
            total_train_loss += loss.item()
            progress_bar.set_postfix(loss=f"{loss.item():.4f}")

        avg_train_loss = total_train_loss / len(train_loader)
        train_losses.append(avg_train_loss)

        # Evaluate Model After Each Epoch
        avg_eval_loss = evaluate_model(model, test_loader)
        eval_losses.append(avg_eval_loss)

        # Print Epoch Summary
        print(f"\nEpoch {epoch+1}: Train Loss = {avg_train_loss:.4f}, Eval Loss = {avg_eval_loss:.4f}")


# Evaluation Function with tqdm Progress Bar
def evaluate_model(model, test_loader):
    model.train()
    total_loss = 0

    with torch.no_grad():
        progress_bar = tqdm(test_loader, desc="Evaluating", unit="batch")
        for images, targets in progress_bar:
            images = [img.to(device) for img in images]
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            # When targets are provided, the model returns a dictionary of losses.
            loss_dict = model(images, targets)
            loss = sum(loss for loss in loss_dict.values())
            total_loss += loss.item()

            progress_bar.set_postfix(loss=f"{loss.item():.4f}")

    avg_loss = total_loss / len(test_loader)
    return avg_loss

# Check if CUDA is available
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device = torch.device("cpu")

In [None]:
os.environ["CUDA_LAUNCH_BLOCKING"] = "1"
os.environ["TORCH_USE_CUDA_DSA"] = "1"
print(device)
model.to(device)
for _ in range(1):
    # Train Model with Evaluation Each Epoch
    train_model(model, train_loader, test_loader, num_epochs=5)
    plot_losses()
    torch.save(model.state_dict(), r"C:\PROJECTS\MILO\MILO\FaceRecognition\model.pth")
    

In [None]:
# Convert to ONNX with 1080p Input
dummy_input = torch.randn(1, 3, 1080, 1920).to(device)  # Adjusted for 1080p
torch.onnx.export(model, dummy_input, ["ssd_1080p.onnx"], dynamo=True)

In [None]:
torch.save(model.state_dict(), r"C:\PROJECTS\MILO\MILO\FaceRecognition\model.pth")

In [None]:
model = TheModelClass(*args, **kwargs)
model.load_state_dict(torch.load(PATH, weights_only=True))
model.eval()