# Import and Loads

In [1]:
from ultralytics import YOLO
import os 
import sys
sys.path.append("/mnt/RAID/projects/FjordVision")
from models.probability_tree import ProbabilityTree
import torch
from anytree.importer import JsonImporter
from preprocessing.preprocessing import load_ground_truth_mask_xyn, convert_polygon_to_mask, calculate_binary_mask_iou
import numpy as np
import matplotlib.pyplot as plt
from itertools import islice

# Function to divide the data into chunks of size n
def chunks(lst, n):
    """Yield successive n-sized chunks from lst."""
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

# Define the file path
weights_path = '/mnt/RAID/projects/FjordVision/runs/segment/Yolov8n-seg-train/weights/best.pt'

# Load the YOLO model weights
model = YOLO(weights_path)

importer = JsonImporter()
with open('data/ontology.json', 'r') as f:
    root = importer.read(f)

classes_file = '/mnt/RAID/datasets/label-studio/fjord/classes.txt'

species_names = []
with open(classes_file, 'r') as file:
    species_names = [line.strip() for line in file]

genus_names, class_names, binary_names = [], [], []
for node in root.descendants:
    if node.rank == 'genus':
        genus_names.append(node.name)
    elif node.rank == 'class':
        class_names.append(node.name)
    elif node.rank == 'binary':
        binary_names.append(node.name)

# Plot Labels

In [2]:
def plot_labels(image_path, label_path, class_index):
    # Load the image
    image = plt.imread(image_path)
    
    # Create a figure and axes
    fig, ax = plt.subplots()
    
    # Plot the image
    ax.imshow(image)
    
    # Read the label file
    with open(label_path, 'r') as file:
        # Loop through the lines in the label file
        for line in file:
            # Split the line into class_id and coordinates
            class_id, *coordinates = line.strip().split(' ')
            
            # Convert the coordinates to float and normalize them
            coordinates = [float(coord) for coord in coordinates]
            coordinates = [coord * image.shape[1] if i % 2 == 0 else coord * image.shape[0] for i, coord in enumerate(coordinates)]
            
            # Reshape the coordinates into an array of points for the polygon
            points = np.array(coordinates).reshape(-1, 2)
            
            # Get the class label from the class_index
            class_label = class_index[int(class_id)]
            
            # Get a unique color for each class
            color = plt.cm.tab10(int(class_id) % 10)
            
            # Plot the polygon with the class label and color
            polygon = plt.Polygon(points, edgecolor=color, facecolor='none')
            ax.add_patch(polygon)
            ax.text(points[0, 0], points[0, 1], class_label, color=color, fontsize=8, verticalalignment='top')
    
    # Show the plot
    plt.show()


def plot_masks(image, masks, classes):
    # Create a figure and axes
    fig, ax = plt.subplots()
    
    # Plot the image
    ax.imshow(image)
    
    # Loop through the masks and coordinates
    for mask, cls in zip(masks, classes):    
        mask[:, 0] = mask[:, 0] * image.shape[1]
        mask[:, 1] = mask[:, 1] * image.shape[0]    
        points = mask

        # Get the class label from the class_index
        class_label = species_names[int(cls)]
        
        # Get a unique color for each class
        color = plt.cm.tab10(int(cls) % 10)
        
        # Plot the polygon with the class label and color
        polygon = plt.Polygon(points, edgecolor=color, facecolor='none')
        ax.add_patch(polygon)
        ax.text(points[0, 0], points[0, 1], class_label, color=color, fontsize=8, verticalalignment='top')
    
    # Show the plot
    plt.show()

# Construct Probability Tree

In [3]:
# Usage example
ontology_path = 'data/ontology.json'  # Update this path as necessary
prob_tree = ProbabilityTree(ontology_path)

In [4]:
# Define the image folder path
image_folder_path = '/mnt/RAID/datasets/The Fjord Dataset/fjord/images/test/'
frames = os.listdir(image_folder_path)
image_files_full_path = [image_folder_path + f for f in frames]

# Define the label folder path
label_folder_path = '/mnt/RAID/datasets/The Fjord Dataset/fjord/labels/test/'

classes = '/mnt/RAID/datasets/The Fjord Dataset/fjord/classes.txt'

class_index = []
with open(classes, 'r') as file:
    for line_number, line in enumerate(file, start=1):
        class_name = line.strip()
        class_index.append(class_name)


Y = []
Yhat = []
batch_size = 50

# Loop through batches of images
for image_batch in chunks(image_files_full_path, batch_size):

    with torch.no_grad():
        predictions = model(image_batch, stream=True)

    # Loop through the files in the image folder
    for file_name, prediction in zip(image_batch, predictions):
        # Check if the file is an image file
        if file_name.endswith('.jpg') or file_name.endswith('.png'):
            # Construct the corresponding label file name
            shape = prediction.orig_img.shape[:2]
            base_file_name = file_name.split('/')[-1].replace('.jpg', '.txt')
            label_file_path = label_folder_path + base_file_name
        
            # check if predictions are empty
            if len(prediction.boxes.cls) == 0:
                continue

            GT = load_ground_truth_mask_xyn(label_file_path)
            visited = len(GT)*[None]

            for cls, mask in zip(prediction.boxes.cls, prediction.masks.xyn):
                m = convert_polygon_to_mask(mask, shape)
                best_iou = 0

                # calculate iou and find the best mask
                for idx, (gcls, gmsk) in enumerate(GT):
                    g = convert_polygon_to_mask(gmsk, shape)
                    iou = calculate_binary_mask_iou(m, g)

                    if iou > best_iou and iou > 0.5:
                        best_iou = iou
                        best_g = g
                        best_gcls = gcls
                        visited[idx] = True
                        best_idx = idx

                if best_idx is not None and best_iou > 0.5:
                    visited[best_idx] = True

                if best_g is None:
                    Y.append(None)
                    Yhat.append(int(cls.item()))
                else:
                    Y.append(best_gcls)
                    Yhat.append(int(cls.item()))

            for vis in visited:
                if vis is None:
                    Y.append(GT[idx][0])
                    Yhat.append(None)

    # After processing each batch, clear unused memory from CUDA
    torch.cuda.empty_cache()







































In [9]:
from anytree.search import find
from anytree.walker import Walker

def hierarchical_similarity(node1, node2, tree):
    walker = Walker()
    # Assuming node1 and node2 are already the correct nodes from the tree
    upwards, _, down = walker.walk(node1, node2)
    distance = len(upwards) + len(down)
    return 1 / (1 + distance)

def calculate_hierarchical_precision_recall(Y, Yhat, tree, species_names):
    weighted_true_positives = 0
    weighted_false_positives = 0
    weighted_false_negatives = 0

    for true_label, predicted_label in zip(Y, Yhat):

        if predicted_label is None:  # Handle negative prediction as complete miss
            weighted_false_negatives += 1  # Might need to adjust based on how you want to treat negative predictions
            continue
        if true_label is None:  # Handle missing ground truth as complete miss
            weighted_false_positives += 1
            continue

        node1 = find(tree, lambda node: node.name == species_names[true_label])
        node2 = find(tree, lambda node: node.name == species_names[predicted_label])
        similarity_weight = hierarchical_similarity(node1, node2, tree)

        if true_label == predicted_label:
            weighted_true_positives += similarity_weight
        else:
            weighted_false_positives += (1 - similarity_weight)  # This assumes you want to penalize based on dissimilarity

    precision = weighted_true_positives / (weighted_true_positives + weighted_false_positives) if (weighted_true_positives + weighted_false_positives) > 0 else 0
    recall = weighted_true_positives / (weighted_true_positives + weighted_false_negatives) if (weighted_true_positives + weighted_false_negatives) > 0 else 0
    
    return precision, recall

def calculate_weighted_f1_score(precision, recall):
    if precision + recall == 0:
        return 0
    return 2 * (precision * recall) / (precision + recall)

# Calculate weighted precision, recall, and F1
precision, recall = calculate_hierarchical_precision_recall(Y, Yhat, root, species_names)
weighted_f1_score = calculate_weighted_f1_score(precision, recall)

In [10]:
precision

0.9588657803141181

In [11]:
recall

0.8972809667673716

In [12]:
weighted_f1_score

0.9270517169110761