<a href="https://colab.research.google.com/github/ehdrjs4502/mtcnn-face-mosaic/blob/main/face_mosaic.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# WIDER FACE Training Images
!curl -fSLo WIDER_val.zip https://huggingface.co/datasets/CUHK-CSE/wider_face/resolve/main/data/WIDER_val.zip?download=true
!unzip WIDER_val.zip -d /content/WIDER_FACE/


In [None]:
!curl -fSLo wider_face_split.zip http://shuoyang1213.me/WIDERFACE/support/bbx_annotation/wider_face_split.zip
!unzip wider_face_split.zip -d /content/WIDER_FACE/

In [None]:
!pip uninstall -y torch torchvision torchaudio
!pip uninstall -y facenet-pytorch

!pip install torch torchvision
!pip install facenet-pytorch

In [None]:
import os
import json

def convert_ground_truth_to_json_partial(gt_txt_path, json_output_path, image_dir, fraction=10):
    """
    Ground Truth TXT 파일의 일부를 JSON으로 변환 (fraction만큼 샘플링)
    gt_txt_path: Ground Truth TXT 파일 경로
    json_output_path: 변환된 JSON 저장 경로
    image_dir: 이미지 디렉토리 경로
    fraction: 사용할 데이터 비율 (e.g., 10이면 1/10)
    """
    gt_dict = {}
    with open(gt_txt_path, 'r') as f:
        lines = f.readlines()
        i = 0
        counter = 0  # 데이터 샘플링을 위한 카운터
        while i < len(lines):
            # 샘플링
            if counter % fraction != 0:  # fraction 비율로 데이터 선택
                i += 1
                while i < len(lines) and lines[i].strip().isdigit() is False:
                    i += 1
                counter += 1
                continue

            # 이미지 파일명 (첫 번째 줄)
            filename = lines[i].strip()  # 예: "0--Parade/0_Parade_marchingband_1_1.jpg"
            i += 1

            # 얼굴 개수 (두 번째 줄)
            try:
                face_count = int(lines[i].strip())
            except ValueError:
                print(f"Error reading face count for {filename}, skipping...")
                continue
            i += 1

            # 얼굴 경계 상자 (face_count 줄만큼)
            face_boxes = []
            for _ in range(face_count):
                try:
                    box = list(map(int, lines[i].strip().split()[:4]))
                    x, y, w, h = box
                    face_boxes.append([x, y, x + w, y + h])  # [x1, y1, x2, y2]
                    i += 1
                except ValueError:
                    print(f"Error reading bounding box for {filename}, skipping box...")
                    continue

            # 이미지 경로가 유효한 경우만 추가
            full_path = os.path.join(image_dir, filename)
            if os.path.exists(full_path):
                gt_dict[filename] = face_boxes
            else:
                print(f"Image not found: {full_path}, skipping...")

            counter += 1

    # JSON 파일로 저장
    with open(json_output_path, 'w') as json_file:
        json.dump(gt_dict, json_file, indent=4)
    print(f"Ground Truth JSON 저장 완료: {json_output_path}")




gt_txt_path = "/content/WIDER_FACE/wider_face_split/wider_face_val_bbx_gt.txt"
json_output_path = "/content/WIDER_FACE/ground_truth_val_partial.json"
image_dir = "/content/WIDER_FACE/WIDER_val/images"

# 데이터의 1/10만 변환
convert_ground_truth_to_json_partial(gt_txt_path, json_output_path, image_dir, fraction=10)


In [None]:
import os
import json
from PIL import Image
import torch
from facenet_pytorch import MTCNN

# MTCNN 초기화
device = 'cuda' if torch.cuda.is_available() else 'cpu'
mtcnn = MTCNN(keep_all=True, device=device)

# IoU 계산 함수
def calculate_iou(box1, box2):
    """
    box1, box2: [x1, y1, x2, y2] 형태의 경계 상자
    """
    x1, y1, x2, y2 = max(box1[0], box2[0]), max(box1[1], box2[1]), min(box1[2], box2[2]), min(box1[3], box2[3])
    intersection = max(0, x2 - x1) * max(0, y2 - y1)
    box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1])
    box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1])
    union = box1_area + box2_area - intersection
    return intersection / union if union > 0 else 0

# 데이터셋 평가 함수
def evaluate_dataset(dataset_path, ground_truth_path, iou_threshold=0.5):
    """
    dataset_path: 이미지 파일 경로
    ground_truth_path: Ground Truth 경계 상자 파일 경로 (JSON)
    """
    total_faces = 0
    detected_faces = 0
    true_positives = 0
    false_positives = 0

    # Load ground truth
    with open(ground_truth_path, 'r') as f:
        ground_truth = json.load(f)  # {image_filename: [[x1, y1, x2, y2], ...]}

    total_images = len(ground_truth)
    print(f"평가할 이미지 수: {total_images}")

    for idx, (img_name, gt_boxes) in enumerate(ground_truth.items(), start=1):
        img_path = os.path.join(dataset_path, img_name)
        if not os.path.exists(img_path):
            print(f"[{idx}/{total_images}] 이미지가 존재하지 않습니다: {img_name}, 건너뜁니다.")
            continue

        # 진행 상황 출력
        print(f"[{idx}/{total_images}] 처리 중: {img_name}")

        # 이미지 열기
        image = Image.open(img_path).convert('RGB')

        # MTCNN으로 얼굴 검출
        detected_boxes, _ = mtcnn.detect(image)

        if detected_boxes is None:
            detected_boxes = []

        total_faces += len(gt_boxes)
        detected_faces += len(detected_boxes)

        # 검출된 박스 평가
        for d_box in detected_boxes:
            match_found = False
            for gt_box in gt_boxes:
                iou = calculate_iou(d_box, gt_box)
                if iou >= iou_threshold:
                    true_positives += 1
                    match_found = True
                    break
            if not match_found:
                false_positives += 1

    precision = true_positives / detected_faces if detected_faces > 0 else 0
    recall = true_positives / total_faces if total_faces > 0 else 0
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

    print(f"\n평가 완료!")
    print(f"Precision: {precision:.2f}")
    print(f"Recall: {recall:.2f}")
    print(f"F1-Score: {f1_score:.2f}")
    return precision, recall, f1_score



# 4. 평가 실행
dataset_path = "/content/WIDER_FACE/WIDER_val/images"  # 이미지 파일 경로
ground_truth_path = "/content/WIDER_FACE/ground_truth_val_partial.json"  # 변환된 JSON 파일 경로

precision, recall, f1_score = evaluate_dataset(dataset_path, ground_truth_path)

# 하이퍼파라미터 설정 및 평가 실행
iou_thresholds = [0.3, 0.5, 0.7]
mtcnn_params = [
    {"thresholds": [0.6, 0.7, 0.7], "min_face_size": 20},  # 기본값
    {"thresholds": [0.5, 0.6, 0.6], "min_face_size": 30},
    {"thresholds": [0.7, 0.8, 0.8], "min_face_size": 40},
]

# 결과 저장용 리스트
results = []

for iou_threshold in iou_thresholds:
    for params in mtcnn_params:
        print(f"\nEvaluating with IoU: {iou_threshold}, Params: {params}")

        # MTCNN 초기화
        mtcnn = MTCNN(
            keep_all=True,
            device=device,
            thresholds=params["thresholds"],
            min_face_size=params["min_face_size"],
        )

        # 평가 실행
        precision, recall, f1_score = evaluate_dataset(
            dataset_path, ground_truth_path, iou_threshold=iou_threshold
        )

        # 결과 저장
        results.append({
            "iou_threshold": iou_threshold,
            "thresholds": params["thresholds"],
            "min_face_size": params["min_face_size"],
            "precision": precision,
            "recall": recall,
            "f1_score": f1_score,
        })

# 결과 출력
print("\n===== 하이퍼파라미터 변경 결과 =====")
for result in results:
    print(
        f"IoU: {result['iou_threshold']}, "
        f"Thresholds: {result['thresholds']}, "
        f"Min Face Size: {result['min_face_size']}, "
        f"Precision: {result['precision']:.2f}, "
        f"Recall: {result['recall']:.2f}, "
        f"F1-Score: {result['f1_score']:.2f}"
    )


In [None]:
from facenet_pytorch import MTCNN
from PIL import Image
import cv2
import numpy as np
from google.colab.patches import cv2_imshow
import torch

# MTCNN 초기화
device = 'cuda' if torch.cuda.is_available() else 'cpu'
mtcnn = MTCNN(keep_all=True, device=device, thresholds=[0.7, 0.8, 0.8])

# 다양한 필터 함수
def apply_filter(image, face_coordinates, filter_type='mosaic', mosaic_level=15):
    """
    얼굴 영역에 다양한 필터 적용
    image: 원본 이미지 (numpy array)
    face_coordinates: 얼굴 경계 상자 리스트 [[x1, y1, x2, y2], ...]
    filter_type: 'mosaic', 'blur', 'pixelate'
    mosaic_level: 모자이크 크기
    """
    h, w, _ = image.shape  # 이미지 크기 확인

    for box in face_coordinates:
        x1, y1, x2, y2 = [int(coord) for coord in box]

        # 좌표가 이미지 크기를 초과하지 않도록 클리핑 처리
        x1, y1 = max(0, x1), max(0, y1)
        x2, y2 = min(w, x2), min(h, y2)

        # 잘못된 경계 상자 필터링 (너비나 높이가 0 이하인 경우)
        if x2 <= x1 or y2 <= y1:
            continue

        face = image[y1:y2, x1:x2]

        if filter_type == 'mosaic':
            face = cv2.resize(face, (max(1, (x2 - x1) // mosaic_level), max(1, (y2 - y1) // mosaic_level)), interpolation=cv2.INTER_LINEAR)
            face = cv2.resize(face, (x2 - x1, y2 - y1), interpolation=cv2.INTER_NEAREST)

        elif filter_type == 'blur':
            face = cv2.GaussianBlur(face, (99, 99), 30)

        elif filter_type == 'pixelate':
            face = cv2.resize(face, (10, 10), interpolation=cv2.INTER_NEAREST)
            face = cv2.resize(face, (x2 - x1, y2 - y1), interpolation=cv2.INTER_NEAREST)

        image[y1:y2, x1:x2] = face

    return image

# 얼굴 검출 함수
def detect_faces(image):
    """
    이미지에서 얼굴 검출
    """
    pil_image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
    boxes, _ = mtcnn.detect(pil_image)
    if boxes is None:
        return []
    return boxes

# 이미지 처리 함수
def process_image(image_path, output_path="output.jpg", filter_type='mosaic', mosaic_level=15):
    image = cv2.imread(image_path)
    faces = detect_faces(image)
    if len(faces) == 0:
        print("얼굴이 감지되지 않았습니다.")
        return
    print(f"{len(faces)}개의 얼굴을 감지했습니다.")
    processed_image = apply_filter(image, faces, filter_type, mosaic_level)
    cv2.imwrite(output_path, processed_image)
    cv2_imshow(processed_image)

# 동영상 처리 함수
def process_video(video_path, output_path="output_video.avi", filter_type='mosaic', mosaic_level=15):
    """
    동영상의 각 프레임에 얼굴 검출 및 필터 적용
    """
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"동영상을 열 수 없습니다: {video_path}")
        return

    # 동영상 저장 설정
    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    out = cv2.VideoWriter(output_path, fourcc, int(cap.get(cv2.CAP_PROP_FPS)),
                          (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))))

    frame_count = 0  # 처리된 프레임 수

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        frame_count += 1

        # 얼굴 검출
        faces = detect_faces(frame)
        if faces is not None and len(faces) > 0:
            # 얼굴이 있는 경우 필터 적용
            frame = apply_filter(frame, faces, filter_type, mosaic_level)

        # 프레임 저장
        out.write(frame)

        # 중간 진행 상황 출력
        if frame_count % 10 == 0:
            print(f"Processed {frame_count} frames...")

    cap.release()
    out.release()
    print(f"동영상 처리가 완료되었습니다. 저장 경로: {output_path}")


# 실행
if __name__ == "__main__":
    # 이미지 처리
    input_image = "image.jpg"  # 입력 이미지 경로
    process_image(input_image, filter_type='pixelate', mosaic_level=15)

    # 동영상 처리
    # input_video = "video.mp4"  # 입력 동영상 경로
    # process_video(input_video, filter_type='pixelate', mosaic_level=15)
