<a href="https://colab.research.google.com/github/Meenakshi2434/Project_Person-Detection-and-Tracking/blob/main/Main_code.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import yt_dlp
import logging
import os

# Function to download a video from YouTube
def download_video_from_youtube(url, index, save_path):
    ydl_opts = {
        'outtmpl': os.path.join(save_path, f'video_{index}.%(ext)s'),
        'format': 'mp4'
    }
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        info_dict = ydl.extract_info(url, download=True)
        video_file = ydl.prepare_filename(info_dict)
    return video_file

# Function to download videos from a text file
def download_videos_from_file(file_path, save_path):
    with open(file_path, 'r') as file:
        urls = file.readlines()

    for index, url in enumerate(urls, start=1):
        url = url.strip()
        if not url:
            continue

        try:
            logging.info(f"Downloading video {index} from {url}")
            download_video_from_youtube(url, index, save_path)
        except Exception as e:
            logging.error(f"Failed to download video {index} from {url}: {e}")

# Usage
if __name__ == "__main__":
    # Ensure the log file is created
    logging.basicConfig(filename='download_log.txt', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

    # Specify the path to your text file with video URLs
    file_path = 'test_videos.txt'

    # Specify the save path for the downloaded videos
    save_path = r'C:\Users\hp\Videos'

    # Create the save directory if it doesn't exist
    os.makedirs(save_path, exist_ok=True)

    # Download videos listed in the text file
    download_videos_from_file(file_path, save_path)

[youtube] Extracting URL: https://www.youtube.com/watch?v=V9YDDpo9LWg
[youtube] V9YDDpo9LWg: Downloading webpage
[youtube] V9YDDpo9LWg: Downloading ios player API JSON
[youtube] V9YDDpo9LWg: Downloading web creator player API JSON
[youtube] V9YDDpo9LWg: Downloading m3u8 information
[info] V9YDDpo9LWg: Downloading 1 format(s): 18
[download] Destination: C:\Users\shlok\OneDrive\Desktop\Computer Vision\Approach 2\Videos\video_1.mp4
[download] 100% of    7.52MiB in 00:00:02 at 3.31MiB/s   
[youtube] Extracting URL: https://www.youtube.com/watch?v=JBoc3w5EKfI
[youtube] JBoc3w5EKfI: Downloading webpage
[youtube] JBoc3w5EKfI: Downloading ios player API JSON
[youtube] JBoc3w5EKfI: Downloading web creator player API JSON
[youtube] JBoc3w5EKfI: Downloading m3u8 information
[info] JBoc3w5EKfI: Downloading 1 format(s): 18
[download] Destination: C:\Users\shlok\OneDrive\Desktop\Computer Vision\Approach 2\Videos\video_2.mp4
[download] 100% of    3.85MiB in 00:00:02 at 1.77MiB/s   
[youtube] Extracti



[youtube:tab] PL5B0D2D5B4BFE92C1 page 1: Downloading API JSON




[youtube:tab] PL5B0D2D5B4BFE92C1 page 1: Downloading API JSON




[youtube:tab] PL5B0D2D5B4BFE92C1 page 1: Downloading API JSON




[youtube:tab] Playlist In The Son-Rise Program® Playroom for Autism: Downloading 6 items of 6
[download] Downloading item 1 of 6
[youtube] Extracting URL: https://www.youtube.com/watch?v=Hd9marUY5GQ
[youtube] Hd9marUY5GQ: Downloading webpage
[youtube] Hd9marUY5GQ: Downloading ios player API JSON
[youtube] Hd9marUY5GQ: Downloading web creator player API JSON
[youtube] Hd9marUY5GQ: Downloading m3u8 information
[info] Hd9marUY5GQ: Downloading 1 format(s): 18
[download] Destination: C:\Users\shlok\OneDrive\Desktop\Computer Vision\Approach 2\Videos\video_15.mp4
[download] 100% of   21.01MiB in 00:00:11 at 1.90MiB/s     
[download] Downloading item 2 of 6
[youtube] Extracting URL: https://www.youtube.com/watch?v=81fShK4roew
[youtube] 81fShK4roew: Downloading webpage
[youtube] 81fShK4roew: Downloading ios player API JSON
[youtube] 81fShK4roew: Downloading web creator player API JSON
[youtube] 81fShK4roew: Downloading m3u8 information
[info] 81fShK4roew: Downloading 1 format(s): 18
[download] 

In [None]:
import cv2
import tensorflow as tf
import numpy as np
import tensorflow_hub as hub

# Function to load the TensorFlow model
def load_model():
    model_url = "https://tfhub.dev/tensorflow/ssd_mobilenet_v2/2"
    model = hub.load(model_url)
    return model

# Function to perform object detection
def detect_objects(model, image):
    input_tensor = tf.convert_to_tensor(image)
    input_tensor = input_tensor[tf.newaxis, ...]
    detections = model(input_tensor)
    return detections

# Function to assign unique IDs to detected people
def assign_unique_id(detected_people, unique_ids, next_id):
    new_unique_ids = {}
    for person in detected_people:
        person_tuple = tuple(person)  # Convert list to tuple for hashable key
        if person_tuple not in unique_ids:
            unique_ids[person_tuple] = next_id
            next_id += 1
        new_unique_ids[person_tuple] = unique_ids[person_tuple]
    return new_unique_ids, next_id

# Main function to process the video
def process_video(video_path, model, output_frame_dir):
    cap = cv2.VideoCapture(video_path)
    frame_num = 0
    unique_ids = {}
    next_id = 1

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frame_num += 1

        # Skip some frames to reduce processing load
        if frame_num % 10 != 0:
            continue

        # Perform object detection
        detections = detect_objects(model, frame)

        # Extract person detections
        person_detections = []
        for i in range(int(detections['num_detections'][0])):
            if detections['detection_classes'][0][i].numpy() == 1:  # Class 1 corresponds to 'person'
                ymin, xmin, ymax, xmax = detections['detection_boxes'][0][i].numpy()
                h, w, _ = frame.shape
                person_detections.append([xmin * w, ymin * h, xmax * w, ymax * h])

        # Assign unique IDs to detected people
        new_unique_ids, next_id = assign_unique_id(person_detections, unique_ids, next_id)

        # Save the frames where persons are detected
        for detection, uid in zip(person_detections, new_unique_ids.values()):
            x1, y1, x2, y2 = detection
            person_image = frame[int(y1):int(y2), int(x1):int(x2)]
            person_image_path = f'{output_frame_dir}/person_{uid}_frame_{frame_num}.jpg'
            cv2.imwrite(person_image_path, person_image)

    cap.release()
    print("Detection and ID assignment complete.")

# Usage
if __name__ == "__main__":
    video_path = r"C:\Users\hp\Videos\your_video.mp4"  # Update with the correct video filename
    output_frame_dir = r"C:\Users\hp\Frames"

    # Load the TensorFlow model
    model = load_model()

    # Process the video
    process_video(video_path, model, output_frame_dir)

Detection and ID assignment complete.


In [None]:
def process_video(video_path, model, output_frame_dir):
    cap = cv2.VideoCapture(video_path)
    frame_num = 0
    unique_ids = {}
    next_id = 1

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frame_num += 1

        # Skip frames to reduce processing load (process every 10th frame)
        if frame_num % 10 != 0:
            continue

        # Perform object detection
        detections = detect_objects(model, frame)

        # Extract person detections
        person_detections = []
        for i in range(int(detections['num_detections'][0])):
            if detections['detection_classes'][0][i].numpy() == 1:  # Class 1 corresponds to 'person'
                ymin, xmin, ymax, xmax = detections['detection_boxes'][0][i].numpy()
                h, w, _ = frame.shape
                person_detections.append([xmin * w, ymin * h, xmax * w, ymax * h])

        # Assign unique IDs to detected people
        new_unique_ids = {}
        for person in person_detections:
            person_tuple = tuple(person)
            if person_tuple not in unique_ids:
                unique_ids[person_tuple] = next_id
                next_id += 1
            new_unique_ids[person_tuple] = unique_ids[person_tuple]

        # Save detected person images
        for detection, uid in zip(person_detections, new_unique_ids.values()):
            x1, y1, x2, y2 = detection
            person_image = frame[int(y1):int(y2), int(x1):int(x2)]
            person_image_path = os.path.join(output_frame_dir, f'person_{uid}_frame_{frame_num}.jpg')
            cv2.imwrite(person_image_path, person_image)

    cap.release()
    print(f"Completed processing for video: {os.path.basename(video_path)}")
    return unique_ids

In [None]:
def process_all_videos(videos_dir, frames_dir, model):
    all_unique_ids = {}

    # Ensure frames directory exists
    os.makedirs(frames_dir, exist_ok=True)

    # Iterate over all video files in the directory
    for video_file in os.listdir(videos_dir):
        if video_file.endswith('.mp4'):
            video_path = os.path.join(videos_dir, video_file)
            video_name = os.path.splitext(video_file)[0]

            # Create a specific frames directory for each video
            video_frames_dir = os.path.join(frames_dir, video_name)
            os.makedirs(video_frames_dir, exist_ok=True)

            # Process video and get unique IDs
            unique_ids = process_video(video_path, model, video_frames_dir)

            # Merge unique IDs into the aggregated dictionary
            all_unique_ids[video_name] = unique_ids

    return all_unique_ids


In [None]:
import csv
import os

def save_to_csv(all_unique_ids, output_csv_dir):
    os.makedirs(output_csv_dir, exist_ok=True)

    for video_name, unique_ids in all_unique_ids.items():
        csv_path = os.path.join(output_csv_dir, f"{video_name}_detection.csv")

        # Count occurrences
        id_counts = {}
        for uid in unique_ids.values():
            id_counts[uid] = id_counts.get(uid, 0) + 1

        # Write to CSV
        with open(csv_path, mode='w', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(['Person ID', 'Count'])
            for uid, count in id_counts.items():
                writer.writerow([uid, count])

        print(f"CSV file saved for {video_name} at {csv_path}")

In [None]:
if __name__ == "__main__":
    # Paths
    videos_dir = r'C:\Users\hp\Videos'
    frames_dir = r'C:\Users\hp\Frames'
    output_csv_dir = r'C:\Users\hp\CSV_Outputs'

    # Load model
    model = load_model()

    # Process all videos and get unique IDs
    all_unique_ids = process_all_videos(videos_dir, frames_dir, model)

    # Save results to CSV files
    save_to_csv(all_unique_ids, output_csv_dir)


Completed processing for video: video_1.mp4
Completed processing for video: video_10.mp4
Completed processing for video: video_11.mp4
Completed processing for video: video_12.mp4
Completed processing for video: video_13.mp4
Completed processing for video: video_14.mp4
Completed processing for video: video_15.mp4
Completed processing for video: video_16.mp4
Completed processing for video: video_17.mp4
Completed processing for video: video_18.mp4
Completed processing for video: video_19.mp4
Completed processing for video: video_2.mp4
Completed processing for video: video_20.mp4
Completed processing for video: video_3.mp4
Completed processing for video: video_4.mp4
Completed processing for video: video_5.mp4
Completed processing for video: video_6.mp4
Completed processing for video: video_7.mp4
Completed processing for video: video_8.mp4
Completed processing for video: video_9.mp4
CSV file saved for video_1 at C:\Users\shlok\OneDrive\Desktop\Computer Vision\Approach 2\CSV_Outputs\video_1_

In [None]:
def process_video(video_path, model, output_frame_dir, video_number):
    cap = cv2.VideoCapture(video_path)
    frame_num = 0
    unique_ids = {}
    next_id = 1
    video_data = []

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frame_num += 1

        # Skip some frames to reduce processing load
        if frame_num % 10 != 0:
            continue

        # Perform object detection
        detections = detect_objects(model, frame)

        # Extract person detections
        person_detections = []
        for i in range(int(detections['num_detections'][0])):
            if detections['detection_classes'][0][i].numpy() == 1:  # Class 1 corresponds to 'person'
                ymin, xmin, ymax, xmax = detections['detection_boxes'][0][i].numpy()
                h, w, _ = frame.shape
                person_detections.append([xmin * w, ymin * h, xmax * w, ymax * h])

        # Assign unique IDs to detected people
        new_unique_ids, next_id = assign_unique_id(person_detections, unique_ids, next_id)

        # Save the frames where persons are detected
        for detection, uid in zip(person_detections, new_unique_ids.values()):
            x1, y1, x2, y2 = detection
            person_image = frame[int(y1):int(y2), int(x1):int(x2)]
            person_image_path = f'{output_frame_dir}/person_{uid}_frame_{frame_num}.jpg'
            cv2.imwrite(person_image_path, person_image)

            # Collect data for CSV
            video_data.append({
                'Person ID': uid,
                'Image Path': person_image_path,
                'Frame Number': frame_num,
                'Video Number': video_number,
                'Count': 1  # This is just a placeholder; it will be counted later
            })

    cap.release()
    return video_data

In [None]:
def process_all_videos(videos_dir, frames_dir, model):
    all_video_data = []

    for video_number, video_file in enumerate(os.listdir(videos_dir), start=1):
        video_path = os.path.join(videos_dir, video_file)
        video_frames_dir = os.path.join(frames_dir, f"video_{video_number}_frames")
        os.makedirs(video_frames_dir, exist_ok=True)

        # Process video and collect data
        video_data = process_video(video_path, model, video_frames_dir, video_number)

        # Merge video data into the aggregated list
        all_video_data.extend(video_data)

    return all_video_data


In [None]:
import csv
import os

def save_to_csv(all_video_data, output_csv_path):
    # Create a dictionary to count occurrences of each Person ID
    id_counts = {}
    for data in all_video_data:
        uid = data["Person ID"]
        id_counts[uid] = id_counts.get(uid, 0) + 1

    # Add count information to each entry
    for data in all_video_data:
        uid = data["Person ID"]
        data["Count"] = id_counts[uid]

    # Write all data to a single CSV file
    with open(output_csv_path, mode='w', newline='') as file:
        writer = csv.DictWriter(file, fieldnames=["Person ID", "Image Path", "Frame Number", "Video Number", "Count"])
        writer.writeheader()
        writer.writerows(all_video_data)

    print(f"CSV file saved at {output_csv_path}")


In [None]:
#from part2 import load_model  # Ensure part2.py has these functions

if __name__ == "__main__":
    # Paths
    videos_dir = r'C:\Users\hp\Videos'
    frames_dir = r'C:\hp\Frames'
    output_csv_path = r'C:\Users\hp\CSV_Outputs\detections_summary.csv'

    # Load model
    model = load_model()

    # Process all videos and collect data
    all_video_data = process_all_videos(videos_dir, frames_dir, model)

    # Save results to a single CSV file
    save_to_csv(all_video_data, output_csv_path)