# **Phase 1: Setting up and Testing Environment**

The goal in this stage is to download all the necessary libraries and pre-trained models, ready for testing.

In [None]:
!pip install ultralytics opencv-python

In [None]:
import cv2
from ultralytics import YOLO
import torch # just to be safe, idk if it comes with ultralytics
import os

print("PyTorch Version:", torch.__version__)
print("OpenCV version:", cv2.__version__)

In [None]:
model = YOLO("yolov8n.pt")  # 'n' = nano ver (lightweight, fast, just what we need)

# **Phase 2: Counting People**

This phase is dedicated to detecting changes in the number of people in a video and the time at which a change occurs.

In [None]:
video_path = "INSERTFILENAMEHERE.mp4" # defined ONCE in the session, can accept any mp4 of your choice

results = model.predict(
    source=video_path,
    conf=0.5,
    classes=[0],  # person
    stream=True # streaming mode allows frame-by-frame processing
)


In [None]:
people_counts = []

for frame_idx, r in enumerate(results):
    # r.boxes contains detections for this frame
    if r.boxes is not None:
        num_people = len(r.boxes)
    else:
        num_people = 0

    people_counts.append({ # add frame-people pairs to a list per row
        "frame": frame_idx,
        "people": num_people
    })

In [None]:
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
cap.release()

for entry in people_counts: # NOTE: can add [:5] after people_counts to JUST check that the logic works (doing it for all frames may lag/take too long for testing)
    entry["time_sec"] = entry["frame"] / fps # convert frames to time

people_counts[:5] # output only first 5 entries

In [None]:
import pandas as pd

df = pd.DataFrame(people_counts) # inserts time and people into a pandas dataframe
df.to_csv("people_counts.csv", index=False)
df.head() # sanity check: print csv first few rows

# **Phase 3: Anomaly Logic**

This phase is dedicated to computing averages and deviation and determining thresholds for excessive number of people in-frame

In [None]:
df = pd.read_csv("people_counts.csv")
df.head()

In [None]:
# defining median and deviation (less sensitive to outliers)
baseline_median = df["people"].median()
baseline_mad = (df["people"] - baseline_median).abs().median()

In [None]:
# unusual = mean + k deviations (3 is when it gets very rare)
K = 3
df["is_crowd_anomaly"] = df["people"] > (baseline_median + K * baseline_mad)
df[["time_sec", "people", "is_crowd_anomaly"]].head(10) # sanity check

In [None]:
anomalies = df[df["is_crowd_anomaly"]]
anomalies.head() # sanity check to return points of unusually high number of people

# **Phase 4: Temporal Logic**

Tracking sustained presence of crowds to distinguish short-term spikes from loitering; the latter is likelier to lead to crime. A one-minute timeframe sound reasonable, especially for cases where CCTV only covers a small area. Moreover, even if it seems short, an alert message only prompts further investigation, not an immediate police response. Also, due to the nature of the input, we expect videos to be quite long.

In [None]:
MIN_DURATION_SEC = 10 # 10s reasonable for sustained large crowds in small area
LOITERING_MIN_PEOPLE = 2 # small groups loitering can still be suspicious, especially at night
LOITERING_DURATION = 30 # seconds for loitering when group sizes are not anomalous

In [None]:
df["is_loitering_candidate"] = df["people"] >= LOITERING_MIN_PEOPLE

df["is_suspicious"] = (
    df["is_crowd_anomaly"] |
    df["is_loitering_candidate"]
)

In [None]:
events = []
current_event = None

for _, row in df.iterrows():
    if row["is_suspicious"]:
        if current_event is None:
            current_event = {
                "start_time": row["time_sec"],
                "end_time": row["time_sec"],
                "max_people": row["people"],
                "type": "crowd" if row["is_crowd_anomaly"] else "loitering"
            }
        else:
            current_event["end_time"] = row["time_sec"]
            current_event["max_people"] = max(
                current_event["max_people"], row["people"]
            )
    else:
        if current_event is not None:
            duration = current_event["end_time"] - current_event["start_time"]

            # Crowd rule
            if current_event["type"] == "crowd" and duration >= MIN_DURATION_SEC:
                current_event["duration"] = duration
                events.append(current_event)

            # Loitering rule (not necessarily crowds)
            elif current_event["type"] == "loitering" and duration >= LOITERING_DURATION:
                current_event["duration"] = duration
                events.append(current_event)

            current_event = None

# Handle case where video ends during an event
if current_event is not None:
    duration = current_event["end_time"] - current_event["start_time"]

    if current_event["type"] == "crowd" and duration >= MIN_DURATION_SEC:
        current_event["duration"] = duration
        events.append(current_event)

    elif current_event["type"] == "loitering" and duration >= LOITERING_MIN_DURATION:
        current_event["duration"] = duration
        events.append(current_event)

events # sanity check

# **Phase 5: Alert System**

This phase is dedicated to outputting an accurate and sufficiently detailed alert message containing the timestamp, observed number, baseline average, duration in-frame, and a frame with bounded boxes around suspicious people. It will also save the alert to a CSV file for further reference.

In [None]:
def format_alert(event, location_name="Camera 1"):
    return (
        f"ALERT: {event['type'].upper()} DETECTED\n"
        f"Location: {location_name}\n"
        f"Start Time: {event['start_time']:.1f}s\n"
        f"Duration: {event['duration']:.1f}s\n"
        f"Max People Detected: {event['max_people']}\n"
    ) # alert message function format


In [None]:
for event in events: # prints an error message for every incident, although live footage will have the messages appear in real-time
    alert_msg = format_alert(event)
    print(alert_msg)
    print("-" * 40)

In [None]:
alerts_df = pd.DataFrame(events)
alerts_df.to_csv("alerts_log.csv", index=False)

alerts_df

In [None]:
def save_evidence_frame(event, model, video_path, fps, output_dir="evidence"):
    os.makedirs(output_dir, exist_ok=True)

    # Open video fresh
    cap = cv2.VideoCapture(video_path)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    # Compute midpoint frame between start and end of anomalous event
    mid_time = (event["start_time"] + event["end_time"]) / 2
    frame_idx = int(mid_time * fps)

    # Clamp frame index
    frame_idx = max(0, min(frame_idx, total_frames - 1))

    cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
    ret, frame = cap.read()

    if not ret:
        print(f"[WARNING] Could not read frame {frame_idx}")
        cap.release()
        return

    results = model.predict(frame, conf=0.4, classes=[0])
    annotated = results[0].plot()

    filename = f"{output_dir}/{event['type']}_{frame_idx}.jpg"
    cv2.imwrite(filename, annotated)

    cap.release()

In [None]:
for event in events:
    save_evidence_frame(event, model, video_path, fps)

In [None]:
!ls evidence # sanity check