## Training YOLOv8 on UA-DETRAC Dataset

We used the YOLOv8n model (nano) for training on a custom vehicle detection dataset formatted in YOLO format.

Key training configuration:
- Epochs: 50  
- Image size: 640  
- Batch size: 24  
- Dataset: UA-DETRAC (from Roboflow)  


In [11]:
import os
import shutil
from ultralytics import YOLO

# === Paths ===
final_model_path   = "models/best.pt"  # This is your existing trained model
trained_model_path = "runs/detect/ua_detrac_yolov8/weights/best.pt"

# === Check for existing model ===
if os.path.exists(final_model_path):
    print(f"✅ YOLOv8 model already exists at: {final_model_path}")
    print("🛑 Skipping training.")
else:
    print("🚀 No trained model found. Starting training...")

    # Load base YOLOv8 model
    model = YOLO("yolov8n.pt")

    # Train model
    model.train(
        data="data/yolo_format/data.yaml",
        epochs=50,
        imgsz=640,
        batch=24,
        name="ua_detrac_yolov8",
        device="cuda"
    )

    # After training, copy the best.pt to models/
    if os.path.exists(trained_model_path):
        os.makedirs("models", exist_ok=True)
        shutil.copy(trained_model_path, final_model_path)
        print(f"✅ Trained model copied to: {final_model_path}")
    else:
        print("❌ Training completed, but no best.pt found in runs/")


✅ YOLOv8 model already exists at: models/best.pt
🛑 Skipping training.


## 🚀 Training Faster R-CNN on UA-DETRAC (Pascal VOC)

This cell will fine-tune a pretrained Faster R-CNN (ResNet-50 + FPN) on your Pascal VOC–formatted UA-DETRAC data.  

- **Train / Val data:** `data/voc_format/train` and `data/voc_format/valid`  
- **Classes:** car, bus, truck, motorcycle  
- **Batch accumulation:** target 30 images via micro-batches of 6  
- **AMP mixed precision** for faster training  
- **Checkpoints** written to `models/checkpoints/checkpoint_last.pth` each epoch  
- **Final weights** saved as `models/faster_rcnn_final.pth`  

> **Skip Logic:** If `models/faster_rcnn_final.pth` already exists, training is skipped.


In [12]:
import os
import xml.etree.ElementTree as ET
import torch
from PIL import Image
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from tqdm import tqdm
from torch.amp import autocast, GradScaler

# ─── Paths & Skip Logic ─────────────────────────────────────────────────────────
final_model_path = "models/faster_rcnn_final.pth"
checkpoint_dir   = "models/checkpoints"
ckpt_path        = os.path.join(checkpoint_dir, "checkpoint_last.pth")
train_dir        = "data/voc_format/train"
val_dir          = "data/voc_format/valid"

if os.path.exists(final_model_path):
    print(f"✅ Final model exists at `{final_model_path}` – skipping training.")
else:
    print("🚀 No existing model found. Starting Faster R-CNN training...")

    # ensure checkpoint folder
    os.makedirs(checkpoint_dir, exist_ok=True)

    # ─── Dataset Definition ─────────────────────────────────────────────────────
    class PascalVOCDataset(Dataset):
        def __init__(self, root, classes, transforms=None):
            self.root = root
            self.transforms = transforms
            self.class_to_idx = {c: i+1 for i, c in enumerate(classes)}
            self.imgs = [f for f in os.listdir(root)
                         if f.lower().endswith(('.jpg', '.jpeg', '.png'))]

        def __len__(self):
            return len(self.imgs)

        def __getitem__(self, idx):
            img_path = os.path.join(self.root, self.imgs[idx])
            image = Image.open(img_path).convert("RGB")
            xml = ET.parse(os.path.splitext(img_path)[0] + ".xml").getroot()
            boxes, labels = [], []
            for obj in xml.findall("object"):
                cls = obj.find("name").text
                if cls not in self.class_to_idx:
                    continue
                labels.append(self.class_to_idx[cls])
                b = obj.find("bndbox")
                xmin = float(b.find("xmin").text)
                ymin = float(b.find("ymin").text)
                xmax = float(b.find("xmax").text)
                ymax = float(b.find("ymax").text)
                boxes.append([xmin, ymin, xmax, ymax])
            boxes = torch.tensor(boxes, dtype=torch.float32)
            labels = torch.tensor(labels, dtype=torch.int64)
            area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
            target = {
                "boxes": boxes,
                "labels": labels,
                "image_id": torch.tensor([idx]),
                "area": area,
                "iscrowd": torch.zeros((len(boxes),), dtype=torch.int64)
            }
            if self.transforms:
                image = self.transforms(image)
            return image, target

    # ─── Transforms & DataLoaders ────────────────────────────────────────────────
    def get_transform(train):
        return transforms.Compose([
            transforms.Resize((600, 600)),          # ↓ smaller images
            transforms.ToTensor(),
            *( [transforms.RandomHorizontalFlip(0.5)] if train else [] )
        ])

    def collate_fn(batch):
        return tuple(zip(*batch))

    classes     = ['car', 'bus', 'truck', 'motorcycle']
    micro_bs    = 2                             # ↓ smaller micro-batch
    accum_steps = max(1, 30 // micro_bs)        # target total batch of ~30

    train_loader = DataLoader(
        PascalVOCDataset(train_dir, classes, transforms=get_transform(True)),
        batch_size=micro_bs, shuffle=True, collate_fn=collate_fn
    )
    val_loader = DataLoader(
        PascalVOCDataset(val_dir, classes, transforms=get_transform(False)),
        batch_size=micro_bs, shuffle=False, collate_fn=collate_fn
    )

    # ─── Model & Optimizer Setup ────────────────────────────────────────────────
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(
        weights=torchvision.models.detection.FasterRCNN_ResNet50_FPN_Weights.COCO_V1
    )
    in_feats = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_feats, len(classes)+1)
    model.to(device)

    optimizer = torch.optim.SGD(
        model.parameters(),
        lr=0.005 * (30/4),
        momentum=0.9,
        weight_decay=5e-4
    )
    lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)
    scaler = GradScaler()

    # ─── Resume from Checkpoint ──────────────────────────────────────────────────
    start_epoch = 0
    if os.path.exists(ckpt_path):
        chk = torch.load(ckpt_path, map_location=device)
        model.load_state_dict(chk['model_state_dict'])
        optimizer.load_state_dict(chk['optimizer_state_dict'])
        lr_scheduler.load_state_dict(chk['scheduler_state_dict'])
        start_epoch = chk['epoch']
        print(f"🔄 Resuming from epoch {start_epoch}")

    # ─── Training Loop ───────────────────────────────────────────────────────────
    for epoch in range(start_epoch, 50):
        model.train()
        running_loss = 0.0
        optimizer.zero_grad()

        for i, (imgs, targs) in enumerate(tqdm(train_loader, desc=f"Epoch {epoch+1}/50 [Train]")):
            imgs = [img.to(device) for img in imgs]
            targs = [{k: v.to(device) for k, v in t.items()} for t in targs]

            with autocast(device_type=device.type):
                loss_dict = model(imgs, targs)
                loss = sum(loss for loss in loss_dict.values()) / accum_steps

            scaler.scale(loss).backward()
            if (i+1) % accum_steps == 0:
                scaler.step(optimizer)
                scaler.update()
                optimizer.zero_grad()

            running_loss += loss.item() * accum_steps

        lr_scheduler.step()

        # Validation
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for imgs, targs in tqdm(val_loader, desc=f"Epoch {epoch+1}/50 [Val]"):
                imgs = [img.to(device) for img in imgs]
                targs = [{k: v.to(device) for k, v in t.items()} for t in targs]
                with autocast(device_type=device.type):
                    loss_dict = model(imgs, targs)
                    val_loss += sum(loss for loss in loss_dict.values()).item()

        print(f"Epoch {epoch+1}: Train {running_loss/len(train_loader):.4f} | Val {val_loss/len(val_loader):.4f}")

        # Save checkpoint & clear cache
        torch.save({
            'epoch': epoch+1,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'scheduler_state_dict': lr_scheduler.state_dict()
        }, ckpt_path)
        torch.save(model.state_dict(),
                   os.path.join(checkpoint_dir, f"checkpoint_epoch_{epoch+1}.pth"))
        torch.cuda.empty_cache()

    # ─── Final Save ───────────────────────────────────────────────────────────────
    torch.save(model.state_dict(), final_model_path)
    print(f"✅ Training complete. Model saved to `{final_model_path}`")


✅ Final model exists at `models/faster_rcnn_final.pth` – skipping training.


## 🔗 Hybrid Detector: YOLOv8 + Faster R-CNN

In this section we build a **hybrid detector** that combines:

1. **YOLOv8** for fast, coarse proposals  
2. **Faster R-CNN** to refine low-confidence boxes  
3. A final **NMS** pass to merge overlapping detections

**Key points**:  
- We load your trained weights from `models/best.pt` (YOLO) and `models/faster_rcnn_final.pth` (RCNN).  
- Any YOLO box with confidence ≥ 0.5 is accepted immediately; lower-confidence boxes are sent to RCNN (threshold 0.7).  
- We crop uncertain regions and run them through the RCNN model, then merge all detections with an NMS IoU of 0.5.  


In [22]:
import cv2
import torch
import numpy as np
from ultralytics import YOLO
import torchvision
import torchvision.transforms as T
from torchvision.ops import nms

# ─── Device & Model Paths ──────────────────────────────────────────────────────
DEVICE           = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
YOLO_MODEL_PATH  = 'models/best.pt'
RCNN_MODEL_PATH  = 'models/faster_rcnn_final.pth'

# ─── Class Names ───────────────────────────────────────────────────────────────
YOLO_NAMES       = ['bus', 'car', 'truck', 'van']
RCNN_CLASSES     = ['background', 'bus', 'car', 'truck', 'van']

# ─── Load YOLOv8 ───────────────────────────────────────────────────────────────
yolo_model = YOLO(YOLO_MODEL_PATH).to(DEVICE)
yolo_model.conf = 0.25  # min confidence for YOLO
yolo_model.iou  = 0.45  # YOLO NMS IoU threshold

# ─── Load Faster R-CNN ─────────────────────────────────────────────────────────
def load_rcnn():
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(
        pretrained=False,
        num_classes=len(RCNN_CLASSES)
    )
    state_dict = torch.load(RCNN_MODEL_PATH, map_location=DEVICE)
    model.load_state_dict(state_dict)
    return model.to(DEVICE).eval()

torchvision_rcnn = load_rcnn()
rcnn_transform   = T.Compose([
    T.ToTensor(),
    T.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])
])

POST_NMS_IOU = 0.5  # final NMS threshold

# ─── Hybrid Detection Function ────────────────────────────────────────────────
def hybrid_detect(frame: np.ndarray,
                  yolo_confirm_thresh: float = 0.5,
                  rcnn_conf_thresh:  float = 0.7
                 ) -> list:
    # 1) YOLO inference
    results   = yolo_model(frame)[0]
    yb        = results.boxes.xyxy.cpu().numpy()
    ys        = results.boxes.conf.cpu().numpy()
    yc        = results.boxes.cls.cpu().numpy().astype(int)
    h, w      = frame.shape[:2]
    detections = []

    # 2) Process each YOLO box
    for (x1,y1,x2,y2), score, clsid in zip(yb, ys, yc):
        x1, y1, x2, y2 = map(int, (x1,y1,x2,y2))
        # clamp to image
        x1, y1 = max(0,x1), max(0,y1)
        x2, y2 = min(w,x2), min(h,y2)

        if score >= yolo_confirm_thresh:
            detections.append([x1,y1,x2,y2,float(score),int(clsid)])
        else:
            # skip tiny boxes
            if (x2-x1)<20 or (y2-y1)<20: continue
            crop = frame[y1:y2, x1:x2]
            img_t = rcnn_transform(crop).to(DEVICE)
            with torch.no_grad():
                out = torchvision_rcnn([img_t])[0]

            rb, rs, rl = out['boxes'].cpu().numpy(), out['scores'].cpu().numpy(), out['labels'].cpu().numpy().astype(int)
            mask = rs >= rcnn_conf_thresh
            if not mask.any(): continue

            idx = mask.argmax()
            bx1, by1, bx2, by2 = rb[idx][:4]
            # map back to original coords
            bx1 = int(bx1 + x1); by1 = int(by1 + y1)
            bx2 = int(bx2 + x1); by2 = int(by2 + y1)
            label = rl[idx]
            if label==0: continue

            detections.append([bx1,by1,bx2,by2,float(rs[idx]), int(label-1)])

    # 3) Final NMS
    if detections:
        boxes  = torch.tensor([d[:4] for d in detections], dtype=torch.float32)
        scores = torch.tensor([d[4]  for d in detections], dtype=torch.float32)
        keep   = nms(boxes, scores, POST_NMS_IOU).cpu().numpy()
        detections = [detections[i] for i in keep]

    return detections




In [None]:
cap = cv2.VideoCapture('inputs/2252223-sd_960_540_30fps.mp4')
ret, frame = cap.read()
_ = hybrid_detect(frame)  # warm-up

while cap.isOpened():
    ret, frame = cap.read()
    if not ret: break
    dets = hybrid_detect(frame)
    for x1,y1,x2,y2,sc,clsid in dets:
        lbl = YOLO_NAMES[clsid]
        cv2.rectangle(frame, (x1,y1), (x2,y2), (0,255,0), 2)
        cv2.putText(frame, f'{lbl} {sc:.2f}', (x1,y1-6),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,0), 2)
    cv2.imshow('Hybrid Detection', frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

## 📊 Hybrid Tracking & Extended Logging

We’ll enhance our DeepSORT tracking loop to log:

- **Active counts** per frame (already displayed)  
- **Total unique objects** seen per class over the entire video  
- **Per-track lifespans** (number of frames each object was tracked)  
- **Per-frame log entries**, which you can save to CSV for later analysis  

At the end we’ll print a summary table of total unique counts and average lifespan per class.


In [None]:
# ─── Hybrid Tracking & Reporting with Video Output ─────────────────────────────
import cv2
import os
from deep_sort_realtime.deepsort_tracker import DeepSort

# ─── Parameters ────────────────────────────────────────────────────────────────
INPUT_VIDEO    = 'https://hls.ibb.gov.tr/tkm4/hls/19.stream/playlist.m3u8'
OUTPUT_VIDEO   = 'outputs/tracked_output.mp4'
REPORT_PATH    = 'outputs/tracking_report.txt'
YOLO_THRESH    = 0.75
RCNN_THRESH    = 0.9
TRACK_MAX_AGE  = 40
TRACK_N_INIT   = 2
MIN_AREA_FRAC  = 0.001
MAX_AREA_FRAC  = 0.4

# ─── Prepare output dirs & video writer ───────────────────────────────────────
os.makedirs('outputs', exist_ok=True)
cap = cv2.VideoCapture(INPUT_VIDEO)
if not cap.isOpened():
    raise RuntimeError(f"Cannot open video {INPUT_VIDEO}")

fps      = cap.get(cv2.CAP_PROP_FPS) or 30.0
ret, frame = cap.read()
if not ret:
    raise RuntimeError("Cannot read first frame.")
h, w     = frame.shape[:2]
fourcc   = cv2.VideoWriter_fourcc(*'mp4v')
writer   = cv2.VideoWriter(OUTPUT_VIDEO, fourcc, fps, (w, h))
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)

# ─── Logging structures ───────────────────────────────────────────────────────
total_seen    = {name: set() for name in YOLO_NAMES}
track_classes = {}       # track_id → class_name
track_lengths = {}       # track_id → total frames tracked
frame_idx     = 0

# ─── Initialize tracker ───────────────────────────────────────────────────────
tracker = DeepSort(max_age=TRACK_MAX_AGE, n_init=TRACK_N_INIT)

# ─── Process video ────────────────────────────────────────────────────────────
while True:
    ret, frame = cap.read()
    if not ret:
        break
    frame_idx += 1

    # 1) Hybrid detection
    dets = hybrid_detect(
        frame,
        yolo_confirm_thresh=YOLO_THRESH,
        rcnn_conf_thresh=RCNN_THRESH
    )

    # 2) Filter & format for DeepSORT
    raw = []
    frame_area = h * w
    for x1, y1, x2, y2, score, clsid in dets:
        area = (x2 - x1) * (y2 - y1)
        if area < frame_area * MIN_AREA_FRAC or area > frame_area * MAX_AREA_FRAC:
            continue
        cls_name = YOLO_NAMES[clsid]
        raw.append(([x1, y1, x2 - x1, y2 - y1], score, cls_name))

    # 3) Update tracks
    tracks = tracker.update_tracks(raw, frame=frame)

    # 4) Count active & update logs
    active_counts = {name: 0 for name in YOLO_NAMES}
    for tr in tracks:
        if not tr.is_confirmed():
            continue
        cid = tr.det_class
        tid = tr.track_id
        active_counts[cid] += 1
        track_classes.setdefault(tid, cid)
        if tid not in total_seen[cid]:
            total_seen[cid].add(tid)
        track_lengths[tid] = track_lengths.get(tid, 0) + 1

        # draw box & label
        x1, y1, x2, y2 = map(int, tr.to_ltrb())
        cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 0, 0), 2)
        cv2.putText(frame,
                    f"{cid}-{tid}",
                    (x1, y1 - 10),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5,
                    (255, 0, 0), 2)

    # 5) Overlay counts (ASCII only, no unicode arrow)
    y0 = 30
    for cls in YOLO_NAMES:
        text = f"{cls}: active {active_counts[cls]} | total {len(total_seen[cls])}"
        cv2.putText(frame, text, (10, y0),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6,
                    (0, 255, 255), 2)
        y0 += 25

    # 6) Write frame to output video & display
    writer.write(frame)
    cv2.imshow('Hybrid Tracking', frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
writer.release()
cv2.destroyAllWindows()

# ─── Generate Text Report ──────────────────────────────────────────────────────
summary_counts = {cls: len(total_seen[cls]) for cls in YOLO_NAMES}
lifespans      = {cls: [] for cls in YOLO_NAMES}
for tid, length in track_lengths.items():
    cls = track_classes[tid]
    lifespans[cls].append(length)
avg_lifespan = {
    cls: (sum(lifespans[cls]) / len(lifespans[cls])) if lifespans[cls] else 0
    for cls in YOLO_NAMES
}

with open(REPORT_PATH, 'w') as f:
    f.write("=== Tracking Report ===\n\n")
    f.write(f"Video processed: {INPUT_VIDEO}\n")
    f.write(f"Total frames: {frame_idx}\n\n")
    for cls in YOLO_NAMES:
        f.write(f"{cls.upper()}:\n")
        f.write(f"  • Total unique seen : {summary_counts[cls]}\n")
        f.write(f"  • Avg lifespan      : {avg_lifespan[cls]:.1f} frames\n\n")

print(f"✅ Overlay video saved to {OUTPUT_VIDEO}")
print(f"✅ Text report saved to {REPORT_PATH}")
