In [None]:
# Put near top of your notebook
import time
import json
import os
import numpy as np
import pandas as pd
from ultralytics import YOLO
import cv2
from collections import defaultdict

# ---------- 1) Run ultralytics validation to get detection metrics ----------
def run_yolov8_val(model_path, data_yaml, imgsz=640, conf=0.25, iou=0.5, save_json=True, device=None):
    """
    Runs YOLOv8 .val() to get mAP, precision, recall etc.
    Returns: results object and metrics dict
    """
    model = YOLO(model_path)
    if device:
        model.to(device)
    print(f"Running val() for {model_path} ...")
    res = model.val(data=data_yaml, imgsz=imgsz, conf=conf, iou=iou, verbose=False)
    # ultralytics returns object with metrics in res.metrics or res.box.map and res.box.pr... depends on version.
    metrics = {}
    try:
        # Try common keys
        metrics['mAP_50'] = float(res.box.map[0])            # map@0.5
        metrics['mAP_50_95'] = float(res.box.map[1])        # map@0.5:0.95
        metrics['precision'] = float(res.box.pr[0])
        metrics['recall'] = float(res.box.pr[1])
    except Exception:
        # Fallback to printed JSON if available
        if hasattr(res, 'metrics'):
            metrics = dict(res.metrics)
    # Optionally save JSON (for later per-class AP parsing)
    if save_json and hasattr(res, 'saved_results') and 'results.json' in res.saved_results:
        try:
            out_json = res.saved_results['results.json']
            with open(f"{os.path.basename(model_path)}_val_results.json","w") as f:
                json.dump(out_json, f)
        except Exception:
            pass
    return res, metrics

# ---------- 2) Evaluate runtime (inference speed) and distance MAE ----------
def eval_runtime_and_distance(model_path, video_or_image_list, focal_length_px=1000,
                              known_heights_m=None, default_height_m=1.5, device=None, conf=0.25):
    """
    Evaluate average inference time per frame and compute distance MAE if ground truth available.
    - video_or_image_list: list of image paths OR path to single video
    - known_heights_m: dict class_id -> real height (meters)
    Returns: dict with avg_infer_ms, fps, distance_metrics (MAE, RMSE) if distances computed
    Note: This function attempts to match predicted bbox to ground-truth bboxes if a GT COCO JSON is provided
    """
    model = YOLO(model_path)
    if device:
        model.to(device)

    is_video = isinstance(video_or_image_list, str) and video_or_image_list.lower().endswith(('.mp4','.mov','.avi','.mkv'))
    infer_times = []
    distance_errors = []  # store absolute error meters
    squared_errors = []

    # If we have dataset GT distances per image/annotation, use them. Otherwise compute by matching to COCO annotations (not included here).
    # For simplicity: assume we have a mapping `gt_distances_by_frame_and_track` or we compute from GT bbox heights and known real heights.

    if is_video:
        cap = cv2.VideoCapture(video_or_image_list)
        frame_idx = 0
        while True:
            ret, frame = cap.read()
            if not ret: break
            frame_idx += 1
            t0 = time.time()
            results = model.predict(frame, conf=conf, verbose=False)
            t1 = time.time()
            infer_times.append((t1-t0)*1000)  # ms

            # compute distance per detection using box height
            if results and len(results) and results[0].boxes is not None:
                boxes = results[0].boxes.xyxy.cpu().numpy()  # [N,4]
                cls_ids = results[0].boxes.cls.cpu().numpy().astype(int)
                for (x1,y1,x2,y2), cls in zip(boxes, cls_ids):
                    h_px = float(y2-y1)
                    obj_h_m = known_heights_m.get(int(cls), default_height_m) if known_heights_m else default_height_m
                    est_dist = (obj_h_m * focal_length_px) / (h_px + 1e-6)
                    # If you have GT distance: compute error. Here we skip if no GT.
                    # Example placeholder: gt_dist = get_gt_distance_for_frame(frame_idx, cls)
                    # if gt_dist is not None: distance_errors.append(abs(est_dist - gt_dist)); squared_errors.append((est_dist-gt_dist)**2)
            # else continue
        cap.release()
    else:
        for img_path in video_or_image_list:
            img = cv2.imread(img_path)
            t0 = time.time()
            results = model.predict(img, conf=conf, verbose=False)
            t1 = time.time()
            infer_times.append((t1-t0)*1000)
            # same distance estimate block as above...
    # summary:
    avg_ms = float(np.mean(infer_times)) if infer_times else None
    fps = 1000.0/avg_ms if avg_ms and avg_ms>0 else None
    out = {'avg_infer_ms': avg_ms, 'fps': fps}
    if distance_errors:
        out['distance_MAE'] = float(np.mean(distance_errors))
        out['distance_RMSE'] = float(np.sqrt(np.mean(squared_errors)))
    return out

# ---------- 3) Prepare a compact compare() to run both models and collect major metrics ----------
def compare_models(baseline_model_path, tuned_model_path, data_yaml, imgsz=640, conf=0.25, iou=0.5, device=None):
    results = {}
    for name, mp in [('baseline', baseline_model_path), ('tuned', tuned_model_path)]:
        res, met = run_yolov8_val(mp, data_yaml, imgsz=imgsz, conf=conf, iou=iou, device=device)
        results[name] = {'metrics': met, 'val_result_obj': res}
    # Build a summary dataframe
    rows = []
    keys = set()
    for k in results:
        metrics = results[k]['metrics']
        for kk in metrics: keys.add(kk)
    for k in results:
        row = {'model': k}
        for kk in keys:
            row[kk] = results[k]['metrics'].get(kk, None)
        rows.append(row)
    df = pd.DataFrame(rows)
    return df, results

# ---------- 4) Example usage ----------
# BASELINE = '/content/drive/.../yolov8n_baseline.pt'
# TUNED = '/content/drive/.../yolov8n_custom_coco_best.pt'
# DATA_YAML = '/content/drive/.../data.yaml'  # must reference validation set and classes
# df_summary, raw = compare_models(BASELINE, TUNED, DATA_YAML, imgsz=640, conf=0.25, iou=0.5, device='cuda')
# print(df_summary)
# df_summary.to_csv("model_comparison_summary.csv", index=False)


In [None]:
import motmetrics as mm

def compute_tracking_metrics(gt_file, hyp_file):
    # Both CSVs: columns: [frame, id, x, y, w, h] (MOT format requires tlwh)
    gt = mm.io.loadtxt(gt_file, fmt='mot15-2D')
    hyp = mm.io.loadtxt(hyp_file, fmt='mot15-2D')
    acc = mm.utils.compare_to_groundtruth(gt, hyp, 'iou', distth=0.5)
    mh = mm.metrics.create()
    summary = mh.compute(acc, metrics=['mota','idf1','precision','recall','num_frames','num_objects','mostly_tracked','id_switches'], name='summary')
    print(summary)
    return summary


In [None]:
confs = [0.2, 0.3, 0.4, 0.5, 0.6]
rows = []
for c in confs:
    df_summary, _ = compare_models(BASELINE, TUNED, DATA_YAML, imgsz=640, conf=c, iou=0.5, device='cuda')
    # df_summary has precision, recall, mAP columns
    df_summary['conf'] = c
    rows.append(df_summary)
big = pd.concat(rows, ignore_index=True)
big.to_csv("threshold_sweep_results.csv", index=False)


# Full working pipeline Need to be fixed


In [None]:
# Full pipeline integrating YOLOv8 (ultralytics) + ByteTrack + gTTS audio alerts
import os, time, base64, subprocess, math
from collections import deque, defaultdict
from gtts import gTTS
from pydub import AudioSegment
import cv2
import numpy as np
import torch
from ultralytics import YOLO

# ByteTrack import (from installed ByteTrack package)
try:
    # ByteTrack repo provides BYTETracker under yolox/ or bytetrack package depending on install
    from yolox.tracker.byte_tracker import BYTETracker
except Exception as e:
    try:
        from bytetrack.byte_tracker import BYTETracker
    except Exception as e2:
        raise ImportError(
            "BYTETracker not found. Make sure you installed ByteTrack (see install cell). "
            "After installing, restart the runtime and re-run this cell."
        )

# ---------------------- User-configurable settings ----------------------
CUSTOM_YOLO_MODEL_PATH = '/content/drive/My Drive/VisionAssist-Models/yolov8n_custom_coco_best.pt'
VIDEO_SOURCE = 'test.mp4'   # uploaded/placed in working dir
SILENT_VIDEO_OUTPUT = 'temp_silent_video_mp4v.mp4'
FINAL_VIDEO_OUTPUT = 'final_video_with_audio.mp4'
FINAL_AUDIO_FILE = 'final_audio.mp3'
TEMP_TTS_FILE = 'temp_alert.mp3'

# Distance estimation / classes
ESTIMATED_FOCAL_LENGTH_PIXELS = 1000
KNOWN_OBJECT_HEIGHTS_METERS = {0:1.7, 1:1.0, 2:1.5, 3:1.2, 5:3.0, 7:3.5}
DEFAULT_KNOWN_HEIGHT = 1.5

# Alert & tracking params
CONF_THRESHOLD = 0.35
IOU_THRESHOLD_FOR_TRACK = 0.3    # passed to BYTETracker config
ALERT_CLASS_COOLDOWN_SEC = 8.0
ALERT_REPEAT_DELAY_SEC = 15.0
HISTORY_FRAMES = 15
MOVEMENT_THRESHOLD_PIXELS = 5
DIRECTION_THRESHOLD_RATIO = 0.3

# ByteTrack tracker parameters (tune as needed)
BYTE_TRACKER_ARGS = {
    'track_thresh': 0.5,   # detection confidence threshold used inside ByteTrack
    'match_thresh': 0.8,
    'track_buffer': 30,
    'frame_rate': 30,
}
# -----------------------------------------------------------------------

# small helpers
def get_device():
    return 'cuda' if torch.cuda.is_available() else 'cpu'

def estimate_distance(box_height_px, object_real_height_m, focal_length_px):
    if box_height_px <= 0: return float('inf')
    return (object_real_height_m * focal_length_px) / (box_height_px + 1e-6)

def get_direction_motion(track_id, frame_width, track_histories):
    direction_str = "Ahead"
    motion_str = "Static"
    if track_id in track_histories and len(track_histories[track_id]) == HISTORY_FRAMES:
        history = track_histories[track_id]
        oldest_pos, newest_pos = history[0], history[-1]
        dx = newest_pos[0] - oldest_pos[0]
        dy = newest_pos[1] - oldest_pos[1]
        if abs(dx) > MOVEMENT_THRESHOLD_PIXELS or abs(dy) > MOVEMENT_THRESHOLD_PIXELS:
            motion_str = "Moving"
        frame_center_x = frame_width / 2
        relative_pos = (newest_pos[0] - frame_center_x) / frame_center_x
        if relative_pos > DIRECTION_THRESHOLD_RATIO:
            direction_str = "Right"
        elif relative_pos < -DIRECTION_THRESHOLD_RATIO:
            direction_str = "Left"
    return direction_str, motion_str

# Load models
print("Device:", get_device())
DEVICE = get_device()

print("Loading YOLO model:", CUSTOM_YOLO_MODEL_PATH)
yolo_model = YOLO(CUSTOM_YOLO_MODEL_PATH)
yolo_model.to(DEVICE)

# Create ByteTracker
# ByteTracker's constructor signature (common): BYTETracker(opt) or BYTETracker(args) depending on install.
# We'll attempt common patterns:
def make_bytetrack(frame_rate=30):
    try:
        tracker = BYTETracker(BYTE_TRACKER_ARGS)
        return tracker
    except Exception:
        try:
            tracker = BYTETracker(frame_rate=frame_rate)
            return tracker
        except Exception as e:
            # Fallback: try passing unpacked dict
            try:
                tracker = BYTETracker(**BYTE_TRACKER_ARGS)
                return tracker
            except Exception as e2:
                raise RuntimeError("Failed to instantiate BYTETracker. See ByteTrack install instructions.") from e2

tracker = make_bytetrack(frame_rate=BYTE_TRACKER_ARGS.get('frame_rate',30))

# Video I/O
cap = cv2.VideoCapture(VIDEO_SOURCE)
if not cap.isOpened():
    raise FileNotFoundError(f"Cannot open video {VIDEO_SOURCE}")

frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS) or BYTE_TRACKER_ARGS.get('frame_rate', 30)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0)
total_duration_ms = ((total_frames / fps) * 1000) if fps>0 else 0

fourcc = cv2.VideoWriter_fourcc(*'mp4v')
writer = cv2.VideoWriter(SILENT_VIDEO_OUTPUT, fourcc, fps, (frame_width, frame_height))

# runtime state
track_histories = {}        # track_id -> deque of (cx,cy)
alert_log = []              # list of (timestamp_ms, alert_text)
alerted_tracks = {}         # track_id -> last_alert_time_sec
last_alert_time_by_class = {} # class_id -> last alert time (sec)
alert_sound_cache = {}      # alert_text -> AudioSegment

frame_idx = 0
print("Starting processing...")
while True:
    ret, frame = cap.read()
    if not ret:
        break
    frame_idx += 1
    current_msec = cap.get(cv2.CAP_PROP_POS_MSEC)
    current_sec = current_msec / 1000.0

    # 1) Run detection with YOLOv8
    # We request results as numpy arrays; ultralytics may return a list of Result objects
    results = yolo_model.predict(frame, conf=CONF_THRESHOLD, imgsz=640, verbose=False)  # single image predict
    detections_for_tracker = []  # format: [ [x1,y1,x2,y2,score,cls], ... ] or [ [x1,y1,x2,y2,score] ] depending on ByteTrack API
    detections_per_frame = []    # keep class info

    if results and len(results) and hasattr(results[0], 'boxes') and results[0].boxes is not None:
        boxes = results[0].boxes.xyxy.cpu().numpy()  # Nx4
        scores = results[0].boxes.conf.cpu().numpy() # Nx
        cls_ids = results[0].boxes.cls.cpu().numpy().astype(int)  # Nx
        for (x1,y1,x2,y2), sc, cls in zip(boxes, scores, cls_ids):
            # ByteTrack expects tlbr format (x1,y1,x2,y2) and score
            detections_for_tracker.append([float(x1), float(y1), float(x2), float(y2), float(sc)])
            detections_per_frame.append({'box':(int(x1),int(y1),int(x2),int(y2)), 'score':float(sc), 'class_id':int(cls)})

    # 2) Feed ByteTrack with detections (ByteTrack typically expects numpy array)
    import numpy as _np
    if len(detections_for_tracker) > 0:
        dets_np = _np.asarray(detections_for_tracker, dtype=_np.float32)
    else:
        dets_np = _np.zeros((0,5), dtype=_np.float32)

    # ByteTrack update: signature often tracker.update(dets, img_info, frame_id=frame_idx)
    try:
        online_targets = tracker.update(dets_np, [frame_height, frame_width], frame_idx)
    except TypeError:
        # alternate signature: tracker.update(dets, frame_id=frame_idx)
        try:
            online_targets = tracker.update(dets_np, frame_idx)
        except Exception as e:
            # alternate signature: tracker.update(dets)
            online_targets = tracker.update(dets_np)

    # online_targets is usually a list of Track objects with attributes: tlwh, tlbr, track_id, score, cls
    active_track_ids_this_frame = set()
    # map tracker boxes to classes & scores by IoU because ByteTrack strips class info in many implementations
    # We'll perform a simple matching: for each online_target find best matching detection_per_frame by IoU
    def iou(boxA, boxB):
        xA = max(boxA[0], boxB[0])
        yA = max(boxA[1], boxB[1])
        xB = min(boxA[2], boxB[2])
        yB = min(boxA[3], boxB[3])
        interW = max(0, xB - xA)
        interH = max(0, yB - yA)
        interArea = interW * interH
        boxAArea = max(0, boxA[2]-boxA[0]) * max(0, boxA[3]-boxA[1])
        boxBArea = max(0, boxB[2]-boxB[0]) * max(0, boxB[3]-boxB[1])
        union = boxAArea + boxBArea - interArea + 1e-6
        return interArea / union

    mapped_targets = []
    if hasattr(online_targets, '__iter__'):
        for t in online_targets:
            # t may be a custom object or tuple; attempt to extract bounding box and id
            try:
                tlbr = getattr(t, 'tlbr', None)
                if tlbr is None:
                    tlbr = getattr(t, 'tlwh', None)
                    if tlbr is not None and len(tlbr) == 4:
                        x,y,w,h = tlbr
                        tlbr = [x, y, x+w, y+h]
                track_id = int(getattr(t, 'track_id', getattr(t, 'id', -1)))
                score = float(getattr(t, 'score', 0.0))
            except Exception:
                # fallback if t is tuple-like
                try:
                    arr = np.asarray(t)
                    tlbr = arr[:4].tolist()
                    track_id = int(arr[4])
                    score = float(arr[5]) if arr.shape[0] > 5 else 0.0
                except Exception:
                    continue
            if tlbr is None: continue
            mapped_targets.append({'tlbr': [float(v) for v in tlbr], 'track_id':track_id, 'score':score})

    # Match mapped_targets to detections to obtain class_id
    for mt in mapped_targets:
        best_iou = 0.0
        best_cls = None
        best_box = None
        for det in detections_per_frame:
            i = iou(mt['tlbr'], det['box'])
            if i > best_iou:
                best_iou = i
                best_cls = det['class_id']
                best_box = det['box']
        # If poor IoU match, still use bounding box center from tracker box
        tl = mt['tlbr']
        x1,y1,x2,y2 = map(int,tl)
        cx = (x1 + x2)//2
        cy = (y1 + y2)//2
        tid = mt['track_id']
        active_track_ids_this_frame.add(tid)
        if tid not in track_histories:
            track_histories[tid] = deque(maxlen=HISTORY_FRAMES)
        track_histories[tid].append((cx, cy))

        # estimate distance & do alerting
        box_h_px = y2 - y1
        cls = best_cls if best_cls is not None else -1
        obj_real_h = KNOWN_OBJECT_HEIGHTS_METERS.get(int(cls), DEFAULT_KNOWN_HEIGHT)
        est_dist = estimate_distance(box_h_px, obj_real_h, ESTIMATED_FOCAL_LENGTH_PIXELS)
        direction_str, motion_str = get_direction_motion(tid, frame_width, track_histories)
        class_name = yolo_model.names.get(int(cls), f"Class{cls}") if cls != -1 else "Unknown"
        info_text = f"ID:{tid} {class_name} Est:{est_dist:.1f}m {direction_str}"
        if motion_str == "Moving":
            info_text += " Moving"
        # Draw box & label
        cv2.rectangle(frame, (x1,y1), (x2,y2), (0,255,0), 2)
        cv2.putText(frame, info_text, (x1, max(20,y1-10)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0), 2)

        # Alert logic: person <2m, objects <5m (same as original)
        is_person = (cls == 0)
        is_close_person = is_person and (est_dist < 2.0)
        is_close_object = (not is_person) and (est_dist < 5.0)
        if (is_close_person or is_close_object) and frame_idx > 10:
            last_alerted_time_for_track = alerted_tracks.get(tid, -1e9)
            if current_sec - last_alerted_time_for_track > ALERT_REPEAT_DELAY_SEC:
                # per-class cooldown
                last_time_for_class = last_alert_time_by_class.get(int(cls), -1e9)
                if current_sec - last_time_for_class > ALERT_CLASS_COOLDOWN_SEC:
                    dist_str = f"{est_dist:.0f}"
                    alert_base = f"{class_name}, about {dist_str} meters, {direction_str}."
                    alert_text_this_frame = f"Caution: {alert_base}"
                    alert_log.append((int(current_msec), alert_text_this_frame))
                    alerted_tracks[tid] = current_sec
                    last_alert_time_by_class[int(cls)] = current_sec
                    print(f"[ALERT @ {current_sec:.2f}s] {alert_text_this_frame}")

    # cleanup track_histories for disappeared tracks
    current_track_ids_set = set(active_track_ids_this_frame)
    for tid in list(track_histories.keys()):
        if tid not in current_track_ids_set:
            del track_histories[tid]
            if tid in alerted_tracks:
                del alerted_tracks[tid]

    # write frame to silent video
    writer.write(frame)

# finished loop
cap.release()
writer.release()
print("Video processing finished. Silent video saved:", SILENT_VIDEO_OUTPUT)

# ------------------- Generate audio from alerts using gTTS -------------------
print("Generating TTS audio for unique alerts...")
final_audio = AudioSegment.silent(duration=int(total_duration_ms) if total_duration_ms>0 else 1000)

unique_alerts = list({text for _, text in alert_log})
for text in unique_alerts:
    try:
        tts = gTTS(text=text, lang='en', slow=False)
        tts.save(TEMP_TTS_FILE)
        alert_sound_cache[text] = AudioSegment.from_mp3(TEMP_TTS_FILE)
        os.remove(TEMP_TTS_FILE)
    except Exception as e:
        print("gTTS warning:", e)

print("Overlaying alerts onto timeline...")
for timestamp_ms, text in alert_log:
    if text in alert_sound_cache:
        sound = alert_sound_cache[text]
        final_audio = final_audio.overlay(sound, position=int(timestamp_ms))

final_audio.export(FINAL_AUDIO_FILE, format='mp3')
print("Final audio saved:", FINAL_AUDIO_FILE)

# ------------------- Mux audio + video with ffmpeg -------------------
print("Muxing audio and video using ffmpeg...")
ffmpeg_cmd = [
    'ffmpeg', '-y', '-loglevel', 'error',
    '-i', SILENT_VIDEO_OUTPUT, '-i', FINAL_AUDIO_FILE,
    '-c:v', 'libx264', '-c:a', 'aac', '-shortest', FINAL_VIDEO_OUTPUT
]
try:
    subprocess.run(ffmpeg_cmd, check=True)
    print("Final video with audio saved:", FINAL_VIDEO_OUTPUT)
except subprocess.CalledProcessError as e:
    print("FFmpeg failed:", e)

# Optionally display inline (if running in Colab/Notebook)
try:
    from IPython.display import HTML, display
    data = open(FINAL_VIDEO_OUTPUT,"rb").read()
    video_url = "data:video/mp4;base64," + base64.b64encode(data).decode()
    display(HTML(f'<video width="640" height="480" controls><source src="{video_url}" type="video/mp4"></video>'))
except Exception as e:
    print("Could not inline-display video:", e)

# Cleanup (optional)
# os.remove(SILENT_VIDEO_OUTPUT); os.remove(FINAL_AUDIO_FILE)
print("Done.")
