In [1]:
!nvidia-smi

/bin/bash: line 1: nvidia-smi: command not found


In [2]:
!pip install gdown



In [3]:
! rm -rf video.mp4

In [4]:
import gdown

# Update with your file's specific ID
file_id = "15C3ZIbvy4CemGBOmIkrGK7H7a2tpYaBJ"
url = f"https://drive.google.com/uc?id={file_id}"

output = "people-counting.mp4"
gdown.download(url, output, quiet=False)

Downloading...
From: https://drive.google.com/uc?id=15C3ZIbvy4CemGBOmIkrGK7H7a2tpYaBJ
To: /content/people-counting.mp4
100%|██████████| 42.5M/42.5M [00:01<00:00, 38.0MB/s]


'people-counting.mp4'

In [5]:
import os
HOME = os.getcwd()
print(HOME)

/content


In [6]:
SOURCE_VIDEO_PATH = os.path.join(HOME, "people-counting.mp4")

In [7]:
# Pip install method (recommended)

!pip install "ultralytics<=8.3.40"

from IPython import display
display.clear_output()

import ultralytics
ultralytics.checks()

Ultralytics 8.3.40 🚀 Python-3.11.11 torch-2.6.0+cu124 CPU (Intel Xeon 2.20GHz)
Setup complete ✅ (2 CPUs, 12.7 GB RAM, 41.2/107.7 GB disk)


In [8]:
!pip install supervision==0.3.0

from IPython import display
display.clear_output()

import supervision
print("supervision.__version__:", supervision.__version__)

supervision.__version__: 0.3.0


In [20]:
# settings
MODEL = "yolov8x.pt"

In [21]:
from ultralytics import YOLO

model = YOLO(MODEL)
model.fuse()

YOLOv8x summary (fused): 268 layers, 68,200,608 parameters, 0 gradients, 257.8 GFLOPs


In [11]:
from ultralytics import YOLO

# Load YOLO model
model = YOLO('yolov8s.pt')

# Get class names from the model
class_names = model.names  # Dictionary {class_id: class_name}

# Print all class IDs and names
for class_id, class_name in class_names.items():
    print(f"Class ID: {class_id}, Class Name: {class_name}")

Downloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8s.pt to 'yolov8s.pt'...


100%|██████████| 21.5M/21.5M [00:00<00:00, 41.2MB/s]

Class ID: 0, Class Name: person
Class ID: 1, Class Name: bicycle
Class ID: 2, Class Name: car
Class ID: 3, Class Name: motorcycle
Class ID: 4, Class Name: airplane
Class ID: 5, Class Name: bus
Class ID: 6, Class Name: train
Class ID: 7, Class Name: truck
Class ID: 8, Class Name: boat
Class ID: 9, Class Name: traffic light
Class ID: 10, Class Name: fire hydrant
Class ID: 11, Class Name: stop sign
Class ID: 12, Class Name: parking meter
Class ID: 13, Class Name: bench
Class ID: 14, Class Name: bird
Class ID: 15, Class Name: cat
Class ID: 16, Class Name: dog
Class ID: 17, Class Name: horse
Class ID: 18, Class Name: sheep
Class ID: 19, Class Name: cow
Class ID: 20, Class Name: elephant
Class ID: 21, Class Name: bear
Class ID: 22, Class Name: zebra
Class ID: 23, Class Name: giraffe
Class ID: 24, Class Name: backpack
Class ID: 25, Class Name: umbrella
Class ID: 26, Class Name: handbag
Class ID: 27, Class Name: tie
Class ID: 28, Class Name: suitcase
Class ID: 29, Class Name: frisbee
Class ID:




#### Track and Count People 

* Count total people in the video
* Count people entering and exiting

In [27]:
import supervision as sv
from ultralytics import YOLO
import os
import cv2
import numpy as np
import json
from datetime import datetime

# Video Paths
TARGET_VIDEO_PATH = 'output_video.mp4'

# Define two polygonal areas
area1 = np.array([(1169, 1678+50), (1942, 2025+50), (1816, 2102+50), (1085, 1703+50)], np.int32)
area2 = np.array([(1040, 1710+50), (1771, 2117+50), (1673, 2142+50), (981, 1713+50)], np.int32)

# Initialize counts
total_count = 0
entering_count = 0
exiting_count = 0
tracker_states = {}  # Track movement of each ID
detection_data = {}  # Store detection data for JSON

# Open video info
video_info = sv.VideoInfo.from_video_path(SOURCE_VIDEO_PATH)

# Video Sink
with sv.VideoSink(TARGET_VIDEO_PATH, video_info) as sink:
    for frame_number, result in enumerate(
        model.track(source=SOURCE_VIDEO_PATH, tracker='bytetrack.yaml', show=False, stream=True, persist=True)
    ):
        frame = result.orig_img
        detections = sv.Detections.from_yolov8(result)

        # Only keep people (COCO class 0)
        PEOPLE_CLASS_ID = 0
        people_detections = []

        if result.boxes.id is not None:  # Ensure we have tracker IDs
            tracker_ids = result.boxes.id.cpu().numpy().astype(int)  # Get tracker IDs
            for i, (bbox, confidence, class_id) in enumerate(zip(detections.xyxy, detections.confidence, detections.class_id)):
                if class_id == PEOPLE_CLASS_ID:
                    tracker_id = tracker_ids[i] if i < len(tracker_ids) else None
                    people_detections.append((bbox, confidence, class_id, tracker_id))

        # Process detections
        for bbox, confidence, class_id, tracker_id in people_detections:
            x1, y1, x2, y2 = bbox  # Get bounding box coordinates
            bottom_right = (int(x2), int(y2))  # Bottom-right corner

            # Check if the bottom-right corner is in an area
            in_area1 = cv2.pointPolygonTest(area1, bottom_right, False) >= 0
            in_area2 = cv2.pointPolygonTest(area2, bottom_right, False) >= 0

            if tracker_id not in tracker_states:
                tracker_states[tracker_id] = []  # Initialize state tracking
                detection_data[tracker_id] = {  # Initialize detection data
                    "tracker_id": int(tracker_id),  # Convert to standard Python int
                    "entry_time": None,
                    "exit_time": None,
                    "bbox_history": [],
                    "confidence": float(confidence),  # Convert to standard Python float
                    "class_id": int(class_id)  # Convert to standard Python int
                }

            # Track order of area entry
            if in_area2 and "area2" not in tracker_states[tracker_id]:
                tracker_states[tracker_id].append("area2")
                if tracker_states[tracker_id] == ["area2"]:
                    detection_data[tracker_id]["entry_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            if in_area1 and "area1" not in tracker_states[tracker_id]:
                tracker_states[tracker_id].append("area1")
                if tracker_states[tracker_id] == ["area1"]:
                    detection_data[tracker_id]["exit_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

            # Determine if entering or exiting
            if tracker_states[tracker_id] == ["area2", "area1"]:
                entering_count += 1
                tracker_states[tracker_id] = []  # Reset state after count
            elif tracker_states[tracker_id] == ["area1", "area2"]:
                exiting_count += 1
                tracker_states[tracker_id] = []  # Reset state after count

            total_count = entering_count + exiting_count

            # Update bbox history
            detection_data[tracker_id]["bbox_history"].append({
                "frame_number": int(frame_number),  # Convert to standard Python int
                "bbox": [float(x1), float(y1), float(x2), float(y2)],  # Convert to standard Python floats
                "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            })

            # Draw bounding box and ID
            cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (255, 0, 0), 2)
            cv2.putText(frame, f"ID: {tracker_id}", (int(x1), int(y1) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)

            # Draw bottom-right corner marker
            cv2.circle(frame, bottom_right, 5, (0, 0, 255), -1)

        # Draw polygons on the frame
        cv2.polylines(frame, [area1], isClosed=True, color=(255, 0, 0), thickness=2)
        cv2.polylines(frame, [area2], isClosed=True, color=(0, 255, 0), thickness=2)

        # Display counts
        cv2.putText(frame, f"Total: {total_count}", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
        cv2.putText(frame, f"Entering: {entering_count}", (50, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        cv2.putText(frame, f"Exiting: {exiting_count}", (50, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)

        # Save frame to video
        sink.write_frame(frame)

# Prepare JSON data
json_data = {
    "summary": {
        "total_people": int(total_count),  # Convert to standard Python int
        "total_entering": int(entering_count),  # Convert to standard Python int
        "total_exiting": int(exiting_count),  # Convert to standard Python int
        "total_timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    },
    "detections": {
        # Convert NumPy int64 keys to standard Python int
        int(tracker_id): data
        for tracker_id, data in detection_data.items()
    }
}

# Save all data to a single JSON file
json_path = "detection_data.json"
with open(json_path, "w") as json_file:
    json.dump(json_data, json_file, indent=4)

print("Processing complete. Output video saved.")
print(f"Total: {total_count}, Entering: {entering_count}, Exiting: {exiting_count}")


video 1/1 (frame 1/592) /content/people-counting.mp4: 384x640 1 potted plant, 3863.9ms
video 1/1 (frame 2/592) /content/people-counting.mp4: 384x640 1 car, 1 potted plant, 2814.4ms
video 1/1 (frame 3/592) /content/people-counting.mp4: 384x640 1 car, 1 potted plant, 2830.6ms
video 1/1 (frame 4/592) /content/people-counting.mp4: 384x640 1 car, 1 potted plant, 2758.0ms
video 1/1 (frame 5/592) /content/people-counting.mp4: 384x640 1 car, 1 potted plant, 3898.0ms
video 1/1 (frame 6/592) /content/people-counting.mp4: 384x640 1 person, 1 car, 1 potted plant, 2775.9ms
video 1/1 (frame 7/592) /content/people-counting.mp4: 384x640 1 person, 1 car, 1 potted plant, 2756.7ms
video 1/1 (frame 8/592) /content/people-counting.mp4: 384x640 1 person, 1 car, 1 potted plant, 2747.6ms
video 1/1 (frame 9/592) /content/people-counting.mp4: 384x640 1 person, 1 car, 1 potted plant, 3954.7ms
video 1/1 (frame 10/592) /content/people-counting.mp4: 384x640 1 person, 1 car, 1 bench, 1 potted plant, 2798.2ms
video 

#### Enter Exit People Count and Age Gender Detectin

##### Enter Exit Count + Age Gender

* Add age gender detection to above code

In [None]:
!pip install deepface


In [None]:
import supervision as sv
from ultralytics import YOLO
import os
import cv2
import numpy as np
import json
from datetime import datetime
from deepface import DeepFace

# Video Paths
TARGET_VIDEO_PATH = 'output_video.mp4'

# Define two polygonal areas
area1 = np.array([(1169, 1678+50), (1942, 2025+50), (1816, 2102+50), (1085, 1703+50)], np.int32)
area2 = np.array([(1040, 1710+50), (1771, 2117+50), (1673, 2142+50), (981, 1713+50)], np.int32)

# Initialize counts
total_count = 0
entering_count = 0
exiting_count = 0
tracker_states = {}  # Track movement of each ID
detection_data = {}  # Store detection data for JSON

# Open video info
video_info = sv.VideoInfo.from_video_path(SOURCE_VIDEO_PATH)

# Function to get age and gender using DeepFace
def predict_age_gender(face):
    try:
        analysis = DeepFace.analyze(face, actions=["age", "gender"], enforce_detection=False)
        age = analysis[0]["age"]
        gender = analysis[0]["dominant_gender"]
        return gender, age
    except Exception as e:
        print(f"Error in age/gender prediction: {e}")
        return "Unknown", "Unknown"

# Video Sink
with sv.VideoSink(TARGET_VIDEO_PATH, video_info) as sink:
    for frame_number, result in enumerate(
        model.track(source=SOURCE_VIDEO_PATH, tracker='bytetrack.yaml', show=False, stream=True, persist=True)
    ):
        frame = result.orig_img
        detections = sv.Detections.from_yolov8(result)

        # Only keep people (COCO class 0)
        PEOPLE_CLASS_ID = 0
        people_detections = []

        if result.boxes.id is not None:  # Ensure we have tracker IDs
            tracker_ids = result.boxes.id.cpu().numpy().astype(int)  # Get tracker IDs
            for i, (bbox, confidence, class_id) in enumerate(zip(detections.xyxy, detections.confidence, detections.class_id)):
                if class_id == PEOPLE_CLASS_ID:
                    tracker_id = tracker_ids[i] if i < len(tracker_ids) else None
                    people_detections.append((bbox, confidence, class_id, tracker_id))

        # Process detections
        for bbox, confidence, class_id, tracker_id in people_detections:
            x1, y1, x2, y2 = bbox  # Get bounding box coordinates
            bottom_right = (int(x2), int(y2))  # Bottom-right corner

            # Crop face for gender and age prediction
            face = frame[int(y1):int(y2), int(x1):int(x2)]
            gender, age = "Unknown", "Unknown"
            if face.size > 0:
                gender, age = predict_age_gender(face)

            # Check if the bottom-right corner is in an area
            in_area1 = cv2.pointPolygonTest(area1, bottom_right, False) >= 0
            in_area2 = cv2.pointPolygonTest(area2, bottom_right, False) >= 0

            if tracker_id not in tracker_states:
                tracker_states[tracker_id] = []  # Initialize state tracking
                detection_data[tracker_id] = {  # Initialize detection data
                    "tracker_id": int(tracker_id),  # Convert to standard Python int
                    "gender": gender,
                    "age": age,
                    "entry_time": None,
                    "exit_time": None,
                    "bbox_history": [],
                    "confidence": float(confidence),  # Convert to standard Python float
                    "class_id": int(class_id)  # Convert to standard Python int
                }

            # Track order of area entry
            if in_area2 and "area2" not in tracker_states[tracker_id]:
                tracker_states[tracker_id].append("area2")
                if tracker_states[tracker_id] == ["area2"]:
                    detection_data[tracker_id]["entry_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            if in_area1 and "area1" not in tracker_states[tracker_id]:
                tracker_states[tracker_id].append("area1")
                if tracker_states[tracker_id] == ["area1"]:
                    detection_data[tracker_id]["exit_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

            # Determine if entering or exiting
            if tracker_states[tracker_id] == ["area2", "area1"]:
                entering_count += 1
                tracker_states[tracker_id] = []  # Reset state after count
            elif tracker_states[tracker_id] == ["area1", "area2"]:
                exiting_count += 1
                tracker_states[tracker_id] = []  # Reset state after count

            total_count = entering_count + exiting_count

            # Update bbox history
            detection_data[tracker_id]["bbox_history"].append({
                "frame_number": int(frame_number),  # Convert to standard Python int
                "bbox": [float(x1), float(y1), float(x2), float(y2)],  # Convert to standard Python floats
                "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            })

            # Draw bounding box and ID
            cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (255, 0, 0), 2)
            label = f"ID: {tracker_id} | {gender}, {age}"
            cv2.putText(frame, label, (int(x1), int(y1) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)

            # Draw bottom-right corner marker
            cv2.circle(frame, bottom_right, 5, (0, 0, 255), -1)

        # Draw polygons on the frame
        cv2.polylines(frame, [area1], isClosed=True, color=(255, 0, 0), thickness=2)
        cv2.polylines(frame, [area2], isClosed=True, color=(0, 255, 0), thickness=2)

        # Display counts
        cv2.putText(frame, f"Total: {total_count}", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
        cv2.putText(frame, f"Entering: {entering_count}", (50, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        cv2.putText(frame, f"Exiting: {exiting_count}", (50, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)

        # Save frame to video
        sink.write_frame(frame)

# Prepare JSON data
json_data = {
    "summary": {
        "total_people": int(total_count),  # Convert to standard Python int
        "total_entering": int(entering_count),  # Convert to standard Python int
        "total_exiting": int(exiting_count),  # Convert to standard Python int
        "total_timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    },
    "detections": {
        # Convert NumPy int64 keys to standard Python int
        int(tracker_id): data
        for tracker_id, data in detection_data.items()
    }
}

# Save all data to a single JSON file
json_path = "detection_data.json"
with open(json_path, "w") as json_file:
    json.dump(json_data, json_file, indent=4)

print("Processing complete. Output video saved.")
print(f"Total: {total_count}, Entering: {entering_count}, Exiting: {exiting_count}")

##### Get metadata

In [None]:
!pip install ffmpeg-python

###### Get video creation date

In [None]:
import ffmpeg

def get_video_creation_time(video_path):
    try:
        probe = ffmpeg.probe(video_path)
        
        # Check both 'format' and 'streams' for creation_time
        creation_time = None
        
        # 1. Check format tags
        if 'format' in probe and 'tags' in probe['format']:
            creation_time = probe['format']['tags'].get('creation_time')
        
        # 2. Fallback to stream tags
        if not creation_time:
            for stream in probe.get('streams', []):
                if 'tags' in stream and 'creation_time' in stream['tags']:
                    creation_time = stream['tags']['creation_time']
                    break
        
        return creation_time
    
    except ffmpeg.Error as e:
        print(f"FFmpeg error: {e.stderr.decode('utf-8')}")
        return None
    except Exception as e:
        print(f"Error: {str(e)}")
        return None

# Usage
creation_time = get_video_creation_time("/content/people-counting.mp4")
if creation_time:
    print(f"Creation time: {creation_time}")
else:
    print("No creation time found in metadata")

###### Get all metadata

In [None]:
import ffmpeg
from datetime import datetime, timezone
import json

def extract_video_metadata(video_path):
    """Extract all available metadata from a video file using ffmpeg-python."""
    try:
        probe = ffmpeg.probe(video_path)
        metadata = {}

        # ===== 1. General Video Information =====
        if "format" in probe:
            format_info = probe["format"]
            metadata.update({
                "filename": format_info.get("filename"),
                "format_name": format_info.get("format_name"),
                "format_long_name": format_info.get("format_long_name"),
                "duration_seconds": float(format_info.get("duration", 0)),
                "size_bytes": int(format_info.get("size", 0)),
                "bitrate": int(format_info.get("bit_rate", 0)),
            })

            # Extract creation_time (if available)
            if "tags" in format_info:
                metadata.update({
                    "creation_time": format_info["tags"].get("creation_time"),
                    "encoder": format_info["tags"].get("encoder"),
                })

        # ===== 2. Video Stream Metadata =====
        video_streams = [s for s in probe["streams"] if s["codec_type"] == "video"]
        if video_streams:
            video_info = video_streams[0]
            metadata.update({
                "video_codec": video_info.get("codec_name"),
                "width": int(video_info.get("width", 0)),
                "height": int(video_info.get("height", 0)),
                "fps": eval(video_info.get("avg_frame_rate", "0/1")),  # e.g., "30/1" → 30.0
            })

            # Extract device-specific metadata (iPhone, Android, etc.)
            if "tags" in video_info:
                metadata.update({
                    "device_model": video_info["tags"].get("com.apple.quicktime.model"),
                    "software": video_info["tags"].get("software"),
                })

        # ===== 3. Audio Stream Metadata =====
        audio_streams = [s for s in probe["streams"] if s["codec_type"] == "audio"]
        if audio_streams:
            audio_info = audio_streams[0]
            metadata.update({
                "audio_codec": audio_info.get("codec_name"),
                "sample_rate": int(audio_info.get("sample_rate", 0)),
                "channels": int(audio_info.get("channels", 0)),
            })

        # ===== 4. GPS Coordinates (if recorded) =====
        if "format" in probe and "tags" in probe["format"]:
            tags = probe["format"]["tags"]
            if "location" in tags:  # Some Android devices store GPS here
                metadata["gps_coordinates"] = tags["location"]
            elif "com.apple.quicktime.location.ISO6709" in tags:  # iPhone GPS
                metadata["gps_coordinates"] = tags["com.apple.quicktime.location.ISO6709"]

        # ===== 5. Convert ISO Timestamp to Readable Format =====
        if "creation_time" in metadata:
            try:
                dt = datetime.strptime(metadata["creation_time"].split(".")[0], "%Y-%m-%dT%H:%M:%S")
                dt = dt.replace(tzinfo=timezone.utc)
                metadata["creation_time_utc"] = dt.strftime("%Y-%m-%d %H:%M:%S UTC")
                metadata["creation_time_local"] = dt.astimezone().strftime("%Y-%m-%d %H:%M:%S %Z")
            except Exception:
                pass

        return metadata

    except ffmpeg.Error as e:
        print(f"FFmpeg error: {e.stderr.decode('utf-8')}")
        return None
    except Exception as e:
        print(f"Error: {str(e)}")
        return None

# Example Usage
metadata = extract_video_metadata("/content/people-counting.mp4")
print(json.dumps(metadata, indent=4))

#### Video Metadata Extraction and  Carrying Object Identification

##### Enter Exit Count + Age Gender + Video Metadata + Carrying Object

* Add video metadata and carrying objects detection to above code


In [None]:
import supervision as sv
from ultralytics import YOLO
import cv2
import numpy as np
import json
from datetime import datetime, timedelta
import os
from deepface import DeepFace
import ffmpeg
from datetime import timezone

# ===== Configuration =====
TARGET_VIDEO_PATH = "output_video_updated.mp4"
JSON_OUTPUT_PATH = "detection_data_updated.json"

# Define polygonal areas for counting
area1 = np.array([(1169, 1678+50), (1942, 2025+50), (1816, 2102+50), (1085, 1703+50)], np.int32)
area2 = np.array([(1040, 1710+50), (1771, 2117+50), (1673, 2142+50), (981, 1713+50)], np.int32)

# Object classes (COCO dataset)
BAG_CLASSES = [24, 26, 28]  # backpack, handbag, suitcase
CAT_CLASS = 15
DOG_CLASS = 16

# ===== Initialize =====
total_count = 0
entering_count = 0
exiting_count = 0
tracker_states = {}
detection_data = {}

# ===== Helper Functions =====
def extract_video_metadata(video_path):
    """Extract all available metadata from a video file using ffmpeg-python."""
    try:
        probe = ffmpeg.probe(video_path)
        metadata = {}

        # ===== 1. General Video Information =====
        if "format" in probe:
            format_info = probe["format"]
            metadata.update({
                "filename": format_info.get("filename"),
                "format_name": format_info.get("format_name"),
                "format_long_name": format_info.get("format_long_name"),
                "duration_seconds": float(format_info.get("duration", 0)),
                "size_bytes": int(format_info.get("size", 0)),
                "bitrate": int(format_info.get("bit_rate", 0)),
            })

            # Extract creation_time (if available)
            if "tags" in format_info:
                metadata.update({
                    "creation_time": format_info["tags"].get("creation_time"),
                    "encoder": format_info["tags"].get("encoder"),
                })

        # ===== 2. Video Stream Metadata =====
        video_streams = [s for s in probe["streams"] if s["codec_type"] == "video"]
        if video_streams:
            video_info = video_streams[0]
            metadata.update({
                "video_codec": video_info.get("codec_name"),
                "width": int(video_info.get("width", 0)),
                "height": int(video_info.get("height", 0)),
                "fps": eval(video_info.get("avg_frame_rate", "0/1")),  # e.g., "30/1" → 30.0
            })

            # Extract device-specific metadata (iPhone, Android, etc.)
            if "tags" in video_info:
                metadata.update({
                    "device_model": video_info["tags"].get("com.apple.quicktime.model"),
                    "software": video_info["tags"].get("software"),
                })

        # ===== 3. Audio Stream Metadata =====
        audio_streams = [s for s in probe["streams"] if s["codec_type"] == "audio"]
        if audio_streams:
            audio_info = audio_streams[0]
            metadata.update({
                "audio_codec": audio_info.get("codec_name"),
                "sample_rate": int(audio_info.get("sample_rate", 0)),
                "channels": int(audio_info.get("channels", 0)),
            })

        # ===== 4. GPS Coordinates (if recorded) =====
        if "format" in probe and "tags" in probe["format"]:
            tags = probe["format"]["tags"]
            if "location" in tags:  # Some Android devices store GPS here
                metadata["gps_coordinates"] = tags["location"]
            elif "com.apple.quicktime.location.ISO6709" in tags:  # iPhone GPS
                metadata["gps_coordinates"] = tags["com.apple.quicktime.location.ISO6709"]

        # ===== 5. Convert ISO Timestamp to Readable Format =====
        if "creation_time" in metadata:
            try:
                dt = datetime.strptime(metadata["creation_time"].split(".")[0], "%Y-%m-%dT%H:%M:%S")
                dt = dt.replace(tzinfo=timezone.utc)
                metadata["creation_time_utc"] = dt.strftime("%Y-%m-%d %H:%M:%S UTC")
                metadata["creation_time_local"] = dt.astimezone().strftime("%Y-%m-%d %H:%M:%S %Z")
                metadata["recording_time"] = metadata["creation_time_local"]  # For backward compatibility
            except Exception:
                pass

        return metadata

    except ffmpeg.Error as e:
        print(f"FFmpeg error: {e.stderr.decode('utf-8')}")
        return None
    except Exception as e:
        print(f"Error: {str(e)}")
        return None

def predict_age_gender(face_img):
    """Predict age and gender using DeepFace"""
    try:
        analysis = DeepFace.analyze(face_img, actions=["age", "gender"], enforce_detection=False)
        return analysis[0]["dominant_gender"], analysis[0]["age"]
    except Exception:
        return "Unknown", "Unknown"

def is_carried(obj_bbox, person_bbox):
    """Check if an object is being carried by a person"""
    px1, py1, px2, py2 = person_bbox
    ox1, oy1, ox2, oy2 = obj_bbox
    obj_center_y = (oy1 + oy2) / 2
    lower_half_threshold = py1 + (py2 - py1) * 0.6
    overlap = (ox1 > px1) and (ox2 < px2) and (oy1 > py1) and (oy2 < py2)
    in_carry_position = obj_center_y > lower_half_threshold
    return overlap and in_carry_position

# ===== Main Processing =====
video_metadata = extract_video_metadata(SOURCE_VIDEO_PATH)
video_info = sv.VideoInfo.from_video_path(SOURCE_VIDEO_PATH)

# Get recording time from metadata or use current time as fallback
try:
    recording_time = datetime.strptime(video_metadata["creation_time"].split(".")[0], "%Y-%m-%dT%H:%M:%S")
    recording_time = recording_time.replace(tzinfo=timezone.utc)
except (KeyError, ValueError):
    print("Warning: Using current time as recording time fallback")
    recording_time = datetime.now(timezone.utc)

with sv.VideoSink(TARGET_VIDEO_PATH, video_info) as sink:
    for frame_number, result in enumerate(
        model.track(source=SOURCE_VIDEO_PATH, tracker="bytetrack.yaml", show=False, stream=True, persist=True)
    ):
        frame = result.orig_img
        detections = sv.Detections.from_yolov8(result)

        # Separate detections
        people = []
        objects = []
        if result.boxes.id is not None:
            tracker_ids = result.boxes.id.cpu().numpy().astype(int)
            for i, (bbox, conf, class_id) in enumerate(zip(detections.xyxy, detections.confidence, detections.class_id)):
                if class_id == 0:  # Person
                    tracker_id = tracker_ids[i] if i < len(tracker_ids) else None
                    people.append((bbox, conf, tracker_id))
                elif class_id in BAG_CLASSES + [CAT_CLASS, DOG_CLASS]:
                    objects.append((bbox, conf, class_id))

        # Process each person
        for person_bbox, confidence, tracker_id in people:
            x1, y1, x2, y2 = person_bbox
            bottom_right = (int(x2), int(y2))

            # Age/gender detection
            face_roi = frame[int(y1):int(y2), int(x1):int(x2)]
            gender, age = predict_age_gender(face_roi)

            # Check for carried items
            carried_items = []
            for obj_bbox, _, class_id in objects:
                if is_carried(obj_bbox, person_bbox):
                    if class_id in BAG_CLASSES:
                        carried_items.append("bag")
                    elif class_id == CAT_CLASS:
                        carried_items.append("cat")
                    elif class_id == DOG_CLASS:
                        carried_items.append("dog")

            # Initialize or update person data
            if tracker_id not in tracker_states:
                tracker_states[tracker_id] = []
                detection_data[tracker_id] = {
                    "tracker_id": int(tracker_id),
                    "gender": gender,
                    "age": age,
                    "carrying": carried_items if carried_items else "no objects",
                    "entry_time": None,
                    "exit_time": None,
                    "entry_frame": None,
                    "exit_frame": None,
                    "bbox_history": [],
                    "confidence": float(confidence)
                }

            # Area crossing logic
            in_area1 = cv2.pointPolygonTest(area1, bottom_right, False) >= 0
            in_area2 = cv2.pointPolygonTest(area2, bottom_right, False) >= 0

            if in_area2 and "area2" not in tracker_states[tracker_id]:
                tracker_states[tracker_id].append("area2")
                if tracker_states[tracker_id] == ["area2"]:
                    entry_time = recording_time + timedelta(seconds=frame_number / video_info.fps)
                    detection_data[tracker_id]["entry_time"] = entry_time.strftime("%Y-%m-%d %H:%M:%S")
                    detection_data[tracker_id]["entry_frame"] = frame_number

            if in_area1 and "area1" not in tracker_states[tracker_id]:
                tracker_states[tracker_id].append("area1")
                if tracker_states[tracker_id] == ["area1"]:
                    exit_time = recording_time + timedelta(seconds=frame_number / video_info.fps)
                    detection_data[tracker_id]["exit_time"] = exit_time.strftime("%Y-%m-%d %H:%M:%S")
                    detection_data[tracker_id]["exit_frame"] = frame_number

            # Update counts
            if tracker_states[tracker_id] == ["area2", "area1"]:
                entering_count += 1
                tracker_states[tracker_id] = []
            elif tracker_states[tracker_id] == ["area1", "area2"]:
                exiting_count += 1
                tracker_states[tracker_id] = []

            total_count = entering_count + exiting_count

            # Store bounding box history
            detection_data[tracker_id]["bbox_history"].append({
                "frame_number": int(frame_number),
                "bbox": [float(x1), float(y1), float(x2), float(y2)],
                "timestamp": (recording_time + timedelta(seconds=frame_number / video_info.fps)).strftime("%Y-%m-%d %H:%M:%S"),
                "carrying": carried_items if carried_items else "no objects"
            })

            # Draw bounding box and label
            carrying_str = ", ".join(carried_items) if carried_items else "no objects"
            label = f"ID: {tracker_id} | {gender}, {age} | Carrying: {carrying_str}"
            cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (255, 0, 0), 2)
            cv2.putText(frame, label, (int(x1), int(y1) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)

        # Draw counting areas and counters
        cv2.polylines(frame, [area1], isClosed=True, color=(255, 0, 0), thickness=2)
        cv2.polylines(frame, [area2], isClosed=True, color=(0, 255, 0), thickness=2)
        cv2.putText(frame, f"Total: {total_count}", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
        cv2.putText(frame, f"Entering: {entering_count}", (50, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        cv2.putText(frame, f"Exiting: {exiting_count}", (50, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)

        # Write frame to output video
        sink.write_frame(frame)

# ===== Save Results =====
json_output = {
    "video_metadata": video_metadata,
    "processing_time": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z"),
    "summary": {
        "total_people": int(total_count),
        "total_entering": int(entering_count),
        "total_exiting": int(exiting_count),
        "fps": float(video_info.fps),
        "duration_seconds": float(video_info.total_frames / video_info.fps)
    },
    "detections": {
        int(tracker_id): data for tracker_id, data in detection_data.items()
    }
}

with open(JSON_OUTPUT_PATH, "w") as f:
    json.dump(json_output, f, indent=4)

print(f"Processing complete. Results saved to {TARGET_VIDEO_PATH} and {JSON_OUTPUT_PATH}")
print(f"Total: {total_count}, Entering: {entering_count}, Exiting: {exiting_count}")

#### Face Mask Detection and Speed Calculation 

##### Enter Exit Count + Age Gender + Video Metadata + Carrying Object + Face Mask Detection + Speed Calculation 

* Add face mask detection and speed calculation to above code



In [None]:
!git clone https://github.com/chandrikadeb7/Face-Mask-Detection.git #get Face mask detection pre trained model

In [None]:
from tensorflow.keras.utils import img_to_array
from tensorflow.keras.models import load_model

model_path = "Face-Mask-Detection/mask_detector.model"
mask_model = load_model(model_path)


In [None]:
import supervision as sv
from ultralytics import YOLO
import cv2
import numpy as np
import json
from datetime import datetime, timedelta
import os
from deepface import DeepFace
import ffmpeg
from datetime import timezone
import math
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input


# ===== Configuration =====
TARGET_VIDEO_PATH = "output_video_mask_speed.mp4"
JSON_OUTPUT_PATH = "detection_data_mask_speed.json"

# Define polygonal areas for counting
area1 = np.array([(1169, 1678+50), (1942, 2025+50), (1816, 2102+50), (1085, 1703+50)], np.int32)
area2 = np.array([(1040, 1710+50), (1771, 2117+50), (1673, 2142+50), (981, 1713+50)], np.int32)

# Object classes (COCO dataset)
BAG_CLASSES = [24, 26, 28]  # backpack, handbag, suitcase
CAT_CLASS = 15
DOG_CLASS = 16

# Mask detection labels
MASK_LABELS = ["mask", "no_mask", "improper"]

# ===== Initialize =====
total_count = 0
entering_count = 0
exiting_count = 0
tracker_states = {}
detection_data = {}
previous_positions = {}  # To track movement between frames

# ===== Helper Functions =====
def extract_video_metadata(video_path):
    """Extract all available metadata from a video file using ffmpeg-python."""
    try:
        probe = ffmpeg.probe(video_path)
        metadata = {}

        # ===== 1. General Video Information =====
        if "format" in probe:
            format_info = probe["format"]
            metadata.update({
                "filename": format_info.get("filename"),
                "format_name": format_info.get("format_name"),
                "format_long_name": format_info.get("format_long_name"),
                "duration_seconds": float(format_info.get("duration", 0)),
                "size_bytes": int(format_info.get("size", 0)),
                "bitrate": int(format_info.get("bit_rate", 0)),
            })

            # Extract creation_time (if available)
            if "tags" in format_info:
                metadata.update({
                    "creation_time": format_info["tags"].get("creation_time"),
                    "encoder": format_info["tags"].get("encoder"),
                })

        # ===== 2. Video Stream Metadata =====
        video_streams = [s for s in probe["streams"] if s["codec_type"] == "video"]
        if video_streams:
            video_info = video_streams[0]
            metadata.update({
                "video_codec": video_info.get("codec_name"),
                "width": int(video_info.get("width", 0)),
                "height": int(video_info.get("height", 0)),
                "fps": eval(video_info.get("avg_frame_rate", "0/1")),  # e.g., "30/1" → 30.0
            })

            # Extract device-specific metadata (iPhone, Android, etc.)
            if "tags" in video_info:
                metadata.update({
                    "device_model": video_info["tags"].get("com.apple.quicktime.model"),
                    "software": video_info["tags"].get("software"),
                })

        # ===== 3. Audio Stream Metadata =====
        audio_streams = [s for s in probe["streams"] if s["codec_type"] == "audio"]
        if audio_streams:
            audio_info = audio_streams[0]
            metadata.update({
                "audio_codec": audio_info.get("codec_name"),
                "sample_rate": int(audio_info.get("sample_rate", 0)),
                "channels": int(audio_info.get("channels", 0)),
            })

        # ===== 4. GPS Coordinates (if recorded) =====
        if "format" in probe and "tags" in probe["format"]:
            tags = probe["format"]["tags"]
            if "location" in tags:  # Some Android devices store GPS here
                metadata["gps_coordinates"] = tags["location"]
            elif "com.apple.quicktime.location.ISO6709" in tags:  # iPhone GPS
                metadata["gps_coordinates"] = tags["com.apple.quicktime.location.ISO6709"]

        # ===== 5. Convert ISO Timestamp to Readable Format =====
        if "creation_time" in metadata:
            try:
                dt = datetime.strptime(metadata["creation_time"].split(".")[0], "%Y-%m-%dT%H:%M:%S")
                dt = dt.replace(tzinfo=timezone.utc)
                metadata["creation_time_utc"] = dt.strftime("%Y-%m-%d %H:%M:%S UTC")
                metadata["creation_time_local"] = dt.astimezone().strftime("%Y-%m-%d %H:%M:%S %Z")
                metadata["recording_time"] = metadata["creation_time_local"]  # For backward compatibility
            except Exception:
                pass

        return metadata

    except ffmpeg.Error as e:
        print(f"FFmpeg error: {e.stderr.decode('utf-8')}")
        return None
    except Exception as e:
        print(f"Error: {str(e)}")
        return None

def predict_age_gender(face_img):
    """Predict age and gender using DeepFace"""
    try:
        analysis = DeepFace.analyze(face_img, actions=["age", "gender"], enforce_detection=False)
        return analysis[0]["dominant_gender"], analysis[0]["age"]
    except Exception:
        return "Unknown", "Unknown"

def detect_mask(face_img):
    """Detect if person is wearing a mask"""
    try:
        face = cv2.cvtColor(face_img, cv2.COLOR_BGR2RGB)
        face = cv2.resize(face, (224, 224))
        face = img_to_array(face)
        face = preprocess_input(face)
        face = np.expand_dims(face, axis=0)

        preds = mask_model.predict(face)[0]
        max_idx = np.argmax(preds)
        return MASK_LABELS[max_idx], float(preds[max_idx])
    except Exception as e:
        print(f"Mask detection error: {str(e)}")
        return "unknown", 0.0

def is_carried(obj_bbox, person_bbox):
    """Check if an object is being carried by a person"""
    px1, py1, px2, py2 = person_bbox
    ox1, oy1, ox2, oy2 = obj_bbox
    obj_center_y = (oy1 + oy2) / 2
    lower_half_threshold = py1 + (py2 - py1) * 0.6
    overlap = (ox1 > px1) and (ox2 < px2) and (oy1 > py1) and (oy2 < py2)
    in_carry_position = obj_center_y > lower_half_threshold
    return overlap and in_carry_position

def calculate_speed(prev_position, current_position, fps, frame_interval=1):
    """
    Calculate speed in pixels per second between two positions
    frame_interval: number of frames between measurements (for smoothing)
    """
    if prev_position is None or current_position is None:
        return 0

    # Use center points for speed calculation
    prev_center = ((prev_position[0] + prev_position[2]) / 2, (prev_position[1] + prev_position[3]) / 2)
    curr_center = ((current_position[0] + current_position[2]) / 2, (current_position[1] + current_position[3]) / 2)

    # Calculate Euclidean distance
    distance = math.sqrt((curr_center[0] - prev_center[0])**2 + (curr_center[1] - prev_center[1])**2)

    # Convert to pixels per second
    speed = distance * fps / frame_interval
    return speed

# ===== Main Processing =====
video_metadata = extract_video_metadata(SOURCE_VIDEO_PATH)
video_info = sv.VideoInfo.from_video_path(SOURCE_VIDEO_PATH)

# Get recording time from metadata or use current time as fallback
try:
    recording_time = datetime.strptime(video_metadata["creation_time"].split(".")[0], "%Y-%m-%dT%H:%M:%S")
    recording_time = recording_time.replace(tzinfo=timezone.utc)
except (KeyError, ValueError):
    print("Warning: Using current time as recording time fallback")
    recording_time = datetime.now(timezone.utc)

with sv.VideoSink(TARGET_VIDEO_PATH, video_info) as sink:
    for frame_number, result in enumerate(
        model.track(source=SOURCE_VIDEO_PATH, tracker="bytetrack.yaml", show=False, stream=True, persist=True)
    ):
        frame = result.orig_img
        detections = sv.Detections.from_yolov8(result)

        # Separate detections
        people = []
        objects = []
        if result.boxes.id is not None:
            tracker_ids = result.boxes.id.cpu().numpy().astype(int)
            for i, (bbox, conf, class_id) in enumerate(zip(detections.xyxy, detections.confidence, detections.class_id)):
                if class_id == 0:  # Person
                    tracker_id = tracker_ids[i] if i < len(tracker_ids) else None
                    people.append((bbox, conf, tracker_id))
                elif class_id in BAG_CLASSES + [CAT_CLASS, DOG_CLASS]:
                    objects.append((bbox, conf, class_id))

        # Process each person
        for person_bbox, confidence, tracker_id in people:
            x1, y1, x2, y2 = person_bbox
            bottom_right = (int(x2), int(y2))

            # Calculate speed
            current_position = (x1, y1, x2, y2)
            prev_position = previous_positions.get(tracker_id, None)
            speed = calculate_speed(prev_position, current_position, video_info.fps) if frame_number > 0 else 0
            previous_positions[tracker_id] = current_position

            # Age/gender detection
            face_roi = frame[int(y1):int(y2), int(x1):int(x2)]
            gender, age = predict_age_gender(face_roi)

            # Mask detection
            mask_status,mask_confidence = detect_mask(face_roi)  # Check if the person is wearing a mask

            # Check for carried items
            carried_items = []
            for obj_bbox, _, class_id in objects:
                if is_carried(obj_bbox, person_bbox):
                    if class_id in BAG_CLASSES:
                        carried_items.append("bag")
                    elif class_id == CAT_CLASS:
                        carried_items.append("cat")
                    elif class_id == DOG_CLASS:
                        carried_items.append("dog")

            # Initialize or update person data
            if tracker_id not in tracker_states:
                tracker_states[tracker_id] = []
                detection_data[tracker_id] = {
                    "tracker_id": int(tracker_id),
                    "gender": gender,
                    "age": age,
                    "carrying": carried_items if carried_items else "no objects",
                    "mask_status":mask_status,
                    "mask_confidence":mask_confidence,
                    "bbox_history": [],
                    "confidence": float(confidence)
                }

            # Area crossing logic
            in_area1 = cv2.pointPolygonTest(area1, bottom_right, False) >= 0
            in_area2 = cv2.pointPolygonTest(area2, bottom_right, False) >= 0

            if in_area2 and "area2" not in tracker_states[tracker_id]:
                tracker_states[tracker_id].append("area2")
                if tracker_states[tracker_id] == ["area2"]:
                    entry_time = recording_time + timedelta(seconds=frame_number / video_info.fps)
                    detection_data[tracker_id]["entry_time"] = entry_time.strftime("%Y-%m-%d %H:%M:%S")
                    detection_data[tracker_id]["entry_frame"] = frame_number
                    # Mask detection
                    mask_status,mask_confidence = detect_mask(face_roi)  # Check if the person is wearing a mask
                    detection_data[tracker_id]["entry_mask_status"] = mask_status
                    detection_data[tracker_id]["entry_mask_confidence"] = mask_confidence


            if in_area1 and "area1" not in tracker_states[tracker_id]:
                tracker_states[tracker_id].append("area1")
                if tracker_states[tracker_id] == ["area1"]:
                    exit_time = recording_time + timedelta(seconds=frame_number / video_info.fps)
                    detection_data[tracker_id]["exit_time"] = exit_time.strftime("%Y-%m-%d %H:%M:%S")
                    detection_data[tracker_id]["exit_frame"] = frame_number
                    # Mask detection
                    mask_status,mask_confidence = detect_mask(face_roi)  # Check if the person is wearing a mask
                    detection_data[tracker_id]["exit_mask_status"] = mask_status
                    detection_data[tracker_id]["exit_mask_confidence"] = mask_confidence

            # Update counts
            if tracker_states[tracker_id] == ["area2", "area1"]:
                entering_count += 1
                tracker_states[tracker_id] = []
            elif tracker_states[tracker_id] == ["area1", "area2"]:
                exiting_count += 1
                tracker_states[tracker_id] = []

            total_count = entering_count + exiting_count

            # Store bounding box history
            detection_data[tracker_id]["bbox_history"].append({
                "frame_number": int(frame_number),
                "bbox": [float(x1), float(y1), float(x2), float(y2)],
                "timestamp": (recording_time + timedelta(seconds=frame_number / video_info.fps)).strftime("%Y-%m-%d %H:%M:%S"),
                "carrying": carried_items if carried_items else "no objects",
                "speed": speed,
                "mask_status":mask_status,
                "mask_confidence":mask_confidence
            })

            # Draw bounding box and label
            carrying_str = ", ".join(carried_items) if carried_items else "no objects"
            label = f"ID: {tracker_id} | {gender}, {age} | Mask: {mask_status} | Carrying: {carrying_str} | Speed: {speed:.2f} px/s"
            cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (255, 0, 0), 3)
            cv2.putText(frame, label, (int(x1), int(y1) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)

        # Draw counting areas and counters
        cv2.polylines(frame, [area1], isClosed=True, color=(255, 0, 0), thickness=2)
        cv2.polylines(frame, [area2], isClosed=True, color=(0, 255, 0), thickness=2)
        cv2.putText(frame, f"Total: {total_count}", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
        cv2.putText(frame, f"Entering: {entering_count}", (50, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        cv2.putText(frame, f"Exiting: {exiting_count}", (50, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)

        # Write frame to output video
        sink.write_frame(frame)

# ===== Save Results =====
json_output = {
    "video_metadata": video_metadata,
    "processing_time": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z"),
    "summary": {
        "total_people": int(total_count),
        "total_entering": int(entering_count),
        "total_exiting": int(exiting_count),
        "fps": float(video_info.fps),
        "duration_seconds": float(video_info.total_frames / video_info.fps)
    },
    "detections": {
        int(tracker_id): data for tracker_id, data in detection_data.items()
    }
}

with open(JSON_OUTPUT_PATH, "w") as f:
    json.dump(json_output, f, indent=4)

print(f"Processing complete. Results saved to {TARGET_VIDEO_PATH} and {JSON_OUTPUT_PATH}")
print(f"Total: {total_count}, Entering: {entering_count}, Exiting: {exiting_count}")


#### Restricted Area People Entering Monitoring 

##### Enter Exit Count + Age Gender + Video Metadata + Carrying Object + Face Mask Detection + Speed Calculation + Restricted Area

* Add people entering detection and counting for restricted area to above code

In [None]:
import supervision as sv
from ultralytics import YOLO
import cv2
import numpy as np
import json
from datetime import datetime, timedelta
import os
from deepface import DeepFace
import ffmpeg
from datetime import timezone
import math
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input


# ===== Configuration =====
TARGET_VIDEO_PATH = "output_video_restrcted.mp4"
JSON_OUTPUT_PATH = "detection_data_restrcted.json"

# Define polygonal areas for counting
area1 = np.array([(1169, 1678+50), (1942, 2025+50), (1816, 2102+50), (1085, 1703+50)], np.int32)
area2 = np.array([(1040, 1710+50), (1771, 2117+50), (1673, 2142+50), (981, 1713+50)], np.int32)

# Add a restricted area (randomly chosen coordinates - adjust as needed)
restricted_area = np.array([(500, 500), (800, 500), (800, 800), (500, 800)], np.int32)

# Object classes (COCO dataset)
BAG_CLASSES = [24, 26, 28]  # backpack, handbag, suitcase
CAT_CLASS = 15
DOG_CLASS = 16

# Mask detection labels
MASK_LABELS = ["mask", "no_mask", "improper"]

# ===== Initialize =====
total_count = 0
entering_count = 0
exiting_count = 0
restricted_area_count = 0  # New counter for restricted area
tracker_states = {}
restricted_area_states = {}  # To track who entered restricted area
detection_data = {}
previous_positions = {}  # To track movement between frames

# ===== Helper Functions =====
def extract_video_metadata(video_path):
    """Extract all available metadata from a video file using ffmpeg-python."""
    try:
        probe = ffmpeg.probe(video_path)
        metadata = {}

        # ===== 1. General Video Information =====
        if "format" in probe:
            format_info = probe["format"]
            metadata.update({
                "filename": format_info.get("filename"),
                "format_name": format_info.get("format_name"),
                "format_long_name": format_info.get("format_long_name"),
                "duration_seconds": float(format_info.get("duration", 0)),
                "size_bytes": int(format_info.get("size", 0)),
                "bitrate": int(format_info.get("bit_rate", 0)),
            })

            # Extract creation_time (if available)
            if "tags" in format_info:
                metadata.update({
                    "creation_time": format_info["tags"].get("creation_time"),
                    "encoder": format_info["tags"].get("encoder"),
                })

        # ===== 2. Video Stream Metadata =====
        video_streams = [s for s in probe["streams"] if s["codec_type"] == "video"]
        if video_streams:
            video_info = video_streams[0]
            metadata.update({
                "video_codec": video_info.get("codec_name"),
                "width": int(video_info.get("width", 0)),
                "height": int(video_info.get("height", 0)),
                "fps": eval(video_info.get("avg_frame_rate", "0/1")),  # e.g., "30/1" → 30.0
            })

            # Extract device-specific metadata (iPhone, Android, etc.)
            if "tags" in video_info:
                metadata.update({
                    "device_model": video_info["tags"].get("com.apple.quicktime.model"),
                    "software": video_info["tags"].get("software"),
                })

        # ===== 3. Audio Stream Metadata =====
        audio_streams = [s for s in probe["streams"] if s["codec_type"] == "audio"]
        if audio_streams:
            audio_info = audio_streams[0]
            metadata.update({
                "audio_codec": audio_info.get("codec_name"),
                "sample_rate": int(audio_info.get("sample_rate", 0)),
                "channels": int(audio_info.get("channels", 0)),
            })

        # ===== 4. GPS Coordinates (if recorded) =====
        if "format" in probe and "tags" in probe["format"]:
            tags = probe["format"]["tags"]
            if "location" in tags:  # Some Android devices store GPS here
                metadata["gps_coordinates"] = tags["location"]
            elif "com.apple.quicktime.location.ISO6709" in tags:  # iPhone GPS
                metadata["gps_coordinates"] = tags["com.apple.quicktime.location.ISO6709"]

        # ===== 5. Convert ISO Timestamp to Readable Format =====
        if "creation_time" in metadata:
            try:
                dt = datetime.strptime(metadata["creation_time"].split(".")[0], "%Y-%m-%dT%H:%M:%S")
                dt = dt.replace(tzinfo=timezone.utc)
                metadata["creation_time_utc"] = dt.strftime("%Y-%m-%d %H:%M:%S UTC")
                metadata["creation_time_local"] = dt.astimezone().strftime("%Y-%m-%d %H:%M:%S %Z")
                metadata["recording_time"] = metadata["creation_time_local"]  # For backward compatibility
            except Exception:
                pass

        return metadata

    except ffmpeg.Error as e:
        print(f"FFmpeg error: {e.stderr.decode('utf-8')}")
        return None
    except Exception as e:
        print(f"Error: {str(e)}")
        return None

def predict_age_gender(face_img):
    """Predict age and gender using DeepFace"""
    try:
        analysis = DeepFace.analyze(face_img, actions=["age", "gender"], enforce_detection=False)
        return analysis[0]["dominant_gender"], analysis[0]["age"]
    except Exception:
        return "Unknown", "Unknown"

def detect_mask(face_img):
    """Detect if person is wearing a mask"""
    try:
        face = cv2.cvtColor(face_img, cv2.COLOR_BGR2RGB)
        face = cv2.resize(face, (224, 224))
        face = img_to_array(face)
        face = preprocess_input(face)
        face = np.expand_dims(face, axis=0)

        preds = mask_model.predict(face)[0]
        max_idx = np.argmax(preds)
        return MASK_LABELS[max_idx], float(preds[max_idx])
    except Exception as e:
        print(f"Mask detection error: {str(e)}")
        return "unknown", 0.0

def is_carried(obj_bbox, person_bbox):
    """Check if an object is being carried by a person"""
    px1, py1, px2, py2 = person_bbox
    ox1, oy1, ox2, oy2 = obj_bbox
    obj_center_y = (oy1 + oy2) / 2
    lower_half_threshold = py1 + (py2 - py1) * 0.6
    overlap = (ox1 > px1) and (ox2 < px2) and (oy1 > py1) and (oy2 < py2)
    in_carry_position = obj_center_y > lower_half_threshold
    return overlap and in_carry_position

def calculate_speed(prev_position, current_position, fps, frame_interval=1):
    """
    Calculate speed in pixels per second between two positions
    frame_interval: number of frames between measurements (for smoothing)
    """
    if prev_position is None or current_position is None:
        return 0

    # Use center points for speed calculation
    prev_center = ((prev_position[0] + prev_position[2]) / 2, (prev_position[1] + prev_position[3]) / 2)
    curr_center = ((current_position[0] + current_position[2]) / 2, (current_position[1] + current_position[3]) / 2)

    # Calculate Euclidean distance
    distance = math.sqrt((curr_center[0] - prev_center[0])**2 + (curr_center[1] - prev_center[1])**2)

    # Convert to pixels per second
    speed = distance * fps / frame_interval
    return speed

# ===== Main Processing =====
video_metadata = extract_video_metadata(SOURCE_VIDEO_PATH)
video_info = sv.VideoInfo.from_video_path(SOURCE_VIDEO_PATH)

# Get recording time from metadata or use current time as fallback
try:
    recording_time = datetime.strptime(video_metadata["creation_time"].split(".")[0], "%Y-%m-%dT%H:%M:%S")
    recording_time = recording_time.replace(tzinfo=timezone.utc)
except (KeyError, ValueError):
    print("Warning: Using current time as recording time fallback")
    recording_time = datetime.now(timezone.utc)

with sv.VideoSink(TARGET_VIDEO_PATH, video_info) as sink:
    for frame_number, result in enumerate(
        model.track(source=SOURCE_VIDEO_PATH, tracker="bytetrack.yaml", show=False, stream=True, persist=True)
    ):
        frame = result.orig_img
        detections = sv.Detections.from_yolov8(result)

        # Separate detections
        people = []
        objects = []
        if result.boxes.id is not None:
            tracker_ids = result.boxes.id.cpu().numpy().astype(int)
            for i, (bbox, conf, class_id) in enumerate(zip(detections.xyxy, detections.confidence, detections.class_id)):
                if class_id == 0:  # Person
                    tracker_id = tracker_ids[i] if i < len(tracker_ids) else None
                    people.append((bbox, conf, tracker_id))
                elif class_id in BAG_CLASSES + [CAT_CLASS, DOG_CLASS]:
                    objects.append((bbox, conf, class_id))

        # Process each person
        for person_bbox, confidence, tracker_id in people:
            x1, y1, x2, y2 = person_bbox
            bottom_right = (int(x2), int(y2))
            center = (int((x1 + x2) / 2), int((y1 + y2) / 2))

            # Calculate speed
            current_position = (x1, y1, x2, y2)
            prev_position = previous_positions.get(tracker_id, None)
            speed = calculate_speed(prev_position, current_position, video_info.fps) if frame_number > 0 else 0
            previous_positions[tracker_id] = current_position

            # Age/gender detection
            face_roi = frame[int(y1):int(y2), int(x1):int(x2)]
            gender, age = predict_age_gender(face_roi)

            # Mask detection
            mask_status, mask_confidence = detect_mask(face_roi)

            # Check for carried items
            carried_items = []
            for obj_bbox, _, class_id in objects:
                if is_carried(obj_bbox, person_bbox):
                    if class_id in BAG_CLASSES:
                        carried_items.append("bag")
                    elif class_id == CAT_CLASS:
                        carried_items.append("cat")
                    elif class_id == DOG_CLASS:
                        carried_items.append("dog")

            # Initialize or update person data
            if tracker_id not in tracker_states:
                tracker_states[tracker_id] = []
                restricted_area_states[tracker_id] = False
                detection_data[tracker_id] = {
                    "tracker_id": int(tracker_id),
                    "gender": gender,
                    "age": age,
                    "carrying": carried_items if carried_items else "no objects",
                    "mask_status": mask_status,
                    "mask_confidence": mask_confidence,
                    "bbox_history": [],
                    "confidence": float(confidence),
                    "entered_restricted_area": False,
                    "restricted_area_entry_time": None,
                    "restricted_area_entry_frame": None
                }

            # Check restricted area entry
            in_restricted_area = cv2.pointPolygonTest(restricted_area, center, False) >= 0
            if in_restricted_area and not restricted_area_states[tracker_id]:
                restricted_area_states[tracker_id] = True
                detection_data[tracker_id]["entered_restricted_area"] = True
                detection_data[tracker_id]["restricted_area_entry_time"] = (
                    recording_time + timedelta(seconds=frame_number / video_info.fps)).strftime("%Y-%m-%d %H:%M:%S")
                detection_data[tracker_id]["restricted_area_entry_frame"] = frame_number
                restricted_area_count += 1
            elif not in_restricted_area and restricted_area_states[tracker_id]:
                restricted_area_states[tracker_id] = False

            # Area crossing logic
            in_area1 = cv2.pointPolygonTest(area1, bottom_right, False) >= 0
            in_area2 = cv2.pointPolygonTest(area2, bottom_right, False) >= 0

            if in_area2 and "area2" not in tracker_states[tracker_id]:
                tracker_states[tracker_id].append("area2")
                if tracker_states[tracker_id] == ["area2"]:
                    entry_time = recording_time + timedelta(seconds=frame_number / video_info.fps)
                    detection_data[tracker_id]["entry_time"] = entry_time.strftime("%Y-%m-%d %H:%M:%S")
                    detection_data[tracker_id]["entry_frame"] = frame_number
                    detection_data[tracker_id]["entry_mask_status"] = mask_status
                    detection_data[tracker_id]["entry_mask_confidence"] = mask_confidence

            if in_area1 and "area1" not in tracker_states[tracker_id]:
                tracker_states[tracker_id].append("area1")
                if tracker_states[tracker_id] == ["area1"]:
                    exit_time = recording_time + timedelta(seconds=frame_number / video_info.fps)
                    detection_data[tracker_id]["exit_time"] = exit_time.strftime("%Y-%m-%d %H:%M:%S")
                    detection_data[tracker_id]["exit_frame"] = frame_number
                    detection_data[tracker_id]["exit_mask_status"] = mask_status
                    detection_data[tracker_id]["exit_mask_confidence"] = mask_confidence

            # Update counts
            if tracker_states[tracker_id] == ["area2", "area1"]:
                entering_count += 1
                tracker_states[tracker_id] = []
            elif tracker_states[tracker_id] == ["area1", "area2"]:
                exiting_count += 1
                tracker_states[tracker_id] = []

            total_count = entering_count + exiting_count

            # Store bounding box history
            detection_data[tracker_id]["bbox_history"].append({
                "frame_number": int(frame_number),
                "bbox": [float(x1), float(y1), float(x2), float(y2)],
                "timestamp": (recording_time + timedelta(seconds=frame_number / video_info.fps)).strftime("%Y-%m-%d %H:%M:%S"),
                "carrying": carried_items if carried_items else "no objects",
                "speed": speed,
                "mask_status": mask_status,
                "mask_confidence": mask_confidence,
                "in_restricted_area": in_restricted_area
            })

            # Draw bounding box and label
            carrying_str = ", ".join(carried_items) if carried_items else "no objects"
            label = f"ID: {tracker_id} | {gender}, {age} | Mask: {mask_status} | Carrying: {carrying_str} | Speed: {speed:.2f} px/s"
            if in_restricted_area:
                label += " | IN RESTRICTED AREA"
                box_color = (0, 0, 255)  # Red for restricted area
            else:
                box_color = (255, 0, 0)  # Blue for normal

            cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), box_color, 3)
            cv2.putText(frame, label, (int(x1), int(y1) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)

        # Draw counting areas and counters
        cv2.polylines(frame, [area1], isClosed=True, color=(255, 0, 0), thickness=2)
        cv2.polylines(frame, [area2], isClosed=True, color=(0, 255, 0), thickness=2)
        cv2.polylines(frame, [restricted_area], isClosed=True, color=(0, 0, 255), thickness=2)
        cv2.putText(frame, "Restricted Area", (restricted_area[0][0], restricted_area[0][1] - 10),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)

        # Display counters
        cv2.putText(frame, f"Total: {total_count}", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
        cv2.putText(frame, f"Entering: {entering_count}", (50, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        cv2.putText(frame, f"Exiting: {exiting_count}", (50, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
        cv2.putText(frame, f"Restricted Area Entries: {restricted_area_count}", (50, 200),
                   cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 165, 255), 2)

        # Write frame to output video
        sink.write_frame(frame)

# ===== Save Results =====
json_output = {
    "video_metadata": video_metadata,
    "processing_time": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z"),
    "summary": {
        "total_people": int(total_count),
        "total_entering": int(entering_count),
        "total_exiting": int(exiting_count),
        "restricted_area_entries": int(restricted_area_count),  # New field
        "fps": float(video_info.fps),
        "duration_seconds": float(video_info.total_frames / video_info.fps)
    },
    "detections": {
        int(tracker_id): data for tracker_id, data in detection_data.items()
    }
}

with open(JSON_OUTPUT_PATH, "w") as f:
    json.dump(json_output, f, indent=4)

print(f"Processing complete. Results saved to {TARGET_VIDEO_PATH} and {JSON_OUTPUT_PATH}")
print(f"Total: {total_count}, Entering: {entering_count}, Exiting: {exiting_count}")
print(f"Restricted Area Entries: {restricted_area_count}")

### FRame Bases detection

In [None]:
import supervision as sv
from ultralytics import YOLO
import cv2
import numpy as np
import json
from datetime import datetime, timedelta
import os
from deepface import DeepFace
import ffmpeg
from datetime import timezone
import math
from tensorflow.keras.models import load_model
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array

# ===== Configuration =====
TARGET_VIDEO_PATH = "output_video_final_trackingID_based.mp4"
JSON_OUTPUT_PATH = "detection_data_final_trackingID_based.json"

# Load models (add these at the beginning)
model = YOLO("yolov8n.pt")  # or whatever model you're using

# Define polygonal areas for counting
area1 = np.array([(1169, 1678+50), (1942, 2025+50), (1816, 2102+50), (1085, 1703+50)], np.int32)
area2 = np.array([(1040, 1710+50), (1771, 2117+50), (1673, 2142+50), (981, 1713+50)], np.int32)

# Define restricted area (example coordinates - adjust as needed)
restricted_area = np.array([(500, 500), (800, 500), (800, 800), (500, 800)], np.int32)

# Object classes (COCO dataset)
BAG_CLASSES = [24, 26, 28]  # backpack, handbag, suitcase
CAT_CLASS = 15
DOG_CLASS = 16

# Mask detection labels
MASK_LABELS = ["mask", "no_mask", "improper"]

# ===== Initialize =====
total_count = 0
entering_count = 0
exiting_count = 0
restricted_area_count = 0
tracker_history = {}  # To maintain state across frames
restricted_people = set()  # Track people who entered restricted area

# [Rest of your helper functions remain the same...]

# ===== Main Processing =====
video_metadata = extract_video_metadata(SOURCE_VIDEO_PATH)
video_info = sv.VideoInfo.from_video_path(SOURCE_VIDEO_PATH)

# Get recording time from metadata or use current time as fallback
try:
    recording_time = datetime.strptime(video_metadata["creation_time"].split(".")[0], "%Y-%m-%dT%H:%M:%S")
    recording_time = recording_time.replace(tzinfo=timezone.utc)
except (KeyError, ValueError):
    print("Warning: Using current time as recording time fallback")
    recording_time = datetime.now(timezone.utc)

frame_detections = []  # This will store all frame-based detections

with sv.VideoSink(TARGET_VIDEO_PATH, video_info) as sink:
    for frame_number, result in enumerate(
        model.track(source=SOURCE_VIDEO_PATH, tracker="bytetrack.yaml", show=False, stream=True, persist=True)
    ):
        frame = result.orig_img
        detections = sv.Detections.from_yolov8(result)
        
        current_frame_data = {
            "frame_number": frame_number,
            "timestamp": (recording_time + timedelta(seconds=frame_number / video_info.fps)).strftime("%Y-%m-%d %H:%M:%S"),
            "detections": []
        }

        # Initialize objects and people lists for this frame
        objects = []
        people = []
        
        if result.boxes.id is not None:
            tracker_ids = result.boxes.id.cpu().numpy().astype(int)
            for i, (bbox, conf, class_id) in enumerate(zip(detections.xyxy, detections.confidence, detections.class_id)):
                if class_id == 0:  # Person
                    tracker_id = tracker_ids[i] if i < len(tracker_ids) else None
                    if tracker_id is not None:
                        people.append((bbox, conf, tracker_id))
                elif class_id in BAG_CLASSES + [CAT_CLASS, DOG_CLASS]:  # Objects we care about
                    objects.append((bbox, conf, class_id))

        # Process each person in the current frame
        for bbox, conf, tracker_id in people:
            x1, y1, x2, y2 = bbox
            bottom_center = (int((x1+x2)/2), int(y2))

            # Check area crossings
            in_area1 = cv2.pointPolygonTest(area1, bottom_center, False) >= 0
            in_area2 = cv2.pointPolygonTest(area2, bottom_center, False) >= 0
            in_restricted = cv2.pointPolygonTest(restricted_area, bottom_center, False) >= 0

            # Initialize tracker history if new person
            if tracker_id not in tracker_history:
                tracker_history[tracker_id] = {
                    'first_seen': frame_number,
                    'last_seen': frame_number,
                    'entered_restricted': False,
                    'entry_frame': None,
                    'exit_frame': None,
                    'entry_time': None,
                    'exit_time': None,
                    'gender': "Unknown",
                    'age': "Unknown",
                    'carrying': "none",
                    'mask_status': "unknown",
                    'mask_confidence': 0.0
                }
            else:
                tracker_history[tracker_id]['last_seen'] = frame_number

            # Update restricted area status
            if in_restricted and not tracker_history[tracker_id]['entered_restricted']:
                tracker_history[tracker_id]['entered_restricted'] = True
                restricted_area_count += 1
                restricted_people.add(tracker_id)

            # Analyze person attributes if entering monitored area
            if (in_area1 or in_area2) and tracker_history[tracker_id]['entry_frame'] is None:
                gender, age, mask_status, mask_conf, carrying = analyze_person(frame, bbox, objects)
                entry_time = recording_time + timedelta(seconds=frame_number / video_info.fps)
                
                tracker_history[tracker_id].update({
                    'entry_frame': frame_number,
                    'entry_time': entry_time.strftime("%Y-%m-%d %H:%M:%S"),
                    'gender': gender,
                    'age': age,
                    'carrying': carrying,
                    'mask_status': mask_status,
                    'mask_confidence': mask_conf
                })
                entering_count += 1
                total_count += 1

            # Create detection entry for current frame
            detection_entry = {
                "tracker_id": int(tracker_id),
                "class_id": 0,  # 0 is for person in COCO
                "class_name": "person",
                "confidence": float(conf),
                "bbox": [float(x1), float(y1), float(x2), float(y2)],
                "in_area1": in_area1,
                "in_area2": in_area2,
                "in_restricted_area": in_restricted,
                "gender": tracker_history[tracker_id]['gender'],
                "age": tracker_history[tracker_id]['age'],
                "carrying": tracker_history[tracker_id]['carrying'],
                "mask_status": tracker_history[tracker_id]['mask_status'],
                "mask_confidence": tracker_history[tracker_id]['mask_confidence'],
                "entry_time": tracker_history[tracker_id]['entry_time'],
                "exit_time": tracker_history[tracker_id]['exit_time'],
                "first_seen_frame": tracker_history[tracker_id]['first_seen'],
                "last_seen_frame": tracker_history[tracker_id]['last_seen'],
                "entered_restricted": tracker_history[tracker_id]['entered_restricted']
            }

            current_frame_data['detections'].append(detection_entry)

            # Draw visualizations
            if tracker_history[tracker_id]['entry_time']:
                label = f"ID: {tracker_id} | {tracker_history[tracker_id]['gender']}, {tracker_history[tracker_id]['age']} | Mask: {tracker_history[tracker_id]['mask_status']}"
                if tracker_history[tracker_id]['entered_restricted']:
                    cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0, 0, 255), 3)
                    label += " | RESTRICTED"
                else:
                    cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)
                cv2.putText(frame, label, (int(x1), int(y1)-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1)

        # Add frame data to our collection
        frame_detections.append(current_frame_data)

        sink.write_frame(frame)


# Update exit times for people who were tracked
for tracker_id, data in tracker_history.items():
    if data['entry_time'] and not data['exit_time']:
        exit_time = recording_time + timedelta(seconds=data['last_seen'] / video_info.fps)
        data['exit_time'] = exit_time.strftime("%Y-%m-%d %H:%M:%S")
        exiting_count += 1

# ===== Save Results =====
json_output = {
    "video_metadata": video_metadata,
    "processing_time": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z"),
    "summary": {
        "total_people": int(total_count),
        "total_entering": int(entering_count),
        "total_exiting": int(exiting_count),
        "restricted_area_entries": int(restricted_area_count),
        "restricted_people_ids": [int(id) for id in restricted_people],
        "fps": float(video_info.fps),
        "duration_seconds": float(video_info.total_frames / video_info.fps)
    },
    "frame_detections": frame_detections
}

with open(JSON_OUTPUT_PATH, "w") as f:
    json.dump(json_output, f, indent=4)

print(f"Processing complete. Results saved to {TARGET_VIDEO_PATH} and {JSON_OUTPUT_PATH}")
print(f"Total: {total_count}, Entering: {entering_count}, Exiting: {exiting_count}")
print(f"Restricted area entries: {restricted_area_count}")

#### Download Output Video

In [None]:
from IPython.display import FileLink
FileLink(r'output_video.mp4')