In [32]:
import os
import cv2
import numpy as np
import random
from PIL import Image
import torch
from torchvision import transforms
from pathlib import Path
import re

# 1. Main processing pipeline
def process_all_videos(vid_folder, output_folder):
    os.makedirs(output_folder, exist_ok=True)
    for video_file in os.listdir(vid_folder):
        full_path = os.path.join(vid_folder, video_file)
        extract_frames(full_path, output_folder)
    add_noise(output_folder)
    rename_images(output_folder)

# 2. Extract frames
def extract_frames(video_path, output_dir, output_size=(256, 256), fps=1, noise_ratio=0.1):
    cap = cv2.VideoCapture(video_path)
    count = 0
    frame_rate = int(cap.get(cv2.CAP_PROP_FPS) / fps)
    vid_name = os.path.splitext(os.path.basename(video_path))[0]

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        if count % frame_rate == 0:
            output_path = os.path.join(output_dir, f"{vid_name}_frame_{count}.jpg")
            frame_resized = cv2.resize(frame, output_size)
            cv2.imwrite(output_path, frame_resized)

            # augment
            augment_data(frame_resized, output_path)

            # noisy
            if random.random() < noise_ratio:
                std = random.uniform(0.45, 0.6)
                noisy_img = gaussian_noise(frame_resized, std=std)
                noisy_output = output_path.replace('.jpg', '_noisy.jpg')
                cv2.imwrite(noisy_output, noisy_img)

        count += 1
    cap.release()

# 3. Augment with torchvision
def augment_data(img, output_path, aug_count=2):
    pil_img = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))

    aug_transforms = transforms.Compose([
        transforms.RandomRotation(10),
        transforms.RandomResizedCrop(256, scale=(0.9, 1.1)),
        transforms.RandomHorizontalFlip(),
        transforms.ColorJitter(brightness=0.1),
        transforms.ToTensor()
    ])

    for i in range(aug_count):
        augmented = aug_transforms(pil_img)
        aug_img = transforms.ToPILImage()(augmented)
        aug_img.save(f"{output_path}_aug_{i}.jpg")

# 4. Gaussian noise
def gaussian_noise(img, mean=0, std = 0):
    noise = np.random.normal(mean, std, img.shape).astype(np.uint8)
    return cv2.add(img, noise)

# 5. Add noise to random images
def add_noise(folder_path, ratio=0.1):
    images = [f for f in os.listdir(folder_path) if f.endswith('.jpg')]
    sample_size = int(len(images) * ratio)
    selected = random.sample(images, sample_size)

    for img_file in selected:
        img_path = os.path.join(folder_path, img_file)
        img = cv2.imread(img_path)
        noisy_img = gaussian_noise(img)
        output_path = img_path.replace('.jpg', '_noisy.jpg')
        cv2.imwrite(output_path, noisy_img)

# 6. Rename image files
def rename_images(folder_path):
    pat = re.compile(r"^(.*?)(?:\.jpg)?(?:_aug_(\d+))?(?:_noisy)*\.jpg$", re.VERBOSE)

    for p in Path(folder_path).glob("*.jpg"):
        m = pat.match(p.name)
        if not m:
            print(f"SKIP {p.name}")
            continue

        base, aug_idx = m.group(1), m.group(2)
        base = re.sub(r'(?:_noisy)+$', '', base)
        aug_part = f"_aug_{aug_idx}" if aug_idx else ""
        noisy_part = "_noisy" if "_noisy" in p.name else ""
        new_name = f"{base}{aug_part}{noisy_part}.jpg"
        new_path = p.with_name(new_name)

        if not new_path.exists():
            print(f"{p.name} -> {new_path.name}")
            p.rename(new_path)

In [34]:
# Main run
if __name__ == "__main__":
    process_all_videos("newvid", "outputimg")

01_02__meeting_serious__YVGY8LOK_frame_0.jpg_aug_0.jpg -> 01_02__meeting_serious__YVGY8LOK_frame_0_aug_0.jpg
01_02__meeting_serious__YVGY8LOK_frame_0.jpg_aug_1.jpg -> 01_02__meeting_serious__YVGY8LOK_frame_0_aug_1.jpg
01_02__meeting_serious__YVGY8LOK_frame_1008.jpg_aug_0.jpg -> 01_02__meeting_serious__YVGY8LOK_frame_1008_aug_0.jpg
01_02__meeting_serious__YVGY8LOK_frame_1008.jpg_aug_1.jpg -> 01_02__meeting_serious__YVGY8LOK_frame_1008_aug_1.jpg
01_02__meeting_serious__YVGY8LOK_frame_1032.jpg_aug_0.jpg -> 01_02__meeting_serious__YVGY8LOK_frame_1032_aug_0.jpg
01_02__meeting_serious__YVGY8LOK_frame_1032.jpg_aug_1.jpg -> 01_02__meeting_serious__YVGY8LOK_frame_1032_aug_1.jpg
01_02__meeting_serious__YVGY8LOK_frame_120.jpg_aug_0.jpg -> 01_02__meeting_serious__YVGY8LOK_frame_120_aug_0.jpg
01_02__meeting_serious__YVGY8LOK_frame_120.jpg_aug_1.jpg -> 01_02__meeting_serious__YVGY8LOK_frame_120_aug_1.jpg
01_02__meeting_serious__YVGY8LOK_frame_144.jpg_aug_0.jpg -> 01_02__meeting_serious__YVGY8LOK_fra

In [36]:
from retinaface.pre_trained_models import get_model
from pathlib import Path


def run_retinaface_on_folder(input_folder="outputimg", output_folder="outputimg_faces", conf_threshold=0.6, model_type="resnet50_2020-07-20"):

    input_path = Path(input_folder)
    output_path = Path(output_folder)
    output_path.mkdir(exist_ok=True)

    # Load RetinaFace model
    model = get_model(model_type, max_size=1024)
    model.eval()

    for img_file in input_path.glob("*.jpg"):
        img = cv2.imread(str(img_file))
        if img is None:
            print(f"Failed to load {img_file.name}")
            continue

        results = model.predict_jsons(img)

        for face in results:
            if face['score'] < conf_threshold:
                continue
            x1, y1, x2, y2 = map(int, face['bbox'])
            cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 2)

        out_path = output_path / img_file.name
        cv2.imwrite(str(out_path), img)
        print(f"Processed {img_file.name} -> {out_path.name}")

In [38]:
run_retinaface_on_folder("outputimg", "facedetect")

Processed 01_02__meeting_serious__YVGY8LOK_frame_0.jpg -> 01_02__meeting_serious__YVGY8LOK_frame_0.jpg
Processed 01_02__meeting_serious__YVGY8LOK_frame_0_aug_0.jpg -> 01_02__meeting_serious__YVGY8LOK_frame_0_aug_0.jpg
Processed 01_02__meeting_serious__YVGY8LOK_frame_0_aug_1.jpg -> 01_02__meeting_serious__YVGY8LOK_frame_0_aug_1.jpg
Processed 01_02__meeting_serious__YVGY8LOK_frame_1008.jpg -> 01_02__meeting_serious__YVGY8LOK_frame_1008.jpg
Processed 01_02__meeting_serious__YVGY8LOK_frame_1008_aug_0.jpg -> 01_02__meeting_serious__YVGY8LOK_frame_1008_aug_0.jpg
Processed 01_02__meeting_serious__YVGY8LOK_frame_1008_aug_1.jpg -> 01_02__meeting_serious__YVGY8LOK_frame_1008_aug_1.jpg
Processed 01_02__meeting_serious__YVGY8LOK_frame_1032.jpg -> 01_02__meeting_serious__YVGY8LOK_frame_1032.jpg
Processed 01_02__meeting_serious__YVGY8LOK_frame_1032_aug_0.jpg -> 01_02__meeting_serious__YVGY8LOK_frame_1032_aug_0.jpg
Processed 01_02__meeting_serious__YVGY8LOK_frame_1032_aug_1.jpg -> 01_02__meeting_seri

KeyboardInterrupt: 