<a href="https://colab.research.google.com/github/dhanushp08/CCTV-Crowd-Flow-Analyzer/blob/main/crowd_flow.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive/')

In [None]:
!pip install -q keras
!pip install cvzone
!pip install deep_sort_realtime
!pip install ultralytics



In [None]:
import cv2
import math
from ultralytics import YOLO
import cvzone
from collections import defaultdict
cap = cv2.VideoCapture(r"/content/drive/MyDrive/final/raw.mp4")
model = YOLO('yolov8n.pt')
classNames = [
    "person", "bicycle", "car", "motorcycle", "airplane", "bus",
    "train", "truck", "boat", "traffic light", "fire hydrant",
    "stop sign", "parking meter", "bench", "bird", "cat", "dog",
    "horse", "sheep", "cow", "elephant", "bear", "zebra",
    "giraffe", "backpack", "umbrella", "handbag", "tie",
    "suitcase", "frisbee", "skis", "snowboard", "sports ball",
    "kite", "baseball bat", "baseball glove", "skateboard",
    "surfboard", "tennis racket", "bottle", "wine glass", "cup",
    "fork", "knife", "spoon", "bowl", "banana", "apple",
    "sandwich", "orange", "broccoli", "carrot", "hot dog", "pizza",
    "donut", "cake", "chair", "couch", "potted plant", "bed",
    "dining table", "toilet", "TV", "laptop", "mouse", "remote",
    "keyboard", "cell phone", "microwave", "oven", "toaster",
    "sink", "refrigerator", "book", "clock", "vase", "scissors",
    "teddy bear", "hair drier", "toothbrush"
]

frame_width = int(cap.get(3))
frame_height = int(cap.get(4))
fps = int(cap.get(cv2.CAP_PROP_FPS))
output_path = r'/content/drive/MyDrive/final/output-vid.mp4'
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))
mask = cv2.imread("/content/drive/MyDrive/newmask.png")


slant_line_start = (710, 650)
slant_line_end = (1350, 280)


tracker = defaultdict(dict)
next_id = 0
people_in = 0
people_out = 0


distance_threshold = 60

def is_left_of_line(point, line_start, line_end):
    (x, y) = point
    (x1, y1), (x2, y2) = line_start, line_end
    return (x2 - x1) * (y - y1) - (y2 - y1) * (x - x1) > 0


conf_threshold = 0.4

while True:
    success, img = cap.read()
    if not success:
        break


    imgRegion = cv2.bitwise_and(img, mask)
    results = model(imgRegion, stream=True)

    current_ids = []

    for r in results:
        boxes = r.boxes
        for box in boxes:
            x1, y1, x2, y2 = box.xyxy[0]
            x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
            w, h = x2 - x1, y2 - y1
            cls = int(box.cls[0])
            currentClass = classNames[cls]
            conf = box.conf[0]

            if currentClass == "person" and conf > conf_threshold:
                cvzone.cornerRect(img, (x1, y1, w, h), l=9)
                cvzone.putTextRect(img, f'{conf:.2f}', (max(0, x1), max(35, y1)), scale=0.8, thickness=1, offset=3)

                cx, cy = x1 + w // 2, y1 + h // 2

                matched_id = None
                for person_id, person_info in tracker.items():
                    px, py = person_info['centroid']
                    distance = math.hypot(cx - px, cy - py)
                    if distance < distance_threshold:
                        matched_id = person_id
                        break

                if matched_id is None:
                    matched_id = next_id
                    next_id += 1

                current_ids.append(matched_id)
                tracker[matched_id]['centroid'] = (cx, cy)
                current_side = is_left_of_line((cx, cy), slant_line_start, slant_line_end)

                if 'side' not in tracker[matched_id]:
                    tracker[matched_id]['side'] = current_side
                    tracker[matched_id]['counted_in'] = False
                    tracker[matched_id]['counted_out'] = False

                previous_side = tracker[matched_id]['side']

                if previous_side != current_side:
                    if previous_side and not tracker[matched_id]['counted_in']:
                        people_in += 1
                        tracker[matched_id]['counted_in'] = True
                        tracker[matched_id]['counted_out'] = False
                        print(f"Person {matched_id} entered. Total In: {people_in}")
                    elif not previous_side and not tracker[matched_id]['counted_out']:
                        people_out += 1
                        tracker[matched_id]['counted_out'] = True
                        tracker[matched_id]['counted_in'] = False
                        print(f"Person {matched_id} exited. Total Out: {people_out}")
                    tracker[matched_id]['side'] = current_side

    for person_id in list(tracker.keys()):
        if person_id not in current_ids:
            del tracker[person_id]


    cv2.line(img, slant_line_start, slant_line_end, (0, 255, 0), 2)

    cvzone.putTextRect(img, f'OUT: {people_in}', (50, 50), scale=1, thickness=2, offset=10)
    cvzone.putTextRect(img, f'IN: {people_out}', (200, 50), scale=1, thickness=2, offset=10)
    out.write(img)
cap.release()
out.release()
cv2.destroyAllWindows()

print(f"Video saved at {'/content/drive/MyDrive/final/output-vid.mp4'}")
