In [11]:
import cv2
import pandas as pd
import numpy as np

# trim vdo

In [None]:
import cv2

def crop_frame(frame, crop_percentage=0.2):
    """Crop left and right sides of the frame, keeping the middle portion."""
    height, width = frame.shape[:2]
    crop_pixels = int(width * crop_percentage)
    x_start = crop_pixels
    x_end = width - crop_pixels
    return frame[:, x_start:x_end], x_start

def trim_video(input_path, output_path, start_time, end_time, crop_percentage=0.2):
    # Open the video file
    cap = cv2.VideoCapture(input_path)
    if not cap.isOpened():
        print("Error: Could not open video.")
        return

    # Get video properties
    fps = cap.get(cv2.CAP_PROP_FPS)  # Frames per second
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    # Calculate start and end frames
    start_frame = int(start_time * fps)
    end_frame = int(end_time * fps)

    # Validate start and end times
    if start_frame >= total_frames or end_frame > total_frames or start_frame >= end_frame:
        print("Error: Invalid start or end time.")
        cap.release()
        return

    # Calculate cropped frame width
    cropped_width = int(frame_width * (1 - 2 * crop_percentage))

    # Set up video writer with cropped dimensions
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Codec for MP4
    out = cv2.VideoWriter(output_path, fourcc, fps, (cropped_width, frame_height))

    # Set the starting frame
    cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)

    # Read and write frames within the specified range
    current_frame = start_frame
    while current_frame < end_frame:
        ret, frame = cap.read()
        if not ret:
            print("Error: Could not read frame.")
            break
        # Crop the frame
        cropped_frame, _ = crop_frame(frame, crop_percentage)
        out.write(cropped_frame)
        current_frame += 1

    # Release resources
    cap.release()
    out.release()
    print(f"Video trimmed and cropped successfully and saved as {output_path}")


In [13]:
input_video =  "./EMO-AffectNetModel-main/archive_vod/ice_trial0.mp4"
output_video = "output_trimmed_video.mp4"


start_time = 37  # Start at 5 seconds
end_time = 630   # End at 10 seconds

trim_video(input_video, output_video, start_time, end_time, crop_percentage=0.2)

Video trimmed and cropped successfully and saved as output_trimmed_video.mp4


# Detect head movement

In [12]:
import cv2
import numpy as np
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
from functools import partial

def detect_faces(frame_data, face_cascade, scale_factor=1.1, min_neighbors=5):
    frame_idx, frame, x_offset = frame_data
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    faces = face_cascade.detectMultiScale(gray, scaleFactor=scale_factor, minNeighbors=min_neighbors, minSize=(30, 30))
    # Adjust face coordinates to original frame
    if len(faces) > 0:
        adjusted_faces = [(x + x_offset, y, w, h) for x, y, w, h in faces]
        return adjusted_faces, frame_idx, frame
    return None, frame_idx, frame

def detect_head_motion_video(input_path, output_path, motion_threshold=20, batch_size=10, max_workers=4, crop_percentage=0.25):
    # Open the video file
    cap = cv2.VideoCapture(input_path)
    if not cap.isOpened():
        print("Error: Could not open video.")
        return

    # Load Haar Cascade for face detection
    face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
    if face_cascade.empty():
        print("Error: Could not load Haar Cascade.")
        return

    # Get video properties
    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    # Set up video writer
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))

    # Initialize lists for motion encoding and logging
    motion_log = []
    motion_encoding = []

    prev_center = None  # Store previous face center
    frame_number = 0
    frame_buffer = []
    motion_trigger = 1
    trigger_frame = 0

    # Partial function for face detection
    detect_faces_partial = partial(detect_faces, face_cascade=face_cascade, scale_factor=1.2, min_neighbors=5)

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        while True:
            # Read batch of frames
            for _ in range(batch_size):
                ret, frame = cap.read()
                if not ret:
                    break
                # Crop frame
                cropped_frame, x_offset = crop_frame(frame, crop_percentage)
                frame_buffer.append((frame_number, cropped_frame, x_offset, frame))
                frame_number += 1

            if not frame_buffer:
                print("Reached end of video or error reading frame.")
                break

            # Process frames in parallel for face detection
            results = list(executor.map(detect_faces_partial, [(idx, cropped, x_offset) for idx, cropped, x_offset, _ in frame_buffer]))

            # Process each frame in the batch
            for (frame_idx, _, _, original_frame), (faces, result_idx, processed_frame) in zip(frame_buffer, results):
                motion_detected = False
                significant_motion_detected = False
                current_center = None

                if faces is not None:
                    # Use the first detected face
                    (x, y, w, h) = faces[0]
                    current_center = (x + w // 2, y + h // 2)

                    # Draw rectangle around the face on original frame
                    cv2.rectangle(original_frame, (x, y), (x + w, y + h), (0, 255, 0), 2)

                    # Check for motion
                    if prev_center is not None:
                        distance = np.sqrt((current_center[0] - prev_center[0])**2 + 
                                          (current_center[1] - prev_center[1])**2)
                        if distance > motion_threshold:
                            motion_detected = True
                            timestamp = frame_idx / fps
                            motion_log.append(f"Frame {frame_idx} (time: {timestamp:.2f}s, distance: {distance:.2f}px)")

                    prev_center = current_center
                else:
                    prev_center = None

                # Encode motion
                motion_encoding.append({'Frame': frame_idx, 'Motion': 1 if motion_detected else 0})

                if motion_trigger and motion_detected: 
                        f_range = trigger_frame*fps*1
                        motion_trigger +=1

                # Mark frame for standard motion (red circle)
                if motion_detected and frame < f_range:
                    cv2.circle(original_frame, (50, 50), 20, (0, 0, 255), -1)
                    cv2.putText(original_frame, "Motion Detected", (70, 60), 
                               cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)

                if motion_trigger > fps+1:
                    motion_trigger = 0


                # Write frame to output video
                out.write(original_frame)

            # Clear buffer
            frame_buffer = []

    # Write motion log to file
    with open("motion_log.txt", "w") as f:
        if motion_log:
            f.write("\n".join(motion_log))
        else:
            f.write("No head motion detected.")

    # Write motion encoding to CSV
    df = pd.DataFrame(motion_encoding)
    df.to_csv("motion_encoding.csv", index=False)

    # Release resources
    cap.release()
    out.release()
    print(f"Video processed and saved as {output_path}")
    print(f"Motion log saved as motion_log.txt")
    print(f"Motion encoding saved as motion_encoding.csv")



In [None]:

input_video = "output_trimmed_video.mp4"
output_video = "output_motion_detected.mp4"

detect_head_motion_video(input_path=input_video, output_path=output_video, motion_threshold=20, batch_size=10, max_workers=4, crop_percentage=0.25)

# Archive

In [None]:
import cv2
import numpy as np
import pandas as pd

def detect_head_motion_video(input_path, output_path, motion_threshold=20):
    # Open the video file
    cap = cv2.VideoCapture(input_path)
    if not cap.isOpened():
        print("Error: Could not open video.")
        return

    # Load Haar Cascade for face detection
    face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
    if face_cascade.empty():
        print("Error: Could not load Haar Cascade.")
        return

    # Get video properties
    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    # Set up video writer
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))

    # Initialize lists for motion encoding and logging
    motion_log = []
    motion_encoding = []

    prev_center = None  # Store previous face center
    frame_number = 0

    while True:
        ret, frame = cap.read()
        if not ret:
            print("Reached end of video or error reading frame.")
            break

        # Convert frame to grayscale for face detection
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        # Detect faces
        faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))

        motion_detected = False
        current_center = None

        if len(faces) > 0:
            # Use the first detected face
            (x, y, w, h) = faces[0]
            # Calculate the center of the face
            current_center = (x + w // 2, y + h // 2)

            # Draw rectangle around the face
            cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)

            # Check for motion by comparing with previous center
            if prev_center is not None:
                # Calculate Euclidean distance between current and previous face centers
                distance = np.sqrt((current_center[0] - prev_center[0])**2 + 
                                  (current_center[1] - prev_center[1])**2)
                if distance > motion_threshold:
                    motion_detected = True
                    # Log frame number and timestamp
                    timestamp = cap.get(cv2.CAP_PROP_POS_MSEC) / 1000
                    motion_log.append(f"Frame {frame_number} (time: {timestamp:.2f}s)")

            prev_center = current_center
        else:
            prev_center = None  # Reset if no face is detected

        # Encode motion: 1 for motion, 0 for no motion
        motion_encoding.append({'Frame': frame_number, 'Motion': 1 if motion_detected else 0})

        # Mark frame if motion is detected
        if motion_detected:
            # Draw a red circle and text in the top-left corner
            cv2.circle(frame, (50, 50), 20, (0, 0, 255), -1)
            cv2.putText(frame, "Motion Detected", (70, 60), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)

        # Write frame to output video
        out.write(frame)
        frame_number += 1

    # Write motion log to file
    with open("motion_log.txt", "w") as f:
        if motion_log:
            f.write("\n".join(motion_log))
        else:
            f.write("No head motion detected.")

    # Write motion encoding to CSV
    df = pd.DataFrame(motion_encoding)
    df.to_csv("motion_encoding.csv", index=False)

    # Release resources
    cap.release()
    out.release()
    print(f"Video processed and saved as {output_path}")
    print(f"Motion log saved as motion_log.txt")
    print(f"Motion encoding saved as motion_encoding.csv")
