#Setting up the environment

## Mounting drive and Installing SAM 2 on Colab

In [None]:
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

In [None]:
!git clone https://github.com/facebookresearch/sam2.git
%cd /content/sam2
!pip install -q -e .

## Downloading checkpoints (use first for the baseline models and the second for the finetuned models)

In [None]:
!wget -O sam2.1_hiera_large.pt "https://dl.fbaipublicfiles.com/segment_anything_2/092824/sam2.1_hiera_large.pt"
!wget -O sam2.1_hiera_base_plus.pt "https://dl.fbaipublicfiles.com/segment_anything_2/092824/sam2.1_hiera_base_plus.pt"

In [None]:
!cd ./checkpoints && ./download_ckpts.sh

## Device and importing model

In [5]:
import torch

# select the device for computation
if torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")

## SAM2 model initiation

###For Hiera base plus

In [6]:
from sam2.build_sam import build_sam2
from sam2.sam2_image_predictor import SAM2ImagePredictor

## For hiera base plus
checkpoint = "/content/sam2/checkpoints/sam2.1_hiera_base_plus.pt"
model_cfg = "configs/sam2.1/sam2.1_hiera_b+.yaml"

sam2_model = build_sam2(model_cfg, checkpoint, device=device)
predictor = SAM2ImagePredictor(sam2_model)

###For Hiera large

In [None]:
from sam2.build_sam import build_sam2
from sam2.sam2_image_predictor import SAM2ImagePredictor

## For hiera base plus
checkpoint = "/content/sam2/checkpoints/sam2.1_hiera_large.pt"
model_cfg = "configs/sam2.1/sam2.1_hiera_l.yaml"

sam2_model = build_sam2(model_cfg, checkpoint, device=device)
predictor = SAM2ImagePredictor(sam2_model)

#Utility functions

##Utility functions for loading data and processing images and masks

In [7]:
import os
import cv2
import time
import torch
import shutil
import numpy as np
import pandas as pd
from PIL import Image
from threading import Lock
from google.colab import files
from concurrent.futures import ThreadPoolExecutor

# Utility Functions
def load_csv(csv_path):
    """
    Load the frame and mask paths from a CSV file,
    and prepend the base dataset path to construct full paths.
    """
    base_dir = "/content/drive/MyDrive/CV/SAM2/Datasets"
    df = pd.read_csv(csv_path)

    # Add base_dir to the paths in the CSV
    frame_paths = [os.path.join(base_dir, path) for path in df['frame']]
    mask_paths = [os.path.join(base_dir, path) for path in df['mask']]

    return mask_paths, frame_paths

def load_image(image_path):
    """Load an image in RGB format."""
    if os.path.exists(image_path):
        return np.array(Image.open(image_path).convert("RGB"))
    return None

def load_mask(mask_path):
    """Load a mask in grayscale format."""
    if os.path.exists(mask_path):
        return np.array(Image.open(mask_path).convert("L"))
    return None

def select_point_prompt(mask, num_points=1):
    """Select points and labels from the mask where the mask value is 255."""
    coordinates = np.argwhere(mask == 255)
    if len(coordinates) < num_points:
        return None, None
    selected_indices = np.random.choice(len(coordinates), num_points, replace=False)
    points = [tuple(coordinates[i][::-1]) for i in selected_indices]  # Convert (y, x) to (x, y)
    labels = [1] * num_points
    return points, labels

# Create a global lock for shared resources
predictor_lock = Lock()
output_lock = Lock()

def process_single_image(predictor, image_path, mask_path, num_points, output_dir):
    """Process a single image-mask pair to generate and save the predicted mask.
    Args:
    - predictor: The SAM2 predictor instance.
    - image_path: Path to the frame image.
    - mask_path: Path to the mask image.
    - num_points: Number of points to sample for prediction.
    - output_dir: Directory to save predicted masks.
    """
    print(f"Processing frame: {image_path}, mask: {mask_path}")
    image = load_image(image_path)
    mask = load_mask(mask_path)
    if image is None or mask is None:
        print(f"Error: Could not load image or mask for {image_path} or {mask_path}")
        return

    points, labels = select_point_prompt(mask, num_points=num_points)
    if points is None:
        print(f"No valid points found in mask: {mask_path}")
        return

    with predictor_lock:  # Lock shared predictor during access
        predictor.set_image(image)
        with torch.no_grad():
            masks, scores, logits = predictor.predict(
                point_coords=np.array(points),
                point_labels=np.array(labels),
                multimask_output=False  # Ensures a single mask is returned
            )

    predicted_mask = (masks[0] > 0.5).astype(np.uint8) * 255  # Convert to binary mask

    masks_relative_path = mask_path.split("Masks/")[-1]
    output_path = os.path.join(output_dir, masks_relative_path)
    with output_lock:  # Thread-safe access to shared directories
        os.makedirs(os.path.dirname(output_path), exist_ok=True)

    # Step 6: Save the predicted mask
    with output_lock:  # Ensure only one thread writes to the file system at a time
        cv2.imwrite(output_path, predicted_mask)

def process_image_wrapper(args):
    """
    Wrapper function to handle arguments for parallel processing.
    Args is a tuple containing:
    - predictor: The SAM2 predictor instance.
    - image_path: Path to the frame image.
    - mask_path: Path to the mask image.
    - num_points: Number of points to sample for prediction.
    - output_dir: Directory to save predicted masks.
    """
    predictor, image_path, mask_path, num_points, output_dir = args
    process_single_image(predictor, image_path, mask_path, num_points, output_dir)


def process_dataset_parallel(predictor, dataset_name, dataset_type="val", num_points=1, max_workers=4):
    """
    Process all images and masks in a dataset to generate predicted masks using parallel processing.
    """
    # Define paths
    csv_path = f"/content/drive/MyDrive/CV/SAM2/Datasets/Paths/{dataset_name}_{dataset_type}.csv"
    output_dir = f"/content/drive/MyDrive/CV/SAM2/Datasets/{dataset_name}/{dataset_type}/PredictedMasks"

    # Load CSV
    mask_paths, frame_paths = load_csv(csv_path)
    print("Loaded frame paths:", frame_paths)
    print("Loaded mask paths:", mask_paths)

    # Prepare arguments for parallel processing
    args_list = [
        (predictor, frame_path, mask_path, num_points, output_dir)
        for frame_path, mask_path in zip(frame_paths, mask_paths)
    ]
    print(f"Output dir: {output_dir}")

    # Use ThreadPoolExecutor for parallel processing
    print(f"Starting parallel processing for dataset: {dataset_name} with {max_workers} workers")
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        executor.map(process_image_wrapper, args_list)
    print(f"Completed processing dataset: {dataset_name}")

# Main Function
def main_with_image_predictor_parallel(dataset_name=None, dataset_type="val", num_points=1, device=device, model_cfg=model_cfg, checkpoint=checkpoint, max_workers=4):
    """
    Main entry point for processing datasets with the SAM2 predictor using parallel processing.

    Args:
        dataset_name (str): Name of the dataset to process.
        dataset_type (str): Subset to process ("train", "val", "test"). Default is "val".
        num_points (int): Number of points to sample from the mask.
        device (str): Device to use for model inference ("cpu" or "cuda").
        checkpoint (str): Path to the SAM2 model checkpoint.
        max_workers (int): Number of workers for parallel processing.
    """
    if checkpoint is None:
        raise ValueError("Checkpoint path must be provided.")

    # Build and initialize the SAM2 model
    sam2_model = build_sam2(model_cfg, checkpoint, device=device)
    predictor = SAM2ImagePredictor(sam2_model)

    if dataset_name:
        # Process the specified dataset
        process_dataset_parallel(predictor, dataset_name, dataset_type, num_points, max_workers=max_workers)
    else:
        # Process all datasets if no specific dataset is provided
        datasets = ["Endoscapes", "UD Ureter-Uterine Artery-Nerve Dataset", "CholecSeg8k", "m2caiSeg", "Dresden"]
        for dataset in datasets:
            process_dataset_parallel(predictor, dataset, dataset_type, num_points, max_workers=max_workers)

##Utility functions for Evaluation

In [8]:
# Utility Functions for Metrics Calculation
def calculate_metrics(predicted_mask, ground_truth_mask):
    """
    Calculate IoU, Dice, Precision, and Recall metrics.

    Parameters:
    - predicted_mask: Binary mask of the predicted segmentation.
    - ground_truth_mask: Binary mask of the ground truth segmentation.

    Returns:
    - IoU, Dice, Precision, Recall metrics.
    """
    # Ensure the masks are valid
    if predicted_mask is None or ground_truth_mask is None:
        raise ValueError("One of the masks is None.")

    # Handle extra dimensions
    if len(predicted_mask.shape) > 2 and predicted_mask.shape[-1] == 1:
        predicted_mask = predicted_mask.squeeze(-1)
    if len(ground_truth_mask.shape) > 2 and ground_truth_mask.shape[-1] == 1:
        ground_truth_mask = ground_truth_mask.squeeze(-1)

    # Resize if dimensions mismatch
    if predicted_mask.shape != ground_truth_mask.shape:
        predicted_mask = cv2.resize(predicted_mask, (ground_truth_mask.shape[1], ground_truth_mask.shape[0]))

    # Convert masks to binary
    predicted_mask_bin = (predicted_mask > 127).astype(np.uint8)
    ground_truth_bin = (ground_truth_mask > 127).astype(np.uint8)

    # Calculate metrics
    intersection = np.logical_and(predicted_mask_bin, ground_truth_bin).sum()
    union = np.logical_or(predicted_mask_bin, ground_truth_bin).sum()
    predicted_sum = predicted_mask_bin.sum()
    ground_truth_sum = ground_truth_bin.sum()

    iou = intersection / union if union > 0 else 0
    dice = (2 * intersection) / (predicted_sum + ground_truth_sum) if (predicted_sum + ground_truth_sum) > 0 else 0
    precision = intersection / predicted_sum if predicted_sum > 0 else 0
    recall = intersection / ground_truth_sum if ground_truth_sum > 0 else 0

    return iou, dice, precision, recall


def evaluate_dataset(predicted_masks_path, ground_truth_masks_path):
    """
    Evaluate predicted masks against ground truth masks for a specific class,
    preserving the exact folder structure matching.

    Parameters:
    - predicted_masks_path: Path to the folder containing predicted masks
    - ground_truth_masks_path: Path to the folder containing ground truth masks

    Returns:
    - A list of dictionaries with evaluation metrics for each file
    """
    results = []
    valid_extensions = ('.png', '.jpg', '.jpeg')

    # Walk through the predicted masks directory
    for root, _, files in os.walk(predicted_masks_path):
        for file in files:
            if not file.lower().endswith(valid_extensions):
                continue

            # Get the relative path from the predicted_masks_path
            rel_path = os.path.relpath(root, predicted_masks_path)

            # Construct the corresponding ground truth path
            gt_root = os.path.join(ground_truth_masks_path, rel_path)
            gt_path = os.path.join(gt_root, file)
            pred_path = os.path.join(root, file)

            if not os.path.exists(gt_path):
                print(f"Missing ground truth for: {os.path.join(rel_path, file)}")
                continue

            predicted_mask = cv2.imread(pred_path, cv2.IMREAD_GRAYSCALE)
            ground_truth_mask = cv2.imread(gt_path, cv2.IMREAD_GRAYSCALE)

            if predicted_mask is None or ground_truth_mask is None:
                print(f"Error loading masks for: {os.path.join(rel_path, file)}")
                continue

            iou, dice, precision, recall = calculate_metrics(predicted_mask, ground_truth_mask)
            results.append({
                "File": os.path.join(rel_path, file),  # Store full relative path
                "IoU": iou,
                "Dice": dice,
                "Precision": precision,
                "Recall": recall
            })

    return results


def evaluate_all_datasets(base_dir, subset="val", datasets=None):
    """
    Evaluate specified or all datasets for IoU, Dice, Precision, and Recall,
    preserving the exact folder structure matching between predicted and ground truth masks.

    Parameters:
    - base_dir: Base directory containing datasets
    - subset: Subset to evaluate (e.g., 'val')
    - datasets: List of dataset names or None to evaluate all datasets in base_dir

    Returns:
    - A DataFrame containing evaluation metrics for all datasets and classes
    """
    if datasets is None:
        datasets = [d for d in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, d))]

    all_results = []
    for dataset in datasets:
        print(f"\nEvaluating dataset: {dataset}")

        predicted_masks_base = os.path.join(base_dir, dataset, subset, "PredictedMasks")
        ground_truth_masks_base = os.path.join(base_dir, dataset, subset, "Masks")

        if not os.path.exists(predicted_masks_base):
            print(f"Predicted masks folder not found for {dataset}: {predicted_masks_base}")
            continue
        if not os.path.exists(ground_truth_masks_base):
            print(f"Ground truth masks folder not found for {dataset}: {ground_truth_masks_base}")
            continue

        # Get classes (top-level folders in ground truth masks)
        classes = [
            d for d in os.listdir(ground_truth_masks_base)
            if os.path.isdir(os.path.join(ground_truth_masks_base, d))
        ]

        dataset_results = []
        for class_name in classes:
            print(f"  Evaluating class: {class_name}")

            predicted_masks_path = os.path.join(predicted_masks_base, class_name)
            ground_truth_masks_path = os.path.join(ground_truth_masks_base, class_name)

            if not os.path.exists(predicted_masks_path):
                print(f"    Predicted masks for class '{class_name}' not found.")
                continue
            if not os.path.exists(ground_truth_masks_path):
                print(f"    Ground truth masks for class '{class_name}' not found.")
                continue

            results = evaluate_dataset(predicted_masks_path, ground_truth_masks_path)
            for result in results:
                result["Dataset"] = dataset
                result["Class"] = class_name
            dataset_results.extend(results)

        if not dataset_results:
            print(f"No results found for dataset: {dataset}")
            continue

        dataset_results_df = pd.DataFrame(dataset_results)

        dataset_summary_per_class = dataset_results_df.groupby("Class").agg(
            mean_IoU=("IoU", "mean"),
            sd_IoU=("IoU", "std"),
            mean_Dice=("Dice", "mean"),
            sd_Dice=("Dice", "std"),
            mean_Precision=("Precision", "mean"),
            sd_Precision=("Precision", "std"),
            mean_Recall=("Recall", "mean"),
            sd_Recall=("Recall", "std"),
        ).reset_index()

        dataset_overall_summary = pd.DataFrame({
            "Metric": ["IoU", "Dice", "Precision", "Recall"],
            "Mean": [
                dataset_results_df["IoU"].mean(),
                dataset_results_df["Dice"].mean(),
                dataset_results_df["Precision"].mean(),
                dataset_results_df["Recall"].mean(),
            ],
            "SD": [
                dataset_results_df["IoU"].std(),
                dataset_results_df["Dice"].std(),
                dataset_results_df["Precision"].std(),
                dataset_results_df["Recall"].std(),
            ]
        })

        # Save results
        eval_results_dir = os.path.join(base_dir, "Eval_Results")
        os.makedirs(eval_results_dir, exist_ok=True)

        results_csv_path = os.path.join(eval_results_dir, f"{dataset}_evaluation_metrics_all_datasets_classes.csv")
        summary_csv_path = os.path.join(eval_results_dir, f"{dataset}_evaluation_summary_per_class.csv")
        overall_csv_path = os.path.join(eval_results_dir, f"{dataset}_evaluation_overall_summary.csv")

        dataset_results_df.to_csv(results_csv_path, index=False)
        dataset_summary_per_class.to_csv(summary_csv_path, index=False)
        dataset_overall_summary.to_csv(overall_csv_path, index=False)

        print(f"\nSaved evaluation results for dataset '{dataset}' to {results_csv_path}")
        print(f"Saved per-class summary for dataset '{dataset}' to {summary_csv_path}")
        print(f"Saved overall summary for dataset '{dataset}' to {overall_csv_path}")

    return pd.DataFrame(all_results)

##Utility functions for zipping, downloading, and deleting - Predicted Masks folders and Eval results from all datasets

In [9]:
def zip_folder(folder_path, zip_path):
    """
    Compress a folder into a zip file and save it to the specified path.

    Args:
        folder_path (str): Path to the folder to compress.
        zip_path (str): Path where the zip file will be saved (including filename).

    Returns:
        str: Path to the created zip file or None if an error occurs.
    """
    try:
        # Create the zip file
        shutil.make_archive(zip_path.replace(".zip", ""), 'zip', folder_path)
        print(f"Compressed folder: {folder_path} into zip: {zip_path}")
        return zip_path
    except Exception as e:
        print(f"Error while zipping folder {folder_path}: {e}")
        return None

def download_file_colab(file_path):
    """
    Download a file from Colab to your local computer.

    Args:
        file_path (str): Path to the file in Colab to be downloaded.

    Returns:
        None
    """
    if os.path.exists(file_path):
        try:
            files.download(file_path)
            print(f"File downloaded: {file_path}")
        except Exception as e:
            print(f"Error during download: {e}")
    else:
        print(f"File not found: {file_path}")


def delete_zip_files(file_paths):
    """
    Delete the specified zip files.

    Args:
        file_paths (list): List of file paths to delete.

    Returns:
        None
    """
    for file_path in file_paths:
        if os.path.exists(file_path):
            try:
                os.remove(file_path)
                print(f"Deleted file: {file_path}")
            except Exception as e:
                print(f"Error while deleting file {file_path}: {e}")
        else:
            print(f"File not found: {file_path}")


def compress_and_download_predicted_masks(base_dir, datasets, subset="val"):
    """
    Compress and download the PredictedMasks folders into zip files.

    Args:
        base_dir (str): Base directory containing the datasets.
        datasets (list): List of dataset names to process.
        subset (str): Dataset subset to process (e.g., "val").

    Returns:
        None
    """
    for dataset in datasets:
        # Path to the PredictedMasks folder
        predicted_masks_dir = os.path.join(base_dir, dataset, subset, "PredictedMasks")

        if not os.path.exists(predicted_masks_dir):
            print(f"PredictedMasks folder not found: {predicted_masks_dir} for dataset: {dataset}")
            continue

        # Set the zip filename with special handling for "UD Ureter-Uterine Artery-Nerve Dataset"
        if dataset == "UD Ureter-Uterine Artery-Nerve Dataset":
            zip_filename = "UDUreter_PredictedMasks.zip"
        else:
            zip_filename = f"{dataset.replace(' ', '_')}_PredictedMasks.zip"

        zip_path = os.path.join(base_dir, zip_filename)

        # Compress the folder
        zip_folder(predicted_masks_dir, zip_path)

        # Trigger download
        download_file_colab(zip_path)
        print(f"Successfully downloaded zip for dataset: {dataset}")


def download_all_eval_results(eval_results_dir):
    """
    Compress and download the evaluation results from the specified directory.

    Args:
        eval_results_dir (str): Path to the directory containing all evaluation results.

    Returns:
        None
    """
    if not os.path.exists(eval_results_dir):
        print(f"Eval_Results directory not found: {eval_results_dir}")
        return

    # Define the output zip file path
    zip_filename = "Eval_Results.zip"
    zip_path = os.path.join(eval_results_dir, "..", zip_filename)

    # Compress the folder
    zip_folder(eval_results_dir, zip_path)

    # Trigger download
    download_file_colab(zip_path)
    print("Successfully downloaded evaluation results zip file.")


# Function for deleting folders

def delete_predicted_masks(base_dir, datasets, subset="val"):
    """
    Delete all PredictedMasks folders for the specified datasets.

    Args:
        base_dir (str): Base directory containing the datasets.
        datasets (list): List of dataset names.
        subset (str): Subset folder where PredictedMasks is located (e.g., "val").

    Returns:
        None
    """
    for dataset in datasets:
        predicted_masks_dir = os.path.join(base_dir, dataset, subset, "PredictedMasks")

        if os.path.exists(predicted_masks_dir):
            shutil.rmtree(predicted_masks_dir)
            print(f"Deleted PredictedMasks folder for dataset: {dataset}")
        else:
            print(f"PredictedMasks folder not found for dataset: {dataset}")


def delete_eval_results(eval_results_dir):
    """
    Delete the Eval_Results folder.

    Args:
        eval_results_dir (str): Path to the Eval_Results directory.

    Returns:
        None
    """
    if os.path.exists(eval_results_dir):
        shutil.rmtree(eval_results_dir)
        print(f"Deleted Eval_Results folder: {eval_results_dir}")
    else:
        print(f"Eval_Results folder not found: {eval_results_dir}")

# Baseline model Evaluation

###Function for automating the eval pipeline

In [10]:
def run_pipeline(num_points=1, checkpoint=checkpoint, dataset_type="val", device=device, threads=4):
    """
    Run the full pipeline for processing datasets, evaluating results,
    compressing and downloading results, and cleaning up files.

    Args:
        num_points (int): Number of points for prediction.
        checkpoint (str): Checkpoint for the predictor.
        device (str): Device to use for prediction.

    Returns:
        None
    """
    # Dataset names
    datasets = ["UD Ureter-Uterine Artery-Nerve Dataset", "m2caiSeg", "Endoscapes", "Dresden", "CholecSeg8k"]

    # Dataset names
    #datasets = ["ARTNet"]

    base_dir = "/content/drive/MyDrive/CV/SAM2/Datasets"
    eval_results_dir = os.path.join(base_dir, "Eval_Results")

    # Error log
    error_log = []

    # Step 1: Run main_with_image_predictor for all datasets
    for dataset_name in datasets:
        try:
            print(f"Processing dataset: {dataset_name}")
            main_with_image_predictor_parallel(
                dataset_name=dataset_name,
                dataset_type=dataset_type,
                num_points=num_points,
                device=device,
                checkpoint=checkpoint,
                max_workers=threads
            )
        except Exception as e:
            error_message = f"Error during main_with_image_predictor for dataset {dataset_name}: {str(e)}"
            print(error_message)
            error_log.append(error_message)

    # Step 2: Evaluate results for all datasets
    for dataset_name in datasets:
        try:
            print(f"Evaluating dataset: {dataset_name}")
            results_df = evaluate_all_datasets(
                base_dir=base_dir,
                subset=dataset_type,
                datasets=[dataset_name]
            )
            print(f"Evaluation complete for dataset: {dataset_name}")
        except Exception as e:
            error_message = f"Error during evaluation for dataset {dataset_name}: {str(e)}"
            print(error_message)
            error_log.append(error_message)

    # Step 3: Compress and download evaluation results
    try:
        print("Compressing and downloading evaluation results...")
        download_all_eval_results(eval_results_dir)
    except Exception as e:
        error_message = f"Error during compressing and downloading evaluation results: {str(e)}"
        print(error_message)
        error_log.append(error_message)

    # Step 4: Compress predicted masks (optional if required)
    try:
        print("Compressing predicted masks...")
        compress_and_download_predicted_masks(base_dir, datasets, subset=dataset_type)

    except Exception as e:
        error_message = f"Error during compressing predicted masks: {str(e)}"
        print(error_message)
        error_log.append(error_message)

    # Print the error log at the end
    if error_log:
        print("\n=== Error Log ===")
        for error in error_log:
            print(error)
    else:
        print("Pipeline execution complete without errors.")


###Running eval pipeline with baseline Hiera large and Hiera Base plus

For Hiera Large

In [None]:
## For hiera large
sam2_checkpoint = "/content/sam2/checkpoints/sam2.1_hiera_large.pt"
model_cfg = "configs/sam2.1/sam2.1_hiera_l.yaml"

sam2_model = build_sam2(model_cfg, sam2_checkpoint, device=device)
predictor = SAM2ImagePredictor(sam2_model)

For Hiera Base Plus

In [None]:
## For hiera base plus
sam2_checkpoint = "/content/sam2/checkpoints/sam2.1_hiera_base_plus.pt"
model_cfg = "configs/sam2.1/sam2.1_hiera_b+.yaml"

sam2_model = build_sam2(model_cfg, sam2_checkpoint, device=device)
predictor = SAM2ImagePredictor(sam2_model)

In [None]:
# Run the pipeline
run_pipeline(num_points=10, checkpoint=sam2_checkpoint, dataset_type="test", device=device, threads=16)


##Delete all the PredictedMasks and Zip files

In [None]:
base_dir = "/content/drive/MyDrive/CV/SAM2/Datasets"
datasets = ["UD Ureter-Uterine Artery-Nerve Dataset", "Dresden", "Endoscapes", "m2caiSeg", "CholecSeg8k"]
eval_results_dir = "/content/drive/MyDrive/CV/SAM2/Datasets/Eval_Results"


# List of specific zip files to delete
zip_files_to_delete_masks = [
    "/content/drive/MyDrive/CV/SAM2/Datasets/CholecSeg8k_PredictedMasks.zip",
    "/content/drive/MyDrive/CV/SAM2/Datasets/Dresden_PredictedMasks.zip",
    "/content/drive/MyDrive/CV/SAM2/Datasets/Endoscapes_PredictedMasks.zip",
    "/content/drive/MyDrive/CV/SAM2/Datasets/UDUreter_PredictedMasks.zip",
    "/content/drive/MyDrive/CV/SAM2/Datasets/m2caiSeg_PredictedMasks.zip",
]

zip_files_to_delete_eval = [
    "/content/drive/MyDrive/CV/SAM2/Datasets/Eval_Results.zip"
]

print("Deleting predicted masks zip files...")
delete_zip_files(zip_files_to_delete_masks)

print("Deleting PredictedMasks folders...")
delete_predicted_masks(base_dir, datasets, subset="val")

print("Deleting Eval_Results folder...")
delete_eval_results(eval_results_dir)

# Delete the zip file
delete_zip_files(zip_files_to_delete_eval)

#Fine-tuned model evaluation

##Eval pipeline with chosen fine-tuned checkpoints

In [11]:
def main_with_finetuned_image_predictor_parallel(
    predictor,
    dataset_name=None,
    dataset_type="val",
    num_points=1,
    max_workers=16
):
    """
    Main entry point for processing datasets with the SAM2 predictor using a preloaded fine-tuned model and parallel processing.

    Args:
        predictor (SAM2ImagePredictor): Preloaded SAM2ImagePredictor with fine-tuned weights.
        dataset_name (str): Name of the dataset to process.
        dataset_type (str): Subset to process ("train", "val", "test"). Default is "val".
        num_points (int): Number of points to sample from the mask.
        max_workers (int): Number of workers for parallel processing.
    """
    if dataset_name:
        # Process the specified dataset
        process_dataset_parallel(predictor, dataset_name, dataset_type, num_points, max_workers=max_workers)
    else:
        # Process all datasets if no specific dataset is provided
        datasets = ["Endoscapes", "UD Ureter-Uterine Artery-Nerve Dataset", "CholecSeg8k", "m2caiSeg", "Dresden"]
        for dataset in datasets:
            process_dataset_parallel(predictor, dataset, dataset_type, num_points, max_workers=max_workers)


def run_pipeline_finetuned_model(num_points=1, dataset_type="val", predictor=None, threads=16):
    """
    Run the full pipeline for processing datasets, evaluating results,
    compressing and downloading results, and cleaning up files.

    Args:
        num_points (int): Number of points for prediction.
        predictor (SAM2ImagePredictor): Preloaded predictor with fine-tuned weights.

    Returns:
        None
    """
    import time
    import os

    # Dataset names
    datasets = [
        "UD Ureter-Uterine Artery-Nerve Dataset",
        "m2caiSeg",
        "Endoscapes",
        "Dresden",
        "CholecSeg8k"
    ]

    base_dir = "/content/drive/MyDrive/CV/SAM2/Datasets"
    eval_results_dir = os.path.join(base_dir, "Eval_Results")

    # Error log
    error_log = []

    # Timing logs
    timings = {}

    # Step 1: Run main_with_finetuned_image_predictor for all datasets
    start_time = time.time()
    for dataset_name in datasets:
        try:
            print(f"Processing dataset: {dataset_name}")
            dataset_start_time = time.time()
            main_with_finetuned_image_predictor_parallel(
                predictor=predictor,
                dataset_name=dataset_name,
                dataset_type=dataset_type,
                num_points=num_points,
                max_workers=threads
            )
            timings[f"Processing {dataset_name}"] = time.time() - dataset_start_time
            print(f"Time taken for processing {dataset_name}: {timings[f'Processing {dataset_name}']:.2f} seconds")
        except Exception as e:
            error_message = f"Error during main_with_finetuned_image_predictor for dataset {dataset_name}: {str(e)}"
            print(error_message)
            error_log.append(error_message)
    timings["Step 1: All Datasets Processing"] = time.time() - start_time
    print(f"Total time for Step 1 (All Dataset Processing): {timings['Step 1: All Datasets Processing']:.2f} seconds")

    # Step 2: Evaluate results for all datasets
    start_time = time.time()
    for dataset_name in datasets:
        try:
            print(f"Evaluating dataset: {dataset_name}")
            dataset_start_time = time.time()
            results_df = evaluate_all_datasets(
                base_dir=base_dir,
                subset=dataset_type,
                datasets=[dataset_name]
            )
            timings[f"Evaluating {dataset_name}"] = time.time() - dataset_start_time
            print(f"Time taken for evaluating {dataset_name}: {timings[f'Evaluating {dataset_name}']:.2f} seconds")
            print(f"Evaluation complete for dataset: {dataset_name}")
        except Exception as e:
            error_message = f"Error during evaluation for dataset {dataset_name}: {str(e)}"
            print(error_message)
            error_log.append(error_message)
    timings["Step 2: Evaluation"] = time.time() - start_time
    print(f"Total time for Step 2 (Evaluation): {timings['Step 2: Evaluation']:.2f} seconds")

    # Step 3: Compress and download evaluation results
    start_time = time.time()
    try:
        print("Compressing and downloading evaluation results...")
        download_all_eval_results(eval_results_dir)
    except Exception as e:
        error_message = f"Error during compressing and downloading evaluation results: {str(e)}"
        print(error_message)
        error_log.append(error_message)
    timings["Step 3: Compress and Download Eval Results into csv"] = time.time() - start_time
    print(f"Total time for Step 3 (Compress and Download Eval Results into csv): {timings['Step 3: Compress and Download Eval Results into csv']:.2f} seconds")

    # Step 4: Compress predicted masks (optional if required)
    start_time = time.time()
    try:
        print("Compressing predicted masks...")
        compress_and_download_predicted_masks(base_dir, datasets, subset=dataset_type)
    except Exception as e:
        error_message = f"Error during compressing predicted masks: {str(e)}"
        print(error_message)
        error_log.append(error_message)
    timings["Step 4: Compress Predicted Masks"] = time.time() - start_time
    print(f"Total time for Step 4 (Compress Predicted Masks): {timings['Step 4: Compress Predicted Masks']:.2f} seconds")

    # Print the error log at the end
    if error_log:
        print("\n=== Error Log ===")
        for error in error_log:
            print(error)
    else:
        print("Pipeline execution complete without errors.")

    # Print timings
    print("\n=== Timing Log ===")
    for step, timing in timings.items():
        print(f"{step}: {timing:.2f} seconds")

## Loading model checkpoints and running pipeline

In [None]:
## Loading fine-tuned checkpoint weights onto SAM 2
checkpoint = "/content/drive/MyDrive/CV/SAM2/FineTuned_Checkpoints/100/Curated100_checkpoint_20.pt"
model_cfg = "configs/sam2.1/sam2.1_hiera_b+.yaml"

sam2_model = build_sam2(model_cfg, checkpoint, device=device)
predictor = SAM2ImagePredictor(sam2_model)

In [None]:
run_pipeline_finetuned_model(num_points=10, dataset_type="val", predictor=predictor, threads=32)

## Clearing masks and eval results zip folders

In [None]:
base_dir = "/content/drive/MyDrive/CV/SAM2/Datasets"
datasets = ["UD Ureter-Uterine Artery-Nerve Dataset", "Dresden", "Endoscapes", "m2caiSeg", "CholecSeg8k"]
eval_results_dir = "/content/drive/MyDrive/CV/SAM2/Datasets/Eval_Results"


# List of specific zip files to delete
zip_files_to_delete_masks = [
    "/content/drive/MyDrive/CV/SAM2/Datasets/CholecSeg8k_PredictedMasks.zip",
    "/content/drive/MyDrive/CV/SAM2/Datasets/Dresden_PredictedMasks.zip",
    "/content/drive/MyDrive/CV/SAM2/Datasets/Endoscapes_PredictedMasks.zip",
    "/content/drive/MyDrive/CV/SAM2/Datasets/UDUreter_PredictedMasks.zip",
    "/content/drive/MyDrive/CV/SAM2/Datasets/m2caiSeg_PredictedMasks.zip",
]

zip_files_to_delete_eval = [
    "/content/drive/MyDrive/CV/SAM2/Datasets/Eval_Results.zip"
]

print("Deleting predicted masks zip files...")
delete_zip_files(zip_files_to_delete_masks)

print("Deleting PredictedMasks folders...")
delete_predicted_masks(base_dir, datasets, subset="val")

print("Deleting Eval_Results folder...")
delete_eval_results(eval_results_dir)

# Delete the zip file
delete_zip_files(zip_files_to_delete_eval)

# MedSAM inference

https://github.com/bowang-lab/MedSAM

Finetuned model checkpoint: https://drive.google.com/drive/folders/1ETWmi4AiniJeWOt6HAsYgTjYv_fkgzoN

Download "medsam_vit_b" from above link "work_dir/MedSAM/medsam_vit_b" and Upload onto "content/medsam_vit_b.pth"

In [None]:
!git clone https://github.com/facebookresearch/segment-anything.git /content/segment-anything


Cloning into '/content/segment-anything'...
remote: Enumerating objects: 304, done.[K
remote: Counting objects: 100% (5/5), done.[K
remote: Compressing objects: 100% (4/4), done.[K
remote: Total 304 (delta 2), reused 1 (delta 1), pack-reused 299 (from 2)[K
Receiving objects: 100% (304/304), 18.31 MiB | 19.86 MiB/s, done.
Resolving deltas: 100% (159/159), done.


Download

In [None]:
from segment_anything import SamPredictor
from segment_anything import sam_model_registry

# Initialize MedSAM model
# Define both possible paths
path_1 = "/content/medsam_vit_b.pth"
path_2 = "/content/drive/MyDrive/CV/SAM2/FineTuned_Checkpoints/MedSAM2_pretrainedCheckpoint/medsam_vit_b"

# Check if path_1 exists, otherwise use path_2
MedSAM_CKPT_PATH = path_1 if os.path.exists(path_1) else path_2

print(f"Using checkpoint from: {MedSAM_CKPT_PATH}")

medsam_model = sam_model_registry['vit_b'](checkpoint=MedSAM_CKPT_PATH)
medsam_model = medsam_model.to(device)

# Create predictor
predictor = SamPredictor(medsam_model)

  state_dict = torch.load(f, map_location=torch.device('cpu'))


In [None]:
run_pipeline_finetuned_model(num_points=10, dataset_type="test", predictor=predictor, threads=32)

# Medical SAM 2

https://github.com/bowang-lab/MedSAM/tree/MedSAM2?tab=readme-ov-file#fine-tune-sam2-on-the-abdomen-ct-dataset