In [2]:
! pip install deepsort



In [5]:
! pip install deep_sort_realtime

Collecting deep_sort_realtime
  Using cached deep_sort_realtime-1.3.2-py3-none-any.whl.metadata (12 kB)
Using cached deep_sort_realtime-1.3.2-py3-none-any.whl (8.4 MB)
Installing collected packages: deep_sort_realtime
Successfully installed deep_sort_realtime-1.3.2


#Importing Important Package


In [6]:
import os
import tensorflow as tf
import tensorflow_hub as hub
import numpy as np
import cv2
import matplotlib.pyplot as plt
import torch
import torchvision
from deep_sort_realtime.deepsort_tracker import DeepSort

Loading the Pre-trained Model for object detection

In [7]:
def load_model():
  model_url = "https://tfhub.dev/tensorflow/ssd_mobilenet_v2/2"
  model = hub.load(model_url)
  return model

In [10]:
def objectdetection(model, image):
  input_tensor = tf.convert_to_tensor(image)
  input_tensor = input_tensor[tf.newaxis, ...]
  detections = model(input_tensor)
  return detections

#function to assign unique ids to detected person
def assign_unique_ids(detected_person, unique_ids, next_id):
  new_ids = {}
  for person in detected_person:
    person_tuple = tuple(person)
    if person_tuple not in unique_ids:
      unique_ids[person_tuple] = next_id # Assign new ID if person not in unique_ids
      next_id += 1
    new_ids[person_tuple] = unique_ids[person_tuple]
  return new_ids, next_id


#function to process videos
def vid_process(video_path, model, output_frame_dir):
  cap = cv2.VideoCapture(video_path)
  frame_num = 0
  unique_ids = {}
  next_id = 1

  while True:
    ret, frame = cap.read()
    if not  ret:
      break
    frame_num += 1

    #skip some frames to reduce processing load
    if frame_num % 10 != 0:
      continue

    # perform obj detection
    detections = objectdetection(model, frame)

    #Extract person detection
    person_detections =[]
    for i in range(int(detections['num_detections'][0])):
      if detections['detection_classes'][0][i].numpy()==1:
        ymin, xmin, ymax, xmax = detections['detection_boxes'][0][i].numpy()
        h, w, _ = frame.shape
        person_detections.append([xmin * w, ymin * h, xmax * w, ymax * h])

    # assign unique IDs to detected persons
    new_ids, next_id = assign_unique_ids(person_detections, unique_ids, next_id)

    # Save the frames where persons are detected
    for detection, uid in zip(person_detections, new_ids.values()):
      x1, y1, x2, y2 = detection
      person_image = frame[int(y1):int(y2), int(x1):int(x2)]
      person_image_path = f'{output_frame_dir}/person_{uid}_frame_{frame_num}.jpg'
      cv2.imwrite(person_image_path, person_image)
  cap.release()
  print("Detection and ID assignment complete.")


# Usage
if __name__ == "__main__":
    video_path = r"/content/drive/MyDrive/Computer Vision/videos/ABA Therapy - Learning about Animals.mp4"  # Update with the correct video filename
    output_frame_dir = r"/content/drive/MyDrive/Computer Vision/Frames"

    # Load the TensorFlow model
    model = load_model()

    # Process the video
    vid_process(video_path, model, output_frame_dir)


Detection and ID assignment complete.


In [11]:
def process_video(video_path, model, output_frame_dir):
    cap = cv2.VideoCapture(video_path)
    frame_num = 0
    unique_ids = {}
    next_id = 1

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frame_num += 1

        # Skip frames to reduce processing load (process every 10th frame)
        if frame_num % 10 != 0:
            continue

        # Perform object detection
        detections = objectdetection(model, frame)

        # Extract person detections
        person_detections = []
        for i in range(int(detections['num_detections'][0])):
            if detections['detection_classes'][0][i].numpy() == 1:  # Class 1 corresponds to 'person'
                ymin, xmin, ymax, xmax = detections['detection_boxes'][0][i].numpy()
                h, w, _ = frame.shape
                person_detections.append([xmin * w, ymin * h, xmax * w, ymax * h])

        # Assign unique IDs to detected people
        new_ids = {}
        for person in person_detections:
            person_tuple = tuple(person)
            if person_tuple not in unique_ids:
                unique_ids[person_tuple] = next_id
                next_id += 1
            new_ids[person_tuple] = unique_ids[person_tuple]

        # Save detected person images
        for detection, uid in zip(person_detections, new_ids.values()):
            x1, y1, x2, y2 = detection
            person_image = frame[int(y1):int(y2), int(x1):int(x2)]
            person_image_path = os.path.join(output_frame_dir, f'person_{uid}_frame_{frame_num}.jpg')
            cv2.imwrite(person_image_path, person_image)

    cap.release()
    print(f"Completed processing for video: {os.path.basename(video_path)}")
    return unique_ids

In [12]:
def process_all_videos(videos_dir, frames_dir, model):
    all_unique_ids = {}

    # Ensure frames directory exists
    os.makedirs(frames_dir, exist_ok=True)

    # Iterate over all video files in the directory
    for video_file in os.listdir(videos_dir):
        if video_file.endswith('.mp4'):
            video_path = os.path.join(videos_dir, video_file)
            video_name = os.path.splitext(video_file)[0]

            # Create a specific frames directory for each video
            video_frames_dir = os.path.join(frames_dir, video_name)
            os.makedirs(video_frames_dir, exist_ok=True)

            # Process video and get unique IDs
            unique_ids = process_video(video_path, model, video_frames_dir)

            # Merge unique IDs into the aggregated dictionary
            all_unique_ids[video_name] = unique_ids

    return all_unique_ids

In [13]:
import csv


def save_to_csv(all_unique_ids, output_csv_dir):
    os.makedirs(output_csv_dir, exist_ok=True)

    for video_name, unique_ids in all_unique_ids.items():
        csv_path = os.path.join(output_csv_dir, f"{video_name}_detection.csv")

        # Count occurrences
        id_counts = {}
        for uid in unique_ids.values():
            id_counts[uid] = id_counts.get(uid, 0) + 1

        # Write to CSV
        with open(csv_path, mode='w', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(['Person ID', 'Count'])
            for uid, count in id_counts.items():
                writer.writerow([uid, count])

        print(f"CSV file saved for {video_name} at {csv_path}")

In [14]:
if __name__ == "__main__":
    # Paths
    videos_dir = r"/content/drive/MyDrive/Computer Vision/videos"
    frames_dir = r"/content/drive/MyDrive/Computer Vision/Frames"
    output_csv_dir = r"/content/drive/MyDrive/Computer Vision/CSV OUPUTS"

    # Load model
    model = load_model()

    # Process all videos and get unique IDs
    all_unique_ids = process_all_videos(videos_dir, frames_dir, model)

    # Save results to CSV files
    save_to_csv(all_unique_ids, output_csv_dir)

Completed processing for video: Exploring the Therapeutic Playroom.mp4
Completed processing for video: ABA Therapy_ Daniel - Communication.mp4
Completed processing for video: Group Therapy for Autism Spectrum Disorder.mp4
Completed processing for video: Play Therapy Session working on Feelings with Candy Land Game.mp4
Completed processing for video: Discrete Trial Training.mp4
Completed processing for video: ABA Therapy - Play.mp4
Completed processing for video: Sensory Play at Home_ Proprioceptive Games.mp4
Completed processing for video: ABA Therapy - Learning about Animals.mp4
Completed processing for video: Matching.mp4
Completed processing for video: videoplayback.mp4
Completed processing for video: Preference Assessment with Toys_ Multiple Stimulus without Replacement (MSWO).mp4
Completed processing for video: How to Do Play Therapy _ Building a Growth Mindset Role Play.mp4
Completed processing for video: MASS TRIAL (Gross motor imitation).mp4
Completed processing for video: Augm

In [15]:
def process_video(video_path, model, output_frame_dir, video_number):
  cap = cv2.VideoCapture(video_path)
  frame_num = 0
  unique_ids = {}
  next_id = 1
  video_data = []


  while True:
    ret, frame = cap.read()
    if not ret:
      break
    frame_num += 1

    # Skip frames to reduce processing load
    if frame_num % 10 != 0:
      continue
    detections = objectdetection(model, frame)

    person_detections = []
    for i in range(int(detections['num_detections'][0])):
      if detections['detection_classes'][0][i].numpy() == 1:
        ymin, xmin, ymax, xmax = detections['detection_boxes'][0][i].numpy()
        h, w, _ = frame.shape
        person_detections.append([xmin * w, ymin * h, xmax * w, ymax * h])

    new_ids, next_id = assign_unique_ids(person_detections, unique_ids, next_id)

    for detection, uid in zip(person_detections, new_ids.values()):
      x1, y1, x2, y2 = detection
      person_image = frame[int(y1):int(y2), int(x1):int(x2)]
      person_image_path = os.path.join(output_frame_dir, f'person_{uid}_frame_{frame_num}.jpg')
      cv2.imwrite(person_image_path, person_image)
      # Collect data for CSV
      video_data.append({'Person ID': uid,'Image Path': person_image_path,'Frame Number': frame_num,'Video Number': video_number,'Count': 1 })  # This is just a placeholder; it will be counted later

  cap.release()
  return video_data




In [16]:
def process_all_videos(videos_dir, frames_dir, model):
    all_video_data = []

    for video_number, video_file in enumerate(os.listdir(videos_dir), start=1):
        video_path = os.path.join(videos_dir, video_file)
        video_frames_dir = os.path.join(frames_dir, f"video_{video_number}_frames")
        os.makedirs(video_frames_dir, exist_ok=True)

        # Process video and collect data
        video_data = process_video(video_path, model, video_frames_dir, video_number)

        # Merge video data into the aggregated list
        all_video_data.extend(video_data)

    return all_video_data

In [17]:
def save_to_csv(all_video_data, output_csv_path):
    # Create a dictionary to count occurrences of each Person ID
    id_counts = {}
    for data in all_video_data:
        uid = data["Person ID"]
        id_counts[uid] = id_counts.get(uid, 0) + 1

    # Add count information to each entry
    for data in all_video_data:
        uid = data["Person ID"]
        data["Count"] = id_counts[uid]

    # Write all data to a single CSV file
    with open(output_csv_path, mode='w', newline='') as file:
        writer = csv.DictWriter(file, fieldnames=["Person ID", "Image Path", "Frame Number", "Video Number", "Count"])
        writer.writeheader()
        writer.writerows(all_video_data)

    print(f"CSV file saved at {output_csv_path}")

In [None]:
#from part2 import load_model  # Ensure part2.py has these functions

if __name__ == "__main__":
    # Paths
    videos_dir = r'/content/drive/MyDrive/Computer Vision/videos'
    frames_dir = r'/content/drive/MyDrive/Computer Vision/Frames'
    output_csv_path = r'/content/drive/MyDrive/Computer Vision/CSV OUPUTS'

    # Load model
    model = load_model()

    # Process all videos and collect data
    all_video_data = process_all_videos(videos_dir, frames_dir, model)

    # Save results to a single CSV file
    save_to_csv(all_video_data, output_csv_path)