In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [None]:
!pip install mediapipe opencv-python pandas face-detection-tflite

In [None]:
import cv2
import mediapipe as mp
from fdlite import FaceDetection, FaceDetectionModel
import pandas as pd
from collections import deque
import numpy as np

In [None]:
def initialize_face_detector(min_detection_confidence=0.6):
    return mp.solutions.face_detection.FaceDetection(min_detection_confidence=min_detection_confidence)

def process_frame(face_detection, frame):
    image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results = face_detection.process(image_rgb)
    return results

def extract_bbox(detection, frame_shape):
    bboxC = detection.location_data.relative_bounding_box
    ih, iw, _ = frame_shape
    bbox = int(bboxC.xmin * iw), int(bboxC.ymin * ih), int(bboxC.width * iw), int(bboxC.height * ih)
    x, y, w = bbox[:3]
    h = bbox[3] if len(bbox) == 4 else 0
    center_x = x + w / 2
    center_y = y + h / 2
    return center_x, center_y, w, h

def smooth_coordinates(coords, smooth_factor=5):
    smoothed_coords = []
    for coord in zip(*coords):
        smoothed_coords.append(np.convolve(coord, np.ones(smooth_factor) / smooth_factor, mode='valid'))
    return zip(*smoothed_coords)

def video_process(video_path, output_video):
    cap = cv2.VideoCapture(video_path)
    frame_rate = cap.get(cv2.CAP_PROP_FPS)
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    frame_count = 0

    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    out = cv2.VideoWriter(output_video, fourcc, frame_rate, (frame_width, frame_height))

    face_detector = FaceDetection(model_type=FaceDetectionModel.BACK_CAMERA) # using fdlite

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        frame_count += 1
        results = face_detector(frame)

        if results and len(results) == 1:
            out.write(frame)

    cap.release()
    out.release()

    print(f"Output video saved to {output_video}")

def track_faces(video_path, result_csv_path, result_video_path, change_threshold=65, smooth_factor=3, min_duration=3, min_detection_confidence=0.6):
    cap = cv2.VideoCapture(video_path)
    frame_rate = cap.get(cv2.CAP_PROP_FPS)
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    frame_count = 0

    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    out = cv2.VideoWriter(result_video_path, fourcc, frame_rate, (frame_width, frame_height))

    tracking_data = []
    current_id = 1
    start_time = None
    last_bbox = None
    colors = {}
    bbox_history = deque(maxlen=smooth_factor)

    face_detector = initialize_face_detector(min_detection_confidence)

    with face_detector:
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            frame_count += 1
            current_time = frame_count / frame_rate
            results = process_frame(face_detector, frame)

            if results.detections:
                if len(results.detections) == 1:
                    detection = results.detections[0]
                    center_x, center_y, width, height = extract_bbox(detection, frame.shape)

                    bbox_history.append((center_x, center_y, width, height))
                    if len(bbox_history) >= smooth_factor:
                        smoothed_coords = smooth_coordinates(bbox_history, smooth_factor)
                        center_x, center_y, width, height = next(smoothed_coords)

                    if start_time is None:
                        start_time = current_time

                    if last_bbox is not None:
                        last_center_x, last_center_y, last_width, last_height = last_bbox
                        if abs(center_x - last_center_x) > change_threshold or abs(center_y - last_center_y) > change_threshold:
                            current_id += 1
                            start_time = current_time

                    last_bbox = (center_x, center_y, width, height)

                    if current_id not in colors:
                        colors[current_id] = (int(current_id * 50 % 256), int(current_id * 80 % 256), int(current_id * 110 % 256))

                    color = colors[current_id]

                    cv2.rectangle(frame, (int(center_x - width / 2), int(center_y - height / 2)),
                                  (int(center_x + width / 2), int(center_y + (height+40) / 2)), color, 2)
                    cv2.putText(frame, f'ID: {current_id}', (int(center_x - width / 2), int(center_y - height / 2) - 10),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2)
                    cv2.putText(frame, f'Frame: {frame_count}', (int(center_x - width / 2), int(center_y - height / 2) - 60),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2)

                    tracking_data.append({
                        'ID': current_id,
                        'frame': frame_count,
                        'start_time': start_time,
                        'end_time': current_time,
                        'bbox': [center_x, center_y, width, height]
                    })
                else:
                    if len(tracking_data) > 0 and tracking_data[-1]['ID'] == current_id:
                        current_id += 1
                        start_time = None
                        last_bbox = None
                        bbox_history.clear()
            else:
                if len(tracking_data) > 0 and tracking_data[-1]['ID'] == current_id:
                    current_id += 1
                    start_time = None
                    last_bbox = None
                    bbox_history.clear()

            out.write(frame)

    cap.release()
    out.release()

    # Filter out IDs with a duration less than min_duration
    filtered_data = []
    for data in tracking_data:
        duration = data['end_time'] - data['start_time']
        if duration >= min_duration:
            filtered_data.append(data)

    df = pd.DataFrame(filtered_data)
    df.to_csv(result_csv_path, index=False)

    print(f"Tracking data saved to {result_csv_path}")
    print(f"Output video saved to {result_video_path}")

In [None]:
if __name__ == "__main__":
    video_path = '/content/drive/MyDrive/ColabDataset/LipReading/data/video_raw/video01.mov'
    output_video = '/content/drive/MyDrive/ColabDataset/LipReading/data/video_output/video01.mp4'
    result_csv_path = '/content/drive/MyDrive/ColabDataset/LipReading/data/result/result_csv/video01.csv'
    result_video_path = '/content/drive/MyDrive/ColabDataset/LipReading/data/result/result_video/video01.mp4'
    change_threshold = 65
    smooth_factor = 3
    min_duration = 3
    min_detection_confidence = 0.6

    video_process(video_path, output_video)
    track_faces(output_video, result_csv_path, result_video_path, change_threshold, smooth_factor, min_duration, min_detection_confidence)

Output video saved to /content/drive/MyDrive/ColabDataset/LipReading/data/video_output/video01.mp4




Tracking data saved to /content/drive/MyDrive/ColabDataset/LipReading/data/result/result_csv/video01.csv
Output video saved to /content/drive/MyDrive/ColabDataset/LipReading/data/result/result_video/video01.mp4
