<a href="https://colab.research.google.com/github/TanishqAgarwal29/DL-projects/blob/main/therapy.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
%cd /content/drive/MyDrive/gazeboi

/content/drive/MyDrive/gazeboi


In [None]:
!pip install opencv-python dlib face_recognition mediapipe fer tqdm numpy

Collecting face_recognition
  Downloading face_recognition-1.3.0-py2.py3-none-any.whl.metadata (21 kB)
Collecting mediapipe
  Downloading mediapipe-0.10.14-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.7 kB)
Collecting fer
  Downloading fer-22.5.1-py3-none-any.whl.metadata (6.4 kB)
Collecting face-recognition-models>=0.3.0 (from face_recognition)
  Downloading face_recognition_models-0.3.0.tar.gz (100.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m100.1/100.1 MB[0m [31m23.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting protobuf<5,>=4.25.3 (from mediapipe)
  Downloading protobuf-4.25.4-cp37-abi3-manylinux2014_x86_64.whl.metadata (541 bytes)
Collecting sounddevice>=0.4.4 (from mediapipe)
  Downloading sounddevice-0.5.0-py3-none-any.whl.metadata (1.4 kB)
Collecting facenet-pytorch (from fer)
  Downloading facenet_pytorch-2.6.0-py3-none-any.whl.metadata (12 kB)
Collecting ffmpeg==1.4 (fr

In [None]:
import cv2
import numpy as np
import os
import dlib
import face_recognition
import mediapipe as mp
from fer import FER
from tqdm import tqdm

# Initialize detectors and models
detector = dlib.get_frontal_face_detector()
predictor = dlib.shape_predictor("shape_predictor_68_face_landmarks.dat")
emotion_detector = FER(mtcnn=True)

# Decrease the confidence threshold for hand detection
mp_hands = mp.solutions.hands
hands = mp_hands.Hands(static_image_mode=False, max_num_hands=4, min_detection_confidence=0.1, min_tracking_confidence=0.1)
mp_drawing = mp.solutions.drawing_utils

def estimate_gaze(eye_landmarks):
    try:
        eye_center = np.mean(eye_landmarks, axis=0)

        # Calculate the eye width
        eye_width = np.linalg.norm(eye_landmarks[0] - eye_landmarks[3])

        # Use the middle top and bottom landmarks for iris estimation
        iris_top = np.mean(eye_landmarks[1:3], axis=0)
        iris_bottom = np.mean(eye_landmarks[4:6], axis=0)
        iris_center = np.mean([iris_top, iris_bottom], axis=0)

        # Calculate gaze vector
        gaze_vector = iris_center - eye_center

        # Normalize gaze vector by eye width to account for different face sizes/distances
        gaze_vector /= eye_width

        # Adjust the vertical component of the gaze vector
        # This helps correct for the tendency to estimate gaze as too low
        gaze_vector[1] *= 0.5  # Reduce the vertical component

        # Renormalize the gaze vector
        gaze_vector_norm = np.linalg.norm(gaze_vector)
        if gaze_vector_norm > 0:
            gaze_vector /= gaze_vector_norm
        else:
            return None, None

        return gaze_vector, tuple(map(int, eye_center))
    except:
        return None, None

def get_gaze_direction(gaze_vector):
    if gaze_vector is None:
        return "unknown"
    x, y = gaze_vector
    threshold = 0.1
    if abs(x) < threshold and abs(y) < threshold:
        return "center"
    vertical = "up" if y < -threshold else "down" if y > threshold else "center"
    horizontal = "left" if x < -threshold else "right" if x > threshold else "center"
    return f"{vertical}-{horizontal}" if vertical != "center" or horizontal != "center" else "center"

def robust_emotion_classification(emotions, min_confidence=0.3):
    emotion_scores = np.array(list(emotions.values())).reshape(1, -1)
    normalized_scores = emotion_scores / np.sum(emotion_scores)
    normalized_emotions = dict(zip(emotions.keys(), normalized_scores[0]))
    sorted_emotions = sorted(normalized_emotions.items(), key=lambda x: x[1], reverse=True)
    top_emotion, top_score = sorted_emotions[0]
    if top_score >= min_confidence:
        return top_emotion, emotions[top_emotion]
    else:
        return None, None

def calculate_engagement(prev_gaze, current_gaze, emotion, hand_movement, hand_positions, face_positions):
    engagement = 0.5  # Start with moderate engagement

    # Gaze engagement
    if prev_gaze != current_gaze:
        engagement += 0.1  # Increase engagement when gaze moves
    if "center" in current_gaze:
        engagement += 0.1  # Increase engagement when looking at center (possibly at each other)

    # Emotion engagement
    if emotion == "happy":
        engagement += 0.2
    elif emotion in ["sad", "angry", "fear"]:
        engagement -= 0.1

    # Hand movement engagement
    if hand_movement:
        engagement += 0.1

        # Calculate the direction of hand movement relative to the other person
        if len(hand_positions) == 2 and len(face_positions) == 2:
            hand_center = np.mean(hand_positions, axis=0)
            face_center = np.mean(face_positions, axis=0)
            movement_vector = hand_center - face_center
            distance = np.linalg.norm(movement_vector)

            # Increase engagement more if hands are moving towards the other person
            if distance < 200:  # Adjust this threshold as needed
                engagement += 0.2 * (1 - distance / 200)  # More engagement for closer hands

    return max(0, min(engagement, 1))  # Ensure engagement is between 0 and 1

def identify_child_and_therapist(face_locations, frame_shape):
    if len(face_locations) < 2:
        return None, None

    face_data = []
    for i, (top, right, bottom, left) in enumerate(face_locations):
        face_size = (right - left) * (bottom - top)
        y_position = (top + bottom) / 2 / frame_shape[0]  # Normalized y-position
        face_data.append((i, face_size, y_position))

    # Sort faces primarily by size (larger face is likely the adult)
    sorted_faces = sorted(face_data, key=lambda x: (-x[1], x[2]))

    if len(sorted_faces) >= 2:
        therapist_index = sorted_faces[0][0]
        child_index = sorted_faces[1][0]
        return child_index, therapist_index
    else:
        return None, None

def process_frame(frame, prev_gaze_child, prev_gaze_therapist):
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

    face_locations = face_recognition.face_locations(rgb_frame, model="cnn", number_of_times_to_upsample=2)
    if len(face_locations) < 2:
        return frame, prev_gaze_child, prev_gaze_therapist, 0.5, 0.5

    child_index, therapist_index = identify_child_and_therapist(face_locations, frame.shape)

    if child_index is None or therapist_index is None:
        return frame, prev_gaze_child, prev_gaze_therapist, 0.5, 0.5

    child_gaze, therapist_gaze = "unknown", "unknown"
    child_emotion, therapist_emotion = None, None
    hand_landmarks_list = []
    face_positions = []

    for i, (top, right, bottom, left) in enumerate(face_locations):
        face = dlib.rectangle(left, top, right, bottom)
        landmarks = predictor(gray, face)
        left_eye_points = np.array([(landmarks.part(n).x, landmarks.part(n).y) for n in range(36, 42)])
        right_eye_points = np.array([(landmarks.part(n).x, landmarks.part(n).y) for n in range(42, 48)])

        label = "Child" if i == child_index else "Therapist"
        color = (0, 255, 0) if i == child_index else (0, 0, 255)
        cv2.rectangle(frame, (left, top), (right, bottom), color, 2)
        cv2.putText(frame, label, (left, top - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2)

        left_gaze, left_eye_center = estimate_gaze(left_eye_points)
        right_gaze, right_eye_center = estimate_gaze(right_eye_points)

        gaze_direction = "unknown"
        if left_gaze is not None and right_gaze is not None:
            avg_gaze = (left_gaze + right_gaze) / 2
            gaze_direction = get_gaze_direction(avg_gaze)

            cv2.arrowedLine(frame, left_eye_center, tuple(map(int, (left_eye_center[0] + left_gaze[0] * 50, left_eye_center[1] + left_gaze[1] * 50))), (0, 0, 255), 2)
            cv2.arrowedLine(frame, right_eye_center, tuple(map(int, (right_eye_center[0] + right_gaze[0] * 50, right_eye_center[1] + right_gaze[1] * 50))), (0, 0, 255), 2)
            cv2.putText(frame, f"Gaze: {gaze_direction}", (left, bottom + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

        face_image = rgb_frame[top:bottom, left:right]
        emotion_results = emotion_detector.detect_emotions(face_image)

        emotion_label = None
        if emotion_results:
            emotions = emotion_results[0]['emotions']
            emotion_label, emotion_score = robust_emotion_classification(emotions, min_confidence=0.3)

            if emotion_label:
                emotion_text = f"Emotion: {emotion_label} ({emotion_score:.2f})"
                cv2.putText(frame, emotion_text, (left, bottom + 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

        face_center = ((left + right) // 2, (top + bottom) // 2)
        face_positions.append(face_center)

        if i == child_index:
            child_gaze = gaze_direction
            child_emotion = emotion_label
        elif i == therapist_index:
            therapist_gaze = gaze_direction
            therapist_emotion = emotion_label

    hand_results = hands.process(rgb_frame)
    hand_positions = []
    if hand_results.multi_hand_landmarks:
        for hand_landmarks in hand_results.multi_hand_landmarks:
            mp_drawing.draw_landmarks(frame, hand_landmarks, mp_hands.HAND_CONNECTIONS)
            hand_landmarks_list.append(hand_landmarks)

            # Calculate average hand position
            hand_pos = np.mean([(lm.x * frame.shape[1], lm.y * frame.shape[0]) for lm in hand_landmarks.landmark], axis=0)
            hand_positions.append(hand_pos)

    hand_movement = len(hand_landmarks_list) > 0

    child_engagement = calculate_engagement(prev_gaze_child, child_gaze, child_emotion, hand_movement, hand_positions, face_positions)
    therapist_engagement = calculate_engagement(prev_gaze_therapist, therapist_gaze, therapist_emotion, hand_movement, hand_positions, face_positions)

    cv2.putText(frame, f"Child Engagement: {child_engagement:.2f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 255, 0), 2)
    cv2.putText(frame, f"Therapist Engagement: {therapist_engagement:.2f}", (10, 70), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 255, 0), 2)

    return frame, child_gaze, therapist_gaze, child_engagement, therapist_engagement

def process_video(input_path, output_path):
    cap = cv2.VideoCapture(input_path)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    pbar = tqdm(total=total_frames, desc="Processing video")

    prev_gaze_child, prev_gaze_therapist = "unknown", "unknown"
    child_engagements, therapist_engagements = [], []

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        processed_frame, child_gaze, therapist_gaze, child_engagement, therapist_engagement = process_frame(frame, prev_gaze_child, prev_gaze_therapist)
        out.write(processed_frame)

        prev_gaze_child, prev_gaze_therapist = child_gaze, therapist_gaze
        child_engagements.append(child_engagement)
        therapist_engagements.append(therapist_engagement)

        pbar.update(1)

    pbar.close()
    cap.release()
    out.release()
    print(f"Processed video saved to: {output_path}")

    avg_child_engagement = np.mean(child_engagements)
    avg_therapist_engagement = np.mean(therapist_engagements)
    print(f"Average Child Engagement: {avg_child_engagement:.2f}")
    print(f"Average Therapist Engagement: {avg_therapist_engagement:.2f}")

# Set paths
input_video = "/content/drive/MyDrive/gazeboi/test/ABA Therapy - Social Engagement.mp4"
output_video = "/content/drive/MyDrive/gazeboi/processed_video.mp4"

# Process the video
process_video(input_video, output_video)


Processing video: 100%|██████████| 2095/2095 [27:01<00:00,  1.29it/s]

Processed video saved to: /content/drive/MyDrive/gazeboi/processed_video.mp4
Average Child Engagement: 0.63
Average Therapist Engagement: 0.55



