# CS5805 Final Project: YOLO Implementation on Facial Detection/Pixelization
### Corey Huang, Allison Deaton, Ken Lin, Rebecca Yeung

In [None]:
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
from ultralytics import YOLO
import torch
import random
import shutil

## Load Data

In [None]:
# Parse annotation files
def parse_annotations(annotation_file, images_dir):
    data = []
    with open(annotation_file, 'r') as f:
        lines = f.readlines()
    i = 0
    while i < len(lines):
        # Read image path
        relative_image_path = lines[i].strip()
        image_path = os.path.join(images_dir, relative_image_path)
        i += 1
        # Skip if image doesn't exist
        if not os.path.exists(image_path):
            while i < len(lines) and lines[i].strip().isdigit():
                i += 1
            continue
        # Number of faces in image
        num_faces = int(lines[i].strip())
        i += 1
        # Read bounding boxes
        bounding_boxes = []
        for _ in range(num_faces):
            if i >= len(lines):
                break
            # Get position of bounding box
            bounding_box = list(map(int, lines[i].strip().split()[:4]))
            x_min, y_min, width, height = bounding_box
            x_max = x_min + width
            y_max = y_min + height
            bounding_boxes.append([x_min, y_min, x_max, y_max])
            i += 1
        # Add image path and bounding boxes
        data.append({'image_path': image_path, 'bounding_boxes': bounding_boxes})
    return data

# Dataset directories
train_images_dir = "data/WIDER_train"
val_images_dir = "data/WIDER_val"
test_images_dir = "data/WIDER_test"

# Annotation files
train_annotation_file = "data/wider_face_split/wider_face_train_bbx_gt.txt"
val_annotation_file = "data/wider_face_split/wider_face_val_bbx_gt.txt"

# Load annotations for training and validation datasets
train_data = parse_annotations(train_annotation_file, train_images_dir)
val_data = parse_annotations(val_annotation_file, val_images_dir)

In [None]:
# Format data for YOLO using random samples
def sampled_yolo(data, output_dir, sample_size):
    # Clear directory of previous data
    if os.path.exists(output_dir):
        for filename in os.listdir(output_dir):
            file_path = os.path.join(output_dir, filename)
            if os.path.isfile(file_path):
                os.remove(file_path)
    # Make directory if it doesn't exist
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    # Random sample of data
    sampled_data = random.sample(data, min(sample_size, len(data)))
    for item in sampled_data:
        image_path = item['image_path']
        image_name = os.path.basename(image_path)
        annotation_name = os.path.splitext(image_name)[0] + ".txt"
        # Copy image to output directory
        shutil.copy(image_path, os.path.join(output_dir, image_name))
        # Create YOLO annotations
        yolo_annotations = []
        try:
            import cv2
            image = cv2.imread(image_path)
            height, width, _ = image.shape
        except Exception as e:
            continue
        for (x_min, y_min, x_max, y_max) in item['bounding_boxes']:
            x_center = (x_min + x_max) / 2 / width
            y_center = (y_min + y_max) / 2 / height
            box_width = (x_max - x_min) / width
            box_height = (y_max - y_min) / height
            yolo_annotations.append(f"0 {x_center} {y_center} {box_width} {box_height}")
        # Save annotations to file
        with open(os.path.join(output_dir, annotation_name), 'w') as f:
            f.write("\n".join(yolo_annotations))

# Output directories for sampled datasets
sampled_train_dir = "data/sampled_train"
sampled_val_dir = "data/sampled_val"
sampled_test_dir = "data/sampled_test"

# Number of images for sampling (train ~70%, val ~15%, test ~15%)
train_sample_size = 500
val_sample_size = 100
test_sample_size = 100

# Get sample datasets
sampled_yolo(train_data, sampled_train_dir, train_sample_size)
sampled_yolo(val_data, sampled_val_dir, val_sample_size)
sampled_yolo(val_data, sampled_test_dir, test_sample_size)

## Train Model

In [None]:
# Check if CUDA is available (GPU or CPU)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Using device: {device}")

# Load YOLO model
model = YOLO('yolov8s.pt')
model.to(device)

# Path to dataset YAML file (must change path inside file to your own)
data_yaml = 'data.yaml'

# Train model
model.train(
    data=data_yaml,
    epochs=50,
    imgsz=640,
    batch=16,
    workers=4,
    device=device
)

# Evaluate model with validation data
metrics = model.val()
print(metrics)

In [None]:
# Find results data
def find_results(base_dir):
    # List of results files
    results_files = []
    for root, dirs, files in os.walk(base_dir):
        if 'results.csv' in files:
            results_file = os.path.join(root, 'results.csv')
            results_files.append(results_file)
    if results_files:
        # Sort by latest results files
        results_files.sort(key=lambda x: os.path.getmtime(x), reverse=True)
        return results_files[0]
    return None

# Load results file
results_dir = find_results("runs/detect")
results_data = pd.read_csv(results_dir)

# Extract relevant data
epochs = results_data['epoch']
map50_95 = results_data['metrics/mAP50-95(B)']

# Plot mAP@50-95 over epochs
plt.figure(figsize=(7, 5))
plt.plot(epochs, map50_95, marker='o', linestyle='-', color='b', label='mAP@50-95')
plt.title('Performance Over Training Epochs', fontsize=16)
plt.xlabel('Epoch', fontsize=14)
plt.ylabel('mAP@50-95', fontsize=14)
plt.grid(True)
plt.legend(fontsize=12)
plt.tight_layout()
plt.show()

## Test Model

In [None]:
# Run model on test data
results = model.predict(
    source=sampled_test_dir,
    conf=0.5,
    imgsz=640,
    device=device,
    save=True,
    save_txt=True
)

## Pixelate Faces

In [None]:
# Pixelate faces for image
def pixelate_face(image_path, bounding_boxes):
    image = cv2.imread(image_path)
    for (x_min, y_min, x_max, y_max) in bounding_boxes:
        # Extract face region in bounding box
        face = image[y_min:y_max, x_min:x_max]
        # Resize region to 16x16 pixels using linear interpolation
        face = cv2.resize(face, (16, 16), interpolation=cv2.INTER_LINEAR)
        # Resize region back to bounding box size using nearest-neighbor interpolation
        face = cv2.resize(face, (x_max - x_min, y_max - y_min), interpolation=cv2.INTER_NEAREST)
        # Replace original image with pixelated face
        image[y_min:y_max, x_min:x_max] = face
    return image

# Apply pixelizations to images
def pixelization(image_dir, results_dir, output_dir):
    # Clear directory of previous data
    if os.path.exists(output_dir):
        for filename in os.listdir(output_dir):
            file_path = os.path.join(output_dir, filename)
            if os.path.isfile(file_path):
                os.remove(file_path)
    # Make directory if it doesn't exist
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    for file in os.listdir(results_dir):
        if not file.endswith('.txt'):
            continue
        image_path = os.path.join(image_dir, file.replace('.txt', '.jpg'))
        if not os.path.exists(image_path):
            continue
        bounding_boxes = []
        with open(os.path.join(results_dir, file), 'r') as f:
            for line in f:
                class_id, x_center, y_center, width, height = map(float, line.split())
                # Load image to calculate dimensions
                image = cv2.imread(image_path)
                img_height, img_width = image.shape[:2]
                x_min = int((x_center - width / 2) * img_width)
                y_min = int((y_center - height / 2) * img_height)
                x_max = int((x_center + width / 2) * img_width)
                y_max = int((y_center + height / 2) * img_height)
                bounding_boxes.append((x_min, y_min, x_max, y_max))
        # Pixelate faces in image
        pixelated_image = pixelate_face(image_path, bounding_boxes)
        # Save pixelated image to output directory
        output_path = os.path.join(output_dir, os.path.basename(image_path))
        cv2.imwrite(output_path, pixelated_image)

# Find directory for test data labels
def find_labels_dir(base_dir):
    # List of label directories
    labels_dirs = []
    for root, dirs, files in os.walk(base_dir):
        if 'labels' in dirs:
            labels_path = os.path.join(root, 'labels')
            labels_dirs.append(labels_path)
    if labels_dirs:
        # Sort by latest label directory
        labels_dirs.sort(key=lambda x: os.path.getmtime(x), reverse=True)
        return labels_dirs[0]
    return None

labels_dir = find_labels_dir("runs/detect")
pixelated_images_dir = "pixelated_images"

pixelation(sampled_test_dir, labels_dir, pixelated_images_dir)