# Clustering Experiment

We have put all of our methods together for direct comparison in a single notebook.

Problem: Currently we are determining only the indiviudal digits, but we need to recognize these as coherent numbers and be able to assign entries to numbers.


### Register Images to Start

To start, we need to register images using the `utilities/conversion/apply_homography_to_labels.ipynb` notebook. This should be run before running this notebook. This notebook is built on the assumption that the `data/registered_images` directory has been created and populated. Additionally it assumes that the `data/yolo_data.json` file is created. Both of these are created in the referenced notebook.


#### Install Packages

These are the necessary packages to run the functions and scripts below.


In [1]:
# Standard libraries
import os
import re
import json
import random
from pathlib import Path
from typing import List, Tuple, Dict, Literal

# Third-party libraries
import cv2
import numpy as np
from PIL import Image, ImageDraw
from sklearn.cluster import KMeans, DBSCAN, AgglomerativeClustering
from sklearn.metrics import silhouette_score
from scipy.stats import gaussian_kde
from itertools import compress

# Local libraries
from utils.annotations import BoundingBox

#### Start By Loading YOLO Data


To start I want to bring in the YOLO formatted data for each sheet and I can additionally load the respective images. As mentioned above you must have ran the `utilities/conversion/apply_homography_to_labels.ipynb` notebook to generate this YOLO data.


In [2]:
# Load yolo_data.json
PATH_TO_YOLO_DATA = "../../data/yolo_data.json"
PATH_TO_REGISTERED_IMAGES = "../../data/registered_images"
UNIFIED_IMAGE_PATH = (
    "../../data/unified_intraoperative_preoperative_flowsheet_v1_1_front.png"
)

# Load yolo_data.json
with open(PATH_TO_YOLO_DATA) as json_file:
    yolo_data = json.load(json_file)

# See how many intraoperative images are registered
print(f"Found {len(yolo_data)} sheets in yolo_data.json")

# Load the json for bp and hr cluster locations
PATH_TO_CLUSTER_LOCATIONS = "../../data/bp_and_hr_cluster_locations.json"
with open(PATH_TO_CLUSTER_LOCATIONS) as json_file:
    bp_hr_cluster_locations = json.load(json_file)
    print(
        f"Found {len(bp_hr_cluster_locations)} items in bp_and_hr_cluster_locations.json"
    )

Found 19 sheets in yolo_data.json
Found 19 items in bp_and_hr_cluster_locations.json


#### Define Constants Used In Notebook


In [3]:
DESIRED_IMAGE_WIDTH = 800
DESIRED_IMAGE_HEIGHT = 600

Now let's select relevant bounding boxes from the blood pressure and HR zone.

Start by defining functions to convert YOLO bounding box format to pixels (to see if the bounding box is within region of interest). Then create a function that allows you to select ROI and returns a list of bounding boxes within this ROI.


In [4]:
def find_density_max(values: List[int], search_area: int) -> int:
    """
    Given a list of values and a search area, find the index of where the highest density is.
    The list of values correspond to identifying points for the bounding boxes and the search area corresponds to the images height or width.

    Args:
        values: List of identifying points for the bounding boxes
        search_area: height/width of the image dependent on whether x or y axis is being search.

    Returns:
        The axis value that has the highest density of bounding boxes.
    """
    kde = gaussian_kde(values, bw_method=0.2)

    x_values = np.linspace(0, search_area, 10000)

    kde_vals = kde(x_values)

    max_index = np.argmax(kde_vals)
    return x_values[max_index]


def remove_bb_outliers(boxes: List[BoundingBox]) -> List[BoundingBox]:
    """
    Given a list of bounding boxes, remove the outliers from the x axis, then remove the outliers from the y axis

    Args:
        boxes: List of Bounding Boxes to filter

    Returns:
        Filtered list of Bounding Boxes
    """
    x_vals = [bb.left for bb in boxes]
    # find the 25th percentile
    x_Q1 = np.percentile(x_vals, 25)
    # find the 75th percentile
    x_Q3 = np.percentile(x_vals, 75)
    # find the IQR
    x_IQR = x_Q3 - x_Q1
    # determine lower and upper bounds
    x_lower = x_Q1 - 1.5 * x_IQR
    x_upper = x_Q3 + 1.5 * x_IQR
    # remove outliers via the x axis
    x_filtered = [bb for bb in boxes if x_lower <= bb.left <= x_upper]

    y_vals = [bb.top for bb in x_filtered]
    # find the 25th percentile
    y_Q1 = np.percentile(y_vals, 25)
    # find the 75th percentile
    y_Q3 = np.percentile(y_vals, 75)
    # find the IQR
    y_IQR = y_Q3 - y_Q1
    # determine the lower and upper bounds
    y_lower = y_Q1 - 1.5 * y_IQR
    y_upper = y_Q3 + 1.5 * y_IQR
    # remove outliers via the y axis
    filtered = [bb for bb in x_filtered if y_lower <= bb.top <= y_upper]

    return x_filtered


def select_relevant_bounding_boxes(
    sheet_data: List[str],
    path_to_image: Path,
    show_images: bool = False,
    desired_img_width: int = DESIRED_IMAGE_WIDTH,
    desired_img_height: int = DESIRED_IMAGE_HEIGHT,
) -> Tuple[List[str], List[str]]:
    """
    Given sheet data for bounding boxes in YOLO format, find the bounding boxes corresponding to the number and time on the BP chart.
    Return the bounding boxes that are within the selected region split into two lists: time labels and numerical values.

    Args:
        sheet_data: List of bounding boxes in YOLO format.
        path_to_image: Path to the image file.

    Returns:
        Tuple of Lists of string representations of bounding boxes that are within the selected region, in YOLO format.
        The first list contains bounding boxes in the top-right region -- representing time labels.
        The second list contains bounding boxes in the bottom-left region -- representing numerical values for mmHg and bpm.
            (bounding_boxes_time, bounding_boxes_numbers)
    """

    # Load the image
    image = cv2.imread(path_to_image)

    # Display the image and allow the user to select a ROI
    resized_image = cv2.resize(image, (desired_img_width, desired_img_height))

    # convert the YOLO data to Bounding Boxes
    bboxes: List[BoundingBox] = [
        BoundingBox.from_yolo(yolo_bb, desired_img_width, desired_img_height)
        for yolo_bb in sheet_data
    ]

    # generate a list of the digit categories
    digit_categories: List[str] = [str(i) for i in range(10)]

    # filter out non bounding boxes and those whose category is not a digit
    bboxes: List[BoundingBox] = list(
        filter(
            lambda bb: isinstance(bb, BoundingBox) and bb.category in digit_categories,
            bboxes,
        )
    )

    # find the point with the maximum density of bounding boxes
    bboxes_right: List[int] = [bb.right for bb in bboxes]
    # x_loc is the vertical line to the left of the time axis and right of the numbers axis
    x_loc: int = find_density_max(bboxes_right, desired_img_width)

    bboxes_bottom: List[int] = [bb.bottom for bb in bboxes]
    # y_loc is the horizontal line undert the time axis and above the number axis
    y_loc: int = find_density_max(bboxes_bottom, desired_img_height)

    bounding_boxes_time = []
    bounding_boxes_numbers = []

    # Process the bounding boxes
    for bounding_box in bboxes:
        # get the center point of the bounding box for comparison
        x_center_bb, y_center_bb = bounding_box.center

        # check if the bounding box is a number on the BP chart by comparing to the KDE index + a threshold
        if x_center_bb > x_loc - 15 and x_center_bb < x_loc + 2:
            bounding_boxes_numbers.append(bounding_box)
        # check if the bounding box is a time on the BP chart by comparing to the KDE index + a threshold
        elif y_center_bb > y_loc - 10 and y_center_bb < y_loc + 2:
            bounding_boxes_time.append(bounding_box)

    bounding_boxes_numbers = remove_bb_outliers(bounding_boxes_numbers)
    bounding_boxes_time = remove_bb_outliers(bounding_boxes_time)

    for bounding_box in bounding_boxes_numbers:
        x_min = int(bounding_box.left)
        x_max = int(bounding_box.right)
        y_min = int(bounding_box.top)
        y_max = int(bounding_box.bottom)

        # Bounding box is in the top-right region
        cv2.rectangle(resized_image, (x_min, y_min), (x_max, y_max), (255, 255, 0), 1)

    for bounding_box in bounding_boxes_time:
        x_min = int(bounding_box.left)
        x_max = int(bounding_box.right)
        y_min = int(bounding_box.top)
        y_max = int(bounding_box.bottom)

        cv2.rectangle(resized_image, (x_min, y_min), (x_max, y_max), (255, 0, 255), 1)

    # plot the lines of the KDE index found for debugging
    # numbers_start = (int(x_loc), 0)
    # numbers_end = (int(x_loc), desired_img_height)

    # time_start = (0, int(y_loc))
    # time_end = (desired_img_width, int(y_loc))

    # cv2.line(resized_image, numbers_start, numbers_end, (255,255,0), 1)
    # cv2.line(resized_image, time_start, time_end, (255,0,255), 1)

    # Close all OpenCV windows, always do this or it will annoyingly not go away
    # You can also manually quit out with ESC key.
    cv2.destroyAllWindows()

    # If we are showing the images, display the image with the selected region and bounding boxes
    # Bounding boxes in the top-right region (time) are in one color while those in the bottom left (numerical) are in another
    if show_images:
        # Display the image with the selected region and bounding boxes
        resized_image = cv2.cvtColor(resized_image, cv2.COLOR_BGR2RGB)
        resized_image = Image.fromarray(resized_image)
        resized_image.show()

    # Return a tuple of bounding boxes in the top-right and bottom-left regions
    return (bounding_boxes_time, bounding_boxes_numbers)

Create functions for K-means clustering, dbscan clustering, and agglomerative clustering


In [5]:
def cluster_kmeans(
    bounding_boxes: List[BoundingBox], possible_nclusters: List[int]
) -> List[int]:
    """
    Cluster bounding boxes using K-Means clustering algorithm.

    Args:
        bounding_boxes: List of bounding boxes in YOLO format.
        possible_nclusters: List of possible number of clusters to try.

    Returns:
        List of cluster labels.
    """
    # Convert to a NumPy array (using only x_center and y_center)
    data = np.array([box.center for box in bounding_boxes])

    cluster_performance_map = {}
    for number_of_clusters in possible_nclusters:
        if number_of_clusters > len(data):
            raise (
                f"Number of clusters {number_of_clusters} is greater than number of bounding boxes {len(data)}."
            )
        if number_of_clusters < 1:
            raise (f"Number of clusters {number_of_clusters} must be greater than 0.")
        # Apply K-Means
        kmeans = KMeans(
            n_clusters=number_of_clusters,
            init="k-means++",
            n_init=20,
            max_iter=500,
            tol=1e-8,
            random_state=42,
        )
        kmeans.fit(data)

        # Get cluster labels
        labels = kmeans.predict(data)
        silhouette_avg = silhouette_score(data, labels)

        # print(
        #     f"Number of clusters: {number_of_clusters}, Silhouette score: {silhouette_avg}"
        # )

        cluster_performance_map[number_of_clusters] = {
            "score": silhouette_avg,
            "labels": labels,
        }

    # Evaluate the performance of each number of clusters and select the one with the highest silhouette score
    # if it is 0.003 greater than what should be the number of clusters otherwise go with proper_nclusters
    n_clusters_max_silhouette = max(
        cluster_performance_map, key=lambda x: cluster_performance_map[x]["score"]
    )
    best_n_clusters = (
        n_clusters_max_silhouette
        if (
            (
                cluster_performance_map[n_clusters_max_silhouette]["score"]
                - cluster_performance_map[max(possible_nclusters)]["score"]
            )
            >= 0.005
        )
        else max(possible_nclusters)
    )
    return cluster_performance_map[best_n_clusters]["labels"]


def dbscan_clustering(
    bounding_boxes: List[BoundingBox], defined_eps: float, min_samples: int
) -> List[int]:
    """
    Cluster bounding boxes density based spatial clustering algorithm.

    Args:
        bounding_boxes: List of bounding boxes.
        defined_eps: Maximum distance between two samples to be in the neighborhood of one another (center of BB).
        min_samples: The number of samples (or total weight) for a point to be considered as core

    Returns:
        List of cluster labels.
    """
    # Convert to a NumPy array (using only x_center and y_center)
    data = np.array([box.center for box in bounding_boxes])

    # DBSCAN
    scan = DBSCAN(eps=defined_eps, min_samples=min_samples)
    labels = scan.fit_predict(data)

    return labels


def agglomerative_clustering(
    bounding_boxes: List[BoundingBox], possible_nclusters: List[int]
) -> List[int]:
    # make the bonding box data into a Numpy array
    data = np.array([box.center for box in bounding_boxes])

    # follow suit of the cluster_kmeans algorithm to measure accuracy through silhoutte scores
    cluster_performance_map = {}
    for number_of_clusters in possible_nclusters:
        if number_of_clusters > len(data):
            raise (
                f"Number of clusters {number_of_clusters} is greater than number of bounding boxes {len(data)}."
            )
        if number_of_clusters < 1:
            raise (f"Number of clusters {number_of_clusters} must be greater than 0.")
        # use agglomerative clustering
        agg = AgglomerativeClustering(n_clusters=number_of_clusters, linkage="single")
        # get labels
        labels = agg.fit_predict(data)
        # compute the silhoutte scores
        silhouette_avg = silhouette_score(data, labels)

        cluster_performance_map[number_of_clusters] = {
            "score": silhouette_avg,
            "labels": labels,
        }

    # get the number of clusters with the best silhoutte score
    n_clusters_max_silhouette = max(
        cluster_performance_map, key=lambda x: cluster_performance_map[x]["score"]
    )

    best_n_clusters = (
        n_clusters_max_silhouette
        if (
            (
                cluster_performance_map[n_clusters_max_silhouette]["score"]
                - cluster_performance_map[max(possible_nclusters)]["score"]
            )
            >= 0.003
        )
        else max(possible_nclusters)
    )
    return cluster_performance_map[best_n_clusters]["labels"]


Code block to get the average X and Y of the expected clusters. We get the average of the location of each label across sheets.


In [6]:
cluster_locations_dict = {}  # Dictionary containing the cluster_name as the key to another dictionary with 'x' and 'y' as keys for lists of x and y coordinates
for sheet_num, data in enumerate(yolo_data.items()):
    # For each sheet, get the cluster center X and Y coordinates and add them to the cluster_locations_dict dictionary
    expected_clusters = bp_hr_cluster_locations[sheet_num]
    for cluster in expected_clusters["annotations"][0]["result"]:
        x_expected_perc, y_expected_perc = (
            cluster["value"]["x"],
            cluster["value"]["y"],
        )  # Get the expected cluster location (percent x and y in the original image space)
        x_expected, y_expected = (
            (x_expected_perc / 100) * DESIRED_IMAGE_WIDTH,
            (y_expected_perc / 100) * DESIRED_IMAGE_HEIGHT,
        )  # Convert the expected cluster location to pixel space
        cluster_name = cluster["value"]["rectanglelabels"][0]
        if cluster_name not in cluster_locations_dict:
            cluster_locations_dict[cluster_name] = {"x": [], "y": []}
        cluster_locations_dict[cluster_name]["x"].append(x_expected)
        cluster_locations_dict[cluster_name]["y"].append(y_expected)

# Average the cluster locations
for cluster_name, cluster_data in cluster_locations_dict.items():
    cluster_locations_dict[cluster_name]["x"] = float(np.mean(cluster_data["x"]))
    cluster_locations_dict[cluster_name]["y"] = float(np.mean(cluster_data["y"]))

In [18]:
def get_cluster_bbs(
    labels: List[str], bounding_boxes: List[BoundingBox]
) -> Dict[str, BoundingBox]:
    """
    Create a dictionary with cluster labels as keys and a BoundingBox for the cluster.

    Args:
        labels: List of cluster labels.
        bounding_boxes: List of bounding boxes.

    Returns:
        Dictionary with cluster labels as keys and a bounding box value as values.
    """
    # Create a dictionary to store labelled elements
    label_dict = {}

    # Iterate over both lists for labels and the bounding boxes found
    for label, box in zip(labels, bounding_boxes):
        label = int(label)
        if label not in label_dict:
            # Create a new list for this label if it doesn't exist
            label_dict[label] = []
        # Append the element to the corresponding label list
        label_dict[label].append(box)

    # Create dictionary that will hold the cluster label and bounding box
    cluster_dict = {}
    # Iterate over the label_dict to find the overall coordinates for the cluster
    for key in label_dict:
        # calculate the coordinates of the cluster bounding box
        x_left = min([bb.left for bb in label_dict[key]])
        x_right = max([bb.right for bb in label_dict[key]])
        y_top = min([bb.top for bb in label_dict[key]])
        y_bottom = max([bb.bottom for bb in label_dict[key]])
        # get the category based off of digit detections
        sorted_boxes = sorted(label_dict[key], key=lambda x: float(x.left))
        sorted_categories = [bb.category for bb in sorted_boxes]
        # Turn list of strings into a string
        cluster_category = f"{''.join(sorted_categories)}"
        # store the bounding box into the dictionary
        cluster_dict[key] = BoundingBox(
            category=cluster_category, left=x_left, right=x_right, top=y_top, bottom=y_bottom
        )
    return cluster_dict


Function to create a result dictionary that we can save as a JSON file to analyze performance.


In [19]:
def create_result_dictionary(
    labels: List[str],
    bounding_boxes: List[BoundingBox],
    cluster_bbs: Dict[str, BoundingBox],
    unit: Literal["mmhg", "mins"],
) -> Dict[int, int]:
    """
    Create a dictionary with cluster labels as keys and cluster bounding boxes as values.

    Args:
        labels: List of cluster labels.
        bounding_boxes: List of bounding boxes.
        cluster_bbs: Dictionary with cluster labels as keys and a bounding box value as values.
        unit: Suffix to add to the cluster label.

    Returns:
        Dictionary with cluster labels as keys and cluster bounding box values as value.
    """
    # Create a dictionary to store labelled elements
    label_dict = {}

    # Iterate over both lists for labels and the bounding boxes found
    for label, box in zip(labels, bounding_boxes):
        label = int(label)
        if label not in label_dict:
            # Create a new list for this label if it doesn't exist
            label_dict[label] = []
        # Append the element to the corresponding label list
        label_dict[label].append(box)

    # Create dictionary that will hold the cluster label and bounding box
    results = []

    # So now we have a dictionary with the clusters as keys and a list of bounding box objects as strings as values

    # Sort the lists in the dictionary by x_center
    for key in label_dict:
        label_dict[key] = sorted(label_dict[key], key=lambda x: float(x.left))
        label_dict[key] = [element.category for element in label_dict[key]]
        # Turn list of strings into a string
        label_dict[key] = f"{''.join(label_dict[key])}_{unit}"
        # Get the bounding box for the cluster
        cluster_bb = cluster_bbs[key]
        # Add the cluster label and bounding box to the result dictionary
        results.append((label_dict[key], cluster_bb.to_yolo(
            DESIRED_IMAGE_WIDTH, DESIRED_IMAGE_HEIGHT
        )))

    results_dict = {}
    # Now if unit is mins, turn repeats into a new value depending on it's X position
    # Meaning if you have two "0's", or 5's etc on is truly 0 and the other is 60
    # Since our axis goes 0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55
    # We can determine which one is 60 by looking at it's X position
    if unit == "mins":
        # See if any repeats and identify them
        count_dict = {}
        for label, bb in results:
            if label in count_dict:
                count_dict[label].append(bb)
            else:
                count_dict[label] = [bb]
        # Now iterate over the dictionary and find the labels with many bounding boxes. Lets change the labels of these.
        for label, bbs in count_dict.items():
            if len(bbs) > 1:
                # Sort by x
                sorted_bbs = sorted(bbs, key=lambda x: float(x.split(" ")[1]))
                # The one furthest to the left is the true one for the label.
                # For the rest add 60 to them depending on their index.
                for i, bb in enumerate(sorted_bbs):
                    results_dict[f"{str(int(re.findall(r'\d+', label)[0]) + (i * 60))}_{unit}"] = bb
            else:
                # Add the label to the results dictionary
                results_dict[label] = bbs[0]
    else:
        # Add the label to the results dictionary
        results_dict = {label: bb for label, bb in results}

    return results_dict

Function to generate colors!


In [20]:
# Draw bounding boxes on the image
def generate_color():
    return "#%06x" % random.randint(0, 0xFFFFFF)

Function to generate random Bounding Box formatted occurances.


In [21]:
def random_time_generate(x):
    # erroneous bounding boxes for time ROI
    category_int = random.randint(0, 9)
    left_int = random.randint(127, 715)

    # slope and random points constrained to time (x) axis
    top_int = random.uniform(222, 234)

    # input generated integers to bounding box
    box = BoundingBox(
        category=f"{category_int}",
        left=left_int,
        top=top_int,
        right=left_int + 4,
        bottom=top_int + 6,
    )
    return box


def random_number_generate(x):
    # erroneous bounding boxes for number ROI
    category_int = random.randint(0, 9)
    left_int = random.randint(108, 117)

    # slope and random points constrained to number (y) axis
    top_int = random.uniform(235, 411)

    # input generated integers to bounding box
    box = BoundingBox(
        category=f"{category_int}",
        left=left_int,
        top=top_int,
        right=left_int + 4,
        bottom=top_int + 6,
    )
    return box

Function to remove 5% bounding boxes and create 5% erroneous.


In [22]:
def erroneous_bounding_boxes(
    time_BB: List[str], number_BB: List[str], percent_erroneous: float
) -> Tuple[List[str], List[str]]:
    """
    Create 5% erroneous bounding boxes by simultaneously removing and generating time and number BB.

    Args:
        time_BB: list of time bounding boxes in BoundingBox format
        number_BB: list of number bounding boxes in BoundingBox format
        percent_erroneous: number between 0 and 1 for percent of erroneous Bounding Boxes

    Returns:
        Tuple: lists of time and number bounding boxes in BoundingBox format
    """
    # make copies of input bounding box lists to avoid unwanted manipulation
    time_BB_copy = time_BB.copy()
    number_BB_copy = number_BB.copy()

    # convert percent input
    time_BB_count = round(percent_erroneous * 76)  # 76 time bounding boxes
    number_BB_count = round(percent_erroneous * 53)  # 53 number bounding boxes

    # subset and remove 5% of bounding boxes from time/number_bounding_boxes lists
    ## sample
    time_BB_sample = list(random.sample(time_BB_copy, time_BB_count))
    number_BB_sample = list(random.sample(number_BB_copy, number_BB_count))

    ## remove
    _ = [time_BB_copy.remove(line) for line in time_BB_sample]
    _ = [number_BB_copy.remove(line) for line in number_BB_sample]

    # use random bounding box generation to refill removed BBs with erroneous boxes
    time_BB_generate = list(map(random_time_generate, range(len(time_BB_sample))))
    number_BB_generate = list(map(random_number_generate, range(len(number_BB_sample))))

    # append BB generated list back to copy with 5% removal
    time_BB_erroneous = time_BB_copy + time_BB_generate
    number_BB_erroneous = number_BB_copy + number_BB_generate

    return (time_BB_erroneous, number_BB_erroneous)

Function to generate random yolo data


In [23]:
def generate_random_yolo(x):
    x_rand = random.uniform(0, 1)
    y_rand = random.uniform(0, 1)
    return f"0 {x_rand} {y_rand} 0.0048989405776515005 0.009852199180453436"

Function to test preprocessing effectiveness


In [24]:
def test_preprocess(yolo_data_sheet, percent_erroneous) -> Dict:
    total_yolo = {}
    # iterate through sheets in yolo json file
    for sheet in range(1, len(yolo_data) + 1):
        if sheet < 10:
            # for ease of replacement, select bounding boxes with 0 label
            select_sheet = yolo_data_sheet[f"RC_000{sheet}_intraoperative.JPG"]
            boolean_list = [
                x.startswith("0")
                for x in yolo_data[f"RC_000{sheet}_intraoperative.JPG"]
            ]
            zero_list = list(compress(select_sheet, boolean_list))
            # replace percent of the 64 possible zeros in the sheet
            count_remove = round(percent_erroneous * 64)  # 64 zeros
            # subset and remove % of yolo lines from json file
            lines_remove = list(random.sample(zero_list, count_remove))
            _ = [select_sheet.remove(line) for line in lines_remove]
            # use random yolo generation to refill removed lines
            lines_gen = list(map(generate_random_yolo, range(len(lines_remove))))
            # append yolo generated list back to copy
            yolo_shuffle = select_sheet + lines_gen
            total_yolo[f"RC_000{sheet}_intraoperative.JPG"] = yolo_shuffle

        else:
            select_sheet = yolo_data_sheet[f"RC_00{sheet}_intraoperative.JPG"]
            boolean_list = [
                x.startswith("0") for x in yolo_data[f"RC_00{sheet}_intraoperative.JPG"]
            ]
            zero_list = list(compress(select_sheet, boolean_list))
            # replace percent of the 64 possible zeros in the sheet
            count_remove = round(percent_erroneous * 64)  # 64 zeros
            # subset and remove % of yolo lines from json file
            lines_remove = list(random.sample(zero_list, count_remove))
            _ = [select_sheet.remove(line) for line in lines_remove]
            # use random yolo generation to refill removed lines
            lines_gen = list(map(generate_random_yolo, range(len(lines_remove))))
            # append yolo generated list back to copy
            yolo_shuffle = select_sheet + lines_gen
            total_yolo[f"RC_00{sheet}_intraoperative.JPG"] = yolo_shuffle

    return total_yolo

In [25]:
# preprocessing test by manipulating yolo data
# yolo_data = test_preprocess(yolo_data, 0.1)

### Function that tests the clustering methods with our without erroneous boxes

Now lets use these functions to get the relevant bounding boxes for clustering.


In [None]:
def test_clustering_methods(percent_erroneous_BB: float, add_erroneous=True) -> None:
    """
    Test the clustering methods on the YOLO data.
    Saves the clustered images and the clustered bounding boxes to JSON files.

    Args:
        add_erroneous: Boolean flag to add erroneous bounding boxes to the data.

    Returns:
        None
    """
    sheet_num = 0
    # Iterate over all images and their bounding boxes
    for sheet, yolo_bbs in yolo_data.items():
        # print(f"Sheet: {sheet}")
        full_image_path = os.path.join(PATH_TO_REGISTERED_IMAGES, sheet)
        # print(f"Full image path: {full_image_path}")

        # Call the analyze_sheet function with data from the loop
        time_bounding_boxes, number_bounding_boxes = select_relevant_bounding_boxes(
            yolo_bbs, full_image_path
        )

        if add_erroneous:
            # make erroneous bounding boxes -- simultaneously add and remove %5 of boxes
            time_bounding_boxes, number_bounding_boxes = erroneous_bounding_boxes(
                time_bounding_boxes, number_bounding_boxes, percent_erroneous_BB
            )

        for method in ["kmeans", "dbscan", "agglomerative"]:
            # Now we need to cluster the bounding boxes that pertain to the same multi-digit number
            if method == "kmeans":
                time_labels = cluster_kmeans(time_bounding_boxes, [40, 41, 42])
                number_labels = cluster_kmeans(number_bounding_boxes, [18, 19, 20])
            elif method == "dbscan":
                time_labels = dbscan_clustering(
                    time_bounding_boxes, defined_eps=5, min_samples=1
                )
                number_labels = dbscan_clustering(
                    number_bounding_boxes, defined_eps=5, min_samples=2
                )
            elif method == "agglomerative":
                time_labels = agglomerative_clustering(
                    time_bounding_boxes, [40, 41, 42]
                )
                number_labels = agglomerative_clustering(
                    number_bounding_boxes, [18, 19, 20]
                )
            else:
                raise ValueError(f"Invalid clustering method: {method}")

            # Create an image object
            image: Image = Image.open(full_image_path)
            image_width, image_height = image.size

            # get time clusters
            time_cluster_bbs = get_cluster_bbs(time_labels, time_bounding_boxes)

            label_color_map = {}
            for label, bounding_box in time_cluster_bbs.items():
                x_min, y_min, x_max, y_max = [
                    (coor / 800) * image_width
                    if i % 2 == 0
                    else (coor / 600) * image_height
                    for i, coor in enumerate(bounding_box.box)
                ]

                # If the label is not in the color map, generate a new color
                if label not in label_color_map:
                    label_color_map[label] = generate_color()

                # Open the image
                draw = ImageDraw.Draw(image)

                draw.rectangle(
                    [
                        x_min,
                        y_min,
                        x_max,
                        y_max,
                    ],
                    outline=label_color_map[label],
                    width=3,
                )

            # Save the image with the bounding boxes to the kmeans_clustered_images folder
            image.save(f"../../data/{method}_clustered_images/time/{sheet}")

            # Save the clustered bounding boxes to a JSON file
            with open(
                f"../../data/{method}_clustered_images/results/time/{sheet.split('.')[0]}.json",
                "w",
            ) as f:
                json.dump(
                    create_result_dictionary(
                        time_labels, time_bounding_boxes, time_cluster_bbs, "mins"
                    ),
                    f,
                )

            # Create an image object
            image: Image = Image.open(full_image_path)
            image_width, image_height = image.size

            # get number clusters
            number_cluster_bbs = get_cluster_bbs(number_labels, number_bounding_boxes)

            label_color_map = {}
            for label, bounding_box in number_cluster_bbs.items():
                x_min, y_min, x_max, y_max = [
                    (coor / 800) * image_width
                    if i % 2 == 0
                    else (coor / 600) * image_height
                    for i, coor in enumerate(bounding_box.box)
                ]

                # If the label is not in the color map, generate a new color
                if label not in label_color_map:
                    label_color_map[label] = generate_color()

                # Open the image
                draw = ImageDraw.Draw(image)

                draw.rectangle(
                    [
                        x_min,
                        y_min,
                        x_max,
                        y_max,
                    ],
                    outline=label_color_map[label],
                    width=3,
                )

            # Save the image with the bounding boxes to the kmeans_clustered_images folder
            image.save(f"../../data/{method}_clustered_images/number/{sheet}")

            # Save the clustered bounding boxes to a JSON file
            with open(
                f"../../data/{method}_clustered_images/results/number/{sheet.split('.')[0]}.json",
                "w",
            ) as f:
                json.dump(
                    create_result_dictionary(
                        number_labels, number_bounding_boxes, number_cluster_bbs, "mmhg"
                    ),
                    f,
                )

        sheet_num += 1

#### Analyze accuracy

Below we write a function that analyzes the accuracy of our clustering methods.


In [27]:
def analyze_accuracy():
    # Since this work is done above, we can simply read in from the JSON files created in the previous step and work from there.
    for method in ["kmeans", "dbscan", "agglomerative"]:
        print(f"Method: {method}")
        # Paths to the JSON files
        PATH_TO_RESULTS = f"../../data/{method}_clustered_images/results"
        TIME_JSON = os.path.join(PATH_TO_RESULTS, "time")
        NUMBER_JSON = os.path.join(PATH_TO_RESULTS, "number")

        time_wrong_clusters_count = 0
        time_correct_clusters_count = 0
        number_wrong_clusters_count = 0
        number_correct_clusters_count = 0

        # Undetected clusters
        undetected_time_clusters = []
        undetected_number_clusters = []
        for sheet, yolo_bb in yolo_data.items():
            full_image_path = os.path.join(PATH_TO_REGISTERED_IMAGES, sheet)
            time_bounding_boxes, number_bounding_boxes = select_relevant_bounding_boxes(
                yolo_bb,
                full_image_path,
            )
            # Convert the bounding boxes to a list of strings with proper suffixes
            expected_time_values = [
                "0_mins",
                "5_mins",
                "10_mins",
                "15_mins",
                "20_mins",
                "25_mins",
                "30_mins",
                "35_mins",
                "40_mins",
                "45_mins",
                "50_mins",
                "55_mins",
                "60_mins",
                "65_mins",
                "70_mins",
                "75_mins",
                "80_mins",
                "85_mins",
                "90_mins",
                "95_mins",
                "100_mins",
                "105_mins",
                "110_mins",
                "115_mins",
                "120_mins",
                "125_mins",
                "130_mins",
                "135_mins",
                "140_mins",
                "145_mins",
                "150_mins",
                "155_mins",
                "160_mins",
                "165_mins",
                "170_mins",
                "175_mins",
                "180_mins",
                "185_mins",
                "190_mins",
                "195_mins",
                "200_mins",
                "205_mins",
            ]

            expected_number_values = [
                "30_mmhg",
                "40_mmhg",
                "50_mmhg",
                "60_mmhg",
                "70_mmhg",
                "80_mmhg",
                "90_mmhg",
                "100_mmhg",
                "110_mmhg",
                "120_mmhg",
                "130_mmhg",
                "140_mmhg",
                "150_mmhg",
                "160_mmhg",
                "170_mmhg",
                "180_mmhg",
                "190_mmhg",
                "200_mmhg",
                "210_mmhg",
                "220_mmhg",
            ]

            # Load JSON
            with open(os.path.join(TIME_JSON, f"{sheet.split('.')[0]}.json")) as f:
                time_clusters = json.load(f)

            # Each cluster contains the number (integer) that the cluster represents
            # We know what integers should be represented in the time labels, lets check that they are all there.
            # Keep track of any false positives (new clusters that don't exist) or negatives (missing clusters)
            for cluster, bounding_box in time_clusters.items():
                if cluster not in expected_time_values:
                    # Print the sheet, value that is not in the expected values
                    #print(f"Time -> Sheet: {sheet}, Value: {cluster}.")
                    # We have an erroneous cluster
                    time_wrong_clusters_count += 1
                else:
                    # We have a correct cluster
                    expected_time_values.remove(cluster)
                    time_correct_clusters_count += 1

            undetected_time_clusters += expected_time_values

            # Load JSON
            with open(os.path.join(NUMBER_JSON, f"{sheet.split('.')[0]}.json")) as f:
                number_clusters = json.load(f)

            # Each cluster contains the number (integer) that the cluster represents
            # We know what integers should be represented in the time labels, lets check that they are all there.
            # Keep track of any false positives (new clusters that don't exist) or negatives (missing clusters)
            for cluster, bounding_box in number_clusters.items():
                if cluster not in expected_number_values:
                
                    # print(f"Number -> Sheet: {sheet}, Value: {value}")
                    # We have an erroneous cluster
                    number_wrong_clusters_count += 1
                else:
                    # We have a correct cluster
                    expected_number_values.remove(cluster)
                    number_correct_clusters_count += 1

            undetected_number_clusters += expected_number_values

        print(
            f"Time labels: {time_correct_clusters_count} correct clusters, {time_wrong_clusters_count} incorrect clusters. There were {len(undetected_time_clusters)} undetected clusters. The accuracy is {(time_correct_clusters_count - (time_wrong_clusters_count + len(undetected_time_clusters))) / (42 * 19) * 100:.2f}%."
        )
        print("\n")
        print(
            f"Number labels: {number_correct_clusters_count} correct clusters, {number_wrong_clusters_count} incorrect clusters. There were {len(undetected_number_clusters)} undetected clusters. The accuracy is {(number_correct_clusters_count - (number_wrong_clusters_count + len(undetected_number_clusters))) / (20 * 19) * 100:.2f}%."
        )
        print("\n\n")


### Test without Erroneous bounding boxes


In [28]:
# Test the clustering methods with errouneous bounding boxes
test_clustering_methods(0.05, add_erroneous=False)
analyze_accuracy()

{6: BoundingBox(category='25', left=721.6904000946971, top=225.8097282858456, right=729.7973484848485, bottom=231.88060087316177), 2: BoundingBox(category='0', left=130.80656664299244, top=226.08968577665442, right=134.58732836174244, bottom=232.14639820772058), 26: BoundingBox(category='5', left=145.64833392518938, top=226.05847886029412, right=149.66429924242425, bottom=232.09637810202204), 33: BoundingBox(category='10', left=158.1322502367424, top=226.14414349724265, right=165.96851325757575, bottom=232.03193933823528), 12: BoundingBox(category='15', left=172.51596531723484, top=226.10875746783088, right=180.61334043560606, bottom=232.27738683363967), 7: BoundingBox(category='20', left=187.13372987689397, top=226.0351993336397, right=195.53330669981062, bottom=232.1578440946691), 37: BoundingBox(category='25', left=201.60980409564397, top=226.14908375459558, right=210.12683475378785, bottom=232.33648322610293), 14: BoundingBox(category='30', left=216.2979403409091, top=226.114688648

KeyboardInterrupt: 

### Test with Erroneous bounding boxes


In [None]:
# Test the clustering methods with errouneous bounding boxes
test_clustering_methods(0.05, add_erroneous=True)
analyze_accuracy()

Method: kmeans
Time labels: 668 correct clusters, 98 incorrect clusters. There were 130 undetected clusters. The accuracy is 55.14%.


Number labels: 274 correct clusters, 96 incorrect clusters. There were 106 undetected clusters. The accuracy is 18.95%.



Method: dbscan
Time labels: 697 correct clusters, 115 incorrect clusters. There were 101 undetected clusters. The accuracy is 60.28%.


Number labels: 281 correct clusters, 83 incorrect clusters. There were 99 undetected clusters. The accuracy is 26.05%.



Method: agglomerative
Time labels: 676 correct clusters, 98 incorrect clusters. There were 122 undetected clusters. The accuracy is 57.14%.


Number labels: 271 correct clusters, 94 incorrect clusters. There were 109 undetected clusters. The accuracy is 17.89%.



