#!pip install geojson shapely

In [None]:
# version from inf_draft10_4.ipynb

In [1]:
import torch
import cv2
import openslide
import numpy as np
import os
from pathlib import Path
from tqdm import tqdm
from ultralytics import YOLO
import geojson
from geojson import Feature, FeatureCollection, Polygon
import json
#from shapely.geometry import Polygon
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches

In [2]:
# Configuration Parameters
YOLO_MODEL_PATH = 'yolo11_eosinophil_seg5/weights/best.pt'    # Path to YOLO11 pretrained model
IMAGE_DIR = 'svs_inf'          # Input directory with SVS files
OUTPUT_DIR = 'inference_output10'  # Output directory

# Patch Extraction Parameters
PATCH_SIZE = 448              # Patch dimensions
PATCH_STRIDE = 424            # Stride between patches
HPF_SIZE = 2144               # High-power field size
HPF_STRIDE = 500              # Stride between HPFs

# Prediction Output Mode (choose 'bbox', 'polygon', or 'both')
PREDICTION_MODE = 'both'      # Options: 'bbox', 'polygon', 'both'

CLASSES = ['eos', 'eosg', 'Tissue']  # Class labels

In [3]:
def extract_patches(slide, base_filename):
    """Extract patches from the slide according to the specified dimensions"""
    width, height = slide.dimensions
    
    # Calculate how many high power fields (HPFs) we need
    hpf_stride = HPF_STRIDE  # From the problem description 500 pixels
    hpfs_x = max(1, (width - HPF_SIZE) // hpf_stride + 1)
    hpfs_y = max(1, (height - HPF_SIZE) // hpf_stride + 1)
    
    patches_info = []
    
    for hpf_x in range(hpfs_x):
        for hpf_y in range(hpfs_y):
            # HPF coordinates
            hpf_left = min(hpf_x * hpf_stride, width - HPF_SIZE)
            hpf_top = min(hpf_y * hpf_stride, height - HPF_SIZE)
            
            # Now extract patches from this HPF
            for patch_x in range(0, HPF_SIZE - PATCH_SIZE + 1, PATCH_STRIDE):
                for patch_y in range(0, HPF_SIZE - PATCH_SIZE + 1, PATCH_STRIDE):
                    patch_left = hpf_left + patch_x
                    patch_top = hpf_top + patch_y
                    
                    # Create patch info
                    patch_info = {
                        'patch_left': patch_left,
                        'patch_top': patch_top,
                        'patch_name': f"{base_filename}_hpf_{hpf_x}_{hpf_y}_patch_{patch_x}_{patch_y}"
                    }
                    patches_info.append(patch_info)
    
    return patches_info


In [4]:
def get_patch(slide, patch_info):
    """Extract a single patch from the slide"""
    patch_left = patch_info['patch_left']
    patch_top = patch_info['patch_top']
    
    # Extract the patch as an image
    patch_img = slide.read_region((patch_left, patch_top), 0, (PATCH_SIZE, PATCH_SIZE)) # method OpenSlide to extract a 448×448 pixel region at level 0 (highest resolution)
    patch_img = np.array(patch_img)  # Convert from PIL image to numpy array for further processing, feeding into YOLO
    
    # Convert the patch image to RGB (if necessary)
    if patch_img.shape[2] == 4:  # If there's an alpha channel, remove it
        patch_img = patch_img[:, :, :3]

    # Outputs a clean RGB NumPy image array, ready for inference or visualization
    return patch_img


In [5]:
# NEW VERSION FOR GEOJSON
# we are using YOLO11 with task='segment', which means polygon masks are available via .masks.xy
#from geojson import Feature, FeatureCollection, Polygon
# Updated save_patch_results() function to import Polygon from the geojson library — not Shapely. 
# That fixes the shape compatibility with GeoJSON export.
# Correct Import:
# from geojson import Feature, FeatureCollection, Polygon
# Ensure Shapely's Polygon is not being imported. Possible source confusion, because both packages have a Polygon class but with incompatible expectations.

def save_patch_results(patch_img, results, patch_info, slide_output_dir, model):
    result = results[0]

    if result.boxes is None or len(result.boxes) == 0:
        return

    boxes = result.boxes.xyxy.cpu().numpy()
    class_ids = result.boxes.cls.cpu().numpy().astype(int)
    confidences = result.boxes.conf.cpu().numpy()
    masks = result.masks.xy if hasattr(result, "masks") and result.masks is not None else None

    features = []

    for i, (box, class_id, score) in enumerate(zip(boxes, class_ids, confidences)):
        class_name = model.names[class_id].lower()
        if class_name not in ["eos", "eosg"]:
            continue

        # Convert box coords to float explicitly
        x1, y1, x2, y2 = map(float, box)
        x1 += patch_info['patch_left']
        x2 += patch_info['patch_left']
        y1 += patch_info['patch_top']
        y2 += patch_info['patch_top']

        bbox_coords = [
            [float(round(x1, 2)), float(round(y1, 2))],
            [float(round(x2, 2)), float(round(y1, 2))],
            [float(round(x2, 2)), float(round(y2, 2))],
            [float(round(x1, 2)), float(round(y2, 2))],
            [float(round(x1, 2)), float(round(y1, 2))]
        ]

        if PREDICTION_MODE in ['bbox', 'both']:
            features.append(Feature(
                geometry=Polygon([bbox_coords]),  # For geojson.Polygon
                properties={
                    "classification": {"name": class_name},
                    "measurements": [{"name": "confidence", "value": float(round(score, 4))}],
                    "source": "bbox"
                }
            ))

        if PREDICTION_MODE in ['polygon', 'both'] and masks is not None and i < len(masks):
            mask_polygon = np.array(masks[i])
            if mask_polygon.shape[0] >= 3:
                mask_coords = [
                    [float(round(x + patch_info['patch_left'], 2)), float(round(y + patch_info['patch_top'], 2))] 
                    for x, y in mask_polygon
                ]
                mask_coords.append(mask_coords[0])  # Ensure polygon is closed
                features.append(Feature(
                    geometry=Polygon([mask_coords]),  # For geojson.Polygon
                    properties={
                        "classification": {"name": class_name},
                        "measurements": [{"name": "confidence", "value": float(round(score, 4))}],
                        "source": "polygon"
                    }
                ))

    if not features:
        return

    feature_collection = FeatureCollection(features)
    geojson_path = slide_output_dir / f"{patch_info['patch_id']}_detections.geojson"
    with open(geojson_path, "w") as f:
        geojson.dump(feature_collection, f, indent=2)

    print(f"Saved QuPath-compatible GeoJSON: {geojson_path}")


In [6]:
# new debug visualization

def create_debug_visualization(patch_img, patch_info, results, slide_output_dir, model):
    yolo_debug_img = patch_img.copy()
    result = results[0]

    if result.boxes is not None and len(result.boxes) > 0:
        boxes = result.boxes.xyxy.cpu().numpy()
        confidences = result.boxes.conf.cpu().numpy()
        class_ids = result.boxes.cls.cpu().numpy().astype(int)
        masks = result.masks.xy if hasattr(result, "masks") and result.masks is not None else None

        for i, (box, score, class_id) in enumerate(zip(boxes, confidences, class_ids)):
            x1, y1, x2, y2 = box
            class_name = model.names[class_id].lower()

            if class_name not in ['eos', 'eosg']:
                continue

            if PREDICTION_MODE in ['bbox', 'both']:
                cv2.rectangle(yolo_debug_img, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)
                cv2.putText(yolo_debug_img, f"{class_name} {score:.2f}", 
                            (int(x1), int(y1) - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)

            if PREDICTION_MODE in ['polygon', 'both'] and masks is not None and i < len(masks):
                polygon = np.array(masks[i], dtype=np.int32)
                cv2.polylines(yolo_debug_img, [polygon], isClosed=True, color=(255, 0, 0), thickness=2)

        debug_id = patch_info.get('patch_id', f"{patch_info.get('x', 0)}_{patch_info.get('y', 0)}")
        yolo_debug_path = slide_output_dir / f"{debug_id}_yolo_debug.jpg"
        cv2.imwrite(str(yolo_debug_path), cv2.cvtColor(yolo_debug_img, cv2.COLOR_RGB2BGR))
        print(f"Saved debug image for patch {debug_id} to {yolo_debug_path}")


In [None]:
from ultralytics import YOLO
import os
from pathlib import Path
import cv2
from tqdm import tqdm

def run_yolo_inference_on_patches(model, batch_images, batch_ids, device='cuda', imgsz=448):
    temp_dir = Path("temp_inference_patches")
    temp_dir.mkdir(exist_ok=True)

    temp_image_paths = []
    for i, img in enumerate(batch_images):
        img_bgr = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
        temp_path = temp_dir / f"{batch_ids[i]}.jpg"
        cv2.imwrite(str(temp_path), img_bgr)
        temp_image_paths.append(str(temp_path))

    results = list(model.predict(
        source=temp_image_paths,
        imgsz=imgsz,
        show=False,
        stream=True,
        device=device,
        verbose=False
    ))

    # Clean up temporary images
    # for f in temp_dir.glob("*.jpg"):
    #     f.unlink()

    return results


# Updated inference pipeline

model = YOLO(YOLO_MODEL_PATH, task='segment').to('cuda')
print("Model loaded successfully!")

# Create output directory
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Process all SVS files
svs_files = list(Path(IMAGE_DIR).glob("*.svs"))
for svs_path in tqdm(svs_files, desc="Processing slides"):
    slide_name = svs_path.stem
    slide = openslide.OpenSlide(str(svs_path))
    print(f"Processing slide: {slide_name}")

    slide_output_dir = Path(OUTPUT_DIR) / slide_name
    slide_output_dir.mkdir(parents=True, exist_ok=True)

    patches_info = extract_patches(slide, slide_name)

    batch_size = 16
    # the number of patches, not batches in the print output

    for batch_start in tqdm(range(0, len(patches_info), batch_size), desc=f"Processing batches for {slide_name}"):
        batch_end = min(batch_start + batch_size, len(patches_info))
        current_batch_info = patches_info[batch_start:batch_end]

        batch_images = []
        batch_ids = []
        batch_info = []

        for idx, patch_info in enumerate(current_batch_info):
            patch_idx = batch_start + idx
            patch_info['patch_id'] = f"{slide_name}_patch_{patch_idx}"
            patch_img = get_patch(slide, patch_info)

            if patch_img.mean() > 240: # If the patch_img.mean() is greater than 240 (on a scale of 0–255) -> the patch is almost entirely white, thus skip it
                continue

            batch_images.append(patch_img)
            batch_ids.append(patch_info['patch_id'])
            batch_info.append(patch_info)

        if len(batch_images) == 0:
            continue

        # Run inference using safe wrapper with proper resizing
        results = run_yolo_inference_on_patches(model, batch_images, batch_ids, device='cuda', imgsz=448)

        for i, result in enumerate(results):
            patch_info = batch_info[i]
            patch_img = batch_images[i]

            if result.boxes is not None and len(result.boxes) > 0:
                print(f"Detection(s) in patch {patch_info['patch_id']}")
                # export to GeoJSON
                save_patch_results(patch_img, [result], patch_info, slide_output_dir, model)
                create_debug_visualization(patch_img, patch_info, [result], slide_output_dir, model)

    slide.close()
    print(f"Finished processing slide: {slide_name}")


In [None]:
# Cell below if for merging individual GeoJSON files for each detection to create merged GeoJSON fo rthe entire svs file (WSI)

In [38]:
from shapely.geometry import shape, mapping
from geojson import FeatureCollection, Feature
import geojson
import numpy as np
from tqdm import tqdm  # Progress bar

# Utility function that ensures objects (especially NumPy types) are compatible with JSON serialization 
# It is used for preparing complex Python data structures for output to files like .geojson
def to_serializable(obj):
    """
    Recursively convert an object to a JSON-serializable format.
    Primarily used to convert NumPy types and arrays to standard Python types
    so they can be saved using `json.dump()` or `geojson.dump()`.

    Args:
        obj: The object to be converted (can be a NumPy type, list, dict, etc.)

    Returns:
        A version of `obj` that is safe for JSON serialization.
    """
    # Convert NumPy integer types to native Python int
    if isinstance(obj, (np.integer,)):
        return int(obj)
    # Convert NumPy float types to native Python float
    elif isinstance(obj, (np.floating,)):
        return float(obj)
    # Convert NumPy arrays to native Python lists
    elif isinstance(obj, (np.ndarray,)):
        return obj.tolist()
    # Recursively convert each item in a list
    elif isinstance(obj, list):
        return [to_serializable(i) for i in obj]
    # Recursively convert each value in a dictionary
    elif isinstance(obj, dict):
        return {k: to_serializable(v) for k, v in obj.items()}
    # If already a basic type (e.g., str, int, float), return as-is
    return obj

def merge_geojsons(slide_output_dir, slide_name):
    merged_features = []
    geojson_files = sorted(slide_output_dir.glob("*_detections.geojson"))

    print(f"Merging {len(geojson_files)} patch files for slide: {slide_name}")
    
    for geojson_file in tqdm(geojson_files, desc="🔄 Merging GeoJSONs"):
        try:
            with open(geojson_file, "r") as f:
                data = geojson.load(f)

            if not isinstance(data, FeatureCollection):
                print(f"Skipping invalid GeoJSON file: {geojson_file.name}")
                continue

            for feature in data.features:
                try:
                    geometry = shape(feature.geometry)
                    if not geometry.is_valid:
                        geometry = geometry.buffer(0)  # Try to fix
                    if not geometry.is_valid:
                        print(f"Still invalid geometry in: {geojson_file.name}")
                        continue
                    feature.geometry = mapping(geometry)  # Update to fixed

                except Exception as gex:
                    print(f"Geometry error in {geojson_file.name}: {gex}")
                    continue

                feature.properties.setdefault("objectType", "annotation")

                classification = feature.properties.get("classification", {})
                class_name = classification.get("name", "unknown").lower()
                source_type = feature.properties.get("source", "unknown")

                if class_name == "eos":
                    if source_type == "bbox":
                        classification["color"] = [1.0, 0.0, 0.0]
                    elif source_type == "polygon":
                        classification["color"] = [1.0, 0.0, 0.0]
                        classification["fillColor"] = [1.0, 0.0, 0.0, 0.3]
                elif class_name == "eosg":
                    if source_type == "bbox":
                        classification["color"] = [0.0, 1.0, 0.0]
                    elif source_type == "polygon":
                        classification["color"] = [0.0, 1.0, 0.0]
                        classification["fillColor"] = [0.0, 1.0, 0.0, 0.3]
                else:
                    classification.setdefault("color", [0.666, 0.666, 0.666])

                feature.properties["classification"] = classification
                feature.properties["source_patch"] = geojson_file.name.replace("_detections.geojson", "")

                merged_features.append(feature)

        except Exception as e:
            print(f"Error reading {geojson_file.name}: {e}")

    if not merged_features:
        print(f"No features found to merge for slide: {slide_name}")
        return

    merged_geojson = FeatureCollection(merged_features)
    merged_path = slide_output_dir / f"{slide_name}_detections_merged.geojson"

    with open(merged_path, "w") as out_f:
        geojson.dump(to_serializable(merged_geojson), out_f, indent=2)

    print(f"\nMerged GeoJSON saved: {merged_path}")
    print(f"Individual patch GeoJSON files preserved.")


In [None]:
# slide.close()
# print(f"Finished processing slide: {slide_name}")

# CODE FOR MERGING THE LAST PROCESSED SVS FILE!
# If you want to merge all the individual GeoJSON files for each svs in subfolders inside inference10 folder, please, use the next cells below
merge_geojsons(slide_output_dir, slide_name)

In [None]:
# Merging GeoJSON in individual folder if needed

In [None]:
# from pathlib import Path

# # Replace with the actual folder path
# slide_output_dir = Path("inference_output10") / "1007439"
# slide_name = "1007439"

# merge_geojsons(slide_output_dir, slide_name)


In [None]:
# Merge All Slides in inference_output10 if needed

In [None]:
# # Batch Merge with Skipping Existing Merged Files

# from pathlib import Path

# # Path to a top-level inference output directory
# root_dir = Path("inference_output10")

# # Go through each subfolder (each slide)
# for slide_dir in sorted(root_dir.iterdir()):
#     if slide_dir.is_dir():
#         slide_name = slide_dir.name
#         merged_file = slide_dir / f"{slide_name}_detections_merged.geojson"

#         if merged_file.exists():
#             print(f"Skipping '{slide_name}': merged file already exists.")
#             continue

#         print(f"\n Processing slide: {slide_name}")
#         merge_geojsons(slide_dir, slide_name)


In [None]:
# Final cleanup: remove temporary images
# Comment the lines below if you wanr to keep temporary images for processing

In [None]:
# Final cleanup: remove temporary images
temp_dir = Path("temp_inference_patches")
removed_files = 0  # Counter for removed files

for f in temp_dir.glob("*.jpg"):
    try:
        f.unlink()  # Attempt to delete the file
        removed_files += 1  # Increment counter if file is successfully removed
    except Exception as e:
        print(f"Could not delete {f.name}: {e}")

# Print the final cleanup message
print(f"All temporary files removed. {removed_files} file(s) deleted.")
