# AI-Powered Person Tracking with Virtual Fence Detection
1. Drawing up to 3 virtual boundary lines on an input video
2. Detecting and tracking persons with YOLOv8 + SFSORT
3. Counting and visualizing crossings of the boundary lines
4. Saving an annotated output video

#### 1. Constants and Paths

In [1]:
import os
import glob
from random import randrange

import numpy as np
from numpy import linalg as LA
import cv2
from tqdm import tqdm
from ultralytics import YOLO

from src.SFSORT import SFSORT

In [2]:
# Input videos
ROOT = r"./"  
DATA_DIR    = os.path.join(ROOT, "data")
VIDEO_FILES = glob.glob(os.path.join(DATA_DIR, "*.avi"))

# YOLO weights
WEIGHTS_DIR = os.path.join(ROOT, "weights")
MODEL_PATH  = os.path.join(WEIGHTS_DIR, "best.pt")

# Output folder
OUTPUT_DIR  = os.path.join(ROOT, "outputs")
os.makedirs(OUTPUT_DIR, exist_ok=True)
OUTPUT_VIDEO = os.path.join(OUTPUT_DIR, "tracked_output_MOT.mp4")
OUTPUT_VIDEO_PATH = os.path.join(OUTPUT_DIR, "tracked_output_MOT.mp4")

assert len(VIDEO_FILES)==1, f"expected 1 video in {DATA_DIR}, found {VIDEO_FILES}"
print("Using video:", VIDEO_FILES[0])
print("Using model:", MODEL_PATH)

Using video: ./data\test.avi
Using model: ./weights\best.pt


#### 2. Helper Classes and Functions

In [3]:
class boundaryLine:
    """
    Virtual fence line with counts for two directions.

    Attributes:
        id (int): Unique identifier.
        p0, p1 (tuple[int,int]): Endpoints of the line.
        count_forward (int): Count of crossings in forward direction.
        count_backward (int): Count of crossings in backward direction.
    """
    def __init__(self, line_id, line=[(0, 0), (0, 0)]):
        self.id = line_id
        self.p0 = line[0]
        self.p1 = line[1]
        # Drawing style
        self.color = (0, 255, 255)
        self.lineThinkness = 4
        self.textColor = (0, 255, 255)
        self.textSize = 4
        self.textThinkness = 2
        # Crossing counts
        self.count1 = 0
        self.count2 = 0

def line_vectorize(point1, point2):
    """Return 2D vector from p1 to p2."""
    a = point2[0] - point1[0]
    b = point2[1] - point1[1]
    return [a, b]

def calcVectorAngle(point1, point2, point3, point4):
    u = np.array(line_vectorize(point1, point2))
    v = np.array(line_vectorize(point3, point4))
    i = np.inner(u, v)
    n = LA.norm(u) * LA.norm(v)
    c = i / n
    a = np.rad2deg(np.arccos(np.clip(c, -1.0, 1.0)))
    return a if u[0]*v[1]-u[1]*v[0] < 0 else 360 - a

def checkIntersect(p1, p2, p3, p4):
    tc1 = (p1[0] - p2[0]) * (p3[1] - p1[1]) + (p1[1] - p2[1]) * (p1[0] - p3[0])
    tc2 = (p1[0] - p2[0]) * (p4[1] - p1[1]) + (p1[1] - p2[1]) * (p1[0] - p4[0])
    td1 = (p3[0] - p4[0]) * (p1[1] - p3[1]) + (p3[1] - p4[1]) * (p3[0] - p1[0])
    td2 = (p3[0] - p4[0]) * (p2[1] - p3[1]) + (p3[1] - p4[1]) * (p3[0] - p2[0])
    return tc1 * tc2 < 0 and td1 * td2 < 0

def checkLineCross(boundary_line, trajectory, track_id):
    """
    Update line counts if track segment crosses the boundary.
    Returns a message if crossing occurred, else None.
    """
    traj_p0 = (trajectory[0], trajectory[1])
    traj_p1 = (trajectory[2], trajectory[3])
    bLine_p0 = (boundary_line.p0[0], boundary_line.p0[1])
    bLine_p1 = (boundary_line.p1[0], boundary_line.p1[1])
    intersect = checkIntersect(traj_p0, traj_p1, bLine_p0, bLine_p1)
    if intersect:
        angle = calcVectorAngle(traj_p0, traj_p1, bLine_p0, bLine_p1)
        if 15 < angle < 165:
            boundary_line.count1 += 1
        elif 195 < angle < 345:
            boundary_line.count2 += 1
        return f'ID {track_id} crossed line {boundary_line.id}!'
    return None

def drawBoundaryLine(img, line):
    """Draw all boundary lines and their counts on the frame."""
    x1, y1 = line.p0
    x2, y2 = line.p1
    cv2.line(img, (x1, y1), (x2, y2), line.color, line.lineThinkness)
    cv2.putText(img, str(line.count1), (x1, y1), cv2.FONT_HERSHEY_PLAIN, line.textSize, line.textColor, line.textThinkness)
    cv2.putText(img, str(line.count2), (x2, y2), cv2.FONT_HERSHEY_PLAIN, line.textSize, line.textColor, line.textThinkness)
    cv2.drawMarker(img, (x1, y1), line.color, cv2.MARKER_TRIANGLE_UP, 16, 4)
    cv2.drawMarker(img, (x2, y2), line.color, cv2.MARKER_TILTED_CROSS, 16, 4)

def drawBoundaryLines(img, boundaryLines):
    for line in boundaryLines:
        drawBoundaryLine(img, line)

#### 3. Load Video and Draw Boundaries - Interactive: draw up to 3 lines on the first frame.


In [4]:
if len(VIDEO_FILES) != 1:
    print(VIDEO_FILES)
    raise ValueError("There should be exactly one .mp4 file in the folder.")
video_path = VIDEO_FILES[0]

cap = cv2.VideoCapture(video_path)
ret, frame = cap.read()
if not ret:
    raise RuntimeError("Failed to read first frame.")

# Interactive drawing
boundary_points = []
drawing = False
current_line = []

def draw_line(event, x, y, flags, param):
    global drawing, current_line, boundary_points
    if event == cv2.EVENT_LBUTTONDOWN:
        drawing = True
        current_line = [(x, y)]
    elif event == cv2.EVENT_MOUSEMOVE and drawing:
        temp_img = frame.copy()
        cv2.line(temp_img, current_line[0], (x, y), (0, 255, 255), 2)
        cv2.imshow("Draw Lines (up to 3)", temp_img)
    elif event == cv2.EVENT_LBUTTONUP:
        drawing = False
        current_line.append((x, y))
        boundary_points.append(tuple(current_line))
        cv2.line(frame, current_line[0], current_line[1], (0, 255, 255), 2)
        cv2.imshow("Draw Lines (up to 3)", frame)

cv2.namedWindow("Draw Lines (up to 3)")
cv2.setMouseCallback("Draw Lines (up to 3)", draw_line)

print("Draw up to 3 boundary lines. Press 'q' to confirm.")
while True:
    cv2.imshow("Draw Lines (up to 3)", frame)
    if cv2.waitKey(1) & 0xFF == ord('q') or len(boundary_points) >= 3:
        break
cv2.destroyWindow("Draw Lines (up to 3)")

# Create BoundaryLine objects
bLines = [boundaryLine(i+1, list(pts)) for i, pts in enumerate(boundary_points)]
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)  # rewind

Draw up to 3 boundary lines. Press 'q' to confirm.


True

#### 5. Main Tracking Loop


In [5]:
model = YOLO(MODEL_PATH, 'detect')

# Get video properties
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
frame_rate = int(cap.get(cv2.CAP_PROP_FPS))
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

# Tracker parameters (adjust as needed)
tracker_arguments = {
    "dynamic_tuning": True,
    "high_th": 0.5,
    "low_th": 0.2,
    "match_th_first": 0.4,
    "match_th_first_m": 0.05,
    "match_th_second": 0.4,
    "marginal_timeout": int(1.1*frame_rate), 
    "central_timeout": int(1.4*frame_rate),  
    "horizontal_margin": int(0.1*frame_width),  
    "vertical_margin": int(0.1*frame_height), 
    "frame_width": frame_width,  # To be set dynamically
    "frame_height": frame_height  # To be set dynamically
}


tracker = SFSORT(tracker_arguments)

# Setup video writer for output
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(OUTPUT_VIDEO_PATH, fourcc, frame_rate, (frame_width, frame_height))
colors = {}
intruders = {}
tracks = {}


for frame_no in tqdm(range(frame_count)):
    ret, frame = cap.read()
    if not ret:
        print(f"End of video reached or error reading frame {frame_no}.")
        break
    prediction = model.predict(frame, imgsz=(800,1440),
                               conf=0.1, iou=0.45, half=False,
                               max_det=60, classes=0, verbose=False)
    
    # exclude extra info from the predictions
    predictionResults = prediction[0].boxes.cpu().numpy()

    box_list = predictionResults.xyxy
    score_list = predictionResults.conf

    if box_list.shape[0] == 0:
      print(f'no detection found in {frame_no+1}')
      drawBoundaryLines(frame,bLines)
      # cv2.imwrite(os.path.join(folder_path,str(frame_no+1) +'.jpg'),frame)
      out.write(frame)
      continue

    # Update tracker with detections
    track_list = tracker.update(box_list, score_list)


    # Annotate frame with tracking results
    for track in track_list:
        bbox, track_id = track[0], int(track[1])
        if track_id not in tracks:
          tracks[track_id] = bbox
        prev_box = tracks[track_id]
        
        
        x0, y0, x1, y1 = map(int, bbox)
        cx,cy = (x0+x1)/2 , (y0+y1)/2

        
        prev_x0,prev_y0,prev_x1,prev_y1 = map(int, prev_box)
        prev_cx,prev_cy = (prev_x0+prev_x1)/2 , (prev_y0+prev_y1)/2
        
        crossed = [None]*len(bLines)
        for j,line in enumerate(bLines):
          crossed[j] = checkLineCross(line, [prev_cx,prev_cy, cx,cy], track_id)

        if track_id not in colors:
          colors[track_id] =  (randrange(255), randrange(255), randrange(255))

        color = colors[track_id]
        track_thickness = 1
        if any(crossed):
          if track_id not in intruders:
            intruders[track_id] = [[cx,cy]]
          message = [m for m in crossed if m is not None]
          cv2.putText(frame, str(message[0]), (20,50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,0,255), 2)
          print(frame_no+1)
          print(str(message[0]))

        # drawBoundaryLines(frame,bLines)
        
        if track_id in intruders:
          color = (0,0,255)
          track_thickness = 2
          intruders[track_id].append([cx,cy])
          cv2.polylines(frame, np.array([intruders[track_id]], np.int32), False, (255,255,255), 4)

        frame = cv2.rectangle(frame, (x0, y0), (x1, y1), color, track_thickness)
        cv2.putText(frame, str(track_id), (x0, y0-5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, track_thickness)
        tracks[track_id] = bbox

    drawBoundaryLines(frame,bLines)
    # cv2.imwrite(os.path.join(folder_path,str(frame_no+1) +'.jpg'),frame)
    out.write(frame)

# Release resources for this video
cap.release()
out.release()
print(f"Tracked video saved at: {OUTPUT_VIDEO_PATH}")


 20%|█▉        | 206/1050 [00:47<02:59,  4.71it/s]

206
ID 37 crossed line 1!


 24%|██▍       | 251/1050 [00:57<02:53,  4.62it/s]

251
ID 39 crossed line 1!


 27%|██▋       | 284/1050 [01:04<02:51,  4.47it/s]

284
ID 15 crossed line 2!


 27%|██▋       | 288/1050 [01:05<02:52,  4.43it/s]

288
ID 19 crossed line 2!


 32%|███▏      | 334/1050 [01:15<02:39,  4.50it/s]

334
ID 38 crossed line 2!


 39%|███▊      | 405/1050 [01:32<02:24,  4.46it/s]

405
ID 43 crossed line 3!


 44%|████▍     | 463/1050 [01:44<02:08,  4.56it/s]

463
ID 50 crossed line 3!


 67%|██████▋   | 703/1050 [02:39<01:16,  4.52it/s]

703
ID 48 crossed line 3!


 68%|██████▊   | 719/1050 [02:43<01:19,  4.15it/s]

719
ID 60 crossed line 3!


 76%|███████▌  | 797/1050 [03:01<00:56,  4.46it/s]

797
ID 40 crossed line 1!


 89%|████████▉ | 934/1050 [03:32<00:25,  4.46it/s]

934
ID 68 crossed line 2!


 89%|████████▉ | 937/1050 [03:33<00:26,  4.32it/s]

937
ID 76 crossed line 3!


 90%|████████▉ | 940/1050 [03:33<00:25,  4.28it/s]

940
ID 69 crossed line 2!


 92%|█████████▏| 961/1050 [03:38<00:21,  4.22it/s]

961
ID 56 crossed line 3!


 92%|█████████▏| 971/1050 [03:41<00:18,  4.29it/s]

971
ID 72 crossed line 3!


100%|██████████| 1050/1050 [03:59<00:00,  4.39it/s]

Tracked video saved at: ./outputs\tracked_output_MOT.mp4



