<a href="https://colab.research.google.com/github/Far-ch/Signals-and-Systems-Project/blob/main/Signal_and_Systems_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#YOLO
from ultralytics import YOLO
import cv2, torch, numpy as np, matplotlib.pyplot as plt

VIDEO_IN  = "person1.mp4"
SNAPSHOT  = "first_detected.jpg"

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
MODEL  = "yolov8m.pt"
model  = YOLO(MODEL).to(DEVICE)
CLASSES = {
    0: "person",1: "bicycle",2: "car",3: "motorcycle",4: "airplane",5: "bus",6: "train",7: "truck",8: "boat",9: "traffic light",10: "fire hydrant",
11: "stop sign", 12: "parking meter",13: "bench",14: "bird",15: "cat",16: "dog",17: "horse",18: "sheep",19: "cow",20: "elephant",21: "bear",
22: "zebra",23: "giraffe",24: "backpack",25: "umbrella",26: "handbag",27: "tie",28: "suitcase",29: "frisbee",30: "skis",31: "snowboard",32: "sports ball",
33: "kite",34: "baseball bat",35: "baseball glove",36: "skateboard",37: "surfboard",38: "tennis racket",39: "bottle",40: "wine glass",41: "cup",
42: "fork",43: "knife",44: "spoon",45: "bowl",46: "banana",47: "apple",48: "sandwich",49: "orange",50: "broccoli",51: "carrot",
52: "hot dog", 53: "pizza",54: "donut",55: "cake",56: "chair",57: "couch",58: "potted plant",59: "bed",60: "dining table",61: "toilet",
62: "tv",63: "laptop",64: "mouse",65: "remote",66: "keyboard",67: "cell phone",68: "microwave",69: "oven",70: "toaster",71: "sink",72: "refrigerator",
73: "book",74: "clock",75: "vase",76: "scissors",77: "teddy bear",78: "hair drier",79: "toothbrush"}

CLASS_IDS = list(CLASSES.keys())
rng = np.random.default_rng(42)
COLORS = {cls_id: tuple(rng.integers(64, 256, 3).tolist()) for cls_id in CLASS_IDS}

BOX_THICK = 3
FONT_SCALE = 1.0
FONT_THICK = 2


def detect(img_rgb, imgsz=960, conf=0.3):
    """Detect objects and return list of boxes: (x1, y1, x2, y2, class_id, conf)"""
    res = model.predict(img_rgb, imgsz=imgsz, conf=conf,
                        classes=CLASS_IDS, device=DEVICE,
                        verbose=False)[0]
    out = []
    for b in res.boxes:
        out.append((*map(int, b.xyxy[0]), int(b.cls[0]), float(b.conf[0])))
    return out


def robust_detect(frame_bgr):
    H, W = frame_bgr.shape[:2]

    #Full-frame
    boxes = detect(cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB), 1280, 0.3)
    if boxes:
        return boxes

    #Center crop + upscale
    cw, ch = int(W * 0.6), int(H * 0.7)
    x0, y0 = (W - cw) // 2, (H - ch) // 2
    crop = frame_bgr[y0:y0+ch, x0:x0+cw]
    crop_up = cv2.resize(crop, (1280, 1280))
    boxes_crop = detect(cv2.cvtColor(crop_up, cv2.COLOR_BGR2RGB), 1280, 0.25)
    if boxes_crop:
        sx, sy = cw / 1280, ch / 1280
        return [(int(x1*sx + x0), int(y1*sy + y0),
                 int(x2*sx + x0), int(y2*sy + y0), cls, conf)
                for x1, y1, x2, y2, cls, conf in boxes_crop]

    #Sliding tiles
    out = []
    tile, stride = 640, 320
    for y in range(0, H, stride):
        for x in range(0, W, stride):
            tile_bgr = frame_bgr[y:y+tile, x:x+tile]
            boxes_tile = detect(cv2.cvtColor(tile_bgr, cv2.COLOR_BGR2RGB), 640, 0.25)
            for x1, y1, x2, y2, cls, conf in boxes_tile:
                out.append((x1+x, y1+y, x2+x, y2+y, cls, conf))
    return out


cap = cv2.VideoCapture(VIDEO_IN)
ok, frame = cap.read()
cap.release()
assert ok, f" Couldn't read first frame of {VIDEO_IN}"

boxes = robust_detect(frame.copy())
for x1, y1, x2, y2, cls, conf in boxes:
    label_name = CLASSES.get(cls, "unknown")
    label_text = f"{label_name} {conf:.2f}"

    color = COLORS.get(cls, (0, 255, 255))
    cv2.rectangle(frame, (x1, y1), (x2, y2), color, BOX_THICK)

    (tw, th), _ = cv2.getTextSize(label_text, cv2.FONT_HERSHEY_SIMPLEX,
                                  FONT_SCALE, FONT_THICK)
    cv2.rectangle(frame, (x1, y1 - th - 8), (x1 + tw, y1), color, -1)
    cv2.putText(frame, label_text, (x1, y1 - 5),
                cv2.FONT_HERSHEY_SIMPLEX, FONT_SCALE,
                (255, 255, 255), FONT_THICK, cv2.LINE_AA)


cv2.imwrite(SNAPSHOT, frame)
plt.figure(figsize=(12,6))
plt.imshow(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
plt.axis('off')
plt.show()

In [None]:
#Fast-R-CNN
!pip -q install --upgrade torch torchvision torchaudio
import cv2, torch, matplotlib.pyplot as plt
import numpy as np
from torchvision import models, transforms

# COCO classes
CLASSES = {
    0:"background", 1:"person", 2:"bicycle", 3:"car", 4:"motorcycle", 5:"airplane",
    6:"bus",7:"train",8:"truck",9:"boat",10:"trafficlight",11:"firehydrant",
    13:"bench",14:"bird",15:"cat",16:"dog",17:"horse",18:"sheep",19:"cow",
    20:"elephant",21:"bear",22:"zebra",23:"giraffe",24:"backpack",25:"umbrella",
    27:"handbag",28:"tie",31:"snowboard",32:"sportsball",33:"kite",34:"baseballbat",
    35:"baseballglove",36:"skateboard",37:"surfboard",38:"tennisracket",39:"bottle",
    40:"wineglass",41:"cup",42:"fork",43:"knife",44:"spoon",45:"bowl",46:"banana",
    47:"apple",48:"sandwich",49:"orange",50:"broccoli",51:"carrot",52:"hotdog",
    53:"pizza",54:"donut",55:"cake",56:"chair",57:"couch",58:"pottedplant",59:"bed",
    60:"diningtable",61:"toilet",62:"tv",63:"laptop",64:"mouse",65:"remote",
    66:"keyboard",67:"cellphone",68:"microwave",69:"oven",70:"toaster",71:"sink",
    72:"refrigerator",73:"book",74:"clock",75:"vase",76:"scissors",77:"teddybear",
    78:"hairdrier",79:"toothbrush"
}

VIDEO_PATH = "person1.mp4"
cap = cv2.VideoCapture(VIDEO_PATH)
ok, frame_bgr = cap.read()
cap.release()
assert ok, "Could not read first frame"

frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)

device = "cuda" if torch.cuda.is_available() else "cpu"
model  = models.detection.fasterrcnn_resnet50_fpn(weights="DEFAULT")
model.to(device).eval()

transform = transforms.Compose([transforms.ToTensor()])   # converts to [0,1] + CHW
img_tensor = transform(frame_rgb).to(device)


with torch.no_grad():
    preds = model([img_tensor])[0]   # dict with boxes, labels, scores

boxes   = preds["boxes"].cpu().numpy()
labels  = preds["labels"].cpu().numpy()
scores  = preds["scores"].cpu().numpy()

CONF_TH = 0.50
keep = scores >= CONF_TH

boxes, labels, scores = boxes[keep], labels[keep], scores[keep]

OUT = frame_bgr.copy()
for (x1,y1,x2,y2), cls_id, conf in zip(boxes, labels, scores):
    cls_id = int(cls_id)
    name   = CLASSES.get(cls_id, f"id{cls_id}")
    label  = f"{name} {conf:.2f}"

    color = (0,255,0)   # green boxes; change if you like
    cv2.rectangle(OUT, (int(x1),int(y1)), (int(x2),int(y2)), color, 2)
    (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2)
    cv2.rectangle(OUT, (int(x1), int(y1)-th-8), (int(x1)+tw, int(y1)), color, -1)
    cv2.putText(OUT, label, (int(x1), int(y1)-4),
                cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,0,0), 2)

cv2.imwrite("fast_rcnn_detect.jpg", OUT[:, :, ::-1])  # BGR→RGB file

plt.figure(figsize=(12,6))
plt.imshow(cv2.cvtColor(OUT, cv2.COLOR_BGR2RGB))
plt.axis("off")
plt.title("Fast-R-CNN detection ")
plt.show()


In [None]:
#MOSSE_Tracker
import time
import uuid
import cv2
import numpy as np
import torch
from torchvision import models, transforms
from pathlib import Path

#PATHS
downloads = Path.home() / "Downloads"
VIDEO_PATH = downloads / "videos" / "car1.mp4"
OUTPUT_PATH = downloads / "car1_mosse_final_output.mp4"

#PARAMETERS
CONF_TH = 0.5
MAX_LOST = 250
HIST_THRESH = 0.6
HIST_BINS = (16, 16, 16)

#COCO LABELS
COCO = {i: name for i, name in enumerate([
    '__background__', 'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus',
    'train', 'truck', 'boat', 'traffic light', 'fire hydrant', '', 'stop sign',
    'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow',
    'elephant', 'bear', 'zebra', 'giraffe', '', 'backpack', 'umbrella', '', '',
    'handbag', 'tie', 'suitcase', 'frisbee', 'skis', 'snowboard', 'sports ball',
    'kite', 'baseball bat', 'baseball glove', 'skateboard', 'surfboard',
    'tennis racket', 'bottle', '', 'wine glass', 'cup', 'fork', 'knife',
    'spoon', 'bowl', 'banana', 'apple', 'sandwich', 'orange', 'broccoli',
    'carrot', 'hot dog', 'pizza', 'donut', 'cake', 'chair', 'couch',
    'potted plant', 'bed', '', 'dining table', '', '', 'toilet', '', 'tv',
    'laptop', 'mouse', 'remote', 'keyboard', 'cell phone', 'microwave',
    'oven', 'toaster', 'sink', 'refrigerator', '', 'book', 'clock', 'vase',
    'scissors', 'teddy bear', 'hair drier', 'toothbrush'])}

#Histogram functions
def get_histogram(patch):
    hsv = cv2.cvtColor(patch, cv2.COLOR_BGR2HSV)
    hist = cv2.calcHist([hsv], [0, 1, 2], None, HIST_BINS, [0, 180, 0, 256, 0, 256])
    cv2.normalize(hist, hist)
    return hist.flatten()

def compare_hist(hist1, hist2):
    return cv2.compareHist(hist1.astype(np.float32), hist2.astype(np.float32), cv2.HISTCMP_CORREL)

#Kalman Filter
class KalmanFilter:
    def __init__(self, x, y):
        self.kf = cv2.KalmanFilter(8, 4)
        self.kf.transitionMatrix = np.eye(8, dtype=np.float32)
        for i in range(4):
            self.kf.transitionMatrix[i, i + 4] = 1
        self.kf.measurementMatrix = np.zeros((4, 8), np.float32)
        for i in range(4):
            self.kf.measurementMatrix[i, i] = 1
        self.kf.processNoiseCov = np.eye(8, dtype=np.float32) * 1e-3
        self.kf.measurementNoiseCov = np.eye(4, dtype=np.float32) * 1e-2
        self.kf.statePost = np.array([[x], [y], [0], [0], [0], [0], [0], [0]], np.float32)

    def predict(self):
        pred = self.kf.predict()
        return int(pred[0, 0]), int(pred[1, 0])

    def correct(self, x, y):
        self.kf.correct(np.array([[x], [y], [0], [0]], np.float32))

#Model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = models.detection.fasterrcnn_resnet50_fpn(weights="DEFAULT").to(device).eval()
transform = transforms.Compose([transforms.ToTensor()])

#Load video
cap = cv2.VideoCapture(str(VIDEO_PATH))
assert cap.isOpened(), f"Cannot open video: {VIDEO_PATH}"
W, H = int(cap.get(3)), int(cap.get(4))
fps_in = cap.get(cv2.CAP_PROP_FPS) or 25
out = cv2.VideoWriter(str(OUTPUT_PATH), cv2.VideoWriter_fourcc(*'mp4v'), fps_in, (W, H))

#Initialize
tracks = []
frame_idx = 0
start_time = time.time()
ret, frame = cap.read()
frame_idx += 1
assert ret, "Can't read the first frame"
img_tensor = transform(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)).to(device)

with torch.no_grad():
    preds = model([img_tensor])[0]

boxes = preds["boxes"].cpu().numpy()
labels = preds["labels"].cpu().numpy()
scores = preds["scores"].cpu().numpy()

for box, cls, score in zip(boxes, labels, scores):
    if score < CONF_TH:
        continue
    x1, y1, x2, y2 = map(int, box)
    w, h = x2 - x1, y2 - y1
    patch = frame[y1:y2, x1:x2]
    if patch.size == 0:
        continue
    tracker = cv2.legacy.TrackerMOSSE_create()
    tracker.init(frame, (x1, y1, w, h))
    kalman = KalmanFilter(x1 + w // 2, y1 + h // 2)
    hist = get_histogram(patch)
    label = COCO.get(int(cls), f"class{int(cls)}")
    tracks.append({
        "id": str(uuid.uuid4())[:8],
        "tracker": tracker,
        "label": label,
        "kalman": kalman,
        "hist": hist,
        "lost": 0
    })
    cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
    cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)

out.write(frame)

#Track across video
while True:
    ret, frame = cap.read()
    if not ret:
        break
    frame_idx += 1
    new_tracks = []

    for tr in tracks:
        ok, bbox = tr["tracker"].update(frame)
        x, y, w, h = map(int, bbox)
        x2, y2 = x + w, y + h

        if not ok or w <= 0 or h <= 0:
            tr["lost"] += 1
            pred_x, pred_y = tr["kalman"].predict()
            if tr["lost"] <= MAX_LOST:
                cv2.circle(frame, (pred_x, pred_y), 4, (255, 255, 0), -1)
                new_tracks.append(tr)
            continue

        patch = frame[y:y+h, x:x+w]
        if patch.size > 0:
            hist_new = get_histogram(patch)
            sim = compare_hist(tr["hist"], hist_new)
            if sim >= HIST_THRESH:
                tr["kalman"].correct(x + w // 2, y + h // 2)
                tr["hist"] = 0.8 * tr["hist"] + 0.2 * hist_new
                tr["lost"] = 0
            else:
                tr["lost"] += 1
                if tr["lost"] <= MAX_LOST:
                    pred_x, pred_y = tr["kalman"].predict()
                    cv2.circle(frame, (pred_x, pred_y), 4, (255, 0, 0), -1)
                    new_tracks.append(tr)
                continue

        cv2.rectangle(frame, (x, y), (x2, y2), (255, 0, 0), 2)
        cv2.putText(frame, tr["label"], (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 0), 2)
        new_tracks.append(tr)

    tracks = new_tracks

    elapsed = time.time() - start_time
    fps = frame_idx / elapsed
    cv2.putText(frame, f"FPS: {fps:.1f}", (10, 30),
                cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2)

    out.write(frame)


cap.release()
out.release()
print(f"✅ Done. Output saved to: {OUTPUT_PATH}")
print(f"📈 Average FPS: {frame_idx / (time.time() - start_time):.2f}")

In [None]:
#CSRT_Tracker
from pathlib import Path
import time

import cv2
import numpy as np
from ultralytics import YOLO

#PATHS
downloads   = Path.home() / "Downloads"
VIDEO_PATH  = downloads / "videos" / "car1.mp4"
OUTPUT_PATH = downloads / "car1_CSRT_final_output.mp4"

#HYPER‑PARAMETERS
CONF_THRESHOLD  = 0.05
IOU_THRESH      = 0.50     # skip duplicate tracks if IoU > this
HIST_BINS       = (16,16,16)
MAX_LOST_FRAMES = 60       # keep predicting this many missed frames

def extract_hsv_hist(patch, bins=HIST_BINS):
    hsv  = cv2.cvtColor(patch, cv2.COLOR_BGR2HSV)
    hist = cv2.calcHist([hsv],[0,1,2],None,bins,[0,180,0,256,0,256])
    cv2.normalize(hist, hist)
    return hist.flatten()

def iou(a, b):
    xa,ya = max(a[0],b[0]), max(a[1],b[1])
    xb,yb = min(a[2],b[2]), min(a[3],b[3])
    inter = max(0, xb-xa) * max(0, yb-ya)
    if inter == 0: return 0.0
    area_a = (a[2]-a[0])*(a[3]-a[1])
    area_b = (b[2]-b[0])*(b[3]-b[1])
    return inter / (area_a + area_b - inter)
class Track:
    def __init__(self, bbox_xywh, label, frame):
        self.tracker = cv2.TrackerCSRT_create()
        self.tracker.init(frame, bbox_xywh)

        x,y,w,h = map(int, bbox_xywh)
        self.bbox  = [x,y,w,h]
        self.label = label
        self.lost  = 0

        self.feature = extract_hsv_hist(frame[y:y+h, x:x+w])

        # Kalman state (cx,cy,vx,vy)
        cx, cy = x + w/2, y + h/2
        self.kalman = cv2.KalmanFilter(4,2)
        self.kalman.transitionMatrix   = np.array(
            [[1,0,1,0],[0,1,0,1],[0,0,1,0],[0,0,0,1]], np.float32)
        self.kalman.measurementMatrix  = np.eye(2,4, dtype=np.float32)
        self.kalman.processNoiseCov    = np.eye(4, dtype=np.float32)*1e-2
        self.kalman.measurementNoiseCov= np.eye(2, dtype=np.float32)*1e-1
        self.kalman.statePre  = np.array([[cx],[cy],[0],[0]], np.float32)
        self.kalman.statePost = self.kalman.statePre.copy()

    def predict_center(self):
        p = self.kalman.predict()
        return float(p[0,0]), float(p[1,0])

    def correct(self, cx, cy):
        self.kalman.correct(np.array([[cx],[cy]], np.float32))

    def box_xyxy(self):
        x,y,w,h = self.bbox
        return [x, y, x+w, y+h]

model = YOLO("yolov8n.pt")

cap = cv2.VideoCapture(str(VIDEO_PATH))
assert cap.isOpened(), f"Cannot open {VIDEO_PATH}"
W, H   = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps_in = cap.get(cv2.CAP_PROP_FPS)

out = cv2.VideoWriter(str(OUTPUT_PATH),
                      cv2.VideoWriter_fourcc(*"mp4v"),
                      fps_in, (W,H))

tracks      = []
prev_gray   = None
prev_pts    = None
frame_idx   = 0
start_time  = time.perf_counter()

feature_params = dict(maxCorners=200, qualityLevel=0.3,
                      minDistance=7, blockSize=7)
lk_params = dict(winSize=(15,15), maxLevel=2,
                 criteria=(cv2.TERM_CRITERIA_EPS|cv2.TERM_CRITERIA_COUNT,
                           10, 0.03))

#MAIN LOOP
while True:
    ret, frame = cap.read()
    if not ret:
        break
    frame_idx += 1
    t0 = time.perf_counter()
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

    #First frame: detect & spawn CSRT trackers
    if frame_idx == 1:
        det = model.predict(frame, conf=CONF_THRESHOLD,
                            classes=[0,3], verbose=False)[0]
        for box, cls in zip(det.boxes.xyxy.cpu().numpy().astype(int),
                            det.boxes.cls.cpu().numpy().astype(int)):
            if any(tr.label == model.names[int(cls)] and
                   iou(box, tr.box_xyxy()) > IOU_THRESH for tr in tracks):
                continue
            x1,y1,x2,y2 = box
            tracks.append(Track((x1,y1,x2-x1,y2-y1),
                                model.names[int(cls)], frame))
        prev_gray = gray.copy()
        prev_pts  = cv2.goodFeaturesToTrack(prev_gray, mask=None,
                                            **feature_params)
    #Subsequent frames
    else:
        # 1) global motion compensation
        curr_pts, st, _ = cv2.calcOpticalFlowPyrLK(prev_gray, gray,
                                                  prev_pts, None, **lk_params)
        good_prev = prev_pts[st.flatten()==1]
        good_curr = curr_pts[st.flatten()==1]
        M = np.eye(2,3, dtype=np.float32)
        if len(good_prev) >= 6:
            M,_ = cv2.estimateAffinePartial2D(good_prev, good_curr,
                                              method=cv2.RANSAC)
        stab = cv2.warpAffine(frame, M, (W,H))

        # 2) update each track
        new_tracks = []
        for tr in tracks:
            pcx, pcy = tr.predict_center()
            ok, bbox = tr.tracker.update(stab)
            if ok:
                tr.lost = 0
                tr.bbox = bbox
                x,y,w,h = map(int,bbox)
                tr.correct(x + w/2, y + h/2)
            else:
                tr.lost += 1
                if tr.lost > MAX_LOST_FRAMES:
                    continue  # give up on this track
                # fabricate a bbox around predicted center
                w,h = tr.bbox[2], tr.bbox[3]
                x,y = int(pcx - w/2), int(pcy - h/2)
                tr.bbox = [x,y,w,h]

            # draw back onto original (unstabilised) frame
            x,y,w,h = tr.bbox
            pts  = np.array([[x,y], [x+w,y+h]], np.float32).reshape(-1,1,2)
            invM = cv2.invertAffineTransform(M)
            ox1,oy1, ox2,oy2 = cv2.transform(pts, invM).reshape(-1,2).flatten()
            cv2.rectangle(frame, (int(ox1),int(oy1)),
                          (int(ox2),int(oy2)), (0,255,0), 2)
            cv2.putText(frame, tr.label, (int(ox1),int(oy1)-10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,0), 2)
            cv2.circle(frame, (int(pcx),int(pcy)), 4, (0,0,255), -1)
            new_tracks.append(tr)
        tracks = new_tracks

    #FPS overlay
    fps = 1.0 / (time.perf_counter() - t0)
    cv2.putText(frame, f"FPS: {fps:.1f}",
                (10,30), cv2.FONT_HERSHEY_SIMPLEX,
                0.9, (0,255,255), 2)

    out.write(frame)
    prev_gray = gray.copy()
    prev_pts  = cv2.goodFeaturesToTrack(prev_gray, mask=None,
                                        **feature_params)

total = time.perf_counter() - start_time
print(f"Processed {frame_idx} frames in {total:.2f}s — "
      f"Avg FPS: {frame_idx/total:.2f}")
print(f"Output saved to: {OUTPUT_PATH}")

cap.release()
out.release()
cv2.destroyAllWindows()


In [None]:
#KCF_tracker
import cv2
import torch
import numpy as np
from torchvision import models, transforms
from pathlib import Path
import uuid
import time
#PATHS
downloads = Path.home() / "Downloads"
VIDEO_PATH = downloads / "videos" / "car1.mp4"
OUTPUT_PATH = downloads / "car1_kcf_final_output.mp4"

#CONSTANTS
CONF_TH = 0.6
MAX_LOST = 180
HIST_THRESHOLD = 0.5
HIST_BINS = (16, 16, 16)

#COCO LABELS
COCO = {i: name for i, name in enumerate([
    '__background__', 'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus',
    'train', 'truck', 'boat', 'traffic light', 'fire hydrant', '', 'stop sign',
    'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow',
    'elephant', 'bear', 'zebra', 'giraffe', '', 'backpack', 'umbrella', '', '',
    'handbag', 'tie', 'suitcase', 'frisbee', 'skis', 'snowboard', 'sports ball',
    'kite', 'baseball bat', 'baseball glove', 'skateboard', 'surfboard',
    'tennis racket', 'bottle', '', 'wine glass', 'cup', 'fork', 'knife',
    'spoon', 'bowl', 'banana', 'apple', 'sandwich', 'orange', 'broccoli',
    'carrot', 'hot dog', 'pizza', 'donut', 'cake', 'chair', 'couch',
    'potted plant', 'bed', '', 'dining table', '', '', 'toilet', '', 'tv',
    'laptop', 'mouse', 'remote', 'keyboard', 'cell phone', 'microwave',
    'oven', 'toaster', 'sink', 'refrigerator', '', 'book', 'clock', 'vase',
    'scissors', 'teddy bear', 'hair drier', 'toothbrush'])}

def get_histogram(patch, bins=HIST_BINS):
    hsv = cv2.cvtColor(patch, cv2.COLOR_BGR2HSV)
    hist = cv2.calcHist([hsv], [0, 1, 2], None, bins, [0, 180, 0, 256, 0, 256])
    cv2.normalize(hist, hist)
    return hist.flatten()

def compare_hist(hist1, hist2):
    return cv2.compareHist(hist1.astype(np.float32), hist2.astype(np.float32), cv2.HISTCMP_CORREL)

class StrongKalmanFilter:
    def __init__(self, x, y):
        self.kf = cv2.KalmanFilter(8, 4)
        self.kf.transitionMatrix = np.eye(8, dtype=np.float32)
        for i in range(4):
            self.kf.transitionMatrix[i, i+4] = 1.0
        self.kf.measurementMatrix = np.zeros((4, 8), np.float32)
        for i in range(4):
            self.kf.measurementMatrix[i, i] = 1.0
        self.kf.processNoiseCov = np.eye(8, dtype=np.float32) * 1e-2
        self.kf.measurementNoiseCov = np.eye(4, dtype=np.float32) * 1e-1
        self.kf.errorCovPost = np.eye(8, dtype=np.float32)
        self.kf.statePost = np.array([[x], [y], [0], [0], [0], [0], [0], [0]], dtype=np.float32)

    def predict(self):
        pred = self.kf.predict()
        return int(pred[0]), int(pred[1])

    def correct(self, x, y):
        self.kf.correct(np.array([[x], [y], [0], [0]], dtype=np.float32))

#MODEL
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = models.detection.fasterrcnn_resnet50_fpn(weights="DEFAULT")
model.to(device).eval()
transform = transforms.Compose([transforms.ToTensor()])

#VIDEO
cap = cv2.VideoCapture(str(VIDEO_PATH))
assert cap.isOpened(), f"Could not open {VIDEO_PATH}"
W, H = int(cap.get(3)), int(cap.get(4))
fps = cap.get(cv2.CAP_PROP_FPS) or 25
out = cv2.VideoWriter(str(OUTPUT_PATH), cv2.VideoWriter_fourcc(*'mp4v'), fps, (W, H))

tracks = []
frame_idx = 0
t0 = prev = time.time()

while True:
    ret, frame = cap.read()
    if not ret:
        break
    frame_idx += 1

    if frame_idx == 1:
        rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        img_tensor = transform(rgb).to(device)
        with torch.no_grad():
            preds = model([img_tensor])[0]

        boxes = preds['boxes'].cpu().numpy()
        labels_raw = preds['labels'].cpu().numpy()
        scores = preds['scores'].cpu().numpy()
        keep = scores >= CONF_TH
        boxes, labels_raw = boxes[keep], labels_raw[keep]

        for box, cls_id in zip(boxes, labels_raw):
            x1, y1, x2, y2 = map(int, box)
            w, h = x2 - x1, y2 - y1
            patch = frame[y1:y2, x1:x2]
            if patch.size == 0:
                continue
            tracker = cv2.TrackerKCF_create()
            tracker.init(frame, (x1, y1, w, h))
            hist = get_histogram(patch)
            kalman = StrongKalmanFilter(x1 + w//2, y1 + h//2)
            tracks.append({
                "id": str(uuid.uuid4())[:8],
                "tracker": tracker,
                "label": COCO.get(int(cls_id), f"class{cls_id}"),
                "hist": hist,
                "lost": 0,
                "kalman": kalman
            })
    else:
        new_tracks = []
        for tr in tracks:
            ok, bbox = tr["tracker"].update(frame)
            x, y, w, h = map(int, bbox)
            x2, y2 = x + w, y + h

            if not ok or x < 0 or y < 0 or x2 > W or y2 > H:
                tr["lost"] += 1
                pred_x, pred_y = tr["kalman"].predict()
                if tr["lost"] <= MAX_LOST:
                    cv2.circle(frame, (pred_x, pred_y), 4, (0, 0, 255), -1)
                    new_tracks.append(tr)
                continue

            patch = frame[y:y+h, x:x+w]
            if patch.size > 0:
                hist_new = get_histogram(patch)
                similarity = compare_hist(tr["hist"], hist_new)
                if similarity < HIST_THRESHOLD:
                    tr["lost"] += 1
                    if tr["lost"] <= MAX_LOST:
                        new_tracks.append(tr)
                    continue
                else:
                    tr["lost"] = 0
                    tr["hist"] = 0.8 * tr["hist"] + 0.2 * hist_new
                    tr["kalman"].correct(x + w//2, y + h//2)

            cv2.rectangle(frame, (x, y), (x2, y2), (0, 255, 0), 2)
            cv2.putText(frame, tr["label"], (x, y - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
            new_tracks.append(tr)

        tracks = new_tracks

    #FPS Overlay
    now = time.time()
    fps_now = 1.0 / (now - prev + 1e-6)
    prev = now
    cv2.putText(frame, f"FPS: {fps_now:.1f}", (10, 30),
                cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2)

    out.write(frame)


cap.release()
out.release()
total_time = time.time() - t0
avg_fps = frame_idx / total_time
print(f"✅ Saved to: {OUTPUT_PATH}")
print(f"Average FPS: {avg_fps:.2f}")


In [None]:
#KJF_Tracker
import cv2
import numpy as np
import torch
from ultralytics import YOLO


def filter_points_far_from_cluster(points, threshold_ratio):
    if len(points) == 0:
        return np.array([], dtype=bool)
    if len(points) == 1:
        return np.array([True])

    mean_point = np.mean(points, axis=0)
    distances = np.linalg.norm(points - mean_point, axis=1)

    mask = np.ones(len(points), dtype=bool)
    for i, dist in enumerate(distances):
        others = np.delete(distances, i)
        mean_others = np.mean(others)
        if dist > threshold_ratio * mean_others:
            mask[i] = False

    return mask


def filter_outliers_by_movement(new_points, old_points, max_movement=20):
    if len(new_points) == 0 or len(old_points) == 0:
        return np.array([], dtype=bool)
    movement = np.linalg.norm(new_points - old_points, axis=1)
    mask = movement < max_movement
    return mask


def get_distributed_keypoints(image_gray, max_corners=1000, quality_level=0.7, min_distance=7, grid_size=(4, 4)):
    h, w = image_gray.shape
    keypoints = []

    gh, gw = grid_size
    step_x = w // gw
    step_y = h // gh

    for i in range(gh):
        for j in range(gw):
            x_start = j * step_x
            y_start = i * step_y
            x_end = x_start + step_x
            y_end = y_start + step_y

            roi = image_gray[y_start:y_end, x_start:x_end]

            corners = cv2.goodFeaturesToTrack(roi, maxCorners=max_corners//(gh*gw),
                                              qualityLevel=quality_level, minDistance=min_distance)
            if corners is not None:
                for c in corners:
                    cx, cy = c.ravel()
                    keypoints.append([cx + x_start, cy + y_start])

    return np.array(keypoints, dtype=np.float32).reshape(-1, 1, 2)


# بارگذاری مدل YOLOv8n
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = YOLO('yolov8n.pt').to(device)
model.fuse()

# ویدیو ورودی
cap = cv2.VideoCapture("person2.mp4")
ret, first_frame = cap.read()
if not ret:
    print("خطا در باز کردن ویدیو")
    exit()

# تشخیص همه انسان‌ها (class 0 در COCO)
results = model(first_frame, conf=0.2, classes=[0, 3], max_det=2)[0]
pred = results.boxes
if pred is None or pred.shape[0] == 0:
    print("هیچ جسمی شناسایی نشد")
    exit()

boxes = pred.xyxy.cpu().numpy().astype(int)

# استخراج نقاط کلیدی برای هر باکس
all_points = []
for box in boxes:
    x1, y1, x2, y2 = box[:4]
    roi_gray = cv2.cvtColor(first_frame[y1:y2, x1:x2], cv2.COLOR_BGR2GRAY)
    features = get_distributed_keypoints(
        roi_gray, max_corners=2000, quality_level=0.1, min_distance=7, grid_size=(4, 4))

    if features is None or len(features) == 0:
        all_points.append(np.array([], dtype=np.float32).reshape(-1, 1, 2))
    else:
        features[:, 0, 0] += x1
        features[:, 0, 1] += y1
        all_points.append(features)

old_gray = cv2.cvtColor(first_frame, cv2.COLOR_BGR2GRAY)

# تنظیمات LK Optical Flow
lk_params = dict(winSize=(15, 15),
                 maxLevel=2,
                 criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

# خروجی ویدیو
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter('tracked_output.mp4', fourcc, 30.0,
                      (first_frame.shape[1], first_frame.shape[0]))

draw_keypoints = True  # برای غیرفعال‌کردن: False

# داخل حلقه پردازش:
while True:
    ret, frame = cap.read()
    if not ret:
        break

    frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

    for i, p0 in enumerate(all_points):
        if len(p0) == 0:
            continue

        p1, st, err = cv2.calcOpticalFlowPyrLK(
            old_gray, frame_gray, p0, None, **lk_params)

        if p1 is None or len(p1[st == 1]) < 5:
            all_points[i] = np.array([], dtype=np.float32).reshape(-1, 1, 2)
            continue

        good_new = p1[st == 1]
        good_old = p0[st == 1]

        mask_movement = filter_outliers_by_movement(
            good_new, good_old, max_movement=30)
        mask_distance = filter_points_far_from_cluster(
            good_new, threshold_ratio=2.2)
        final_mask = mask_movement & mask_distance

        good_new = good_new[final_mask]

        if len(good_new) < 5:
            all_points[i] = np.array([], dtype=np.float32).reshape(-1, 1, 2)
            continue

        # رسم نقاط کلیدی (در صورت فعال بودن)
        if draw_keypoints:
            for new in good_new:
                a, b = new.ravel()
                cv2.circle(frame, (int(a), int(b)), 3, (0, 0, 255), -1)

        # رسم جعبه دور جسم
        x_min, y_min = np.min(good_new, axis=0).astype(int)
        x_max, y_max = np.max(good_new, axis=0).astype(int)
        cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), (255, 0, 0), 2)

        all_points[i] = good_new.reshape(-1, 1, 2)

    out.write(frame)
    old_gray = frame_gray.copy()

cap.release()
out.release()
cv2.destroyAllWindows()