In [1]:
from google.colab import drive
import os

# Mount Google Drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!pip install ultralytics opencv-python roboflow
!pip install autodistill autodistill-grounding-dino
!pip install deep-sort-realtime
!pip install deepface





Collecting ultralytics
  Downloading ultralytics-8.2.95-py3-none-any.whl.metadata (39 kB)
Collecting roboflow
  Downloading roboflow-1.1.45-py3-none-any.whl.metadata (9.7 kB)
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.6-py3-none-any.whl.metadata (9.1 kB)
Collecting idna==3.7 (from roboflow)
  Downloading idna-3.7-py3-none-any.whl.metadata (9.9 kB)
Collecting python-dotenv (from roboflow)
  Downloading python_dotenv-1.0.1-py3-none-any.whl.metadata (23 kB)
Collecting requests-toolbelt (from roboflow)
  Downloading requests_toolbelt-1.0.0-py2.py3-none-any.whl.metadata (14 kB)
Collecting filetype (from roboflow)
  Downloading filetype-1.2.0-py2.py3-none-any.whl.metadata (6.5 kB)
Downloading ultralytics-8.2.95-py3-none-any.whl (872 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m872.8/872.8 kB[0m [31m23.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading roboflow-1.1.45-py3-none-any.whl (80 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━

In [6]:
from autodistill_grounding_dino import GroundingDINO
from autodistill.detection import CaptionOntology
import cv2
import numpy as np
from collections import deque
from scipy.spatial import distance

# Define separate ontologies with expanded variations
ontology_child = CaptionOntology({
    "child": "toddler, young child, small child, little boy, little girl, preschool child, kid, infant, baby, youth"
})

ontology_therapist = CaptionOntology({
    "therapist": "adult, therapist, healthcare professional, doctor, nurse, counselor"
})

# Load GroundingDINO models
base_model_child = GroundingDINO(ontology=ontology_child)
base_model_therapist = GroundingDINO(ontology=ontology_therapist)

# Open the video file
video_path = '/content/drive/MyDrive/face_detection/Naturalistic Teaching - Autism Therapy Video.mp4'
cap = cv2.VideoCapture(video_path)

# Get video writer setup
output_path = '/content/drive/MyDrive/face_detection/output_video.mp4'
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

# Function to extract color histogram from the bounding box
def extract_color_histogram(image, bbox):
    x1, y1, x2, y2 = map(int, bbox)
    roi = image[y1:y2, x1:x2]
    hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)
    hist = cv2.calcHist([hsv], [0, 1], None, [50, 60], [0, 180, 0, 256])
    cv2.normalize(hist, hist)
    return hist.flatten()

# Function to calculate Intersection over Union (IoU)
def iou(bbox1, bbox2):
    x1, y1, x2, y2 = bbox1
    x1p, y1p, x2p, y2p = bbox2

    xi1 = max(x1, x1p)
    yi1 = max(y1, y1p)
    xi2 = min(x2, x2p)
    yi2 = min(y2, y2p)

    inter_area = max(0, xi2 - xi1 + 1) * max(0, yi2 - yi1 + 1)

    bbox1_area = (x2 - x1 + 1) * (y2 - y1 + 1)
    bbox2_area = (x2p - x1p + 1) * (y2p - y1p + 1)

    union_area = bbox1_area + bbox2_area - inter_area

    return inter_area / union_area

# Non-Maximum Suppression (NMS) to remove overlapping bounding boxes
def non_max_suppression(detections, iou_threshold=0.5):
    if len(detections) == 0:
        return detections

    # Sort detections by confidence score
    detections = sorted(detections, key=lambda x: x[2].max(), reverse=True)
    keep = []

    while detections:
        best = detections.pop(0)
        keep.append(best)
        detections = [det for det in detections if iou(best[1], det[1]) < iou_threshold]

    return keep

# Trackers and parameters
max_distance = 100  # Max distance for color histogram matching
trackers = deque(maxlen=100)  # Using deque to maintain IDs
detection_history = deque(maxlen=10)  # Store recent detections for smoothing
track_labels = {}  # To keep track of labels assigned to track IDs

class Tracker:
    def __init__(self, track_id, bbox, hist, label):
        self.track_id = track_id
        self.bbox = bbox
        self.hist = hist
        self.label = label
        self.kalman = cv2.KalmanFilter(4, 2)
        self.kalman.measurementMatrix = np.array([[1, 0, 0, 0], [0, 1, 0, 0]], np.float32)
        self.kalman.transitionMatrix = np.array([[1, 0, 1, 0], [0, 1, 0, 1]], np.float32)
        self.kalman.processNoiseCov = np.array([[1, 0, 0, 0], [0, 1, 0, 0], [0, 0, 5, 0], [0, 0, 0, 5]], np.float32)
        self.kalman.statePre = np.array([[bbox[0]], [bbox[1]], [0], [0]], np.float32)

    def predict(self):
        return self.kalman.predict()

    def update(self, bbox):
        self.kalman.correct(np.array([[bbox[0]], [bbox[1]]], np.float32))
        self.bbox = bbox

# Frame processing
frame_count = 0
track_id_counter = 0
confidence_threshold = 0.5  # Adaptive confidence threshold

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    frame_count += 1

    print(f"Processing frame {frame_count}")

    # GroundingDINO detection for both child and therapist
    results_child = base_model_child.predict(frame)
    results_therapist = base_model_therapist.predict(frame)

    detections = []
    for results, label in [(results_child, "child"), (results_therapist, "therapist")]:
        if len(results.confidence) > 0:
            for i in range(len(results.confidence)):
                if results.confidence[i] >= confidence_threshold:
                    box = results.xyxy[i]
                    hist = extract_color_histogram(frame, box)
                    detections.append((label, box, hist, results.confidence[i]))

    # Apply Non-Maximum Suppression (NMS)
    detections = non_max_suppression(detections)

    # Store recent detections for smoothing
    detection_history.append(detections)

    # Smoothing: If no detection for a label in the current frame, consider recent history
    if not any(det[0] == "child" for det in detections):
        for past_detections in reversed(detection_history):
            if any(det[0] == "child" for det in past_detections):
                detections.extend([det for det in past_detections if det[0] == "child"])
                break

    updated_trackers = []
    for label, box, hist, _ in detections:
        matched = False

        # Compare with existing trackers
        for tracker in trackers:
            if tracker.label == label:
                dist = distance.euclidean(tracker.hist, hist)
                if dist < max_distance:
                    tracker.update(box)
                    updated_trackers.append(tracker)
                    matched = True
                    break

        if not matched:
            # New track
            track_id_counter += 1
            new_tracker = Tracker(track_id_counter, box, hist, label)
            trackers.append(new_tracker)
            updated_trackers.append(new_tracker)

    # Remove unmatched trackers
    trackers = deque(updated_trackers, maxlen=100)

    # Draw results with different colors for "child" and "therapist"
    for tracker in trackers:
        box = tracker.bbox
        track_id = tracker.track_id
        x1, y1, x2, y2 = map(int, box)

        # Set different colors for "child" and "therapist"
        if tracker.label == "child":
            color = (0, 0, 255)  # Red color for child
        else:
            color = (0, 255, 0)  # Green color for therapist

        cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
        cv2.putText(frame, f"{tracker.label} ID: {track_id}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2)

    # Write frame to output
    out.write(frame)

cap.release()
out.release()
cv2.destroyAllWindows()


trying to load grounding dino directly
final text_encoder_type: bert-base-uncased
trying to load grounding dino directly
final text_encoder_type: bert-base-uncased
Processing frame 1
Processing frame 2
Processing frame 3
Processing frame 4
Processing frame 5
Processing frame 6
Processing frame 7
Processing frame 8
Processing frame 9
Processing frame 10
Processing frame 11
Processing frame 12
Processing frame 13
Processing frame 14
Processing frame 15
Processing frame 16
Processing frame 17
Processing frame 18
Processing frame 19
Processing frame 20
Processing frame 21
Processing frame 22
Processing frame 23
Processing frame 24
Processing frame 25
Processing frame 26
Processing frame 27
Processing frame 28
Processing frame 29
Processing frame 30
Processing frame 31
Processing frame 32
Processing frame 33
Processing frame 34
Processing frame 35
Processing frame 36
Processing frame 37
Processing frame 38
Processing frame 39
Processing frame 40
Processing frame 41
Processing frame 42
Proce

In [13]:
from autodistill_grounding_dino import GroundingDINO
from autodistill.detection import CaptionOntology
import cv2
import numpy as np
from collections import deque
from scipy.spatial import distance

# Define ontologies with expanded variations
ontology_child = CaptionOntology({
    "child": "toddler, young child, small child, little boy, little girl, preschool child, kid, infant, baby, youth"
})

ontology_therapist = CaptionOntology({
    "therapist": "adult, therapist, healthcare professional, doctor, nurse, counselor"
})

# Load GroundingDINO models
base_model_child = GroundingDINO(ontology=ontology_child)
base_model_therapist = GroundingDINO(ontology=ontology_therapist)

# Open the video file
video_path = '/content/drive/MyDrive/face_detection/Naturalistic Teaching - Autism Therapy Video.mp4'
cap = cv2.VideoCapture(video_path)

# Get video writer setup
output_path = '/content/drive/MyDrive/face_detection/output_video.mp4'
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

# Function to extract color histogram from the bounding box
def extract_color_histogram(image, bbox):
    x1, y1, x2, y2 = map(int, bbox)
    roi = image[y1:y2, x1:x2]
    hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)
    hist = cv2.calcHist([hsv], [0, 1], None, [50, 60], [0, 180, 0, 256])
    cv2.normalize(hist, hist)
    return hist.flatten()

# Non-Maximum Suppression (NMS) to remove overlapping bounding boxes
def non_max_suppression(detections, iou_threshold=0.5):
    if len(detections) == 0:
        return detections

    detections = sorted(detections, key=lambda x: float(x[3]), reverse=True)
    keep = []

    while detections:
        best = detections.pop(0)
        keep.append(best)
        detections = [det for det in detections if iou(best[1], det[1]) < iou_threshold]

    return keep

# Trackers and parameters
max_distance = 100  # Max distance for color histogram matching
trackers = deque(maxlen=100)  # Using deque to maintain IDs
detection_history = deque(maxlen=10)  # Store recent detections for smoothing
track_labels = {}  # To keep track of labels assigned to track IDs

class Tracker:
    def __init__(self, track_id, bbox, hist, label):
        self.track_id = track_id
        self.bbox = bbox
        self.hist = hist
        self.label = label
        self.kalman = cv2.KalmanFilter(4, 2)
        self.kalman.measurementMatrix = np.array([[1, 0, 0, 0], [0, 1, 0, 0]], np.float32)
        self.kalman.transitionMatrix = np.array([[1, 0, 1, 0], [0, 1, 0, 1]], np.float32)
        self.kalman.processNoiseCov = np.array([[1, 0, 0, 0], [0, 1, 0, 0]], np.float32)
        self.kalman.statePre = np.array([[bbox[0]], [bbox[1]], [0], [0]], np.float32)

    def predict(self):
        return self.kalman.predict()

    def update(self, bbox):
        self.kalman.correct(np.array([[bbox[0]], [bbox[1]]], np.float32))
        self.bbox = bbox

# Frame processing
frame_count = 0
track_id_counter = 0
confidence_threshold = 0.5  # Adaptive confidence threshold

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    frame_count += 1

    print(f"Processing frame {frame_count}")

    # GroundingDINO detection for both child and therapist
    results_child = base_model_child.predict(frame)
    results_therapist = base_model_therapist.predict(frame)

    detections = []
    for results, label in [(results_child, "child"), (results_therapist, "therapist")]:
        if len(results.confidence) > 0:
            for i in range(len(results.confidence)):
                if results.confidence[i] >= confidence_threshold:
                    box = results.xyxy[i]
                    hist = extract_color_histogram(frame, box)
                    detections.append((label, box, hist, results.confidence[i]))

    # Apply Non-Maximum Suppression (NMS)
    detections = non_max_suppression(detections)

    # Store recent detections for smoothing
    detection_history.append(detections)

    # Smoothing: If no detection for a label in the current frame, consider recent history
    if not any(det[0] == "child" for det in detections):
        for past_detections in reversed(detection_history):
            if any(det[0] == "child" for det in past_detections):
                detections.extend([det for det in past_detections if det[0] == "child"])
                break

    updated_trackers = []
    for label, box, hist, _ in detections:
        matched = False

        # Compare with existing trackers
        for tracker in trackers:
            if tracker.label == label:
                dist = distance.euclidean(tracker.hist, hist)
                if dist < max_distance:
                    tracker.update(box)
                    updated_trackers.append(tracker)
                    matched = True
                    break

        if not matched:
            # New track
            track_id_counter += 1
            new_tracker = Tracker(track_id_counter, box, hist, label)
            trackers.append(new_tracker)
            updated_trackers.append(new_tracker)

    # Remove unmatched trackers
    trackers = deque(updated_trackers, maxlen=100)

    # Draw results with different colors for "child" and "therapist"
    for tracker in trackers:
        box = tracker.bbox
        track_id = tracker.track_id
        x1, y1, x2, y2 = map(int, box)

        # Set different colors for "child" and "therapist"
        if tracker.label == "child":
            color = (0, 0, 255)  # Red color for child
        else:
            color = (0, 255, 0)  # Green color for therapist

        cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
        cv2.putText(frame, f"{tracker.label} ID: {track_id}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2)

    # Write frame to output
    out.write(frame)

cap.release()
out.release()
cv2.destroyAllWindows()


trying to load grounding dino directly
final text_encoder_type: bert-base-uncased
trying to load grounding dino directly
final text_encoder_type: bert-base-uncased
Processing frame 1
Processing frame 2
Processing frame 3
Processing frame 4
Processing frame 5
Processing frame 6
Processing frame 7
Processing frame 8
Processing frame 9
Processing frame 10
Processing frame 11
Processing frame 12
Processing frame 13
Processing frame 14
Processing frame 15
Processing frame 16
Processing frame 17
Processing frame 18
Processing frame 19
Processing frame 20
Processing frame 21
Processing frame 22
Processing frame 23
Processing frame 24
Processing frame 25
Processing frame 26
Processing frame 27
Processing frame 28
Processing frame 29
Processing frame 30
Processing frame 31
Processing frame 32
Processing frame 33
Processing frame 34
Processing frame 35
Processing frame 36
Processing frame 37
Processing frame 38
Processing frame 39
Processing frame 40
Processing frame 41
Processing frame 42
Proce