** This is a research notebook for the functionalities listed in the report

In [None]:
# !pip install -q ultralytics torch torchvision matplotlib albumentations optuna
# ! pip install numpy==1.26.4

In [None]:
import os
import cv2
import yaml
import optuna
import pandas as pd
import matplotlib.pyplot as plt

from tqdm import tqdm
from ultralytics import YOLO

In [None]:
# MODEL_PATH = '/content/drive/MyDrive/data/vision_in_surgery/hw1_2'
MODEL_PATH = 'models'
os.environ['ULTRALYTICS_HUB_DIR'] = MODEL_PATH # Set the custom path
os.makedirs(MODEL_PATH, exist_ok=True) # Create the directory if it doesn't exist

In [None]:
! ls /datashare/HW1

In [None]:
# DATA_ROOT = "/content/drive/MyDrive/data/vision_in_surgery/hw1_2/HW1/labeled_image_data"
DATA_ROOT = "/datashare/HW1/labeled_image_data"
classes_path = os.path.join(DATA_ROOT, "classes.txt")
with open(classes_path, "r") as f:
    class_names = [line.strip() for line in f if line.strip()]

In [None]:
data_dict = {
    "train": os.path.join(DATA_ROOT, "images", "train"),
    "val": os.path.join(DATA_ROOT, "images", "val"),
    "nc": len(class_names),
    "names": class_names
}
data_yaml_path = os.path.join( "data.yaml")

# data_yaml_path = os.path.join(DATA_ROOT, "data.yaml")
# with open(data_yaml_path, "w") as f:
#     yaml.safe_dump(data_dict, f, sort_keys=False)

# Find Tune On Labeled images

In [None]:
model = YOLO("yolov8n.pt")

In [None]:
model.train(
    data=data_yaml_path,    # path to data.yaml
    epochs=50,              # number of epochs
    imgsz=640,              # training image size (pixels)
    batch=16,               # batch size (adjust to your GPU)
    lr0=1e-3,               # initial learning rate
    project=f"{MODEL_PATH}/runs/finetune",# where to save runs
    name="yolov8_finetuned",# run name
    verbose=True
)

#### optuna

In [None]:
def objective(trial):
    lr0 = trial.suggest_loguniform("lr0", 1e-4, 1e-2)
    momentum = trial.suggest_uniform("momentum", 0.6, 0.95)
    weight_decay = trial.suggest_loguniform("weight_decay", 1e-5, 1e-2)
    batch = trial.suggest_categorical("batch", [8, 16, 32])
    epochs = 10  # short runs for search

    # 3.2 train
    model = YOLO("yolov8n.pt")
    model.train(
        data=data_yaml_path,    # path to data.yaml
        epochs=epochs,              # number of epochs
        imgsz=640,              # training image size (pixels)
        batch=batch,               # batch size (adjust to your GPU)
        lr0=lr0,
        momentum=momentum,
        weight_decay=weight_decay,
        project=f"{MODEL_PATH}/runs/optuna",# where to save runs
        name=f"yolov8_finetuned_trial{trial.number}",# run name
        verbose=True
        )

    # 3.3 read final mAP50 from metrics.csv
    results_path = os.path.join(f"{MODEL_PATH}/runs/optuna", f"yolov8_finetuned_trial{trial.number}", "results.csv")

    df = pd.read_csv(results_path)
    # take the mAP50 of the last epoch
    last_map50 = df.loc[df.epoch == df.epoch.max(), "metrics/mAP50(B)"].values[0]
    return last_map50

In [None]:
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=20, timeout=2*60*60)
print("Best trial:")
trial = study.best_trial
for key, val in trial.params.items():
    print(f"  {key}: {val}")
print(f"  mAP50: {trial.value:.4f}")

In [None]:
best_params = trial.params
best_params.update({"epochs": 75,
                    "project": f"{MODEL_PATH}/runs/optuna",
                    "name": "best_full"})
model.train(data=data_yaml_path,
            imgsz=640,
            verbose=True,
            **best_params)

In [None]:
!cp -r {MODEL_PATH}/runs/optuna/ .

# Psudo Labels on In Distribusion Video

In [None]:
SKIP_FRAMES = 5 # For speed skip some frames

In [None]:
PSUDO_PATH = f"{MODEL_PATH}/pseudo_labels"
FRAME_SAVE_DIR = f"{PSUDO_PATH}/images"
LABEL_SAVE_DIR = f"{PSUDO_PATH}/labels"

os.makedirs(FRAME_SAVE_DIR, exist_ok=True)
os.makedirs(LABEL_SAVE_DIR, exist_ok=True)

### utils functions

In [None]:
def filter_bboxes(results, conf_threshold):
    high_conf_boxes = []
    high_conf_classes = []
    high_conf_scores = []
    for box in results.boxes:
      conf = float(box.conf.cpu().numpy())
      if conf > conf_threshold:
        x1, y1, x2, y2 = map(float, box.xyxy.cpu().numpy().reshape(-1).tolist())
        cls_idx = int(box.cls.cpu().numpy())
        high_conf_boxes.append((x1, y1, x2, y2))
        high_conf_classes.append(cls_idx)
        high_conf_scores.append(conf)
    if len(high_conf_boxes) < len(results):
      # If no boxes pass the threshold, return empty lists
      return [], [], []
    return high_conf_boxes, high_conf_classes, high_conf_scores


def save_frame(frame, frame_idx, frame_save_dir, names_prefix):
    img_filename = f"{names_prefix}_{frame_idx:06d}.jpg"  # e.g. 000123.jpg
    img_path = os.path.join(frame_save_dir, img_filename)
    cv2.imwrite(img_path, frame)
    return img_filename

def save_label(frame_idx,
               high_conf_boxes,
               high_conf_classes,
               high_conf_scores,
               W,
               H,
               label_save_dir,
               names_prefix):
    label_filename = f"{names_prefix}_{frame_idx:06d}.txt"
    label_path = os.path.join(label_save_dir, label_filename)
    with open(label_path, "w") as f_label:
      for (x1, y1, x2, y2), cls_idx in zip(high_conf_boxes, high_conf_classes):
        # Convert [x1,y1,x2,y2] to normalized [x_center, y_center, w, h]
        box_w = x2 - x1
        box_h = y2 - y1
        x_center = x1 + box_w / 2
        y_center = y1 + box_h / 2

        # Normalize by image width/height
        x_c_norm = x_center / W
        y_c_norm = y_center / H
        w_norm = box_w / W
        h_norm = box_h / H

        # Write line: "class_idx x_center y_center w h"
        f_label.write(f"{cls_idx} {x_c_norm:.6f} {y_c_norm:.6f} "
                      f"{w_norm:.6f} {h_norm:.6f}\n")

    return label_filename

## Find tuning

In [None]:
# model = YOLO(f"{MODEL_PATH}/runs/optuna/best_full/weights/best.pt")
model = YOLO("fine_tune_best.pt")

In [None]:
cap = cv2.VideoCapture(f"{MODEL_PATH}/HW1/id_video_data/20_2_24_1.mp4")
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
total_frames

In [None]:
def cluster_feature_extraction(frame):
  small = cv2.resize(frame, (224, 224), interpolation=cv2.INTER_AREA)
  hsv = cv2.cvtColor(small, cv2.COLOR_BGR2HSV)
  hist = cv2.calcHist(
      [hsv], [0, 1, 2], None,
      (8, 8, 8),  # e.g. (8,8,8) bins
      [0, 180, 0, 256, 0, 256]
  )
  hist = cv2.normalize(hist, hist).flatten()
  return hist

In [None]:
def create_pseudo_labels(model_path, video_path, conf_threshold, output_path=PSUDO_PATH):
  names_prefix = video_path.split('/')[-1].split('.')[0]
  frame_output_dir = f"{output_path}/images"
  label_output_dir = f"{output_path}/labels"
  os.makedirs(frame_output_dir, exist_ok=True)
  os.makedirs(label_output_dir, exist_ok=True)

  model = YOLO(model_path)
  clustering_featuers = []
  cap = cv2.VideoCapture(video_path)
  total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
  print(f"Total frames: {total_frames}")
  with tqdm(total=total_frames // SKIP_FRAMES + 1, desc="Frames processed") as pbar:
    frame_idx = 0
    while True:
      ret, frame = cap.read()
      if not ret:
        break

      if frame_idx % SKIP_FRAMES != 0:
        frame_idx += 1
        continue

      H, W = frame.shape[:2]

      results = model(frame, verbose=False)
      results = results[0]

      high_conf_boxes, high_conf_classes, high_conf_scores = filter_bboxes(results, conf_threshold)

      if len(high_conf_boxes) == 0:
              frame_idx += 1
              pbar.update(1)
              continue

      img_filename = save_frame(frame, frame_idx, frame_output_dir, names_prefix)
      label_filename = save_label(frame_idx,
                                  high_conf_boxes,
                                  high_conf_classes,
                                  high_conf_scores,
                                  W,
                                  H,
                                  label_output_dir,
                                  names_prefix)


      clustering_featuers.append((img_filename,
                                  label_filename,
                                  cluster_feature_extraction(frame)))

      frame_idx += 1
      pbar.update(1)

  cap.release()
  return clustering_featuers

### Fine tune on Psudo labels

In [None]:
def fine_tune_on_psudo_labels(model_path, psudo_labels_path, name):
  model = YOLO(model_path)
  psudo_data_dict = {
    # "train": f"{psudo_labels_path}/images",
    "train": f"images",
    "val": os.path.join("../../../models/pseudo_labels/validation_labeled/images"),
    "nc": len(class_names),
    "names": class_names,
    }

  psudo_data_yaml_path = os.path.join(psudo_labels_path, "data.yaml")
  with open(psudo_data_yaml_path, "w") as f:
      yaml.safe_dump(psudo_data_dict, f, sort_keys=False)

  results = model.train(
          data=psudo_data_yaml_path,    # path to data.yaml
          epochs=100,              # number of epochs
          imgsz=640,              # training image size (pixels)
          batch=16,               # batch size (adjust to your GPU)
          lr0=1e-3,               # initial learning rate
          project=f"{MODEL_PATH}/runs/psudo",# where to save runs
          name=name,# run name
          verbose=False,
          pretrained=True)
  return results

In [None]:
# model = fine_tune_on_psudo_labels(f"{MODEL_PATH}/runs/optuna/best_full/weights/best.pt", f'{PSUDO_PATH}/testing')

## Iterative Pseudo Labeling

In [None]:

conf_thresholds = [0.5, 0.6, 0.7, 0.8]

# best_model_path = f"{MODEL_PATH}/runs/optuna/best_full/weights/best.pt"
# model_path = f"{MODEL_PATH}/runs/optuna/best_full/weights/best.pt"
# video_path1 = f"{MODEL_PATH}/HW1/id_video_data/20_2_24_1.mp4"
# video_path2 = f"{MODEL_PATH}/HW1/id_video_data/4_2_24_B_2.mp4"

model_path = f"fine_tune_best.pt"
video_path1 = f"/datashare/HW1/id_video_data/20_2_24_1.mp4"
video_path2 = f"/datashare/HW1/id_video_data/4_2_24_B_2.mp4"
original_images_path = f"{DATA_ROOT}/images/train"
original_labels_path = f"{DATA_ROOT}/labels/train"

In [None]:
# create psudo dataset for each threshold:
for i, conf_threshold in enumerate(conf_thresholds):
  print(f"conf_threshold: {conf_threshold}")
  name = f"run_{i}_conf_threshold_{conf_threshold}"
  output_path = f"{PSUDO_PATH}/{name}"
  for video_path in [video_path1, video_path2]:
    clustering_featuers = create_pseudo_labels(model_path,
                                              video_path,
                                              conf_threshold,
                                              output_path)

In [None]:
# fine tune model on psudo labeled data
results_history = []

for i, conf_threshold in enumerate(conf_thresholds):
  print(f"conf_threshold: {conf_threshold}")
  name = f"run_{i}_conf_threshold_{conf_threshold}"
  output_path = f"{PSUDO_PATH}/{name}"
  model_path = f"{MODEL_PATH}/runs/psudo/{name}/weights/best.pt"
  best_model_path = model_path if os.path.exists(model_path) else "fine_tune_best.pt"
  results = fine_tune_on_psudo_labels(best_model_path, output_path, name)
  results_history.append((name, results))
  

In [None]:
results_history = []
for i, conf_threshold in enumerate(conf_thresholds):
  print(f"conf_threshold: {conf_threshold}")
  name = f"run_{i}_conf_threshold_{conf_threshold}"
  output_path = f"{PSUDO_PATH}/{name}"
  model = YOLO(f"{MODEL_PATH}/runs/psudo/{name}/weights/best.pt")
  data_yaml_path = f'{output_path}/data.yaml'
  results = model.val(data=data_yaml_path, imgsz=640, verbose=True)
  results_history.append((name, results))

In [None]:
# aggregate results
names = list(map(lambda x: x[0], results_history))
aps = list(map(lambda x: x[1].box.map50, results_history))
run_num = list(map(lambda x: float(x.split('_')[1]), names))
conf_thresholds = list(map(lambda x: float(x.split('_')[-1]), names))
mrs = list(map(lambda x: x[1].box.mr, results_history))

In [None]:
# plot results

# Create the plot
plt.figure(figsize=(10, 6))

# Plot mAP50
plt.plot(run_num, aps, label='mAP50', marker='o')
for i, txt in enumerate(conf_thresholds):
    plt.annotate(f'conf={txt}', (run_num[i], aps[i]), textcoords="offset points", xytext=(0,10), ha='center')

# Plot Mean Recall
plt.plot(run_num, mrs, label='Mean Recall', marker='s')
for i, txt in enumerate(conf_thresholds):
    plt.annotate(f'conf={txt}', (run_num[i], mrs[i]), textcoords="offset points", xytext=(0,-15), ha='center')

# Labels and title
plt.xlabel('Run Number')
plt.ylabel('Score')
plt.title('mAP50 and Mean Recall vs Run Number')
plt.legend()
plt.grid(True)

# Save and show
plt.savefig('mean_recall_map50_vs_run_num.png')
plt.show()
