## Pipeline Overview
The **compute_AgNOR_score** function processes an image by first detecting cells using a detection model. These detected cells are then classified into AgNOR categories using a classification model. Finally, the function aggregates the classification results to generate a comprehensive AgNOR score for the entire image.

In [None]:
# Platform-specific environment setup and path configuration

import sys
from pathlib import Path


def pretty_print_title(title='', length=50):
    if len(title.strip()) > 0:
        length = length - len(title.strip()) - 2  # Calculate remaining space for padding
        print(f'{"*" * round(length / 2)} {title} {"*" * round(length / 2)}')
    else:
        print(f'{"*" * length}')


if sys.platform == 'darwin':
    base_path = f'{str(Path.home())}/Projects/computer-vision-thi/dataset/AgNOR_Project'
    !set VIRTUAL_ENV f"{str(Path.home())}}/Projects/computer-vision-thi/.venv"
    device = 'mps'
else:
    from google.colab import drive
    drive.mount('/content/gdrive')
    # !pip install 'pandas<2.0.0'
    # !pip install pillow
    # !pip install tqdm
    # !pip install albumentations
    # !pip install tensorflow
    # !pip install matplotlib
    # !pip install torchmetrics
    # !pip install imbalanced-learn
    # !pip install opencv-python
    import torch
    base_path = '/content/gdrive/MyDrive/AgNORs'
    device = 'cuda' if torch.cuda.is_available() else 'cpu'

path_to_images = f'{base_path}/'
path_to_annotations = f'{base_path}/annotation_frame.p'

print(f'Path to images: {path_to_images}')
print(f'Path to annotations: {path_to_annotations}')
print(f'Device is: {device}')


Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).
Path to images: /content/gdrive/MyDrive/AgNORs/
Path to annotations: /content/gdrive/MyDrive/AgNORs/annotation_frame.p
Device is: cuda


In [None]:
# GPU memory management and cleanup utilities
from torch import cuda


def get_less_used_gpu(gpus=None, debug=False):
    """Inspect cached/reserved and allocated memory on specified gpus and return the id of the less used device"""
    if gpus is None:
        warn = 'Falling back to default: all gpus'
        gpus = range(cuda.device_count())
    elif isinstance(gpus, str):
        gpus = [int(el) for el in gpus.split(',')]

    # check gpus arg VS available gpus
    sys_gpus = list(range(cuda.device_count()))
    if len(gpus) > len(sys_gpus):
        gpus = sys_gpus
        warn = f'WARNING: Specified {len(gpus)} gpus, but only {cuda.device_count()} available. Falling back to default: all gpus.\nIDs:\t{list(gpus)}'
    elif set(gpus).difference(sys_gpus):
        # take correctly specified and add as much bad specifications as unused system gpus
        available_gpus = set(gpus).intersection(sys_gpus)
        unavailable_gpus = set(gpus).difference(sys_gpus)
        unused_gpus = set(sys_gpus).difference(gpus)
        gpus = list(available_gpus) + list(unused_gpus)[:len(unavailable_gpus)]
        warn = f'GPU ids {unavailable_gpus} not available. Falling back to {len(gpus)} device(s).\nIDs:\t{list(gpus)}'

    cur_allocated_mem = {}
    cur_cached_mem = {}
    max_allocated_mem = {}
    max_cached_mem = {}
    for i in gpus:
        cur_allocated_mem[i] = cuda.memory_allocated(i)
        cur_cached_mem[i] = cuda.memory_reserved(i)
        max_allocated_mem[i] = cuda.max_memory_allocated(i)
        max_cached_mem[i] = cuda.max_memory_reserved(i)
    min_allocated = min(cur_allocated_mem, key=cur_allocated_mem.get)
    if debug:
        print(warn)
        print('Current allocated memory:', {f'cuda:{k}': v for k, v in cur_allocated_mem.items()})
        print('Current reserved memory:', {f'cuda:{k}': v for k, v in cur_cached_mem.items()})
        print('Maximum allocated memory:', {f'cuda:{k}': v for k, v in max_allocated_mem.items()})
        print('Maximum reserved memory:', {f'cuda:{k}': v for k, v in max_cached_mem.items()})
        print('Suggested GPU:', min_allocated)
    return min_allocated


def free_memory(to_delete: list, debug=False):
    import gc
    import inspect
    calling_namespace = inspect.currentframe().f_back
    if debug:
        print('Before:')
        get_less_used_gpu(debug=True)

    for _var in to_delete:
        calling_namespace.f_locals.pop(_var, None)
        gc.collect()
        cuda.empty_cache()
    if debug:
        print('After:')
        get_less_used_gpu(debug=True)

def wipe_memory(debug=True):
    get_less_used_gpu(debug=debug)
    free_memory(['cuda:0'], debug=debug)
    gc.collect()
    torch.cuda.empty_cache()
    # print(torch.cuda.set_per_process_memory_fraction(0.6)) release memory cashe

In [None]:
import os
import numpy as np
import pandas as pd
import torch
from PIL import Image
from torchvision.transforms import ToTensor
from torchvision.ops import boxes as box_ops
from torchvision.transforms import functional as F
import torchmetrics
import albumentations as A
import torchvision.transforms as transforms
import pickle
import random
import math
import torchvision
from math import ceil

# Function **process_image**

The **process_image** function runs object detection on an image by tiling it into overlapping crops, applying the detection model, and returning the detected coordinates and scores.

Parameters:
- image: Input image.

- crop_size: Size of the crops.

- overlap: Overlap between crops.

- model: Object detection model.

- detection_threshold: Threshold to filter low-confidence detections.

The function processes each crop, transforms the coordinates to the global image, applies non-maximum suppression to remove duplicates, and returns the valid detections. torch.no_grad() and set the model used in evaluation mode for efficiency.

In [None]:
import numpy as np
import torch
import torchvision
from torchvision.ops import nms

def process_image(image, crop_size, overlap, model, detection_threshold):
    """
    Runs the detection model on an image with overlapping crops.

    Parameters:
    - image: The image on which to run inference (NumPy array).
    - crop_size: The size of the crops (tuple of (height, width)).
    - overlap: Percentage or number of pixels the crops should overlap (float or int).
    - model: The object detection model.
    - detection_threshold: Threshold to apply to the detections to reject false positives (float).

    """
    # Ensure model is in evaluation mode
    model.eval()

    # Calculate stride based on overlap
    stride_y = int(crop_size * (1 - overlap)) if isinstance(overlap, float) else crop_size - overlap
    stride_x = int(crop_size * (1 - overlap)) if isinstance(overlap, float) else crop_size - overlap

    height, width = image.shape[:2]
    detections = []

    # Convert image to tensor # normalize the image according to previous
    image_tensor = (image - np.min(image)) / (np.max(image) - np.min(image))
    image_tensor = torch.from_numpy(image_tensor).float().permute(2, 0, 1)


    with torch.no_grad():
        # Loop over the image with overlapping crops
        for y in range(0, height, stride_y):
            for x in range(0, width, stride_x):
                # Calculate crop boundaries
                y1 = y
                y2 = min(y + crop_size, height)
                x1 = x
                x2 = min(x + crop_size, width)

                crop = image_tensor[:, y1:y2, x1:x2]
                crop = crop.unsqueeze(0)  # Add batch dimension

                # Run detection model on the crop
                outputs = model(crop)[0]

                # Filter out detections below the threshold
                scores = outputs['scores']
                boxes = outputs['boxes']
                valid_indices = scores > detection_threshold
                scores = scores[valid_indices]
                boxes = boxes[valid_indices]

                # Transform coordinates to the global image coordinate system
                for box, score in zip(boxes, scores):
                    x1_global = box[0] + x1
                    y1_global = box[1] + y1
                    x2_global = box[2] + x1
                    y2_global = box[3] + y1
                    detections.append((x1_global, y1_global, x2_global, y2_global, score.item()))

    if not detections:
        return []

    # Convert detections to tensor for NMS
    boxes = torch.tensor([det[:4] for det in detections])
    scores = torch.tensor([det[4] for det in detections])

    # Apply non-maximal suppression
    nms_indices = nms(boxes, scores, iou_threshold=0.5)

    final_detections = [detections[idx] for idx in nms_indices]

    return final_detections

#  Implement a function "process_cells" to classify cells based on provided coordinates

**process_cells** function classifies cells from an image using the given coordinates.

Inputs:

- Image containing the cells

- Detected cell coordinates

- Trained classification model

- Crop size (matching the model's training input size)

The function:

1. Extracts and resizes each cell

2. Classifies cells using the trained model

3. Saves predictions for each cell

4. Calculates the final AgNOR score

Returns:

- Labels of the cells

- Final AgNOR score

In [None]:

def process_cells(image, coords, model, crop_size):
    """
    Classifies the cells from the given coordinates using the classification model.

    Parameters:
    - image: The image from which to load the cells (NumPy array).
    - coords: Coordinates of the cells which were found with the detection algorithm.
    - model: The trained classification model.
    - crop_size: A size to resize the crops to (tuple of (height, width)).

    Returns:
    - labels: List of labels for the respective cells.
    - agnor_score: The aggregated AgNOR score based on the classifications.
    Normalization added.
    """
    model.eval()
    model.to(device)
    transform = transforms.Compose([
        transforms.Resize(crop_size),
        transforms.ToTensor(),
    ])

    labels = []


    with torch.no_grad():
        for coord in coords:
            x1, y1, x2, y2, _ = coord
            cell_image = image[int(y1):int(y2), int(x1):int(x2)]
            cell_image = Image.fromarray(cell_image)
            cell_image = transform(cell_image).unsqueeze(0)
            cell_image = cell_image.to(device)

            output = model(cell_image)
            _, predicted = torch.max(output, 1)
            label = predicted.item()
            labels.append(label)

    return labels, np.mean(labels)

# Refactor both functions into a single function called **compute_AgNOR_score**

Function receive the image as a parameter and also require all parameters to execute the subfunctions. In the end, this function return the overall AgNOR score of the image.

In [None]:
def compute_AgNOR_score(image, crop_size, overlap,resize, detection_model, detection_threshold, classification_model):
    data = process_image(image, crop_size, overlap, detection_model, detection_threshold)

    wipe_memory(False)

    labels, AgNOR_score = process_cells(image, data, classification_model, resize)
    return labels, AgNOR_score

# Testing of pipeline

5 images were taken and processed through the pipeline. The error between the predicted AgNOR score and the AgNOR score, as defined by the labels in the annotation file, was then calculated. To obtain this label, the mean of the labels for the respective image was computed.

In [None]:
import gc

wipe_memory(False)

IMAGES_COUNT = 5

annotation = pickle.load(open(path_to_annotations, 'rb'))
print(annotation.head())

annotation_file_names = annotation.filename.unique().tolist()

test_images = random.sample(annotation_file_names, IMAGES_COUNT)


print("Device", device)
detection_model = torch.load("/content/gdrive/MyDrive/models/detection_model.pth", map_location=device)
classification_model = torch.load("/content/gdrive/MyDrive/models/classification_model.pth", map_location=device)


detection_model.eval()
classification_model.eval()
detection_model.to(device)
classification_model.to(device)

crop_size = 250
overlap = 0.2
detection_threshold = 0.5
resize = 100


from PIL import Image

def load_image(image_name):
        img = Image.open(os.path.join(path_to_images, image_name)).convert('RGB')
        image = np.array(img)
        return image

for i, image_name in enumerate(test_images):

    image = load_image(image_name)
    # Run the image through the pipeline
    labels, score =  compute_AgNOR_score(image, crop_size, overlap, resize, detection_model, detection_threshold, classification_model)
    actual_score = annotation[annotation['filename'] == image_name]['label'].mean()
    # Calculate the error
    error = abs(score - actual_score)

    print(f"Image {i+1} ({image_name}): Predicted Score = {score}, Actual Score = {actual_score}, Error = {error}")

          filename  max_x  max_y  min_x  min_y  label
0  AgNOR_0495.tiff     26     41      4     15      1
1  AgNOR_0495.tiff     71     23     42      0      2
2  AgNOR_0495.tiff    133     61    104     37      1
3  AgNOR_0495.tiff    143    117    121     88      2
4  AgNOR_0495.tiff    224     37    199     12      1
Device cuda
Image 1 (AgNOR_2999.tiff): Predicted Score = 1.4814814814814814, Actual Score = 1.337573385518591, Error = 0.14390809596289045
Image 2 (AgNOR_0495.tiff): Predicted Score = 1.736842105263158, Actual Score = 1.3578643578643579, Error = 0.3789777473988001
Image 3 (AgNOR_2862.tiff): Predicted Score = 1.7246376811594204, Actual Score = 1.3300536672629697, Error = 0.3945840138964507
Image 4 (AgNOR_0677.tiff): Predicted Score = 1.7272727272727273, Actual Score = 2.112208892025406, Error = 0.38493616475267856
Image 5 (AgNOR_0531.tiff): Predicted Score = 1.5714285714285714, Actual Score = 1.0815286624203821, Error = 0.48989990900818925
