In [6]:
# Cell 2: configuration (EDIT THESE PATHS AS NEEDED)
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "7" 
from pathlib import Path
import json
import math
import cv2
import yaml
import numpy as np
import pandas as pd
from ultralytics import YOLO
from tqdm import tqdm

# === EDIT THESE ===
# default model path inferred from your OBBMetrics save_dir earlier:
MODEL_PATH = "/home/23ucc611/SWE/Poaching/poacher_obb/exp1/weights/best.pt"  # change if needed
DATA_YAML = "/home/23ucc611/SWE/Poaching/dataset/data.yaml"   # your uploaded file; adjust if not present. 

# Inputs (change per-run)
IMAGE_PATH = "/home/23ucc611/SWE/Poaching/dataset/test/images/0b1a3af197_jpg.rf.09fdfff449166b18d17638a9cb8aaf22.jpg"
IMAGE_FOLDER = "/home/23ucc611/SWE/Poaching/dataset/test/images"
VIDEO_PATH = "/path/to/input/video.mp4"
WEBCAM_SOURCE = 0   # 0 or 1 or RTSP URL

# Output dir
OUT_DIR = "/home/23ucc611/SWE/Poaching/inference_outputs"
os.makedirs(OUT_DIR, exist_ok=True)

CONF_THRES = 0.001
DEVICE = 0  # 0 = first GPU, "cpu" to force CPU


In [7]:
# Cell 3: load classes from data.yaml (if available) and load model
if os.path.exists(DATA_YAML):
    with open(DATA_YAML, 'r') as f:
        data = yaml.safe_load(f)
    names_dict = data.get('names', {})
    CLASS_NAMES = names_dict
else:
    CLASS_NAMES = None

print("Loaded classes:", len(CLASS_NAMES) if CLASS_NAMES else "no data.yaml found; using indices only")

# Load model
print("Loading model:", MODEL_PATH)
model = YOLO(MODEL_PATH)
print("Model loaded.")


Loaded classes: 72
Loading model: /home/23ucc611/SWE/Poaching/poacher_obb/exp1/weights/best.pt
Model loaded.


In [8]:
# Cell 4: helpers — extract detections, draw rotated boxes when possible, save outputs
def angle_to_degrees(angle):
    """Given angle from model (try to auto-detect radians vs degrees)."""
    # If angle magnitude > 2*pi treat as degrees already, else convert radians->deg
    if abs(angle) > 2*math.pi:
        return float(angle)
    else:
        return float(angle) * 180.0 / math.pi

def polygon_from_cxcywh_angle(cx, cy, w, h, angle_deg):
    """Return polygon (4x2) points from center, w,h and angle in degrees (cv2.boxPoints expects (center,(w,h),angle))."""
    box = ((float(cx), float(cy)), (float(w), float(h)), float(angle_deg))
    pts = cv2.boxPoints(box)  # returns float32 array shape (4,2)
    return pts.astype(int)

def draw_polygon(img, pts, color=(0,255,0), thickness=2):
    pts = np.array(pts, dtype=np.int32).reshape((-1,1,2))
    cv2.polylines(img, [pts], isClosed=True, color=color, thickness=thickness)

def color_for_class(clsid):
    # reproducible color per class
    return (int((clsid*37) % 255), int((clsid*17) % 255), int((clsid*97) % 255))

def extract_detections_from_result(res):
    """
    Input: `res` is a single ultralytics Result (one image/frame).
    Returns list of detection dicts:
      { 'class_id', 'class_name', 'conf', 'xyxy': [x1,y1,x2,y2],
        optional 'cxcywh_angle': [cx,cy,w,h,angle], optional 'polygon': [[x1,y1,...x4,y4]] }
    """
    dets = []
    boxes = getattr(res, 'boxes', None)
    if boxes is None:
        return dets

    # prefer to read attributes if present
    # 1) axis-aligned: boxes.xyxy
    try:
        xyxy = boxes.xyxy.cpu().numpy()  # shape (N,4)
    except Exception:
        xyxy = None

    # 2) try to read rotated format: some ultralytics versions provide boxes.xywh and boxes.xywh[:,4] angle
    # We'll attempt various fallbacks
    has_xywh = False
    cxcywh = None
    if hasattr(boxes, 'xywh'):
        try:
            cxcywh = boxes.xywh.cpu().numpy()  # shape (N,4) or (N,5)
            has_xywh = True
        except Exception:
            has_xywh = False

    # 3) some OBB outputs publish boxes.data with variable columns; we'll peek at .data if others fail
    maybe_data = None
    try:
        maybe_data = boxes.data.cpu().numpy()  # shape (N, M)
    except Exception:
        maybe_data = None

    # class ids and conf
    try:
        confs = boxes.conf.cpu().numpy()
        clsids = boxes.cls.cpu().numpy().astype(int)
    except Exception:
        # fallback
        confs = []
        clsids = []

    N = len(confs)
    for i in range(N):
        d = {}
        d['conf'] = float(confs[i])
        d['class_id'] = int(clsids[i]) if len(clsids)>i else None
        d['class_name'] = CLASS_NAMES[d['class_id']] if (CLASS_NAMES and d['class_id'] is not None and d['class_id'] < len(CLASS_NAMES)) else str(d['class_id'])

        if xyxy is not None and len(xyxy)>i:
            x1,y1,x2,y2 = xyxy[i].tolist()
            d['xyxy'] = [float(x1), float(y1), float(x2), float(y2)]

        # if xywh is present, try parse cx,cy,w,h,(maybe angle)
        if has_xywh and cxcywh is not None and len(cxcywh)>i:
            arr = cxcywh[i]
            if arr.shape[0] >= 4:
                cx, cy, w, h = arr[0], arr[1], arr[2], arr[3]
                # if angle present as 5th element:
                angle = None
                if arr.shape[0] >= 5:
                    angle = arr[4]
                    angle_deg = angle_to_degrees(angle)
                    d['cxcywh_angle'] = [float(cx), float(cy), float(w), float(h), float(angle_deg)]
                    # polygon from rotated box
                    d['polygon'] = polygon_from_cxcywh_angle(cx, cy, w, h, angle_deg).reshape(-1,2).tolist()
                else:
                    d['cxcywh'] = [float(cx), float(cy), float(w), float(h)]

        # last resort: try to parse maybe_data column structure (commonly: x1,y1,x2,y2,conf,cls, maybe angle or poly coords)
        if maybe_data is not None and len(maybe_data)>i:
            row = maybe_data[i]
            # try detect length: if >=9 maybe polygon coords present (cls,conf at end) - heuristic
            # We won't overwrite existing parsed fields, only add polygon if possible
            if 'polygon' not in d:
                if len(row) >= 8:
                    # attempt to detect polygon if values look like polygon (8 coords)
                    # Heuristic: last two elements often conf and cls; if row starts with class then use different ordering - skip for safety
                    # We'll attempt to find 8 sequential coordinates in row (normalized or absolute)
                    # Very best-effort: find any continuous sequence of 8 float-like numbers between 0 and max(image size)
                    vals = row.tolist()
                    floats = [float(v) for v in vals]
                    # try contiguous subarray of length 8 where values look like coords between 0 and 1 or >0 (we cannot know)
                    for start in range(max(0, len(floats)-10)):
                        sub = floats[start:start+8]
                        if len(sub)==8:
                            # accept this as polygon
                            poly = [[sub[0],sub[1]],[sub[2],sub[3]],[sub[4],sub[5]],[sub[6],sub[7]]]
                            d['polygon'] = poly
                            break

        dets.append(d)
    return dets

def annotate_image_with_detections(img_bgr, detections, conf_thres=CONF_THRES, names=CLASS_NAMES):
    out = img_bgr.copy()
    for d in detections:
        if d['conf'] < conf_thres:
            continue
        clsid = d.get('class_id', -1)
        label_text = f"{d.get('class_name', clsid)}:{d['conf']:.2f}"
        color = color_for_class(clsid if clsid is not None else 0)

        # draw polygon if available
        if 'polygon' in d:
            pts = np.array(d['polygon'], dtype=np.int32)
            # if polygon coords are normalized (0..1) check and convert - assume absolute for now
            # if coords seem within 0..1, we cannot convert without image dimensions - skipping that complexity for now
            try:
                draw_polygon(out, pts, color=color, thickness=2)
                # draw label at first point
                x,y = int(pts[0][0]), int(pts[0][1])
                cv2.putText(out, label_text, (x, max(0,y-8)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1, cv2.LINE_AA)
                continue
            except Exception:
                pass

        # else fallback to xyxy
        if 'xyxy' in d:
            x1,y1,x2,y2 = map(int, d['xyxy'])
            cv2.rectangle(out, (x1,y1), (x2,y2), color, 2)
            (tw, th), _ = cv2.getTextSize(label_text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
            cv2.rectangle(out, (x1, y1 - th - 6), (x1 + tw, y1), color, -1)
            cv2.putText(out, label_text, (x1, y1 - 4), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1, cv2.LINE_AA)
        # if only cxcywh present we can compute axis-aligned box
        elif 'cxcywh' in d:
            cx,cy,w,h = d['cxcywh']
            x1 = int(cx - w/2); y1 = int(cy - h/2); x2 = int(cx + w/2); y2 = int(cy + h/2)
            cv2.rectangle(out, (x1,y1),(x2,y2), color, 2)
            cv2.putText(out, label_text, (x1, max(0,y1-4)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1)
    return out


In [9]:
# Cell 5: single-image inference (runs model, extracts detections, saves annotated image + CSV/JSON)
def infer_image(image_path, save_dir=OUT_DIR, conf_thres=CONF_THRES, device=DEVICE):
    p = Path(image_path)
    out_img_path = Path(save_dir) / (p.stem + "_pred.jpg")
    out_csv_path = Path(save_dir) / (p.stem + "_pred.csv")
    out_json_path = Path(save_dir) / (p.stem + "_pred.json")

    results = model.predict(source=str(image_path), conf=conf_thres, device=device, verbose=False)
    res = results[0]
    # read image
    img_bgr = cv2.imread(str(image_path))
    detections = extract_detections_from_result(res)
    # save CSV
    df_rows = []
    for d in detections:
        row = {
            "class_id": d.get('class_id'),
            "class_name": d.get('class_name'),
            "conf": d.get('conf'),
            "xyxy": d.get('xyxy'),
            "cxcywh": d.get('cxcywh'),
            "cxcywh_angle": d.get('cxcywh_angle'),
            "polygon": d.get('polygon')
        }
        df_rows.append(row)
    if df_rows:
        df = pd.DataFrame(df_rows)
        df.to_csv(out_csv_path, index=False)
        with open(out_json_path, 'w') as f:
            json.dump(df_rows, f, indent=2)
    # annotate & save
    annotated = annotate_image_with_detections(img_bgr, detections, conf_thres=conf_thres)
    cv2.imwrite(str(out_img_path), annotated)
    print("Saved annotated image:", out_img_path)
    print("Saved detections CSV/JSON:", out_csv_path, out_json_path)
    return detections

# Example usage:
detections = infer_image(IMAGE_PATH)


Saved annotated image: /home/23ucc611/SWE/Poaching/inference_outputs/0b1a3af197_jpg.rf.09fdfff449166b18d17638a9cb8aaf22_pred.jpg
Saved detections CSV/JSON: /home/23ucc611/SWE/Poaching/inference_outputs/0b1a3af197_jpg.rf.09fdfff449166b18d17638a9cb8aaf22_pred.csv /home/23ucc611/SWE/Poaching/inference_outputs/0b1a3af197_jpg.rf.09fdfff449166b18d17638a9cb8aaf22_pred.json


In [10]:
# Cell 6: folder batch inference (saves annotated images + single CSV with all detections)
def infer_folder(folder_path, save_dir=OUT_DIR, conf_thres=CONF_THRES, device=DEVICE):
    folder = Path(folder_path)
    out_folder = Path(save_dir) / "batch"
    out_folder.mkdir(parents=True, exist_ok=True)
    all_rows = []
    patterns = ["*.jpg","*.jpeg","*.png","*.bmp","*.tif","*.tiff"]
    files = []
    for p in patterns:
        files += list(folder.rglob(p))
    files = sorted(files)
    print("Images found:", len(files))
    for img_path in tqdm(files):
        results = model.predict(source=str(img_path), conf=conf_thres, device=device, verbose=False)
        res = results[0]
        detections = extract_detections_from_result(res)
        # annotate
        img = cv2.imread(str(img_path))
        annotated = annotate_image_with_detections(img, detections, conf_thres=conf_thres)
        out_img_path = out_folder / (img_path.stem + "_pred.jpg")
        cv2.imwrite(str(out_img_path), annotated)
        # collect rows
        for d in detections:
            row = {
                "image": str(img_path),
                "class_id": d.get('class_id'),
                "class_name": d.get('class_name'),
                "conf": d.get('conf'),
                "xyxy": d.get('xyxy'),
                "cxcywh_angle": d.get('cxcywh_angle'),
                "polygon": d.get('polygon')
            }
            all_rows.append(row)
    # save master csv/json
    df = pd.DataFrame(all_rows)
    df.to_csv(Path(save_dir)/"batch_detections.csv", index=False)
    with open(Path(save_dir)/"batch_detections.json",'w') as f:
        json.dump(all_rows, f, indent=2)
    print("Batch inference finished. Annotated images:", out_folder)
    print("Saved batch_detections.csv/json in", save_dir)

# Example usage:
infer_folder(IMAGE_FOLDER)


Images found: 1796


100%|███████████████████████████████████████████████████████████████████████████████| 1796/1796 [01:39<00:00, 18.08it/s]

Batch inference finished. Annotated images: /home/23ucc611/SWE/Poaching/inference_outputs/batch
Saved batch_detections.csv/json in /home/23ucc611/SWE/Poaching/inference_outputs





In [13]:
# Cell 7: video inference (process frames in batches -> annotate -> save output video)
def infer_video(video_path, save_dir=OUT_DIR, conf_thres=CONF_THRES, device=DEVICE, batch_frames=8):
    cap = cv2.VideoCapture(str(video_path))
    if not cap.isOpened():
        raise FileNotFoundError(f"Cannot open video: {video_path}")
    fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
    w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    out_video_path = Path(save_dir) / (Path(video_path).stem + "_pred.mp4")
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    writer = cv2.VideoWriter(str(out_video_path), fourcc, fps, (w,h))
    frames = []
    frame_idx = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frames.append(frame)
        if len(frames) >= batch_frames:
            results = model.predict(frames, conf=conf_thres, device=device, verbose=False)
            for fimg, res in zip(frames, results):
                dets = extract_detections_from_result(res)
                ann = annotate_image_with_detections(fimg, dets, conf_thres=conf_thres)
                writer.write(ann)
            frames = []
    # remaining
    if frames:
        results = model.predict(frames, conf=conf_thres, device=device, verbose=False)
        for fimg, res in zip(frames, results):
            dets = extract_detections_from_result(res)
            ann = annotate_image_with_detections(fimg, dets, conf_thres=conf_thres)
            writer.write(ann)
    cap.release()
    writer.release()
    print("Saved annotated video to:", out_video_path)

# Example usage:
# infer_video(VIDEO_PATH)


In [14]:
# Cell 8: webcam / RTSP stream (display; optional saving)
def infer_stream(source=WEBCAM_SOURCE, save=False, save_dir=OUT_DIR, conf_thres=CONF_THRES, device=DEVICE):
    cap = cv2.VideoCapture(source)
    if not cap.isOpened():
        raise RuntimeError(f"Unable to open stream: {source}")
    fps = cap.get(cv2.CAP_PROP_FPS) or 20.0
    w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    writer = None
    if save:
        out_path = Path(save_dir)/(f"stream_{Path(str(source)).stem}_pred.mp4")
        writer = cv2.VideoWriter(str(out_path), cv2.VideoWriter_fourcc(*"mp4v"), fps, (w,h))
    print("Press 'q' to stop.")
    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            results = model.predict(frame, conf=conf_thres, device=device, verbose=False)
            res = results[0]
            dets = extract_detections_from_result(res)
            ann = annotate_image_with_detections(frame, dets, conf_thres=conf_thres)
            cv2.imshow("YOLOv8-OBB Stream", ann)
            if writer:
                writer.write(ann)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
    finally:
        cap.release()
        if writer:
            writer.release()
        cv2.destroyAllWindows()


In [15]:
# Cell 9: simple post-processing to raise Poacher/Threat alerts (map several classes -> threat)
THREAT_CLASS_NAMES = {"Hunter","Rifle","Pistol","Knife","X-Bow","Rope"}  # edit to your dataset's actual class names

def threats_in_detections(detections, conf_thres=CONF_THRES):
    threats = []
    for d in detections:
        if d['conf'] < conf_thres:
            continue
        name = d.get('class_name')
        if name in THREAT_CLASS_NAMES:
            threats.append((name, d['conf']))
    return threats

# Example: run image inference and print alerts
# dets = infer_image(IMAGE_PATH)
# t = threats_in_detections(dets)
# if t:
#     print("ALERT! threats detected:", t)
