In [2]:
import os
import cv2
import dlib
import numpy as np
import random

In [3]:
# Paths for input and output
real_videos_dir = "F:/dataset/DeeperForensics-1.0/source_videos"
fake_videos_dir = "F:/dataset/DeeperForensics-1.0/manipulated_videos/end_to_end_level_1"

# Define dataset paths for real emotions
check_real_faces_sad = "E:/dataset_1/sad/real"
check_optical_flow_sad = "E:/dataset_1/sad/optical_flow" 
check_edges_sad = "E:/dataset_1/sad/edges"

check_real_faces_angry = "E:/dataset_1/angry/real"
check_optical_flow_angry = "E:/dataset_1/angry/optical_flow" 
check_edges_angry = "E:/dataset_1/angry/edges"

check_real_faces_contempt = "E:/dataset_1/contempt/real"
check_optical_flow_contempt = "E:/dataset_1/contempt/optical_flow" 
check_edges_contempt = "E:/dataset_1/contempt/edges"

check_real_faces_disgust = "E:/dataset_1/disgust/real"
check_optical_flow_disgust = "E:/dataset_1/disgust/optical_flow" 
check_edges_disgust = "E:/dataset_1/disgust/edges"

check_real_faces_fear = "E:/dataset_1/fear/real"
check_optical_flow_fear = "E:/dataset_1/fear/optical_flow" 
check_edges_fear = "E:/dataset_1/fear/edges"

check_real_faces_happy = "E:/dataset_1/happy/real"
check_optical_flow_happy = "E:/dataset_1/happy/optical_flow" 
check_edges_happy = "E:/dataset_1/happy/edges"

check_real_faces_neutral = "E:/dataset_1/neutral/real"
check_optical_flow_neutral = "E:/dataset_1/neutral/optical_flow"
check_edges_neutral = "E:/dataset_1/neutral/edges"

check_real_faces_surprise = "E:/dataset_1/surprise/real"
check_optical_flow_surprise = "E:/dataset_1/surprise/optical_flow" 
check_edges_surprise = "E:/dataset_1/surprise/edges"

# Define dataset paths for fake images 
check_fake_faces = "E:/dataset_1/happy/fake"
check_fake_optical_flow = "E:/dataset_1/happy/optical_flow"
check_fake_edges = "E:/dataset_1/happy/edges"

#Save paths
output_real_faces = "E:/dataset_1/test_set/real"
output_fake_faces = "E:/dataset_1/test_set/fake"
output_optical_flow = "E:/dataset_1/test_set/optical_flow"
output_edges = "E:/dataset_1/test_set/edges"

# Create directories if not exist
for path in [output_real_faces, output_optical_flow, output_edges, output_fake_faces]:
    os.makedirs(path, exist_ok=True)

# Load face detector
detector = dlib.get_frontal_face_detector()
predictor_path = "D:/school/Research/TC-FET-Deepfake-Detector/1preprocessing/shape_predictor_68_face_landmarks.dat"  
predictor = dlib.shape_predictor(predictor_path)

def compute_dense_optical_flow(prev_frame, next_frame):
    """Computes dense optical flow and converts it into an RGB representation."""
    prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
    next_gray = cv2.cvtColor(next_frame, cv2.COLOR_BGR2GRAY)

    flow = cv2.calcOpticalFlowFarneback(prev_gray, next_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
    magnitude, angle = cv2.cartToPolar(flow[..., 0], flow[..., 1])
    
    hsv = np.zeros_like(prev_frame)
    hsv[..., 1] = 255
    hsv[..., 0] = angle * 180 / np.pi / 2
    hsv[..., 2] = cv2.normalize(magnitude, None, 0, 255, cv2.NORM_MINMAX)

    return cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)

def extract_edges(frame):
    """Extracts edges from a frame using the Canny edge detection method."""
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    edges = cv2.Canny(gray, 100, 200)
    return cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR)

def process_video(video_path, save_faces_dir, save_flow_dir, save_edges_dir):
    """Extracts faces and ensures optical flow and edge maps match the face frames."""
    folder_name = os.path.basename(os.path.dirname(video_path))  # Get folder name for naming
    cap = cv2.VideoCapture(video_path)
    ret, prev_frame = cap.read()
    frame_count = 0

    # Counters for statistics
    face_count = 0
    flow_count = 0
    edge_count = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        faces = detector(gray)

        if faces:  # Only process optical flow and edges if a face is found
            for i, face in enumerate(faces):
                landmarks = predictor(gray, face)
                interocular_distance = landmarks.part(42).x - landmarks.part(39).x

                x, y, w, h = face.left(), face.top(), face.width(), face.height()
                size = 5 * interocular_distance  # Increase bounding box size

                x1 = max(0, x - size)
                y1 = max(0, y - size)
                x2 = min(frame.shape[1], x + size)
                y2 = min(frame.shape[0], y + size)

                face_crop = frame[y1:y2, x1:x2]

                if face_crop.shape[0] > 0 and face_crop.shape[1] > 0:  # Ensure valid cropping
                    face_resized = cv2.resize(face_crop, (299, 299))

                    filename_base = f"{folder_name}_{os.path.basename(video_path)}_frame{frame_count}_face{i}"
                    filename_face = f"{filename_base}.jpg"
                    filename_flow = f"{filename_base}_flow.jpg"
                    filename_edges = f"{filename_base}_edges.jpg"

                    # Save face
                    cv2.imwrite(os.path.join(save_faces_dir, filename_face), face_resized)
                    face_count += 1

                    # Compute optical flow and crop to face region
                    if prev_frame is not None:
                        flow_map = compute_dense_optical_flow(prev_frame, frame)
                        flow_crop = flow_map[y1:y2, x1:x2]
                        if flow_crop.shape[0] > 0 and flow_crop.shape[1] > 0:
                            flow_resized = cv2.resize(flow_crop, (299, 299))
                            cv2.imwrite(os.path.join(save_flow_dir, filename_flow), flow_resized)
                            flow_count += 1

                    # Compute edges and crop to face region
                    edge_map = extract_edges(frame)
                    edge_crop = edge_map[y1:y2, x1:x2]
                    if edge_crop.shape[0] > 0 and edge_crop.shape[1] > 0:
                        edge_resized = cv2.resize(edge_crop, (299, 299))
                        cv2.imwrite(os.path.join(save_edges_dir, filename_edges), edge_resized)
                        edge_count += 1

        prev_frame = frame
        frame_count += 1

    cap.release()

    # Print statistics for this video
    print(f"\n Video Processed: {os.path.basename(video_path)}")
    print(f"    Faces Extracted: {face_count}")
    print(f"    Optical Flow Maps: {flow_count}")
    print(f"    Edge Maps: {edge_count}")
    print("=" * 50)

def traverse_and_process(root_dir, save_faces_dir, save_flow_dir, save_edges_dir, filter, 
                         check_faces_dir, check_flow_dir, check_edges_dir, n, check=False):
    """
    Traverses folders recursively and collects all video paths.
    Then shuffles them to process in a random order, optionally filtering 
    based on 'disgust', and skips videos that are already processed.
    Processes up to n videos.
    """
    # Collect all valid video paths
    video_paths = []
    for root, _, files in os.walk(root_dir):
        # Skip if we must only process 'disgust' folders and this folder isn't one
        if check and filter not in root.lower():
            continue
        
        for file in files:
            if file.endswith(".mp4") or file.endswith(".avi"):
                full_path = os.path.join(root, file)
                video_paths.append(full_path)

    # Shuffle the list of video paths
    random.shuffle(video_paths)

    # Initialize counter for processed videos
    processed_count = 0

    # Now iterate over videos in random order
    for video_path in video_paths:
        # Break out of the loop if we've processed n videos
        if processed_count >= n:
            print(f"Processed {n} videos, stopping further processing.")
            break

        folder_name = os.path.basename(os.path.dirname(video_path))
        file_name = os.path.basename(video_path)

        # Generate expected output filenames
        base_filename = f"{folder_name}_{file_name}_frame"

        # Check if at least one output exists
        processed = any(
            filename.startswith(base_filename)
            for directory in [check_faces_dir, check_flow_dir, check_edges_dir]
            for filename in os.listdir(directory)
        )

        if processed:
            print(f"Skipping {video_path} (already processed)")
            continue

        print(f"Processing {video_path}...")
        process_video(video_path, save_faces_dir, save_flow_dir, save_edges_dir)
        processed_count += 1


# Process real videos (source_videos) but only if 'angry' is in the path
traverse_and_process(real_videos_dir, output_real_faces, output_optical_flow, output_edges, "angry", check_real_faces_angry, check_optical_flow_angry, check_edges_angry, 20, check=True)
# Process real videos (source_videos) but only if 'happy' is in the path
traverse_and_process(real_videos_dir, output_real_faces, output_optical_flow, output_edges, "happy", check_real_faces_happy, check_optical_flow_happy, check_edges_happy, 20, check=True)
# Process real videos (source_videos) but only if 'sad' is in the path
traverse_and_process(real_videos_dir, output_real_faces, output_optical_flow, output_edges, "sad", check_real_faces_sad, check_optical_flow_sad, check_edges_sad, 20, check=True)
# Process real videos (source_videos) but only if 'disgust' is in the path
traverse_and_process(real_videos_dir, output_real_faces, output_optical_flow, output_edges, "disgust", check_real_faces_disgust, check_optical_flow_disgust, check_edges_disgust, 20, check=True)
# Process real videos (source_videos) but only if 'neutral' is in the path
traverse_and_process(real_videos_dir, output_real_faces, output_optical_flow, output_edges, "neutral", check_real_faces_neutral, check_optical_flow_neutral, check_edges_neutral, 20, check=True)
# Process real videos (source_videos) but only if 'surprise' is in the path
traverse_and_process(real_videos_dir, output_real_faces, output_optical_flow, output_edges, "surprise", check_real_faces_surprise, check_optical_flow_surprise, check_edges_surprise, 20, check=True)
# Process real videos (source_videos) but only if 'contempt' is in the path
traverse_and_process(real_videos_dir, output_real_faces, output_optical_flow, output_edges, "contempt", check_real_faces_contempt, check_optical_flow_contempt, check_edges_contempt, 20, check=True)
# Process real videos (source_videos) but only if 'fear' is in the path
traverse_and_process(real_videos_dir, output_real_faces, output_optical_flow, output_edges, "fear", check_real_faces_fear, check_optical_flow_fear, check_edges_fear, 20, check=True)
# Process fake videos (manipulated_videos) 
traverse_and_process(fake_videos_dir, output_fake_faces, output_optical_flow, output_edges, "", check_fake_faces, check_fake_optical_flow, check_fake_edges, 160, check=False)

print("\nProcessing complete!")


Processing F:/dataset/DeeperForensics-1.0/source_videos\W135\light_down\angry\camera_leftfront\W135_light_down_angry_camera_leftfront.mp4...

 Video Processed: W135_light_down_angry_camera_leftfront.mp4
    Faces Extracted: 376
    Optical Flow Maps: 376
    Edge Maps: 376
Processing F:/dataset/DeeperForensics-1.0/source_videos\W015\light_left\angry\camera_leftfront\W015_light_left_angry_camera_leftfront.mp4...

 Video Processed: W015_light_left_angry_camera_leftfront.mp4
    Faces Extracted: 376
    Optical Flow Maps: 376
    Edge Maps: 376
Processing F:/dataset/DeeperForensics-1.0/source_videos\M133\light_leftdown\angry\camera_right\M133_light_leftdown_angry_camera_right.mp4...

 Video Processed: M133_light_leftdown_angry_camera_right.mp4
    Faces Extracted: 370
    Optical Flow Maps: 370
    Edge Maps: 370
Processing F:/dataset/DeeperForensics-1.0/source_videos\W018\light_down\angry\camera_front\W018_light_down_angry_camera_front.mp4...

 Video Processed: W018_light_down_angry_came