<a href="https://www.kaggle.com/code/anushreeu04/notebook5bd861a1b4?scriptVersionId=297005937" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [1]:
pip install ultralytics supervision opencv-python pandas


Collecting ultralytics
  Downloading ultralytics-8.4.14-py3-none-any.whl.metadata (39 kB)
Collecting supervision
  Downloading supervision-0.27.0.post1-py3-none-any.whl.metadata (13 kB)
Collecting ultralytics-thop>=2.0.18 (from ultralytics)
  Downloading ultralytics_thop-2.0.18-py3-none-any.whl.metadata (14 kB)
Collecting pyDeprecate>=0.4.0 (from supervision)
  Downloading pydeprecate-0.4.0-py3-none-any.whl.metadata (21 kB)
Downloading ultralytics-8.4.14-py3-none-any.whl (1.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m19.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading supervision-0.27.0.post1-py3-none-any.whl (217 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m217.4/217.4 kB[0m [31m11.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pydeprecate-0.4.0-py3-none-any.whl (21 kB)
Downloading ultralytics_thop-2.0.18-py3-none-any.whl (28 kB)
Installing collected packages: pyDeprecate, supervision, ultralytics-tho

In [2]:
import cv2
import pandas as pd
import numpy as np
from ultralytics import YOLO
import supervision as sv
import warnings

# Suppress warnings
warnings.filterwarnings('ignore')

# ==============================
# INPUT / OUTPUT PATHS
# ==============================
VIDEO_PATH = "/kaggle/input/cctv-video3/video1.mp4"
OUTPUT_CSV = "/kaggle/working/tracking_with_snatching_label.csv"

# ==============================
# LOAD MODELS
# ==============================
model = YOLO("yolov8n.pt")
tracker = sv.ByteTrack()

# ==============================
# OPEN VIDEO
# ==============================
cap = cv2.VideoCapture(VIDEO_PATH)
if not cap.isOpened():
    raise RuntimeError("❌ Cannot open video")

frame_id = 0
log_data = []

# ==============================
# STEP 1: TRACKING DATA LOGGER
# ==============================
while True:
    ret, frame = cap.read()
    if not ret:
        break

    frame_id += 1

    results = model(frame, verbose=False)[0]  # Added verbose=False to suppress YOLO output
    detections = sv.Detections.from_ultralytics(results)

    # Keep only PERSON class
    detections = detections[detections.class_id == 0]

    # Apply tracking
    detections = tracker.update_with_detections(detections)

    for bbox, track_id in zip(detections.xyxy, detections.tracker_id):
        if track_id is None:
            continue

        x1, y1, x2, y2 = map(int, bbox)
        log_data.append([frame_id, int(track_id), x1, y1, x2, y2])

cap.release()

# ==============================
# CREATE DATAFRAME
# ==============================
df = pd.DataFrame(
    log_data,
    columns=["frame", "track_id", "x1", "y1", "x2", "y2"]
)

# ==============================
# STEP 2: POSITION (CENTER)
# ==============================
df["cx"] = (df["x1"] + df["x2"]) / 2
df["cy"] = (df["y1"] + df["y2"]) / 2

# ==============================
# STEP 3: SPEED CALCULATION
# ==============================
df["speed"] = 0.0

for pid in df["track_id"].unique():
    person_data = df[df["track_id"] == pid].sort_values("frame")
    dx = person_data["cx"].diff()
    dy = person_data["cy"].diff()
    speed = np.sqrt(dx**2 + dy**2)
    df.loc[person_data.index, "speed"] = speed.fillna(0)

# ==============================
# STEP 4: ADAPTIVE NORMAL BEHAVIOR LEARNING
# ==============================
# Use percentile-based thresholding instead of mean+std (more robust)
NORMAL_SPEED_THRESHOLD = df["speed"].quantile(0.95)

# ==============================
# STEP 5: IMPROVED CHAIN SNATCHING LOGIC (ADAPTIVE)
# ==============================
df["chain_snatching_detected"] = 0

# Adaptive parameters based on video statistics
fps_estimate = len(df) / len(df["frame"].unique()) if len(df["frame"].unique()) > 0 else 1
WINDOW = max(5, int(fps_estimate * 0.5))  # ~0.5 seconds window
PROXIMITY_WINDOW = max(2, int(fps_estimate * 0.15))  # Very brief contact time
SPEED_RATIO_THRESHOLD = 2.5  # Speed difference during/after encounter
IOU_THRESHOLD = 0.1  # Proximity threshold
MIN_SPEED_ABSOLUTE = df["speed"].quantile(0.80)  # High speed required

def compute_iou(b1, b2):
    xA = max(b1[0], b2[0])
    yA = max(b1[1], b2[1])
    xB = min(b1[2], b2[2])
    yB = min(b1[3], b2[3])

    interW = max(0, xB - xA)
    interH = max(0, yB - yA)
    interArea = interW * interH

    area1 = (b1[2] - b1[0]) * (b1[3] - b1[1])
    area2 = (b2[2] - b2[0]) * (b2[3] - b2[1])

    unionArea = area1 + area2 - interArea
    if unionArea == 0:
        return 0

    return interArea / unionArea

frames = sorted(df["frame"].unique())

# Detect chain snatching: brief contact + one person speeds away
for f in frames:
    current_frame_data = df[df["frame"] == f]
    
    for pid1 in current_frame_data["track_id"].unique():
        for pid2 in current_frame_data["track_id"].unique():
            if pid1 >= pid2:
                continue

            # Get current frame bounding boxes
            curr_p1 = current_frame_data[current_frame_data["track_id"] == pid1].iloc[0]
            curr_p2 = current_frame_data[current_frame_data["track_id"] == pid2].iloc[0]
            
            # Check if they are close NOW (in current frame)
            curr_iou = compute_iou(
                [curr_p1["x1"], curr_p1["y1"], curr_p1["x2"], curr_p1["y2"]],
                [curr_p2["x1"], curr_p2["y1"], curr_p2["x2"], curr_p2["y2"]]
            )
            
            # If not close in current frame, skip
            if curr_iou < IOU_THRESHOLD:
                continue
            
            # They are close NOW - check what happens AFTER
            future_data = df[(df["frame"] > f) & (df["frame"] <= f + WINDOW)]
            
            future_p1 = future_data[future_data["track_id"] == pid1]
            future_p2 = future_data[future_data["track_id"] == pid2]
            
            # Need data after the encounter
            if len(future_p1) < 2 or len(future_p2) < 2:
                continue
            
            # Calculate speeds AFTER the encounter
            speed_after_1 = future_p1["speed"].mean()
            speed_after_2 = future_p2["speed"].mean()
            max_speed_after_1 = future_p1["speed"].max()
            max_speed_after_2 = future_p2["speed"].max()
            
            max_speed = max(max_speed_after_1, max_speed_after_2)
            
            # Check if one person speeds away (high speed after contact)
            if max_speed < MIN_SPEED_ABSOLUTE:
                continue
            
            # Calculate speed ratio AFTER encounter
            avg_max = max(speed_after_1, speed_after_2)
            avg_min = min(speed_after_1, speed_after_2)
            
            if avg_min > 0:
                speed_ratio = avg_max / avg_min
            else:
                speed_ratio = float('inf') if avg_max > MIN_SPEED_ABSOLUTE else 0
            
            # Check that they DON'T stay together after contact (brief contact only)
            # Calculate how many frames they remain close
            close_frames = 0
            for check_f in range(f, min(f + PROXIMITY_WINDOW + 1, max(frames) + 1)):
                check_data = df[df["frame"] == check_f]
                check_p1 = check_data[check_data["track_id"] == pid1]
                check_p2 = check_data[check_data["track_id"] == pid2]
                
                if len(check_p1) > 0 and len(check_p2) > 0:
                    check_iou = compute_iou(
                        [check_p1.iloc[0]["x1"], check_p1.iloc[0]["y1"], 
                         check_p1.iloc[0]["x2"], check_p1.iloc[0]["y2"]],
                        [check_p2.iloc[0]["x1"], check_p2.iloc[0]["y1"], 
                         check_p2.iloc[0]["x2"], check_p2.iloc[0]["y2"]]
                    )
                    if check_iou > IOU_THRESHOLD:
                        close_frames += 1
            
            # Chain snatching = brief contact (not prolonged)
            if close_frames > PROXIMITY_WINDOW:
                continue
            
            # DETECTION: Brief contact + one person speeds away + significant speed difference
            if speed_ratio >= SPEED_RATIO_THRESHOLD:
                # Mark frames around the encounter
                mark_data = df[(df["frame"] >= f) & (df["frame"] <= f + WINDOW)]
                mark_p1 = mark_data[mark_data["track_id"] == pid1]
                mark_p2 = mark_data[mark_data["track_id"] == pid2]
                
                df.loc[mark_p1.index, "chain_snatching_detected"] = 1
                df.loc[mark_p2.index, "chain_snatching_detected"] = 1
# ==============================
# SAVE FINAL CSV
# ==============================
df.to_csv(OUTPUT_CSV, index=False)
print("Output is generated")

Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.
[KDownloading https://github.com/ultralytics/assets/releases/download/v8.4.0/yolov8n.pt to 'yolov8n.pt': 100% ━━━━━━━━━━━━ 6.2MB 24.0MB/s 0.3s
Output is generated
