In [None]:
# Install deps if not already installed (CPU version)
# %pip uninstall -y torch torchvision torchaudio
# %pip install --upgrade torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu128

# %pip install --upgrade ultralytics 

In [None]:
import torch, ultralytics
from ultralytics import YOLO
print("PyTorch:", torch.__version__)

DEVICE = 0 if torch.cuda.is_available() else "cpu"
print("Device available:", DEVICE)

print("Ultralytics:", ultralytics.__version__)

In [None]:
model = YOLO("yolov8m.pt")
results = model.train(
    data="data.yaml",
    epochs=300,
    patience=25,
    imgsz=512,         
    conf=0.70,
    batch=-1,          
    device=0,
    workers=4,       
    lr0=5e-4,
    cos_lr=True,
    degrees=0.0, # == 0.0
    shear=10.0, # == 10.0
    translate=0.05, # == 0.05
    scale=0.2, # == 0.2
    fliplr=0.5,
    flipud=0.0,
    mosaic=0.2, # == 0.2
    mixup=0.0,
    close_mosaic=10, # == 10
    name="yolov8s_the_final_run_wPSeudo"
)


# Run the code below to run images

In [None]:
import output_format
import extract_frame
import glob
import os
import shutil
from ultralytics import YOLO

# images_cat1_path = "data_masked/cat1_test_validation_vid7"
# os.makedirs(images_cat1_path, exist_ok=True)

# # --------- extraction zone ----------- #
# video_path = "data/validation/video/7_fps1.mp4"
# # output_dir = "data_masked/cat1_test_validation_fixed_index"
# output_dir = images_cat1_path
# extract_frame.extract_frames(video_path, output_dir)
# ------------------------------------- #

model = YOLO("runs/detect/yolov8s_god20/weights/best.pt")
# video_path = "/mnt/d/SurgVU 25/surgvu24_videos_only/surgvu24/case_001/case_001_video_part_001.mp4""
pred = model.predict(
    source="/home/kuo/yolo/for_pseudo_ltr_after_mask/8",
    device=DEVICE,
    imgsz=512,
    max_det = 4,
    # iou = 0.4,
    conf=0.75, # confidence threshold set to 0.25 temp
    save=True,
    visualize = False,
    # stream=True,
)
output_format.output_format_as_json(pred)

print("Saved predictions to:", pred[0].save_dir if pred else "n/a")

#####################

# Paths
output_labels = "/home/kuo/yolo/for_pseudo_ltr_b4_split/8/labels"
output_images = "/home/kuo/yolo/for_pseudo_ltr_b4_split/8/images"

# Make sure output dirs exist
os.makedirs(output_labels, exist_ok=True)
os.makedirs(output_images, exist_ok=True)

# Save predictions as YOLO-format txt and copy images
for r in pred:
    img_path = r.path
    name = os.path.splitext(os.path.basename(img_path))[0]
    label_path = os.path.join(output_labels, f"{name}.txt")

    # Write predictions
    with open(label_path, "w") as f:
        for box in r.boxes:
            cls = int(box.cls)         # class id
            conf = float(box.conf)     # confidence
            x1, y1, x2, y2 = box.xyxy[0]  # absolute coords

            # Convert to YOLO format
            x_center = ((x1 + x2) / 2) / r.orig_shape[1]
            y_center = ((y1 + y2) / 2) / r.orig_shape[0]
            width = (x2 - x1) / r.orig_shape[1]
            height = (y2 - y1) / r.orig_shape[0]

            if conf > 0.75:  # keep only confident boxes
                f.write(f"{cls} {x_center:.6f} {y_center:.6f} {width:.6f} {height:.6f}\n")

    # Check if label file is empty
    if os.path.getsize(label_path) == 0:
        # No detections → remove label and skip copying image
        os.remove(label_path)
        print(f"🗑️ Removed empty label: {label_path}")
    else:
        # Only copy image if there was at least one detection
        dst_img = os.path.join(output_images, os.path.basename(img_path))
        shutil.copy(img_path, dst_img)

print("✅ Pseudo-labels and images saved (empty ones removed).")



In [None]:
import os
import glob
import json
import shutil
from pathlib import Path

from ultralytics import YOLO

# Paths
output_labels = "Cat1_vid1_frames/pseudo_cat1_vid6/labels"
image_source = "data_masked/cat1_test_validation_vid6"
output_images = "Cat1_vid1_frames/pseudo_cat1_vid6/images"

# Make sure output dirs exist
os.makedirs(output_labels, exist_ok=True)
os.makedirs(output_images, exist_ok=True)

# Load model and run inference (assuming pred is already created in your code)
# model = YOLO("runs/detect/train/weights/best.pt")
# pred = model.predict(source=image_source, conf=0.5)

# Save predictions as YOLO-format txt
for r in pred:
    img_path = r.path
    name = os.path.splitext(os.path.basename(img_path))[0]
    label_path = os.path.join(output_labels, f"{name}.txt")

    with open(label_path, "w") as f:
        for box in r.boxes:
            cls = int(box.cls)         # class id
            conf = float(box.conf)     # confidence
            x1, y1, x2, y2 = box.xyxy[0]  # absolute coords

            # Convert to YOLO format
            x_center = ((x1 + x2) / 2) / r.orig_shape[1]
            y_center = ((y1 + y2) / 2) / r.orig_shape[0]
            width = (x2 - x1) / r.orig_shape[1]
            height = (y2 - y1) / r.orig_shape[0]

            if conf > 0.25:  # keep only confident boxes
                f.write(f"{cls} {x_center:.6f} {y_center:.6f} {width:.6f} {height:.6f}\n")

# Paths
json_gt = "Cat1_vid1_frames/6_fps1_gc.json"   # JSON with GT boxes

# Load groundtruth JSON
with open(json_gt, "r") as f:
    gt_data = json.load(f)

# Build lookup: frame_number -> allowed tool names
gt_map = {}
for box in gt_data.get("boxes", []):
    name = box["name"]  # e.g., slice_nr_45_needle_driver
    parts = name.split("_")
    frame_num = int(parts[2])  # slice_nr_45 -> 45
    tool_name = "_".join(parts[3:])
    gt_map.setdefault(frame_num, set()).add(tool_name)

# Class mapping (YOLO ID → tool name)
id2name = [
    "bipolar_forceps",
    "cadiere_forceps",
    "clip_applier",
    "force_bipolar",
    "grasping_retractor",
    "monopolar_curved_scissors",
    "needle_driver",
    "permanent_cautery_hook_spatula",
    "prograsp_forceps",
    "stapler",
    "tip_up_fenestrated_grasper",
    "vessel_sealer",
]

# Step 2 & 3: check .txt and copy images
for file in glob.glob(os.path.join(output_labels, "*.txt")):
    base_name = os.path.splitext(os.path.basename(file))[0]
    # 1. Remove if empty
    if os.path.getsize(file) == 0:
        # print(base_name)
        os.remove(file)
        img_file = os.path.join(image_source, base_name + ".jpg")
        # if os.path.exists(img_file):
        #     os.remove(img_file)
        continue


    # 2. Extract frame number from filename: slice_nr_45 -> 45
    try:
        parts = base_name.split("_")
        # print(parts)
        frame_num = int(parts[1])
        # print(frame_num)
    except (IndexError, ValueError):
        # print(f"Skipping {base_name} (bad name)")
        continue

    with open(file, "r") as f:
        lines = f.readlines()

    filtered_lines = []
    for line in lines:
        cls_id = int(line.strip().split()[0])
        tool_name = id2name[cls_id]
        if tool_name in gt_map.get(frame_num, set()):
            filtered_lines.append(line)

    if len(filtered_lines) == 0:
        # No correct predictions remain → remove .txt and image
        os.remove(file)
        img_file = os.path.join(image_source, base_name + ".jpg")
        if os.path.exists(img_file):
            os.remove(img_file)
        continue
    else:
        # Save filtered txt (overwrite original)
        with open(file, "w") as f:
            f.writelines(filtered_lines)

    # Step 4: copy the corresponding image
    src_img = os.path.join(image_source, base_name + ".jpg")
    if os.path.exists(src_img):
        dst_img = os.path.join(output_images, f"{base_name}.jpg")
        shutil.copy(src_img, dst_img)




# MASKING (clean up the black borders)

In [None]:
# only maks not split -Ball
from pathlib import Path
from PIL import Image, ImageDraw



# Mask dimensions
LEFT, TOP, RIGHT, BOTTOM = 190, 55, 190, 30

def mask_image(in_path: Path, out_path: Path):
    im = Image.open(in_path).convert("RGB")
    W, H = im.size
    draw = ImageDraw.Draw(im)

    # Mask top
    draw.rectangle([0, 0, W, TOP], fill=(0, 0, 0))
    # Mask bottom
    draw.rectangle([0, H - BOTTOM, W, H], fill=(0, 0, 0))
    # Mask left
    draw.rectangle([0, 0, LEFT, H], fill=(0, 0, 0))
    # Mask right
    draw.rectangle([W - RIGHT, 0, W, H], fill=(0, 0, 0))

    out_path.parent.mkdir(parents=True, exist_ok=True)
    im.save(out_path, quality=95)

for i in range(1, 9):
    SRC = Path(f"/home/kuo/yolo/for_pseudo_ltr/{i}")   # source folder
    DST = Path(f"/home/kuo/yolo/for_pseudo_ltr_after_mask/{i}")  # destination folder

    # Mask all images recursively from SRC into DST
    for img_path in SRC.rglob("*.*"):
        if img_path.suffix.lower() in [".jpg", ".jpeg", ".png"]:
            rel_path = img_path.relative_to(SRC)
            out_img = DST / rel_path
            mask_image(img_path, out_img)

    print("✅ Masked dataset created at:", DST)



In [None]:
from pathlib import Path
from PIL import Image, ImageDraw 
import shutil 

SRC = Path("/home/kuo/yolo/new_data_sept8_split")
DST = Path("/home/kuo/yolo/new_data_sept8_after_mask")

LEFT, TOP, RIGHT, BOTTOM = 190, 55, 190, 30

def mask_image(in_path: Path, out_path: Path):
    im = Image.open(in_path).convert("RGB")
    W, H = im.size
    draw = ImageDraw.Draw(im)

    # mask top
    draw.rectangle([0, 0, W, TOP], fill=(0, 0, 0))
    # mask bottom
    draw.rectangle([0, H - BOTTOM, W, H], fill=(0, 0, 0))
    # mask left
    draw.rectangle([0, 0, LEFT, H], fill=(0, 0, 0))
    # mask right
    draw.rectangle([W - RIGHT, 0, W, H], fill=(0, 0, 0))

    out_path.parent.mkdir(parents=True, exist_ok=True)
    im.save(out_path, quality=95)

# Process train + validation sets
for split in ["train", "validation"]:
    img_dir = SRC / split / "images"
    lbl_dir = SRC / split / "labels"
    out_img_dir = DST / split / "images"
    out_lbl_dir = DST / split / "labels"
    out_img_dir.mkdir(parents=True, exist_ok=True)
    out_lbl_dir.mkdir(parents=True, exist_ok=True)

    for img_path in img_dir.glob("*.*"):
        # out_img = out_img_dir / img_path.name
        shutil.copy2(img_path, out_img_dir / img_path.name)
        # mask_image(img_path, out_img)

        # copy label without change
        lbl_path = lbl_dir / (img_path.stem + ".txt")
        if lbl_path.exists():
            shutil.copy2(lbl_path, out_lbl_dir / lbl_path.name)

print("Masked dataset created at:", DST)
print("Update data.yaml path to:", DST)

# Comparing to Ground Truth

In [None]:
import json
import numpy as np
from collections import defaultdict

def corners_to_xyxy(corners):
    """Convert 4 corners -> [xmin, ymin, xmax, ymax]."""
    xs = [p[0] for p in corners]
    ys = [p[1] for p in corners]
    return [min(xs), min(ys), max(xs), max(ys)]

def load_boxes(json_file):
    """Load bounding boxes grouped by slice_nr."""
    with open(json_file, "r") as f:
        data = json.load(f)
    
    results = defaultdict(list)
    for box in data["boxes"]:
        name = box["name"]  # e.g. slice_nr_45_needle_driver
        parts = name.split("_")
        frame = int(parts[2])  # 45
        label = "_".join(parts[3:])  # needle_driver, etc.
        coords = corners_to_xyxy(box["corners"])
        results[frame].append({"label": label, "bbox": coords})
    return results

def iou(box1, box2):
    """calculate the iou value"""
    # box = [xmin, ymin, xmax, ymax]
    xA = max(box1[0], box2[0])
    yA = max(box1[1], box2[1])
    xB = min(box1[2], box2[2])
    yB = min(box1[3], box2[3])

    inter = max(0, xB - xA) * max(0, yB - yA)
    area1 = (box1[2]-box1[0]) * (box1[3]-box1[1])
    area2 = (box2[2]-box2[0]) * (box2[3]-box2[1])
    union = area1 + area2 - inter

    return inter / union if union > 0 else 0

### Compare predictions

In [None]:
ground_truth = load_boxes("ground_truth.json") # some ground_truth file
compared_predictions = output_format.output_format_as_json(pred)

for frame in ground_truth:
    print(f"\nFrame {frame}")
    gt_boxes = ground_truth[frame]
    pred_boxes = compared_predictions.get(frame, [])
    
    for g in gt_boxes:
        best_iou = 0
        best_pred = None
        for p in pred_boxes:
            if g["label"] == p["label"]:  # same class
                print(f"    class {g["label"]} gt/p:")
                print(f"      {g["bbox"]}/ {p["bbox"]}")
                score = iou(g["bbox"], p["bbox"])
                if score > best_iou:
                    best_iou = score
                    best_pred = p
        print(f"  GT: {g['label']} {g['bbox']} -> best IoU {best_iou:.2f}")