# Preparation for Milestone Four

Today, we will begin preparing for the final milestone. Here, we will assemble all the pieces of the pipeline you've created. You will need to write a function **compute_AgNOR_score**. This function first utilizes the detection model to locate cells within a given image and then feeds those cells into a classification model to classify them into one of the AgNOR classes. Finally, you will aggregate all predictions into a final AgNOR score for the entire image.

# 1. Write a function "process_image" which receives an image and runs the detection model on it.

The function should have the following parameters:

1. image: The image on which you want to run inference.
2. crop_size: The size of the crops you want to load from the imagea
3. overlap: Percentage or number of pixels the crops should overlap.
4. model: The object detection model. This function should generally be able to run with any detection model.
5. detection_threshold: A threshold to apply to the detections to reject false positives.

The function will have to tile the image into **overlapping crops** and then feed each crop to the model. After that, all detections have to be transformed to the global coordinate system of the image since the detections are within the coordinate system of the image crop. Subsequently, [non-maximal suppression](https://pytorch.org/vision/stable/generated/torchvision.ops.nms.html) needs to be applied to the detections in order to reject overlapping detections. In the end, the function will return the coordinates and scores of the detected cells that exceed the given threshold. Use **torch_no_grad** to save computation time and also ensure your **model is in evaluation mode** before feeding the cells to it.

In [3]:
import os
from PIL import Image
import numpy as np
import pandas as pd
import torchvision
import torch
import albumentations as A
from albumentations.pytorch import ToTensorV2
from torchvision.models.detection.anchor_utils import AnchorGenerator, RetinaNet
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models import MobileNet_V2_Weights
from torchvision.ops import nms
from torch.utils.data import Dataset, DataLoader, random_split
import torch.optim as optim
import torchmetrics
device = torch.device('mps')

ImportError: cannot import name 'RetinaNet' from 'torchvision.models.detection.anchor_utils' (/Users/alexandervaptsarov/Library/jupyterlab-desktop/envs/cv/lib/python3.12/site-packages/torchvision/models/detection/anchor_utils.py)

In [None]:
def make_retinanet(path_to_weights:str):
    backbone = torchvision.models.mobilenet_v2(weights=MobileNet_V2_Weights.DEFAULT).features
    backbone.out_channels = 1280
    anchor_generator = AnchorGenerator(
     sizes=((32, 64, 128, 256, 512),),
     aspect_ratios=((0.5, 1.0, 2.0),)
)
    model = RetinaNet(backbone,
                  num_classes=2,
                  anchor_generator=anchor_generator)
    
    model.load_state_dict(torch.load(path_to_weights, map_location=device))
    model.eval()
    return model

In [None]:
path = 'retinanet/best_model_0.743_map.pth'
detection_model = make_retinanet(path)

In [1]:
detection_model

NameError: name 'detection_model' is not defined

In [None]:
def process_image(image, crop_size, overlap, model, detection_threshold):
    image = torchvision.transforms.functional.to_tensor(image).to(device)

    stride_height = crop_size[0] - overlap[0]
    stride_width = crop_size[1] - overlap[1]

    crops = []
    for i in range(0, image.shape[1] - crop_size[0] + 1, stride_height):
        for j in range(0, image.shape[2] - crop_size[1] + 1, stride_width):
            crops.append(image[:, i:i + crop_size[0], j:j + crop_size[1]])

    # Convert list to tensor
    crops = torch.stack(crops)

    with torch.no_grad():
        outputs = model(crops)

    all_boxes = []
    all_scores = []
    for k, output in enumerate(outputs):
        boxes = output['boxes']
        scores = output['scores']

        mask = scores > detection_threshold
        boxes, scores = boxes[mask], scores[mask]

        crop_row, crop_col = k // ((image.shape[2] - crop_size[0]) // stride_height + 1), k % ((image.shape[2] - crop_size[1]) // stride_width + 1)
        boxes[:, [0, 2]] += crop_col * stride_height
        boxes[:, [1, 3]] += crop_row * stride_width

        all_boxes.append(boxes)
        all_scores.append(scores)

    all_boxes = torch.cat(all_boxes, dim=0)
    all_scores = torch.cat(all_scores, dim=0)

    # non-maximal suppression
    keep = nms(all_boxes, all_scores, 0.1)  # Adjust the NMS threshold as needed

    final_boxes = all_boxes[keep]
    final_scores = all_scores[keep]

    high_conf_mask = final_scores > detection_threshold
    final_boxes = final_boxes[high_conf_mask]
    final_scores = final_scores[high_conf_mask]

    return final_boxes.cpu().numpy(), final_scores.cpu().numpy()

# img = Image.open('path_to_your_image.jpg')
# model.eval()
# boxes, scores = process_image(img, crop_size=512, overlap=100, model=model, detection_threshold=0.5)

# 2. Write a function "process_cells" which classifies the cells from the coordinates that were given to the model.

The function should have the following parameters:

1. image: The image from which to load the cells.
2. coords: Coordinates of the cells which you found with the detection algorithm.
3. model: The trained classification model.
4. crop_size: A size to resize the crops to. It should be equal to the size with which you trained the classification network.

The function should load each cell from the respective image and feed them to the classification model. Save the prediction and, in the end, aggregate the classifications of all cells into a final AgNOR score. The function should return the labels of the respective cells as well as the final AgNOR score.

In [2]:
import torch
from PIL import Image

def process_cells(image, coords, model, crop_size):
    model.eval()
    # store pred
    cell_labels = []
    
    with torch.no_grad(): 
        for box in coords:
            box = box.tolist()
            cell_image = image.crop((box[0], box[1], box[2], box[3]))
            
            cell_image = cell_image.resize(crop_size)
            cell_image = torch.tensor(np.array(cell_image)).permute(2, 0, 1).float() / 255.0
            cell_image = cell_image.unsqueeze(0) 

            # classification model
            output = model(cell_image)
            predicted_label = output.argmax(1).item() 
            cell_labels.append(predicted_label)

    # final AgNOR score

    if cell_labels:
        final_agnor_score = sum(cell_labels) / len(cell_labels)
    else:
        final_agnor_score = 0
    
    return cell_labels, final_agnor_score


# img = Image.open('path_to_image.jpg').convert("RGB")
# detected_coords = torch.tensor([[10, 10, 50, 50], [60, 60, 100, 100]])  # Example coordinates
# classification_model = your_trained_classification_model
# labels, agnor_score = process_cells(img, detected_coords, classification_model, (128, 128))

# 3. Combine both functions into the function **compute_AgNOR_score**.

This function should receive the image as a parameter and also require all parameters to execute the subfunctions. In the end, this function should return the overall AgNOR score of the image.

In [3]:
def compute_AgNOR_score(image, detection_model, classification_model, crop_size_detection, crop_size_classification, overlap, detection_threshold):
    detected_boxes, _ = process_image(image, crop_size_detection, overlap, detection_model, detection_threshold)

    if len(detected_boxes) > 0:
        cell_labels, agnor_score = process_cells(image, detected_boxes, classification_model, crop_size_classification)
    else:
        cell_labels = []
        agnor_score = 0  # No cells detected, hence no score

    return agnor_score

# detection_model = your_pretrained_detection_model
# classification_model = your_pretrained_classification_model
# image_path = 'path_to_your_image.jpg'
# img = Image.open(image_path).convert("RGB")
# agnor_score = compute_AgNOR_score(img, detection_model, classification_model, (256, 256), (128, 128), (50, 50), 0.5)
# print(f"The AgNOR score of the image is {agnor_score}")


In [4]:
import torch
import torch.nn as nn
from torchvision.models import efficientnet_b0

def make_imagenet(weights_path):

    weights = 'IMAGENET1K_V1'
    model = efficientnet_b0(weights=weights)

    model.classifier[1] = nn.Linear(1280, 12)
    #{
    for name, param in model.named_parameters():
        if 'classifier' not in name:
            param.requires_grad = False
    

    for name, param in model.classifier.named_parameters():
        param.requires_grad = True
 
    model.load_state_dict(torch.load(weights_path, map_location=device), strict=False)
    
    return model


classification_model = make_imagenet('best_model.pth')


# 4. Test your pipeline.

Take several images (approximately 5) and run them through your pipeline. Then, calculate the error between the predicted AgNOR score and the AgNOR score defined by the labels of the cells in the annotation file. To obtain this label, simply calculate the mean of the labels of the respective image.

In [8]:
import pandas as pd
import numpy as np
from PIL import Image
from sklearn.metrics import mean_squared_error

def test_pipeline(annotation_file, img_dir, detection_model, classification_model, crop_size_detection, crop_size_classification, overlap, detection_threshold):

    annotations = pd.read_csv(annotation_file)
    unique_images = annotations['filename'].unique()

    predicted_scores = []
    actual_scores = []

    for image_name in unique_images:

        image_path = os.path.join(img_dir, image_name)
        if os.path.exists(image_path) and os.path.getsize(image_path) > 0:
            image = Image.open(image_path).convert("RGB")
            
            # predict AgNOR score 
            predicted_score = compute_AgNOR_score(image, detection_model, classification_model, crop_size_detection, crop_size_classification, overlap, detection_threshold)
            predicted_scores.append(predicted_score)
            
            # get actual score
            labels = annotations[annotations['filename'] == image_name]['label']
            actual_score = labels.mean()
            actual_scores.append(actual_score)
        else:
            continue
    # mean squared error
    mse = mean_squared_error(actual_scores, predicted_scores)
    return mse, actual_scores, predicted_scores


mse, actual_scores, predicted_scores = test_pipeline(
    'annotation_frame.csv',
    'Dataset/',
    detection_model,
    classification_model,
    (256, 256),
    (128, 128),
    (50, 50),
    0.5
)
print(f"Mean Squared Error: {mse}")
print("Actual Scores:", actual_scores)
print("Predicted Scores:", predicted_scores)


Mean Squared Error: 13.975855341858315
Actual Scores: [1.3578643578643579, 1.0689149560117301, 4.3623529411764705, 1.6595492289442468, 1.4829268292682927, 1.08364312267658, 1.051643192488263, 0.9709465791940018, 1.4591439688715953, 1.0815286624203821, 1.325179856115108, 1.3300536672629697, 1.0875912408759123, 1.1523972602739727, 0.6959677419354838, 1.567956795679568, 1.2726008344923505, 1.1495327102803738, 1.8744460856720828, 1.4244482173174873, 1.2068095838587642, 3.779874213836478, 2.112208892025406]
Predicted Scores: [5.337209302325581, 4.989247311827957, 5.405063291139241, 5.08421052631579, 4.542124542124542, 6.2727272727272725, 5.553571428571429, 5.462837837837838, 4.362745098039215, 4.678571428571429, 5.537383177570093, 5.775462962962963, 6.487012987012987, 4.340425531914893, 5.772334293948127, 4.512820512820513, 5.8768115942028984, 5.037037037037037, 5.567164179104478, 3.5128205128205128, 4.644628099173554, 4.378378378378378, 3.367816091954023]
