In [None]:
import os
from google.colab import drive

drive.mount('/content/drive')

PROJECT_ROOT = "/content/drive/MyDrive/Kidney_Stone_YOLOv10_Attention"
os.makedirs(PROJECT_ROOT, exist_ok=True)
os.chdir(PROJECT_ROOT)
print(f"Project directory set to: {PROJECT_ROOT}")

!pip install ultralytics -q

DATASET_PATH = "/content/drive/MyDrive/kidney-dataset"
YOLO_DATA_YAML = os.path.join(DATASET_PATH, 'data.yaml')

if not os.path.exists(YOLO_DATA_YAML):
    print(f"ERROR: 'data.yaml' not found at {YOLO_DATA_YAML}. Please check the path.")
else:
    print(f"✅ Successfully located dataset at: {DATASET_PATH}")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Project directory set to: /content/drive/MyDrive/Kidney_Stone_YOLOv10_Attention
✅ Successfully located dataset at: /content/drive/MyDrive/kidney-dataset


In [None]:
from ultralytics import YOLO

YOLO_PROJECT_NAME = "YOLOv10l_Localizer"
YOLO_BEST_WEIGHTS = os.path.join(PROJECT_ROOT, YOLO_PROJECT_NAME, 'weights/best.pt')

if os.path.exists(YOLO_BEST_WEIGHTS):
    print(f"YOLOv10l model already trained. Weights found at: {YOLO_BEST_WEIGHTS}")
else:
    print("Starting YOLOv10l training...")
    model_yolo = YOLO('yolov10l.pt')
    model_yolo.train(
        data=YOLO_DATA_YAML,
        epochs=75,
        imgsz=640,
        batch=8, 
        project=PROJECT_ROOT,
        name=YOLO_PROJECT_NAME,
        exist_ok=True
    )
    print("\nYOLOv10l training complete.")

In [None]:
import cv2
import numpy as np
from tqdm.notebook import tqdm
from PIL import Image

# --- Segmentation Mask Generation ---
print("Generating segmentation masks from YOLO labels...")
SEG_DATA_PATH = os.path.join(PROJECT_ROOT, "segmentation_data")

def create_masks_for_split(split_name):
    print(f"Processing '{split_name}' split...")
    image_dir = os.path.join(DATASET_PATH, split_name, 'images')
    label_dir = os.path.join(DATASET_PATH, split_name, 'labels')

    dest_img_dir = os.path.join(SEG_DATA_PATH, split_name, 'images')
    dest_mask_dir = os.path.join(SEG_DATA_PATH, split_name, 'masks')
    os.makedirs(dest_img_dir, exist_ok=True)
    os.makedirs(dest_mask_dir, exist_ok=True)

    for img_filename in tqdm(os.listdir(image_dir)):
        img_path = os.path.join(image_dir, img_filename)
        label_path = os.path.join(label_dir, os.path.splitext(img_filename)[0] + '.txt')

        img = Image.open(img_path)
        img_w, img_h = img.size

        mask = np.zeros((img_h, img_w), dtype=np.uint8)

        if os.path.exists(label_path):
            with open(label_path, 'r') as f:
                for line in f:
                    _, x_center, y_center, w, h = map(float, line.split())
                    x1 = int((x_center - w/2) * img_w)
                    y1 = int((y_center - h/2) * img_h)
                    x2 = int((x_center + w/2) * img_w)
                    y2 = int((y_center + h/2) * img_h)
                    cv2.rectangle(mask, (x1, y1), (x2, y2), (255), -1)

        img.save(os.path.join(dest_img_dir, img_filename))
        cv2.imwrite(os.path.join(dest_mask_dir, img_filename), mask)

create_masks_for_split('train')
create_masks_for_split('valid')

print("\nSegmentation mask generation complete.")

In [2]:
!pip install ultralytics segmentation-models-pytorch timm albumentations -q

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/154.8 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m154.8/154.8 kB[0m [31m7.2 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
import torch
import segmentation_models_pytorch as smp
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import albumentations as A
from albumentations.pytorch import ToTensorV2
from tqdm.notebook import tqdm
from PIL import Image
import numpy as np
import os
import time

class KidneyStoneSegDataset(Dataset):
    def __init__(self, img_dir, mask_dir, transform=None):
        self.img_dir = img_dir
        self.mask_dir = mask_dir
        self.images = os.listdir(img_dir)
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.images[idx])
        mask_path = os.path.join(self.mask_dir, self.images[idx])
        image = np.array(Image.open(img_path).convert("RGB"))
        mask = np.array(Image.open(mask_path).convert("L"), dtype=np.float32)
        mask[mask == 255.0] = 1.0 

        if self.transform:
            augmentations = self.transform(image=image, mask=mask)
            image = augmentations["image"]
            mask = augmentations["mask"]
        return image, mask

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
SEG_PROJECT_NAME = "SegFormer_Analyst"
SEG_BEST_WEIGHTS = os.path.join(PROJECT_ROOT, SEG_PROJECT_NAME, 'segformer_best.pth')

if os.path.exists(SEG_BEST_WEIGHTS):
    print(f"SegFormer model already trained. Weights found at: {SEG_BEST_WEIGHTS}")
else:
    print("Setting up SegFormer training...")
    train_transform = A.Compose([
        A.Resize(height=512, width=512), A.Rotate(limit=35, p=1.0),
        A.HorizontalFlip(p=0.5), A.VerticalFlip(p=0.1),
        A.Normalize(mean=[0.0, 0.0, 0.0], std=[1.0, 1.0, 1.0], max_pixel_value=255.0),
        ToTensorV2(),
    ])
    val_transform = A.Compose([
        A.Resize(height=512, width=512),
        A.Normalize(mean=[0.0, 0.0, 0.0], std=[1.0, 1.0, 1.0], max_pixel_value=255.0),
        ToTensorV2(),
    ])

    train_ds = KidneyStoneSegDataset(os.path.join(SEG_DATA_PATH, 'train/images'), os.path.join(SEG_DATA_PATH, 'train/masks'), transform=train_transform)
    val_ds = KidneyStoneSegDataset(os.path.join(SEG_DATA_PATH, 'valid/images'), os.path.join(SEG_DATA_PATH, 'valid/masks'), transform=val_transform)
    train_loader = DataLoader(train_ds, batch_size=8, shuffle=True, num_workers=2)
    val_loader = DataLoader(val_ds, batch_size=8, shuffle=False, num_workers=2)

    model_seg = smp.Segformer(encoder_name="mit_b3", encoder_weights="imagenet", in_channels=3, classes=1).to(DEVICE)
    loss_fn = smp.losses.DiceLoss(mode='binary')
    optimizer = torch.optim.AdamW(model_seg.parameters(), lr=1e-4)
    scaler = torch.amp.GradScaler(device=DEVICE)


    print("Starting SegFormer training...")
    best_val_score = float('-inf')
    for epoch in range(25):
        model_seg.train()
        loop = tqdm(train_loader, desc=f"Epoch {epoch+1}")
        for image, mask in loop:
            image, mask = image.to(DEVICE), mask.float().unsqueeze(1).to(DEVICE)
            with torch.amp.autocast(device_type=DEVICE):
                preds = model_seg(image)
                loss = loss_fn(preds, mask)
            optimizer.zero_grad()
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
            loop.set_postfix(loss=loss.item())

        model_seg.eval()
        total_tp, total_fp, total_fn, total_tn = 0, 0, 0, 0
        with torch.no_grad():
            for image, mask in val_loader:
                image, mask = image.to(DEVICE), mask.float().unsqueeze(1).to(DEVICE)
                preds = torch.sigmoid(model_seg(image))

                tp, fp, fn, tn = smp.metrics.get_stats(
                    preds, mask.long(), mode='binary', threshold=0.5
                )


                total_tp += tp.sum()
                total_fp += fp.sum()
                total_fn += fn.sum()
                total_tn += tn.sum()

        epoch_f1_score = smp.metrics.f1_score(
            total_tp, total_fp, total_fn, total_tn, reduction='micro'
        )

        print(f"Validation F1 Score (Dice): {epoch_f1_score:.4f}")
        if epoch_f1_score > best_val_score:
            best_val_score = epoch_f1_score
            os.makedirs(os.path.dirname(SEG_BEST_WEIGHTS), exist_ok=True)
            torch.save(model_seg.state_dict(), SEG_BEST_WEIGHTS)
            print("=> New best model saved!")

    print("\nSegFormer training complete.")

Setting up SegFormer training...
Starting SegFormer training...


Epoch 1:   0%|          | 0/132 [00:00<?, ?it/s]

Validation F1 Score (Dice): 0.2956
=> New best model saved!


Epoch 2:   0%|          | 0/132 [00:00<?, ?it/s]

Validation F1 Score (Dice): 0.4745
=> New best model saved!


Epoch 3:   0%|          | 0/132 [00:00<?, ?it/s]

Validation F1 Score (Dice): 0.4689


Epoch 4:   0%|          | 0/132 [00:00<?, ?it/s]

Validation F1 Score (Dice): 0.5051
=> New best model saved!


Epoch 5:   0%|          | 0/132 [00:00<?, ?it/s]

Validation F1 Score (Dice): 0.5403
=> New best model saved!


Epoch 6:   0%|          | 0/132 [00:00<?, ?it/s]

Validation F1 Score (Dice): 0.5385


Epoch 7:   0%|          | 0/132 [00:00<?, ?it/s]

Validation F1 Score (Dice): 0.6147
=> New best model saved!


Epoch 8:   0%|          | 0/132 [00:00<?, ?it/s]

Validation F1 Score (Dice): 0.5448


Epoch 9:   0%|          | 0/132 [00:00<?, ?it/s]

Validation F1 Score (Dice): 0.5839


Epoch 10:   0%|          | 0/132 [00:00<?, ?it/s]

Validation F1 Score (Dice): 0.5638


Epoch 11:   0%|          | 0/132 [00:00<?, ?it/s]

Validation F1 Score (Dice): 0.5637


Epoch 12:   0%|          | 0/132 [00:00<?, ?it/s]

Validation F1 Score (Dice): 0.5559


Epoch 13:   0%|          | 0/132 [00:00<?, ?it/s]

Validation F1 Score (Dice): 0.5627


Epoch 14:   0%|          | 0/132 [00:00<?, ?it/s]

Validation F1 Score (Dice): 0.5104


Epoch 15:   0%|          | 0/132 [00:00<?, ?it/s]

Validation F1 Score (Dice): 0.5953


Epoch 16:   0%|          | 0/132 [00:00<?, ?it/s]

Validation F1 Score (Dice): 0.5593


Epoch 17:   0%|          | 0/132 [00:00<?, ?it/s]

Validation F1 Score (Dice): 0.5866


Epoch 18:   0%|          | 0/132 [00:00<?, ?it/s]

Validation F1 Score (Dice): 0.5724


Epoch 19:   0%|          | 0/132 [00:00<?, ?it/s]

Validation F1 Score (Dice): 0.5881


Epoch 20:   0%|          | 0/132 [00:00<?, ?it/s]

Validation F1 Score (Dice): 0.6059


Epoch 21:   0%|          | 0/132 [00:00<?, ?it/s]

Validation F1 Score (Dice): 0.6058


Epoch 22:   0%|          | 0/132 [00:00<?, ?it/s]

Validation F1 Score (Dice): 0.5582


Epoch 23:   0%|          | 0/132 [00:00<?, ?it/s]

Validation F1 Score (Dice): 0.6060


Epoch 24:   0%|          | 0/132 [00:00<?, ?it/s]

Validation F1 Score (Dice): 0.6206
=> New best model saved!


Epoch 25:   0%|          | 0/132 [00:00<?, ?it/s]

Validation F1 Score (Dice): 0.6081

SegFormer training complete.


In [None]:
from ultralytics import YOLO
import torch
import segmentation_models_pytorch as smp
import albumentations as A
from albumentations.pytorch import ToTensorV2
from PIL import Image
import numpy as np
from tqdm.notebook import tqdm
import cv2
import os

print("Loading expert models for final evaluation...")
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

yolo_model = YOLO(YOLO_BEST_WEIGHTS)

seg_model = smp.Segformer(encoder_name="mit_b3", in_channels=3, classes=1).to(DEVICE)
seg_model.load_state_dict(torch.load(SEG_BEST_WEIGHTS))
seg_model.eval()

seg_transform = A.Compose([
    A.Resize(height=512, width=512),
    A.Normalize(mean=[0.0, 0.0, 0.0], std=[1.0, 1.0, 1.0], max_pixel_value=255.0),
    ToTensorV2(),
])

from ultralytics.utils.metrics import ap_per_class
from ultralytics.utils.ops import xywh2xyxy
from ultralytics.utils.metrics import box_iou


def evaluate_fused_performance(split_name, yolo_conf_thresh=0.05, final_score_thresh=0.25):
    print(f"\n--- Fused Evaluation on '{split_name}' set ---")
    image_dir = os.path.join(DATASET_PATH, split_name, 'images')
    label_dir = os.path.join(DATASET_PATH, split_name, 'labels')

    stats = [] 

    for img_filename in tqdm(sorted(os.listdir(image_dir)), desc=f"Processing {split_name}"):
        img_path = os.path.join(image_dir, img_filename)
        img_pil = Image.open(img_path).convert("RGB")
        img_np = np.array(img_pil)
        h, w = img_np.shape[:2]

        targets_list = []
        label_path = os.path.join(label_dir, os.path.splitext(img_filename)[0] + '.txt')
        if os.path.exists(label_path):
            with open(label_path, 'r') as f:
                for line in f:
                    cls, x, y, w_norm, h_norm = map(float, line.split())
                    targets_list.append([cls, x, y, w_norm, h_norm])

        if len(targets_list) > 0:
            targets = torch.tensor(targets_list, device=DEVICE)
        else:
            targets = torch.empty((0, 5), device=DEVICE)

        yolo_preds = yolo_model(img_pil, conf=yolo_conf_thresh, verbose=False)[0]

        if len(yolo_preds.boxes) == 0:
            if targets.shape[0] > 0:
                stats.append((
                    torch.zeros((0, 10), dtype=torch.bool, device=DEVICE),
                    torch.zeros((0,), device=DEVICE),
                    torch.zeros((0,), device=DEVICE),
                    targets[:, 0]  
                ))
            continue

        with torch.no_grad():
            input_tensor = seg_transform(image=img_np)["image"].unsqueeze(0).to(DEVICE)
            heatmap = torch.sigmoid(seg_model(input_tensor)).squeeze().cpu().numpy()

        final_preds_list = []
        heatmap_resized = cv2.resize(heatmap, (w, h))
        for box in yolo_preds.boxes:
            x1, y1, x2, y2 = box.xyxy[0].int().cpu().numpy()
            x1, y1, x2, y2 = max(0, x1), max(0, y1), min(w, x2), min(h, y2)
            box_heatmap = heatmap_resized[y1:y2, x1:x2]
            avg_heatmap_score = float(np.mean(box_heatmap)) if box_heatmap.size > 0 else 0.0
            final_score = float(box.conf[0].item()) * avg_heatmap_score

            if final_score >= final_score_thresh:
                cls_id = int(box.cls[0].item())
                final_preds_list.append([
                    float(box.xywhn[0][0].item()), float(box.xywhn[0][1].item()),
                    float(box.xywhn[0][2].item()), float(box.xywhn[0][3].item()),
                    float(final_score),
                    cls_id
                ])

        if len(final_preds_list) == 0:
            if targets.shape[0] > 0:
                stats.append((
                    torch.zeros((0, 10), dtype=torch.bool, device=DEVICE),
                    torch.zeros((0,), device=DEVICE),
                    torch.zeros((0,), device=DEVICE),
                    targets[:, 0]
                ))
            continue

        final_preds = torch.tensor(final_preds_list, device=DEVICE)

        correct = torch.zeros(final_preds.shape[0], 10, dtype=torch.bool, device=DEVICE)
        if targets.shape[0] > 0:
            tbox = xywh2xyxy(targets[:, 1:5]) * torch.tensor([w, h, w, h], device=DEVICE)
            pbox = xywh2xyxy(final_preds[:, :4]) * torch.tensor([w, h, w, h], device=DEVICE)
            iou = box_iou(pbox, tbox) 
            correct_class = final_preds[:, 5:6] == targets[:, 0]

            for i in range(len(final_preds)):
                matches = iou[i] * correct_class[i]  
                if matches.any():
                    match_idx = matches.argmax()
                    iou_val = iou[i, match_idx]
                    iou_thresholds = torch.linspace(0.5, 0.95, 10, device=DEVICE)
                    correct[i] = iou_val > iou_thresholds

        stats.append((
            correct,                   
            final_preds[:, 4],         
            final_preds[:, 5].long(), 
            targets[:, 0]             
        ))

    if len(stats) == 0:
        print("No stats collected; nothing to evaluate.")
        return

    tp_list, conf_list, pred_cls_list, target_cls_list = zip(*stats)

    # concatenate into single arrays
    tp = torch.cat(tp_list, 0).cpu().numpy() if len(tp_list) > 0 else np.zeros((0, 10), dtype=bool)
    conf = torch.cat(conf_list, 0).cpu().numpy() if len(conf_list) > 0 else np.zeros((0,), dtype=float)
    pred_cls = torch.cat(pred_cls_list, 0).cpu().numpy().astype(np.int64) if len(pred_cls_list) > 0 else np.zeros((0,), dtype=np.int64)
    target_cls = torch.cat(target_cls_list, 0).cpu().numpy().astype(np.int64) if len(target_cls_list) > 0 else np.zeros((0,), dtype=np.int64)

    results = ap_per_class(tp, conf, pred_cls, target_cls, plot=False)

    if len(results) >= 6:
        tp_count, fp_count, p, r, f1, ap = results[:6]
    else:
        p, r, ap, f1 = results[:4]

    ap50 = ap[:, 0] if hasattr(ap, "ndim") and ap.ndim == 2 else ap

    p_mean = float(np.mean(p)) if getattr(p, "size", 0) else 0.0
    r_mean = float(np.mean(r)) if getattr(r, "size", 0) else 0.0
    ap50_mean = float(np.mean(ap50)) if getattr(ap50, "size", 0) else 0.0

    print(f"\n--- Results for '{split_name}' ---")
    print(f"Precision: {p_mean:.4f}")
    print(f"Recall:    {r_mean:.4f}")
    print(f"mAP@0.5:   {ap50_mean:.4f}")
    print("--------------------------------")


evaluate_fused_performance('valid')
evaluate_fused_performance('test')


Loading expert models for final evaluation...

--- Fused Evaluation on 'valid' set ---


Processing valid:   0%|          | 0/123 [00:00<?, ?it/s]


--- Results for 'valid' ---
Precision: 0.8530
Recall:    0.8215
mAP@0.5:   0.8359
--------------------------------

--- Fused Evaluation on 'test' set ---


Processing test:   0%|          | 0/123 [00:00<?, ?it/s]


--- Results for 'test' ---
Precision: 0.8619
Recall:    0.8080
mAP@0.5:   0.8275
--------------------------------
