# Lucas Henneaux : master thesis

## Imports

In [None]:
import os
import sys

import numpy as np
import matplotlib.pyplot as plt
import torchvision.transforms as transforms
import math



import large_image

# !pip install openslide-python tifffile
import torch
from timm import create_model
from sklearn.cluster import DBSCAN
import timm
import random
import json



In [None]:

if torch.cuda.is_available():
    print(f"GPU available : ({torch.cuda.get_device_name(0)})")
else:
    print("No GPU")

In [None]:
def save_dict(data, filename):
    """Save dict in json file"""
    with open(filename, 'w') as f:
        json.dump(data, f, indent=4)
    print(f"{filename} saved")

def load_dict(filename):
    """Load dict from json file"""
    with open(filename, 'r') as f:
        return json.load(f)
    
label_path = "labels_low_res/"
classes = ["1","2","3","41","42","51","52","54","57"]
res=2
if label_path == "labels_low_res/":
    res = 4
    classes = ["1","2","3","41","5"]

In [None]:
import xml.etree.ElementTree as ET
import openslide
import large_image
import cv2
def convert_ndpa_to_ndpi(x_ndpa, y_ndpa, slide):
    #Converts a point in .ndpa coords system to the .ndpi one
    
    # useful informations (I think only available with openSlide, but not sure)
    physical_width = float(slide.properties.get("hamamatsu.PhysicalWidth", 1))
    physical_height = float(slide.properties.get("hamamatsu.PhysicalHeight", 1))
    x_offset = float(slide.properties.get("hamamatsu.XOffsetFromSlideCentre", 0))
    y_offset = float(slide.properties.get("hamamatsu.YOffsetFromSlideCentre", 0))

    mpp_x = float(slide.properties["openslide.mpp-x"])
    mpp_y = float(slide.properties["openslide.mpp-y"])


    # Offsets
    x_offset_total = x_offset
    y_offset_total = y_offset

    
    # Conversion NDPA → NDPI
    x_ndpi = (x_ndpa - x_offset_total) / (mpp_x * 1000) + slide.dimensions[0]/2
    y_ndpi = (y_ndpa - y_offset_total) / (mpp_y * 1000)  + slide.dimensions[1]/2
    
    return int(x_ndpi), int(y_ndpi)

import xml.etree.ElementTree as ET
import random

def extract_annotations(ndpa_path, slide, low_res):
    tree = ET.parse(ndpa_path)
    root = tree.getroot()

    annotations = []
    titles = []

    for ndpviewstate in root.findall("ndpviewstate"): 
        annotation = ndpviewstate.find("annotation")

        specialtype = annotation.find("specialtype")
        title = ndpviewstate.find("title").text
        unity = ndpviewstate.find("coordformat").text

        if specialtype is not None and "rectangle" in specialtype.text.lower():
            points = []
            for point in annotation.findall(".//point"):
                x = float(point.find("x").text)
                y = float(point.find("y").text)

                px, py = convert_ndpa_to_ndpi(x, y, slide)
                points.append((px, py))

            if len(points) == 4:  # Rectangle
                x1, y1 = min(p[0] for p in points), min(p[1] for p in points)
                x2, y2 = max(p[0] for p in points), max(p[1] for p in points)

                width = x2 - x1
                height = y2 - y1
                
                min_size = 224

                if low_res: #In case of low res, we have to not take too small labels into account has it would be similar to blood
                    if width < min_size and height < min_size and title not in ["51","52"]:
                        continue

                    else: #No more need to differentiate 51/52
                        if title in ["51","52"]:
                            title = "5"

                if width < min_size: 
                    extra_width = min_size - width
                    shift_x = random.uniform(0, extra_width)
                    x1 = x1 - shift_x
                    x2 = x1 + min_size

                if height < min_size:
                    extra_height = min_size - height
                    shift_y = random.uniform(0, extra_height)
                    y1 = y1 - shift_y
                    y2 = y1 + min_size

                
                annotations.append((x1, y1, x2, y2))
                titles.append(title)

    return annotations, titles


def get_res_image(slide, res, x1, y1, x2, y2, scale_factor):
    """ Getting image in a certain res and adapting according to this res (scale factor) """
   
    return slide.read_region((int(x1), int(y1)), res, (int((x2-x1)/scale_factor), int((y2-y1)/scale_factor))).convert("RGB")



def process_ndpi_with_large_image(file_path, file_ndpa, output_folder_name, nth_image,classes, res=7):
    
    large_slide = large_image.open(file_path)
    slide = openslide.OpenSlide(file_path)

    low_res = False
    if label_path != "labels/":
        low_res = True
    
    annotations, titles = extract_annotations(file_ndpa, slide, low_res)

    scale_factor = (slide.level_dimensions[0][0]/ slide.level_dimensions[res][0])

    for i, (x1, y1, x2, y2) in enumerate(annotations):

        img_region = get_res_image(slide, res, x1, y1, x2, y2, scale_factor)
        try:
            output_folder = os.path.join(output_folder_name, titles[i])
        except:
            print(f"-----No annotation for {file_path} ------- ")
            return []
        if titles[i] not in classes:
            # print(f"----invalid label found : {titles[i]}-----")
            continue
        
        output_path = os.path.join(output_folder, f"{titles[i]}_{i}_{nth_image}.png")
        
        os.makedirs(output_folder, exist_ok=True)  # Creating folder if does not exist
        
        img_np = np.array(img_region)  # Convert PIL to NumPy
        cv2.imwrite(output_path, cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR))

        print(f"Saved : {output_path}")
    
    return annotations


folder_path = "img/database"

# Folder of wi images and .ndpa files !
files = os.listdir(folder_path)

ndpi_files = [f for f in files if f.endswith(".ndpi")]
ndpa_files = [f for f in files if f.endswith(".ndpi.ndpa")]


for i, ndpi_file in enumerate(ndpi_files):
    ndpa_file = ndpi_file + ".ndpa" #Corresponding .ndpa file
    
    if ndpa_file in ndpa_files:

        ndpi_path = os.path.join(folder_path, ndpi_file)
        ndpa_path = os.path.join(folder_path, ndpa_file)
        print("processing : ", ndpa_file," ...")
        process_ndpi_with_large_image(ndpi_path, ndpa_path, label_path, i,classes, res=res)
    else:
        print(f"No .ndpa file found {ndpi_file}")

In [None]:
import os
from collections import defaultdict
from PIL import Image
import matplotlib.pyplot as plt


patch_size = (224, 224)
min_patch_area_ratio = 0.6

patches_count = {}

min_patch_area = patch_size[0] * patch_size[1] * min_patch_area_ratio

for class_folder in os.listdir(label_path):
    class_folder_path = os.path.join(label_path, class_folder)
    if os.path.isdir(class_folder_path):
        for image_name in os.listdir(class_folder_path):
            if image_name.lower().endswith(('png', 'jpg', 'jpeg')):
                image_id = image_name.split('_')[2].split('.')[0]
                image_path = os.path.join(class_folder_path, image_name)
                
                image = Image.open(image_path)
                width, height = image.size
               
                num_patches_x = max(1, (width + patch_size[0] - 1) // patch_size[0])
                num_patches_y = max(1, (height + patch_size[1] - 1) // patch_size[1])
                
                total_patches = 0
                for x in range(num_patches_x):
                    for y in range(num_patches_y):
                        actual_width = min(patch_size[0], width - x * patch_size[0])
                        actual_height = min(patch_size[1], height - y * patch_size[1])
                        
                        if actual_width * actual_height >= min_patch_area:
                            total_patches += 1
                try:
                    patches_count[class_folder_path + "/" + image_name] += max(1, total_patches) 
                except:
                    patches_count[class_folder_path + "/" + image_name] = max(1, total_patches)



for image_name, patche_count in patches_count.items():
    print(f"Image {image_name} : {patche_count}")

save_dict(patches_count, "patches_count.json")
   

In [None]:
import os
import numpy as np
from collections import defaultdict
from PIL import Image
import matplotlib.pyplot as plt
import cv2
from scipy.spatial.distance import jensenshannon

patch_size = (224, 224)
min_patch_area_ratio = 0.6

patches_count = load_dict("patches_count.json")
color_histograms = defaultdict(lambda: defaultdict(list))

min_patch_area = patch_size[0] * patch_size[1] * min_patch_area_ratio

for class_folder in os.listdir(label_path):
    class_folder_path = os.path.join(label_path, class_folder)
    if os.path.isdir(class_folder_path):
        for image_name in os.listdir(class_folder_path):
            if image_name.lower().endswith(('png', 'jpg', 'jpeg')):
                image_id = image_name.split('_')[2].split('.')[0]
                image_path = os.path.join(class_folder_path, image_name)
                
                image = Image.open(image_path).convert("RGB")
                width, height = image.size
                
                num_patches_x = max(1, (width + patch_size[0] - 1) // patch_size[0])
                num_patches_y = max(1, (height + patch_size[1] - 1) // patch_size[1])
                
                total_patches = 0
                for x in range(num_patches_x):
                    for y in range(num_patches_y):
                        actual_width = min(patch_size[0], width - x * patch_size[0])
                        actual_height = min(patch_size[1], height - y * patch_size[1])
                        
                        if actual_width * actual_height >= min_patch_area:
                            total_patches += 1
                
                try:
                    patches_count[image_id][class_folder] += max(1, total_patches)
                except:
                    patches_count[image_id][class_folder] = max(1, total_patches)

                image_np = np.array(image)
                hsv_image = cv2.cvtColor(image_np, cv2.COLOR_RGB2HSV)
                
                hist = cv2.calcHist([hsv_image], [0], None, [50], [0, 180])
                hist = cv2.normalize(hist, hist).flatten()
                color_histograms[class_folder][image_id] = hist

print("Nb of patches by images by categories :")
for image_id, class_counts in patches_count.items():
    print(f"Image {image_id} :")
    for class_folder, count in class_counts.items():
        print(f"  - {class_folder} : {count} patches")


for class_folder in set(class_folder for counts in patches_count.values() for class_folder in counts):
    image_ids = sorted(patches_count.keys())
    patch_counts = [patches_count[image_id].get(class_folder, 0) for image_id in image_ids]
    
    plt.figure(figsize=(10, 5))
    plt.bar(range(len(image_ids)), patch_counts, tick_label=image_ids)
    plt.xlabel("Indice des images")
    plt.ylabel("Nombre de patches")
    plt.title(f"Nombre de patches par image pour la classe {class_folder}")
    plt.xticks(rotation=90)
    plt.tight_layout()
    plt.show()


# for class_folder, histograms in color_histograms.items():
#     image_ids = list(histograms.keys())
#     num_images = len(image_ids)
#     similarity_matrix = np.zeros((num_images, num_images))
    
#     for i in range(num_images):
#         for j in range(num_images):
#             similarity_matrix[i, j] = jensenshannon(histograms[image_ids[i]], histograms[image_ids[j]])
    
#     plt.figure(figsize=(8, 6))
#     plt.imshow(similarity_matrix, cmap='hot', interpolation='nearest')
#     plt.colorbar(label="Distance de Jensen-Shannon")
#     plt.xticks(range(num_images), image_ids, rotation=90)
#     plt.yticks(range(num_images), image_ids)
#     plt.title(f"Similarité des couleurs pour la classe {class_folder}")
#     plt.show()

In [None]:
import os
import json
import numpy as np
from collections import defaultdict
from PIL import Image
import cv2
from scipy.spatial.distance import jensenshannon
from sklearn.cluster import AgglomerativeClustering, KMeans, SpectralClustering
from sklearn.metrics import silhouette_score
import matplotlib.pyplot as plt
from itertools import combinations

# Fix for Windows MKL memory leak warning
os.environ["OMP_NUM_THREADS"] = "1"

patch_size = (224, 224)
min_patch_area_ratio = 0.6
possible_clusters = range(2, 10)
use_rgb = False

def clustering_methods_analysis(class_folder, save_clusters):
    image_id_mapping = defaultdict(list)
    image_patch_counts = defaultdict(int)
    color_histograms = {}
    image_paths = {}
    dic_clusters = {}
    dic_clusters_patches = {}

    for classe in classes:
        dic_clusters[classe] = {}
        dic_clusters_patches[classe] = {}

    class_folder_path = os.path.join(label_path, class_folder)
    if os.path.isdir(class_folder_path):
        for image_name in os.listdir(class_folder_path):
            if image_name.lower().endswith(('png', 'jpg', 'jpeg')):
                image_id = image_name.split('_')[2].split('.')[0]
                image_path = os.path.join(class_folder_path, image_name)

                image = Image.open(image_path).convert("RGB")
                image_np = np.array(image)
                height, width = image_np.shape[:2]
                patch_count = max(1,(width // patch_size[0])) * max(1,(height // patch_size[1]))
                image_patch_counts[image_id] += patch_count

                if use_rgb:
                    hist = cv2.calcHist([image_np], [0, 1, 2], None, [50, 50, 50], [0, 256, 0, 256, 0, 256])
                else:
                    hsv_image = cv2.cvtColor(image_np, cv2.COLOR_RGB2HSV)
                    hist = cv2.calcHist([hsv_image], [0], None, [50], [0, 180])

                hist = cv2.normalize(hist, hist).flatten()

                color_histograms[image_path] = hist
                image_paths[image_path] = image_path
                image_id_mapping[image_id].append(image_path)

    image_ids = list(color_histograms.keys())
    hist_matrix = np.array([color_histograms[i] for i in image_ids])

    def check_cluster_constraints(clusters, image_ids, min_image_id_count=3):
        cluster_image_ids = defaultdict(set)
        for img_idx, cluster in enumerate(clusters):
            img_path = image_ids[img_idx]
            image_id = [key for key, val in image_id_mapping.items() if img_path in val][0]
            cluster_image_ids[cluster].add(image_id)

        return all(len(image_ids) >= min_image_id_count for image_ids in cluster_image_ids.values())
    



    def save_clusters_result(clusters, image_paths, image_ids, class_folder, method, num_clusters, folder_path="cluster_analysis_results"):
    
        # results dic
        result = {
            "method": method,
            "num_clusters": num_clusters,
            "class": class_folder,
            "clusters": {}
        }

        
        unique_clusters = set(clusters)
        for cluster in unique_clusters:
            result["clusters"][str(cluster)] = [
                image_paths[image_ids[i]] for i in range(len(image_ids)) if clusters[i] == cluster
            ]
        os.makedirs(folder_path, exist_ok=True)

        file_name = f"{method}_{class_folder}_{num_clusters}.json"
        file_path = os.path.join(folder_path, file_name)

        with open(file_path, "w") as f:
            json.dump(result, f, indent=4)

    



    # Distance matrix
    distance_matrix = np.zeros((len(image_ids), len(image_ids)))
    for i, j in combinations(range(len(image_ids)), 2):
        dist = jensenshannon(hist_matrix[i], hist_matrix[j])
        distance_matrix[i, j] = distance_matrix[j, i] = dist

    best_scores_hist_matrix = {'AgglomerativeClustering': [], 'AgglomerativeClusteringBase': [],
                            'KMeans++': [], 'KMeans++Base': [], 
                            'SpectralClustering': [], 'SpectralClusteringBase': []}

    best_scores_distance_matrix = {'AgglomerativeClustering': [], 'AgglomerativeClusteringBase': [],
                                'KMeans++': [], 'KMeans++Base': [], 
                                'SpectralClustering': [], 'SpectralClusteringBase': []}

    for num_clusters in possible_clusters:
        print(f"Testing with {num_clusters} clusters")
        
        agglomerative = AgglomerativeClustering(n_clusters=num_clusters, metric="precomputed", linkage="average")
        agglomerative_clusters = agglomerative.fit_predict(distance_matrix)
        if save_clusters:
            save_clusters_result(agglomerative_clusters, image_paths, image_ids, class_folder, "AgglomerativeClustering",num_clusters)

        if check_cluster_constraints(agglomerative_clusters, image_ids):
            best_scores_distance_matrix['AgglomerativeClustering'].append(silhouette_score(distance_matrix, agglomerative_clusters, metric="precomputed"))
            best_scores_hist_matrix['AgglomerativeClustering'].append(silhouette_score(hist_matrix, agglomerative_clusters))
        else:
            best_scores_distance_matrix['AgglomerativeClustering'].append(np.NaN)
            best_scores_hist_matrix['AgglomerativeClustering'].append(np.NaN)
        

        agglomerative_base = AgglomerativeClustering(n_clusters=num_clusters, linkage="average")
        agglomerative_base_clusters = agglomerative_base.fit_predict(hist_matrix)
        if save_clusters:
            save_clusters_result(agglomerative_base_clusters, image_paths, image_ids, class_folder, "AgglomerativeClusteringBase",num_clusters)

        if check_cluster_constraints(agglomerative_base_clusters, image_ids):
            best_scores_hist_matrix['AgglomerativeClusteringBase'].append(silhouette_score(hist_matrix, agglomerative_base_clusters))
            best_scores_distance_matrix['AgglomerativeClusteringBase'].append(silhouette_score(distance_matrix, agglomerative_base_clusters, metric="precomputed"))
        else:
            best_scores_hist_matrix['AgglomerativeClusteringBase'].append(np.NaN)
            best_scores_distance_matrix['AgglomerativeClusteringBase'].append(np.NaN)


        # KMeans++ with histograms
        kmeans = KMeans(n_clusters=num_clusters, init='k-means++', random_state=42)
        kmeans.fit(hist_matrix)
        kmeans_clusters = kmeans.labels_
        if save_clusters:
            save_clusters_result(kmeans_clusters, image_paths, image_ids, class_folder, "kmeansBase",num_clusters)

        if check_cluster_constraints(kmeans_clusters, image_ids):
            best_scores_hist_matrix['KMeans++Base'].append(silhouette_score(hist_matrix, kmeans_clusters))
            best_scores_distance_matrix['KMeans++Base'].append(silhouette_score(distance_matrix, kmeans_clusters, metric="precomputed"))
        else:
            best_scores_hist_matrix['KMeans++Base'].append(np.NaN)
            best_scores_distance_matrix['KMeans++Base'].append(np.NaN)

        # KMeans++ matrix distances
        kmeans_distance = KMeans(n_clusters=num_clusters, init='k-means++', random_state=42)
        kmeans_distance.fit(distance_matrix)
        kmeans_distance_clusters = kmeans_distance.labels_
        if save_clusters:
            save_clusters_result(kmeans_distance_clusters, image_paths, image_ids, class_folder, "kmeans",num_clusters)

        if check_cluster_constraints(kmeans_distance_clusters, image_ids):
            best_scores_hist_matrix['KMeans++'].append(silhouette_score(hist_matrix, kmeans_distance_clusters))
            best_scores_distance_matrix['KMeans++'].append(silhouette_score(distance_matrix, kmeans_distance_clusters, metric="precomputed"))
        else:
            best_scores_hist_matrix['KMeans++'].append(np.NaN)
            best_scores_distance_matrix['KMeans++'].append(np.NaN)

        # Spectral Clustering matrix distances
        spectral = SpectralClustering(n_clusters=num_clusters, affinity="precomputed", random_state=42)
        spectral_clusters = spectral.fit_predict(distance_matrix)
        if save_clusters:
            save_clusters_result(spectral_clusters, image_paths, image_ids, class_folder, "spectral",num_clusters)

        if check_cluster_constraints(spectral_clusters, image_ids):
            best_scores_hist_matrix['SpectralClustering'].append(silhouette_score(hist_matrix, spectral_clusters))
            best_scores_distance_matrix['SpectralClustering'].append(silhouette_score(distance_matrix, spectral_clusters, metric="precomputed"))
        else:
            best_scores_hist_matrix['SpectralClustering'].append(np.NaN)
            best_scores_distance_matrix['SpectralClustering'].append(np.NaN)

        # Spectral Clustering with histograms
        spectral_base = SpectralClustering(n_clusters=num_clusters, affinity="nearest_neighbors", random_state=42)
        spectral_base_clusters = spectral_base.fit_predict(hist_matrix)
        if save_clusters:
            save_clusters_result(spectral_base_clusters, image_paths, image_ids, class_folder, "spectralBase",num_clusters)


        if check_cluster_constraints(spectral_base_clusters, image_ids):
            best_scores_hist_matrix['SpectralClusteringBase'].append(silhouette_score(hist_matrix, spectral_base_clusters))
            best_scores_distance_matrix['SpectralClusteringBase'].append(silhouette_score(distance_matrix, spectral_base_clusters, metric="precomputed"))
        else:
            best_scores_hist_matrix['SpectralClusteringBase'].append(np.NaN)
            best_scores_distance_matrix['SpectralClusteringBase'].append(np.NaN)


    methods = best_scores_hist_matrix.keys()

    plt.figure(figsize=(12, 5))

    # Graphique 1 : Silouhette score with histogram matrix
    plt.subplot(1, 2, 1)
    for method in methods:
        plt.plot(possible_clusters, best_scores_hist_matrix[method], label=method)
    plt.title(f"Silhouette Scores {class_folder} (Hist Matrix)")
    plt.xlabel("Number of Clusters")
    plt.ylabel("Silhouette Score")
    plt.legend()
    plt.grid(True)

    # Graphique 2 : Silhouette Score distance matrix
    plt.subplot(1, 2, 2)
    for method in methods:
        plt.plot(possible_clusters, best_scores_distance_matrix[method], label=method)
    plt.title(f"Silhouette Scores class {class_folder} (Distance Matrix)")
    plt.xlabel("Number of Clusters")
    plt.ylabel("Silhouette Score")
    plt.legend()
    plt.grid(True)

    plt.tight_layout()
    plt.savefig(f"Silouhette_scores_{class_folder}_HSV.png")
    plt.show()


for class_folder in classes:
    print(f"-----Processing for {class_folder} class ... ------")
    clustering_methods_analysis(class_folder, True)


def compute_variance(cluster_sizes):
    return np.var(cluster_sizes)


def compute_entropy(cluster_sizes):
    total = sum(cluster_sizes)
    probabilities = np.array(cluster_sizes) / total
    return -np.sum(probabilities * np.log2(probabilities + 1e-10))

def compute_entropy(cluster_sizes):
    total = sum(cluster_sizes)
    probabilities = np.array(cluster_sizes) / total
    return -np.sum(probabilities * np.log2(probabilities + 1e-10))

def analyze_cluster_variance(results_folder, patches_count_file):

    with open(patches_count_file, 'r') as f:
        patches_count = json.load(f)
    
    data = defaultdict(lambda: defaultdict(lambda: defaultdict(list)))
    min_values = defaultdict(lambda: defaultdict(lambda: defaultdict(list)))
    

    for file in os.listdir(results_folder):
        if file.endswith(".json") and file != "patches_count.json":
            file_path = os.path.join(results_folder, file)
            with open(file_path, 'r') as f:
                cluster_data = json.load(f)
            
            method = cluster_data["method"]
            num_clusters = cluster_data["num_clusters"]
            class_label = cluster_data["class"]
            
            cluster_sizes = []
            for cluster in cluster_data["clusters"].values():
                cluster_size = sum(patches_count.get(img_path.replace('\\','/'), 0) for img_path in cluster)
                cluster_sizes.append(cluster_size)
            
            min_cluster_size = min(cluster_sizes)
            print(min_cluster_size, method, num_clusters, class_label)
            variance = compute_entropy(cluster_sizes)
     
            data[class_label][method][num_clusters].append(variance)
            min_values[class_label][method][num_clusters].append(min_cluster_size)
    

    for class_label in data:
        for method in data[class_label]:
            for num_clusters in data[class_label][method]:
                data[class_label][method][num_clusters] = np.mean(data[class_label][method][num_clusters])
                min_values[class_label][method][num_clusters] = np.mean(min_values[class_label][method][num_clusters])
    
    for class_label, methods in data.items():
        plt.figure(figsize=(10, 6))
        for method, variances in methods.items():
            sorted_clusters = sorted(variances.keys())
            sorted_mins = [min_values[class_label][method][k] for k in sorted_clusters]
            for i in range(len(sorted_mins)):
                sorted_mins[i]*=(i+2)
            
            plt.plot(sorted_clusters, sorted_mins, marker='o', label=method)
            
          
        plt.xlabel("Number of clusters")
        plt.ylabel("Variance du nombre de patches")
        plt.title(f"Variance des clusters pour la classe {class_label}")
        plt.legend()
        plt.grid()
        plt.savefig(f"entropy_by_classes_{class_label}.pdf")
        plt.show()

analyze_cluster_variance("cluster_analysis_results", "patches_count.json")

In [None]:
import dash
from dash import dcc, html, Input, Output, State
import dash_bootstrap_components as dbc
import json
import os
from PIL import Image
import base64
import io

app = dash.Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP])

clustering_methods = ["AgglomerativeClustering", "AgglomerativeClusteringBase", "kmeans", "kmeansBase", "spectral", "spectralBase"]
n_clusters_options = list(range(2, 10))

folder_path = "cluster_analysis_results"

app.layout = dbc.Container([
    html.H1("Cluster visualization"),
    dbc.Row([
        dbc.Col([html.Label("Clustering method"),
                 dcc.Dropdown(id="method-dropdown", options=[{"label": m, "value": m} for m in clustering_methods], value=clustering_methods[0])], width=4),
        dbc.Col([html.Label("Class"),
                 dcc.Dropdown(id="class-dropdown", options=[{"label": c, "value": c} for c in classes], value=classes[0])], width=4),
        dbc.Col([html.Label("number of clusters"),
                 dcc.Dropdown(id="n-clusters-dropdown", options=[{"label": n, "value": n} for n in n_clusters_options], value=n_clusters_options[0])], width=4)
    ]),
    html.Br(),
    dbc.Button("Afficher les images", id="load-button", color="primary", className="mb-3"),
    html.Div(id="images-container")
])

# Callback loading and showing images
@app.callback(
    Output("images-container", "children"),
    Input("load-button", "n_clicks"),
    State("method-dropdown", "value"),
    State("class-dropdown", "value"),
    State("n-clusters-dropdown", "value")
)
def update_images(n_clicks, method, class_folder, num_clusters):
    if not n_clicks:
        return "Select options and click on the button to show images"
    
    file_name = f"{method}_{class_folder}_{num_clusters}.json"
    file_path = os.path.join(folder_path, file_name)
    
    if not os.path.exists(file_path):
        return f"No file for {method}, class {class_folder}, {num_clusters} clusters."
    
    with open(file_path, "r") as f:
        data = json.load(f)
    
    images = []
    for cluster_id, image_paths in data["clusters"].items():
        images.append(html.H3(f"Cluster {cluster_id}"))
        row_images = []
        for img_path in image_paths[:25]:
            if os.path.exists(img_path):
                with open(img_path, "rb") as img_file:
                    encoded_img = base64.b64encode(img_file.read()).decode()
                    img_element = html.Img(src=f"data:image/png;base64,{encoded_img}", style={"width": "150px", "margin": "5px"})
                    row_images.append(img_element)
        images.append(html.Div(row_images, style={"display": "flex", "flex-wrap": "wrap"}))
    
    return images

if __name__ == "__main__":
    app.run_server(debug=True)

In [None]:
import os
import numpy as np
from collections import defaultdict
from PIL import Image
import cv2
from scipy.spatial.distance import jensenshannon
from sklearn.cluster import AgglomerativeClustering, KMeans, SpectralClustering
from sklearn.metrics import silhouette_score

from itertools import combinations


def find_optimal_repartition(dic_clusters, dic_clusters_patches, class_folder, dic_clusters_patches_opti, nb_clusters, dic_clusters_opti, dic_methods):
    print(f"----------running {class_folder} class ...... -----------------")


    color_histograms = {}
    image_paths = {}
    image_id_mapping = defaultdict(list)
    image_patch_counts = defaultdict(int)

    class_folder_path = os.path.join(label_path, class_folder)
    if os.path.isdir(class_folder_path):
        for image_name in os.listdir(class_folder_path):
            if image_name.lower().endswith(('png', 'jpg', 'jpeg')):
                image_id = image_name.split('_')[2].split('.')[0]
                image_path = os.path.join(class_folder_path, image_name)

                
                
                image = Image.open(image_path).convert("RGB")
                image_np = np.array(image)
                height, width = image_np.shape[:2]
                patch_count = max(1,(width // patch_size[0])) * max(1,(height // patch_size[1]))
                image_patch_counts[image_id] += patch_count
                dic_clusters_patches_opti[image_path] = {"image_id":image_id, "patches":patch_count,"cluster":np.NaN}
                
                if use_rgb:
                    hist = cv2.calcHist([image_np], [0, 1, 2], None, [50, 50, 50], [0, 256, 0, 256, 0, 256])
                else:
                    hsv_image = cv2.cvtColor(image_np, cv2.COLOR_RGB2HSV)
                    hist = cv2.calcHist([hsv_image], [0], None, [50], [0, 180])
                
                hist = cv2.normalize(hist, hist).flatten()
                
                color_histograms[image_path] = hist
                image_paths[image_path] = image_path
                image_id_mapping[image_id].append(image_path)

    image_ids = list(color_histograms.keys())
    hist_matrix = np.array([color_histograms[i] for i in image_ids])

    def check_cluster_constraints(clusters, image_ids, min_image_id_count=3):
        cluster_image_ids = defaultdict(set)
        for img_idx, cluster in enumerate(clusters):
            img_path = image_ids[img_idx]
            image_id = [key for key, val in image_id_mapping.items() if img_path in val][0]
            cluster_image_ids[cluster].add(image_id)
        
        return all(len(image_ids) >= min_image_id_count for image_ids in cluster_image_ids.values())
    

    distance_matrix = np.zeros((len(image_ids), len(image_ids)))
    for i, j in combinations(range(len(image_ids)), 2):
        dist = jensenshannon(hist_matrix[i], hist_matrix[j])
        distance_matrix[i, j] = distance_matrix[j, i] = dist


    method =  dic_methods[class_folder]["method"]
    num_clusters = dic_methods[class_folder]["num_clusters"]
    print(f" using {method} for {class_folder} class for {num_clusters} clusters")


    if method == "AgglomerativeClustering":
    
        agglomerative = AgglomerativeClustering(n_clusters=num_clusters, metric="precomputed", linkage="average")
        clusters = agglomerative.fit_predict(distance_matrix)

    elif method == "AgglomerativeClusteringBase":
        agglomerative_base = AgglomerativeClustering(n_clusters=num_clusters, linkage="average")
        clusters = agglomerative_base.fit_predict(hist_matrix)

    elif method =="kmeansBase":
        kmeans = KMeans(n_clusters=num_clusters, init='k-means++', random_state=42)
        kmeans.fit(hist_matrix)
        clusters = kmeans.labels_

    elif method == "kmeans":
        kmeans_distance = KMeans(n_clusters=num_clusters, init='k-means++', random_state=42)
        kmeans = kmeans_distance.fit(distance_matrix)
        clusters = kmeans.labels_

    elif method == "spectral":
        spectral = SpectralClustering(n_clusters=num_clusters, affinity="precomputed", random_state=42)
        clusters = spectral.fit_predict(distance_matrix)
    
    else:
        spectral_base = SpectralClustering(n_clusters=num_clusters, affinity="nearest_neighbors", random_state=42)
        clusters = spectral_base.fit_predict(hist_matrix)


    for i in range(len(image_ids)):
         dic_clusters_patches_opti[image_paths[image_ids[i]]]["cluster"] = str(clusters[i] + nb_clusters)



    for cluster in clusters:

        filtered_images = [image_paths[image_ids[i]] for i in range(len(image_ids)) if clusters[i] == cluster]
        dic_clusters_opti[str(cluster + nb_clusters)] = filtered_images
     
        cluster_image_ids = set(
            [key for img in filtered_images for key, val in image_id_mapping.items() if img in val]
        )
        l = []
        for img_id in cluster_image_ids:
            l.append(image_patch_counts[img_id])

        dic_clusters[class_folder][cluster] = list(cluster_image_ids)
        dic_clusters_patches[class_folder][cluster] = l.copy()

    return num_clusters


# Fix for Windows MKL memory leak warning
os.environ["OMP_NUM_THREADS"] = "1"


patch_size = (224, 224)
min_patch_area_ratio = 0.6
possible_clusters = range(2, 6)
use_rgb = False


dic_clusters = {}
dic_clusters_patches = {}
dic_clusters_opti = {}
dic_clusters_patches_opti = {}
dic_methods = {"1" : {"method":"kmeansBase", "num_clusters": 2},
               "2":{"method":"kmeans", "num_clusters": 4},
               "3":{"method":"kmeans", "num_clusters": 5},
               "41":{"method":"spectralBase", "num_clusters": 3},
               "42":{"method":"kmeans", "num_clusters": 3},
               "51":{"method":"kmeansBase", "num_clusters": 3},
               "52":{"method":"kmeansBase", "num_clusters": 3},
               "54":{"method":"kmeansBase", "num_clusters": 3},
               "57":{"method":"kmeansBase", "num_clusters": 3}}


if label_path == "labels_low_res/":
    dic_methods = {"1" : {"method":"kmeansBase", "num_clusters": 2},
               "2":{"method":"kmeans", "num_clusters": 4},
               "3":{"method":"kmeans", "num_clusters": 5},
               "41":{"method":"spectralBase", "num_clusters": 3},
               "5":{"method":"kmeansBase", "num_clusters": 3}}


for classe in classes:
    dic_clusters[classe] = {}
    dic_clusters_patches[classe] = {}
   
nb_clusters = 0
for class_folder in classes:
    nb_clusters += find_optimal_repartition(dic_clusters, dic_clusters_patches, class_folder, dic_clusters_patches_opti, nb_clusters, dic_clusters_opti, dic_methods)
    

def normalize_path(path):
    return os.path.normpath(path).replace("\\", "/")




images = {normalize_path(k): v for k, v in dic_clusters_patches_opti.items()}
clusters = {k: [normalize_path(p) for p in v] for k, v in dic_clusters_opti.items()}

save_dict(images,"images.json")
save_dict(clusters, "clusters.json")

In [None]:
import json
import pulp as lp
import os
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd


def repartition_optimization(images, clusters, random_opti = ''):
    splits = ['train', 'val', 'test']
    model = lp.LpProblem("Image_Split_Optimization", lp.LpMinimize)

    x = {(i, s): lp.LpVariable(f"x_{i}_{s}", cat=lp.LpBinary) for i in images for s in splits}
    P_cs = {(c, s): lp.LpVariable(f"P_{c}_{s}", cat=lp.LpContinuous) for c in clusters for s in splits}

    for i in images:
        model += lp.lpSum(x[i, s] for s in splits) == 1, f"assign_{i}"
    
    if random_opti =="random_":
        print("==== adding random constraint in optimization    ")
    
        rdm_constraint = load_dict(f"repartition_labels.json")
        
        for i in images:
            
            ori_img = i.split('/')[-1].split('_')[-1][:-4]
        

            split = rdm_constraint[ori_img]
            if split == "test":
                model += x[i,'test'] == 1, f"assign_random_{i}_test"
                model += x[i,'train'] == 0, f"assign_random_{i}_train"
                model += x[i,'val'] == 0,  f"assign_random_{i}_val"
            else:
                model += x[i,'test'] == 0, f"assign_random_{i}_test"

    image_ids = {}
    for i, img in images.items():
        if img['image_id'] not in image_ids:
            image_ids[img['image_id']] = []
        image_ids[img['image_id']].append(i)

    for id_images in image_ids.values():
        for s in splits:
            for i in id_images:
                for j in id_images:
                    model += x[i, s] == x[j, s], f"same_id_{i}_{j}_{s}"

    for c, image_list in clusters.items():
        for s in splits:
            model += P_cs[c, s] == lp.lpSum(images[str(i)]['patches'] * x[i, s] for i in image_list), f"patches_{c}_{s}"
            model += lp.lpSum(x[i, s] for i in image_list) >= 1, f"min_one_image_{c}_{s}"

    split_ratios = {'train': 0.7, 'val': 0.15, 'test': 0.15}

    P_target_cs = {c: {s: sum(images[i]['patches'] for i in clusters[c]) * split_ratios[s] for s in splits} for c in clusters}


    error = {(c, s): lp.LpVariable(f"error_{c}_{s}", lowBound=0) for c in clusters for s in splits}

    for c in clusters:
        for s in splits:
            model += error[c, s] >= P_cs[c, s] - P_target_cs[c][s]
            model += error[c, s] >= -(P_cs[c, s] - P_target_cs[c][s])

    objective = lp.lpSum(error[c, s] for c in clusters for s in splits)

    model += objective
    model.solve(lp.PULP_CBC_CMD(msg=True))


    res = {}
    results = {}
    patch_counts = {s: 0 for s in splits}

    images_reparted = {}

    if lp.LpStatus[model.status] == "Optimal":
        for i in images:
            class_id = i.split("/")[1]
            cluster = images[i]["cluster"]
            patches = images[i]["patches"]
            for s in splits:
                if x[i, s].varValue > 0.5:
                    print(f"Image {i} -> {s}")
                    images_reparted[i] = {"cluster": cluster, "patches": patches, "class_id": class_id, "split": s}
                    results.setdefault(class_id, {}).setdefault(cluster, {}).setdefault(s, 0)
                    results[class_id][cluster][s] += patches
                    
                    num_patches = images[i]['patches']
                    patch_counts[s] += num_patches
                    
                    final_img_id = i.split('_')[2].split('.')[0]
                    if label_path == "labels_low_res/":
                        final_img_id = i.split('/')[2].split('.')[0].split('_')[2]
                
                    if final_img_id in res and res[final_img_id] != s:
                        print('PROBLEME')
                    else:
                        res[final_img_id] = s

    
    save_dict(res, f"{random_opti}repartition_labels.json")
    save_dict(images_reparted, f"{random_opti}image_reparted.json")
    save_dict(results, f"{random_opti}results.json")

    print("Objective value :", model.objective.value())

    total_patches = sum(patch_counts.values())
    if total_patches > 0:
        patch_proportions = {s: patch_counts[s] / total_patches * 100 for s in splits}
    else:
        patch_proportions = {s: 0 for s in splits}

    print("\n Nb patches by split :")
    for s in splits:
        print(f"{s}: {patch_counts[s]} patches ({patch_proportions[s]:.2f}%)")


    for class_id, clusters in results.items():
        plt.figure(figsize=(10, 6))
        df = []
        
        for cluster, splits in clusters.items():
            for split, count in splits.items():
                df.append({"Split": split, "Patches": count, "Cluster": cluster})
        
        df = pd.DataFrame(df)
        
        ax = sns.barplot(x="Split", y="Patches", hue="Cluster", data=df)
        
        for p in ax.patches:
            ax.annotate(f'{int(p.get_height())}', 
                        (p.get_x() + p.get_width() / 2., p.get_height()), 
                        ha='center', va='bottom', fontsize=10, color='black')

        plt.title(f"Distribution des patches pour la classe {class_id}")
        plt.xlabel("Split")
        plt.ylabel("Nombre de patches")
        plt.legend(title="Cluster")
        plt.savefig(f"{random_opti}repartition_patches_class_{class_id}.png")
        plt.show()
    



true_opti ="random_"
# true_opti = ""

images = load_dict("images.json")
clusters = load_dict("clusters.json")


if true_opti == "random_":
    print("Performing random optimization repartition ...")
    ###Random optimization
    clusters = {}

   
    for image_path, data in images.items():
        # for image_id, patches, cluster in data.values():
        cluster = str(classes.index(image_path.split('/')[1]))
        images[image_path]["cluster"] = cluster
        if cluster in clusters:
            clusters[cluster] += [image_path]
        else:
            clusters[cluster] = [image_path]


print(images)
print(clusters)
repartition_optimization(images, clusters, true_opti)




In [None]:
from collections import defaultdict
import matplotlib.pyplot as plt

def redistribute_patches(results, images_reparted, num_shots_train, num_shots_val, num_shots_test, random_opti=''):
    patches_per_image = {}
    clusters_distribution = defaultdict(lambda: {'train': {}, 'val': {}, 'test': {}})
    
    num_shots_dict = {'train': num_shots_train, 'val': num_shots_val, 'test': num_shots_test}
    
    for class_id, clusters in results.items():
        for split in ['train', 'val', 'test']:
            num_shots = num_shots_dict[split]
            total_patches = sum(cluster_info.get(split, 0) for cluster_info in clusters.values())

            cluster_patches = {k: v.get(split, 0) for k, v in clusters.items()}
            clusters_distribution[class_id][split] = cluster_patches.copy()

            if total_patches == 0:
                continue
            
            
            for image_path, image_info in images_reparted.items():
                if image_info['class_id'] == class_id and image_info['split'] == split:
                    patches_per_image[image_path] = {'patches': image_info['patches'], 'split': split, 'cluster':image_info['cluster']}

            if total_patches <= num_shots:
                continue
            
            

            
            while sum(cluster_patches.values()) > num_shots:
            
                max_cluster = max(cluster_patches, key=cluster_patches.get)

                images_with_patches = [
                    img for img, img_info in images_reparted.items()
                    if img_info['class_id'] == class_id and img_info['split'] == split and patches_per_image.get(img, {}).get('patches', 0) > 0
                ]

                if all(patches_per_image[img]['patches'] == 1 for img in images_with_patches):
                    max_image = random.choice(images_with_patches)
                else:
                    max_image = max(
                        (img for img in images_with_patches if images_reparted[img]['cluster'] == max_cluster),
                        key=lambda img: patches_per_image[img]['patches'],
                        default=None
                    )

                if not max_image or patches_per_image[max_image]['patches'] == 0:
                    print(f"Impossible de réduire davantage les patches pour la classe {class_id}, split {split}.")
                    break

            
                patches_per_image[max_image]['patches'] -= 1
                cluster_patches[max_cluster] -= 1

            clusters_distribution[class_id][split] = cluster_patches.copy()
    
    plot_clusters_distribution(clusters_distribution,random_opti)
    plot_before_after_repartition(results, clusters_distribution, num_shots_dict,random_opti)
    # plot_patches_per_image(images_reparted, patches_per_image_before=images,patches_per_image_after=patches_per_image,random_opti=random_opti)
    return patches_per_image

def plot_clusters_distribution(clusters_distribution, random_opti):
    for class_id, splits in clusters_distribution.items():
        fig, axes = plt.subplots(1, 3, figsize=(15, 5))
        
        for ax, (split, clusters) in zip(axes, splits.items()):
            print(sum(clusters.values()))
            if clusters and sum(clusters.values()) > 0:
                ax.bar(clusters.keys(), clusters.values())

            
            ax.set_xlabel("Clusters")
            ax.set_ylabel("Number of attributed patches")
            ax.set_title(f"Patches repartition ({split}) - Class {class_id}")
            ax.set_xticks(list(clusters.keys()))
            ax.set_xticklabels(list(clusters.keys()), rotation=45)
        plt.savefig(f"{random_opti}Repartition_des_patches")
        plt.tight_layout()
        plt.show()

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

def plot_before_after_repartition(results, clusters_distribution, num_shots_dict, random_opti):
    """
    Show patches repartition before and after equalization
    """
    for class_id, clusters in results.items():
        plt.figure(figsize=(12, 6))
        df = []

        num_clusters = len(clusters)

        for cluster, splits in clusters.items():
            for split, count in splits.items():
                df.append({"Split": split, "Patches": count, "Cluster": cluster, "Status": "Avant"})

        for split, splits in clusters_distribution[class_id].items():
            for cluster, count in splits.items():
                df.append({"Split": split, "Patches": count, "Cluster": cluster, "Status": "Après"})


        df = pd.DataFrame(df)
        
        df["Split"], df["Cluster"] = df["Split"].astype(str), df["Cluster"].astype(str) 
        splits = df["Split"].unique()

        fig, axes = plt.subplots(1, len(splits), figsize=(15, 5), sharey=True)

        for ax, split in zip(axes, splits):
            num_shots = num_shots_dict[split]
            df_split = df[df["Split"] == split]
            sns.barplot(x="Cluster", y="Patches", hue="Status", data=df_split, ax=ax, palette={"Avant": "blue", "Après": "orange"})
            
            ax.axhline(num_shots / df_split["Cluster"].nunique(), color="red", linestyle="dashed", label="Rééquilibrage idéal")
            
            ax.set_title(f"Patches repartition ({split})")
            ax.set_xlabel("Clusters")
            ax.set_ylabel("Number of patches")
            ax.legend()

        plt.tight_layout()
        plt.savefig(f"{random_opti}repartition_patches_avant_apres_class_{class_id}.pdf")
        plt.show()

def plot_patches_per_image(images_reparted, patches_per_image_before, patches_per_image_after, random_opti):
    df = []
    
    for image_path, image_info in images_reparted.items():
        cluster = image_info.get("cluster")
        split = image_info.get("split")
        
        before = patches_per_image_before.get(image_path, {}).get("patches", 0)
        after = patches_per_image_after.get(image_path, {}).get("patches", 0)
        
        if before > 0:
            df.append({"Image": image_path, "Patches": before, "Cluster": cluster, "Split": split, "Status": "Avant"})
            df.append({"Image": image_path, "Patches": after, "Cluster": cluster, "Split": split, "Status": "Après"})
    
    df = pd.DataFrame(df)
    
    clusters = sorted(df["Cluster"].unique())
    
    for cluster in clusters:
        df_cluster = df[df["Cluster"] == cluster]
        if df_cluster.empty:
            continue
        
        fig, axes = plt.subplots(1, 3, figsize=(18, 5), sharey=True)
        
        for j, split in enumerate(["train", "val", "test"]):
            ax = axes[j]
            df_cluster_split = df_cluster[df_cluster["Split"] == split]
            
            if not df_cluster_split.empty:
                sns.barplot(x="Image", y="Patches", hue="Status", data=df_cluster_split, ax=ax, palette={"Avant": "blue", "Après": "orange"})
                ax.set_yscale("log") 
                ax.set_title(f"Cluster {cluster} - {split}")
                ax.set_xlabel("Image Path")
                ax.set_ylabel("Number of patches (log scale)")
                ax.set_xticklabels(ax.get_xticklabels(), rotation=90)
                ax.legend(title="State")
            else:
                ax.set_visible(False)
        
        plt.tight_layout()
        plt.savefig(f"{random_opti}patches_per_image_cluster_{cluster}.png")
        plt.show()


random_opti = 'random_'
# random_opti = ''
images_reparted = load_dict(f"{random_opti}image_reparted.json")
results = load_dict(f"{random_opti}results.json")

# Patches Redistribution
patches_per_image = redistribute_patches(results, images_reparted, 60000, 10000, 10000, random_opti)

save_dict(patches_per_image, "patches_per_image.json")


In [None]:
import os
import random
import shutil
from collections import defaultdict
from PIL import Image, ImageOps

def setup_folders():
    for folder in ["train", "val", "test"]:
        if label_path == "labels_low_res/":
            folder +=  "_low_res" 
        if os.path.exists(folder):
            shutil.rmtree(folder)   
        os.makedirs(folder)

def extract_patches(image_path, patch_size=(224, 224), min_patch_area_ratio=0.6):

    image = Image.open(image_path)
    if image.size[0] < patch_size[0] or image.size[1] < patch_size[1]:
        image = ImageOps.pad(image, patch_size, color=(0, 0, 0))
    
    min_patch_area = patch_size[0] * patch_size[1] * min_patch_area_ratio
    patches = []
    
    for x in range(0, image.size[0], patch_size[0]):
        for y in range(0, image.size[1], patch_size[1]):
            actual_width = min(patch_size[0], image.size[0] - x)
            actual_height = min(patch_size[1], image.size[1] - y)
            
            if actual_width * actual_height >= min_patch_area:
                patch = image.crop((x, y, x + actual_width, y + actual_height))
                patches.append(patch)
    
    return patches

def save_patches(patches_per_image, patch_size=(224, 224)):
    for image_path, info in patches_per_image.items():
        num_patches = info["patches"]
        split = info["split"]
        cluster = info["cluster"]
        
        patches = extract_patches(image_path, patch_size)
       
        selected_patches = random.sample(patches, min(num_patches, len(patches)))
        
        if label_path == "labels_low_res/":
          
            split +=  "_low_res" 
        for idx, patch in enumerate(selected_patches):
            patch_filename = f"{os.path.basename(image_path).split('.')[0]}_patch{idx}.png"
            patch_path = os.path.join(split, patch_filename)
            patch.save(patch_path)

def main(patches_per_image):
    setup_folders()
    save_patches(patches_per_image)
    print("Patches generation over")


main(load_dict("patches_per_image.json"))




In [None]:
import os
import numpy as np
import pandas as pd
from sklearn.cluster import DBSCAN
from sklearn.semi_supervised import LabelPropagation
import matplotlib.pyplot as plt

class Patch:
    def __init__(self, x, y, prediction, embedding, probabilities):
        self.x = x
        self.y = y
        self.prediction = prediction 
        self.embedding = embedding
        self.probabilities = probabilities 
def create_patches_from_parquet(file_path):
    patches = []
    df = pd.read_parquet(file_path)
    
    for _, row in df.iterrows():
        patch = Patch(
            x=row['x'],
            y=row['y'],
            prediction=row['prediction'],
            embedding=row['embedding'],  
            probabilities=row['probabilities']
        )
        patches.append(patch)
    
    return patches

def load_patches_from_folder(folder_path):
    all_patches = []
    
    for file_name in os.listdir(folder_path):
    
        if file_name.endswith('.parquet'):
            file_path = os.path.join(folder_path, file_name)
            patches = create_patches_from_parquet(file_path)
            all_patches.extend(patches)
            print(f"{len(patches)} patches loaded from {file_name}")
    
    print(f"Nb total of loaded patches : {len(all_patches)}")
    return all_patches

folder_path = "../../Cytology-fine-tuning/wsi_results_high_res_cluster/11C01217/"  # ➡️ REMPLACER par le chemin réel

patches = load_patches_from_folder(folder_path)

In [None]:
from sklearn.decomposition import KernelPCA
import numpy as np
import matplotlib.pyplot as plt
from scipy.spatial import KDTree

classes = ["1","2","3","41","42","51","52","54","57"]

def flatten_embeddings(patches):
    print(patches[0].embedding.shape)
    return np.array([np.array(patch.embedding) for patch in patches])

def entropy(probs):
    return -np.sum(probs * np.log(probs + 1e-9), axis=-1)

def visualize_propagated_labels(patches, labels, filename):
    plt.figure(figsize=(16, 10))
    unique_labels = np.unique(labels)
    n_classes = len(unique_labels)
    cmap = plt.get_cmap('tab20', n_classes)
    uncertainties = np.array([entropy(patch.probabilities) for patch in patches])
    probabilities_array = np.array([patch.probabilities for patch in patches])

    if uncertainties.max() - uncertainties.min() == 0:
        alpha_values = np.ones_like(uncertainties)
    else:
        alpha_values = 1 - (uncertainties - uncertainties.min()) / (uncertainties.max() - uncertainties.min() + 1e-6)

    scatter = plt.scatter(
        [patch.x for patch in patches], 
        [patch.y for patch in patches], 
        c=labels, cmap=cmap, s=10
    )

    plt.colorbar(scatter, label='Propagated label', ticks=range(len(classes))).ax.set_yticklabels(classes)
    plt.title("Label propagation")
    plt.xlabel("X position")
    plt.ylabel("Y position")
    plt.gca().invert_yaxis()
    plt.show()

    plt.figure(figsize=(16, 10))
    scatter_entropy = plt.scatter(
        [patch.x for patch in patches], 
        [patch.y for patch in patches], 
        c=alpha_values, cmap='viridis', s=10
    )

    plt.colorbar(scatter_entropy, label='Prediction entropy')
    plt.title("Entropy map of patch predictions")
    plt.xlabel("X position")
    plt.ylabel("Y position")
    plt.gca().invert_yaxis()
    plt.show()

    height = 191
    width = 412

    if len(labels) != height * width:
        raise ValueError(f"Mismatch in number of labels: {height}x{width}")

    max_prob_indices = np.argmax(probabilities_array.reshape((78692,9)), axis=-1) 
    label_grid = probabilities_array.reshape((width, height, 9)) 
    grad_x = np.gradient(label_grid, axis=1)
    grad_y = np.gradient(label_grid, axis=0)  
    gradient_magnitude = np.sqrt(np.sum(abs(grad_x), axis=-1) + np.sum(abs(grad_y), axis=-1))

    plt.figure(figsize=(16, 10))
    scatter_grad = plt.scatter(
        [patch.x for patch in patches], 
        [patch.y for patch in patches], 
        c=gradient_magnitude, cmap='viridis', s=10
    )

    plt.colorbar(scatter_grad, label='Prediction entropy error')
    plt.title("Entropy map of patches")
    plt.xlabel("X position")
    plt.ylabel("Y position")
    plt.gca().invert_yaxis()
    plt.show()

    label_grid = probabilities_array.reshape((width, height, 9))
    label_grid = label_grid[:, ::-1, :]

    grad_x = np.gradient(label_grid, axis=1)
    grad_y = np.gradient(label_grid, axis=0) 
    gradient_magnitude = np.sqrt(np.sum(grad_x**2, axis=-1) + np.sum(grad_y**2, axis=-1))

    plt.figure(figsize=(16, 10))
    plt.imshow(gradient_magnitude, cmap='coolwarm', interpolation='nearest')
    plt.colorbar(label="Gradient magnitude")
    plt.title("Gradient map of propagated labels")
    plt.xlabel("X position")
    plt.ylabel("Y position")
    plt.gca().invert_yaxis()
    plt.show()

def plot_gradient_map_per_class(patches, class_index, class_name, threshold=0.1, radius=225):
    probabilities_array = np.array([patch.probabilities[class_index] for patch in patches])
    threshold = [0.9,0.92,0.92,0.82,0.5,0.5,0.5,0.5,0.5]
    x_coords = np.array([patch.x for patch in patches])
    y_coords = np.array([patch.y for patch in patches])
    grad_x = np.gradient(probabilities_array)
    maxima_indices = np.where((grad_x[:-1] > 0) & (grad_x[1:] < 0) & (probabilities_array[1:] > threshold[class_index]))[0] + 1
    non_maxima_indices = np.setdiff1d(np.where(probabilities_array > threshold[class_index])[0], maxima_indices)

    plt.figure(figsize=(16, 10))
    scatter = plt.scatter(x_coords, y_coords, c=grad_x, cmap='viridis', s=10, label='Gradients')
    plt.scatter(x_coords[maxima_indices], y_coords[maxima_indices], c='red', s=50, marker='*', label='Local maximas')
    plt.colorbar(scatter, label=f'Probability Gradient - Class {class_name}')
    plt.title(f'Gradient Map - Class {class_name}')
    plt.xlabel("X position")
    plt.ylabel("Y position")
    plt.gca().invert_yaxis()
    plt.legend()
    plt.savefig(f"grad_map_{class_index}.png")
    plt.show()

    plt.figure(figsize=(8, 6))
    plt.scatter(np.zeros(len(maxima_indices)), probabilities_array[maxima_indices], color='red', label='Local maximas', alpha=0.7)
    plt.scatter(np.ones(len(non_maxima_indices)), probabilities_array[non_maxima_indices], color='blue', label='Non-maximas', alpha=0.7)
    plt.xticks([0, 1], ['Maximas', 'Non-maximas'])
    plt.ylabel("Prediction probability")
    plt.title(f'Probability comparison - Class {class_name}')
    plt.legend()
    plt.show()

def plot_updated_predictions(patches, class_index, class_name):
    probabilities_array = np.array([patch.probabilities[0][class_index] for patch in patches])
    x_coords = np.array([patch.x for patch in patches])
    y_coords = np.array([patch.y for patch in patches])

    plt.figure(figsize=(16, 10))
    scatter = plt.scatter(x_coords, y_coords, c=probabilities_array, cmap='coolwarm', s=10)
    plt.colorbar(scatter, label=f'Probability after propagation - Class {class_name}')
    plt.title(f'Post-propagation Probability Map - Class {class_name}')
    plt.xlabel("X position")
    plt.ylabel("Y position")
    plt.gca().invert_yaxis()
    plt.show()

def compute_multiclass_gradients(patches, num_classes):
    gradients = np.zeros((len(patches), num_classes))
    for c in range(num_classes):
        probabilities = np.array([patch.probabilities[c] for patch in patches])
        gradients[:, c] = np.gradient(probabilities)
    return gradients

def find_multiclass_maxima(gradients, probabilities, threshold=0.0):
    maxima_indices = []
    for c in range(gradients.shape[1]):
        for i in range(1, len(probabilities) - 1):
            if gradients[i - 1, c] > 0 and gradients[i + 1, c] < 0 and probabilities[i, c] > threshold[c]:
                maxima_indices.append(i)
    return np.array(maxima_indices, dtype=int)

def propagate_labels_multiclass(patches, maxima_indices, num_classes, radius, lambda_factor=0.7):
    interactions = {
        0: [1, 2],
        1: [0, 1, 2, 5],
        2: [0, 1, 2, 5],
        3: [2, 3],
        4: [3],
        5: [0],
        6: [5],
        7: [0],
        8: [0]
    }

    x_coords = np.array([patch.x for patch in patches])
    y_coords = np.array([patch.y for patch in patches])
    probabilities = np.array([patch.probabilities for patch in patches])
    tree = KDTree(np.column_stack((x_coords, y_coords)))

    for idx in maxima_indices:
        neighbors = tree.query_ball_point([x_coords[idx], y_coords[idx]], r=radius)
        for neighbor in neighbors:
            pred_idx = np.argmax(probabilities[idx])
            pred_neighbor_idx = np.argmax(probabilities[neighbor])
            if pred_neighbor_idx in interactions.get(pred_idx, []):
                if np.max(probabilities[neighbor]) < np.max(probabilities[idx]):
                    probabilities[neighbor] += lambda_factor * (probabilities[idx])
                    patches[neighbor].probabilities = probabilities[neighbor]
    return patches

def plot_results(patches, maxima_indices, class_names):
    x_coords = np.array([patch.x for patch in patches])
    y_coords = np.array([patch.y for patch in patches])
    probabilities = np.array([patch.probabilities for patch in patches])

    for c, class_name in enumerate(class_names):
        plt.figure(figsize=(16, 10))
        plt.scatter(x_coords, y_coords, c=probabilities[:, c], cmap='viridis', s=10, label=f'Class {class_name}')
        plt.scatter(x_coords[maxima_indices], y_coords[maxima_indices], c='red', s=50, marker='*', label='Maximas')
        plt.colorbar(label=f'Probability - Class {class_name}')
        plt.title(f'Post-propagation Map - Class {class_name}')
        plt.xlabel("X position")
        plt.ylabel("Y position")
        plt.gca().invert_yaxis()
        plt.legend()
        plt.show()



num_classes = len(classes)
radius = 600

threshold = 0.5
threshold = [0.9,0.92,0.92,0.82,0.85,0.78,0.5,0.8,0.75]
threshold = [0.9,0.92,0.92,0.82,0.5,0.5,0.5,0.5,0.5]
labels = np.array([patch.prediction for patch in patches])

visualize_propagated_labels(patches, labels, "_non_propagated_")
gradients = compute_multiclass_gradients(patches, num_classes)
probabilities = np.array([patch.probabilities for patch in patches])
maxima_indices = find_multiclass_maxima(gradients, probabilities, threshold)

patches = propagate_labels_multiclass(patches, maxima_indices, num_classes, radius)

labels = np.array([np.argmax(patch.probabilities) for patch in patches])

unique_labels, counts = np.unique(labels, return_counts=True)
percentages = (counts / len(labels)) * 100

fig, ax = plt.subplots(figsize=(8, 6))
bars = ax.bar(classes, percentages, color='skyblue')


for bar in bars:
    yval = bar.get_height()
    ax.text(bar.get_x() + bar.get_width() / 2, yval + 1, f'{yval:.1f}%', ha='center', va='bottom')


ax.set_xlabel('Classes')
ax.set_ylabel('Pourcentage')
ax.set_title('Labels repartition in pourcentages')
ax.set_xticks(np.arange(9))
ax.set_xticklabels([str(i) for i in range(9)])

plt.tight_layout()
plt.show()

import json

changed_labels_list = [
    {"x": patche.x, "y": patche.y, "pred": patche.prediction, "label": int(label)}
    for patche, label in zip(patches, labels) if label != patche.prediction
]

json_path = "changed_labels.json"
with open(json_path, "w") as f:
    json.dump(changed_labels_list, f)



visualize_propagated_labels(patches, labels, "_propagated_")


# # Propagate labels according to confidence of predictions
# labels_propagated, labels = propagate_labels(patches, threshold=0.0)
#     
# labels, mask_confident, probabilities  = identify_confident_and_uncertain_labels(patches, 0.0)
# # sampled_embeddings, sampled_labels = sample_balanced_embeddings(patches, labels, sample_size=1500)
# # print(sampled_embeddings.shape, sampled_labels.shape)

# # visualize_propagated_labels(patches, labels_propagated,"wsi_labels_propagated.png")
# visualize_propagated_labels(patches, labels, "wsi_labels_expe5.png")

# visualize_tsne_on_embeddings(sampled_embeddings,sampled_labels, "kpca_labels_plot_expe5.png")

# intra_distances, inter_distances = calculate_approx_distances_in_embeddings(patches, labels)




In [None]:
import json
import dash
from dash import dcc, html
from dash.dependencies import Input, Output
import openslide
from PIL import Image
import io
import base64
import numpy as np
import matplotlib.pyplot as plt
from io import BytesIO


json_path = "changed_labels.json"
changed_labels = load_dict(json_path)

def get_patch_image(slide_path, x, y, patch_size=224):
    slide = openslide.OpenSlide(slide_path)
    tile = slide.read_region((x, y), 0, (patch_size, patch_size)).convert("RGB")

    buffered = io.BytesIO()
    tile.save(buffered, format="PNG")
    return base64.b64encode(buffered.getvalue()).decode()

def create_probability_bar_plot(probabilities):
    fig, ax = plt.subplots(figsize=(5, 3))
    ax.bar(np.arange(9), probabilities)
    ax.set_xticks(np.arange(9))
    ax.set_xlabel('Classes')
    ax.set_ylabel('Probabilities')
    ax.set_title('Distribution of probabilities')

    buf = BytesIO()
    plt.savefig(buf, format="png")
    buf.seek(0)
    return base64.b64encode(buf.getvalue()).decode()

app = dash.Dash(__name__)

app.layout = html.Div([
    html.H1("Modified labels visualization"),
    dcc.Dropdown(
        id="category-filter",
        options=[{"label": f"Class {i}", "value": i} for i in range(9)],
        value=0,
        clearable=False
    ),

    html.Div(id="image-container")
])

@app.callback(
    Output("image-container", "children"),
    [Input("category-filter", "value")]
)
def update_images(selected_category):
    slide_path = "C:/Users/lucas/AAA_MEMOIRE/Code_Memoire/img/database/13C08641.ndpi"
    filtered_labels = [entry for entry in changed_labels if entry["label"] == selected_category]

    images = []
    for entry in filtered_labels[:20]:
        x, y, pred, label = entry["x"], entry["y"], entry["pred"], entry["label"]
        probabilities = np.random.rand(9)
        img_base64 = get_patch_image(slide_path, x, y)
        prob_plot_base64 = create_probability_bar_plot(probabilities)

        images.append(html.Div([
            html.Img(src=f"data:image/png;base64,{img_base64}", style={"width": "150px"}),
            html.P(f"Prédiction Initiale: {classes[pred]} → Nouvelle: {classes[label]}"),
            dcc.Graph(
                id=f"prob-bar-{x}-{y}",
                figure={
                    "data": [
                        {"x": np.arange(9), "y": probabilities, "type": "bar"}
                    ],
                    "layout": {
                        "title": f"",
                        "xaxis": {"title": "Classes"},
                        "yaxis": {"title": "Probabilité"}
                    }
                }
            )
        ], style={"display": "inline-block", "margin": "10px"}))
    
    return images

if __name__ == '__main__':
    app.run_server(debug=True)


In [None]:
import xml.etree.ElementTree as ET
from openslide import OpenSlide
from PIL import Image, ImageDraw
from reportlab.pdfgen import canvas

slide_path = "C:/Users/lucas/AAA_MEMOIRE/Code_Memoire/img/database/13C08641.ndpi"
ndpa_path = "C:/Users/lucas/AAA_MEMOIRE/Code_Memoire/img/database/13C08641.ndpi.ndpa"
output_pdf = "output2.pdf"


slide = OpenSlide(slide_path)

if 7 >= len(slide.level_dimensions):
    raise ValueError("La résolution 7 n'est pas disponible pour cette image.")

image = slide.read_region((0, 0), 6, slide.level_dimensions[6]).convert("RGB")

def parse_ndpa(ndpa_path):
    """Parses the NDPA file to extract annotations."""
    tree = ET.parse(ndpa_path)
    root = tree.getroot()
    annotations = []
    for annotation in root.findall("Annotation"):
        coords = []
        for point in annotation.findall("Coordinates/Coordinate"):
            x = int(float(point.get("X")))
            y = int(float(point.get("Y")))
            coords.append((x, y))
        if coords:
            annotations.append(coords)
    return annotations

annotations = parse_ndpa(ndpa_path)

def draw_annotations(image, annotations):
    """Draws the extracted annotations onto the image."""
    draw = ImageDraw.Draw(image)
    for poly in annotations:
        draw.polygon(poly, outline="red", width=3)
    return image

image = draw_annotations(image, annotations)

image.save(output_pdf, "PDF", resolution=100.0)


In [None]:
import dash
import dash_bootstrap_components as dbc
from dash import dcc, html, Input, Output
import plotly.express as px
import cv2
import numpy as np
import base64
import io
from PIL import Image
from sklearn.cluster import KMeans

def parse_contents(contents):
    content_type, content_string = contents.split(',')
    decoded = base64.b64decode(content_string)
    img = Image.open(io.BytesIO(decoded))
    return np.array(img)

def histogram_equalization(img, apply_clahe):
    if not apply_clahe:
        return img

    img_gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
    
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    img_clahe = clahe.apply(img_gray)
    
    img_clahe_rgb = cv2.cvtColor(img_clahe, cv2.COLOR_GRAY2RGB)
    return img_clahe_rgb

def kmeans_clustering(img, k):
    img_gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
    img_reshaped = img_gray.reshape((-1, 1))
    kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
    kmeans.fit(img_reshaped)
    labels = kmeans.labels_
    sorted_labels = np.argsort(kmeans.cluster_centers_.flatten())
    sorted_img = np.zeros_like(labels)
    for i, lbl in enumerate(sorted_labels):
        sorted_img[labels == lbl] = i
    return sorted_img.reshape(img_gray.shape)


def morphological_operation(mask, operation, kernel_size):
    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (kernel_size, kernel_size))

    if operation == 'dilate':
        return cv2.dilate(mask, kernel, iterations=1)
    elif operation == 'erode':
        return cv2.erode(mask, kernel, iterations=1)
    elif operation == 'open':
        return cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
    elif operation == 'close':
        return cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
    return mask

def mask_original_image(img, clustered_img, selected_classes):
    mask = np.isin(clustered_img, selected_classes).astype(np.uint8) * 255
    masked_img = cv2.bitwise_and(img, img, mask=mask)
    return masked_img

app = dash.Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP])

app.layout = dbc.Container([
    html.H1("Segmentation d'Image et Opérations Morphologiques"),
    dbc.Row([
        dbc.Col(dcc.Upload(id='upload-image', children=html.Button('Charger une Image'), multiple=False), width=3),
        dbc.Col(dcc.Checklist(id='apply-hist-eq', options=[{'label': 'Appliquer Equalization Histogram', 'value': 'yes'}], value=[]), width=3),
        dbc.Col(dcc.Slider(2, 10, step=1, id='kmeans-k', value=5, marks={i: str(i) for i in range(2, 11)}), width=6)
    ]),
    dbc.Row([
        dbc.Col(dcc.Graph(id='original-image'), width=4),
        dbc.Col(dcc.Graph(id='equalized-image'), width=4),
        dbc.Col(dcc.Graph(id='clustered-image'), width=4)
    ]),
    dbc.Row([
        dbc.Col(dcc.Checklist(id='select-classes', options=[{'label': f'Classe {i}', 'value': i} for i in range(10)], value=[]), width=4),
        dbc.Col(dcc.Dropdown(id='morph-operation', options=[
            {'label': 'Dilate', 'value': 'dilate'},
            {'label': 'Erode', 'value': 'erode'},
            {'label': 'Open', 'value': 'open'},
            {'label': 'Close', 'value': 'close'}
        ], placeholder='Choisir une opération'), width=4),
        dbc.Col(dcc.Slider(3, 15, step=2, id='kernel-size', value=5, marks={i: str(i) for i in range(3, 16, 2)}), width=4),
        dbc.Col(dcc.Checklist(id='morph-classes', options=[{'label': f'Classe {i}', 'value': i} for i in range(10)], value=[]), width=4)

    ]),
    dbc.Row([
        dbc.Col(dcc.Graph(id='filtered-image'), width=4),
        dbc.Col(dcc.Graph(id='morph-image'), width=4),
        dbc.Col(dcc.Graph(id='morph-filtered-image'), width=4)
    ])
])

@app.callback(
    [Output('original-image', 'figure'),
     Output('equalized-image', 'figure'),
     Output('clustered-image', 'figure'),
     Output('filtered-image', 'figure'),
     Output('morph-image', 'figure'),
     Output('morph-filtered-image', 'figure')],
    [Input('upload-image', 'contents'),
     Input('apply-hist-eq', 'value'),
     Input('kmeans-k', 'value'),
     Input('morph-operation', 'value'),
     Input('kernel-size', 'value'),
     Input('morph-classes', 'value'),
     Input('select-classes', 'value')]
)
def update_image(contents, hist_eq, k, morph_operation, kernel_size, morph_classes, selected_classes):
    if contents is None:
        return [px.imshow(np.zeros((100, 100, 3), dtype=np.uint8))] * 6
    
    img = parse_contents(contents)
    img_eq = histogram_equalization(img, 'yes' in hist_eq)
    clustered_img = kmeans_clustering(img_eq, k)
    
    masked_img = mask_original_image(img, clustered_img, selected_classes)
    morph_img = clustered_img.copy()

    if morph_operation:
        for morph_class in morph_classes:
            class_mask = (clustered_img == morph_class).astype(np.uint8) * 255
            processed_mask = morphological_operation(class_mask, morph_operation, kernel_size)
            morph_img = np.where(processed_mask == 255, morph_class, morph_img)
    
    morph_filtered_img = mask_original_image(img, morph_img, selected_classes)

    return [px.imshow(img), px.imshow(img_eq), px.imshow(clustered_img), px.imshow(masked_img), px.imshow(morph_img), px.imshow(morph_filtered_img)]

if __name__ == '__main__':
    app.run_server(debug=True)


In [None]:
import dash
import dash_bootstrap_components as dbc
from dash import dcc, html, Input, Output
import plotly.express as px
import cv2
import numpy as np
from PIL import Image

img_path = "cells_morph.png"
img = cv2.imread(img_path)
img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

def morphological_operation(img, operation, kernel_size):
    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (kernel_size, kernel_size))

    if operation == 'dilate':
        return cv2.dilate(img, kernel, iterations=1)
    elif operation == 'erode':
        return cv2.erode(img, kernel, iterations=1)
    elif operation == 'open':
        return cv2.morphologyEx(img, cv2.MORPH_OPEN, kernel)
    elif operation == 'close':
        return cv2.morphologyEx(img, cv2.MORPH_CLOSE, kernel)
    return img

def watershed_segmentation(img):
    
    gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)

    _, thresh = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
    
    # Noise removal - dilate et erode
    kernel = np.ones((3, 3), np.uint8)
    sure_bg = cv2.dilate(thresh, kernel, iterations=3)
  
    sure_fg = cv2.erode(thresh, kernel, iterations=3)
    
    dist_transform = cv2.distanceTransform(thresh, cv2.DIST_L2, 5)
    _, sure_fg = cv2.threshold(dist_transform, 0.7 * dist_transform.max(), 255, 0)
    
    sure_fg = np.uint8(sure_fg)
    unknown = cv2.subtract(sure_bg, sure_fg)
    
    _, markers = cv2.connectedComponents(sure_fg)
    markers = markers + 1
    markers[unknown == 255] = 0
    cv2.watershed(img, markers)
    
    img[markers == -1] = [255, 0, 0]
    
    contours, _ = cv2.findContours(markers.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    areas = [cv2.contourArea(c) for c in contours]
    
    areas.sort(reverse=True)
    max_area = areas[1] if len(areas) > 1 else 0
    
    return img, max_area

app = dash.Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP])

app.layout = dbc.Container([
    html.H1("Opérations Morphologiques et Watershed sur Image"),
    dbc.Row([
        dbc.Col(dcc.Dropdown(
            id='morph-operation',
            options=[
                {'label': 'Erode', 'value': 'erode'},
                {'label': 'Dilate', 'value': 'dilate'},
                {'label': 'Close', 'value': 'close'},
                {'label': 'Open', 'value': 'open'}
            ],
            value='dilate',
            placeholder="Choisir une opération morphologique"
        ), width=4),
        dbc.Col(dcc.Slider(
            id='kernel-size',
            min=3,
            max=15,
            step=2,
            value=5,
            marks={i: str(i) for i in range(3, 16, 2)},
            tooltip={"placement": "bottom", "always_visible": True}
        ), width=8),
    ]),
    dbc.Row([
        dbc.Col(dcc.Graph(id='original-image'), width=6),
        dbc.Col(dcc.Graph(id='morph-image'), width=6),
    ]),
    dbc.Row([
        dbc.Col(html.Div(id='largest-area'), width=12),
    ])
])

@app.callback(
    [Output('original-image', 'figure'),
     Output('morph-image', 'figure'),
     Output('largest-area', 'children')],
    [Input('morph-operation', 'value'),
     Input('kernel-size', 'value')]
)
def update_image(operation, kernel_size):
    morph_img = morphological_operation(img_rgb, operation, kernel_size)
   
    watershed_img, max_area = watershed_segmentation(morph_img)

    original_fig = px.imshow(img_rgb, title="Image Originale")
    morph_fig = px.imshow(watershed_img, title=f"Image après {operation.capitalize()} et Watershed")
    
    return original_fig, morph_fig, f"Area of biggest object detected is : {max_area:.2f} pixels squared"

if __name__ == '__main__':
    app.run_server(debug=True)


In [None]:
import cv2
import numpy as np
from sklearn.cluster import KMeans
from matplotlib import pyplot as plt
from skimage.segmentation import watershed
from skimage.feature import peak_local_max

def histogram_equalization(img):
    img_gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    img_clahe = clahe.apply(img_gray)
    
    return img_clahe

def kmeans_clustering(img_gray, n_clusters=4):
    img_reshaped = img_gray.reshape((-1, 1))
    kmeans = KMeans(n_clusters=n_clusters, random_state=42)
    kmeans.fit(img_reshaped)
    labels = kmeans.labels_.reshape(img_gray.shape)
    return labels, kmeans.cluster_centers_

def morphological_operations(mask, kernel_size=9):
    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (kernel_size, kernel_size))
    opened = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
    closed = cv2.morphologyEx(opened, cv2.MORPH_CLOSE, kernel)
    return closed

def watershed_segmentation(mask):
    sure_bg = cv2.dilate(mask, None, iterations=1)
    sure_fg = cv2.erode(mask, None, iterations=1)
    unknown = cv2.subtract(sure_bg, sure_fg)

    img_watershed = np.zeros((mask.shape[0], mask.shape[1], 3), dtype=np.uint8)

    markers = np.zeros_like(mask, dtype=np.int32)
    markers[sure_fg == 255] = 1
    markers[unknown == 255] = 0

    cv2.watershed(img_watershed, markers)

    img_watershed[markers == -1] = [255, 0, 0]

    contours, _ = cv2.findContours(mask.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    areas = [cv2.contourArea(c) for c in contours]
    max_area = max(areas) if areas else 0
    return img_watershed, max_area


img = cv2.imread("labels/41/41_1.png")

img_eq = histogram_equalization(img)

plt.figure(figsize=(12, 6))
plt.subplot(2, 3, 1)
plt.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
plt.title("Image Originale")
plt.axis("off")

plt.subplot(2, 3, 2)
plt.imshow(img_eq, cmap='gray')
plt.title("Image après Histogram Equalization")
plt.axis("off")

labels, cluster_centers = kmeans_clustering(img_eq, n_clusters=4)

plt.subplot(2, 3, 3)
plt.imshow(labels, cmap='viridis')
plt.title("Labels après K-Means")
plt.axis("off")

sorted_labels = np.argsort(cluster_centers.flatten())
sorted_img = np.zeros_like(labels)
for i, lbl in enumerate(sorted_labels):
    sorted_img[labels == lbl] = i

mask = np.isin(sorted_img, [1, 2]).astype(np.uint8) * 255 
plt.subplot(2, 3, 4)
plt.imshow(mask, cmap='gray')
plt.title("Masque des Catégories 2 et 3")
plt.axis("off")

morph_img = morphological_operations(mask)

plt.subplot(2, 3, 5)
plt.imshow(morph_img, cmap='gray')
plt.title("Image après Opening et Closing")
plt.axis("off")

watershed_img, max_area = watershed_segmentation(morph_img)
print(img.shape[0], img.shape[1])

plt.subplot(2, 3, 6)
plt.imshow(cv2.cvtColor(watershed_img, cv2.COLOR_BGR2RGB))
plt.title(f"Image après Watershed (Aire max : {(100*max_area)/(img.shape[0]*img.shape[1])} %)")
plt.axis("off")

plt.tight_layout()
plt.show()


In [None]:
import os
import cv2
import numpy as np
from sklearn.cluster import KMeans
from skimage.segmentation import watershed
import large_image

def histogram_equalization(img):
    img_gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    img_clahe = clahe.apply(img_gray)
    
    return img_clahe

def kmeans_clustering(img_gray, n_clusters=4):
    img_reshaped = img_gray.reshape((-1, 1))
    kmeans = KMeans(n_clusters=n_clusters, random_state=42)
    kmeans.fit(img_reshaped)
    labels = kmeans.labels_.reshape(img_gray.shape)
    return labels, kmeans.cluster_centers_

def morphological_operations(mask, kernel_size=9):
    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (kernel_size, kernel_size))
    opened = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
    closed = cv2.morphologyEx(opened, cv2.MORPH_CLOSE, kernel)
    return closed

def watershed_segmentation(mask):
    sure_bg = cv2.dilate(mask, None, iterations=3)
    sure_fg = cv2.erode(mask, None, iterations=3)
    unknown = cv2.subtract(sure_bg, sure_fg)

    img_watershed = np.zeros((mask.shape[0], mask.shape[1], 3), dtype=np.uint8)

    markers = np.zeros_like(mask, dtype=np.int32)
    markers[sure_fg == 255] = 1
    markers[unknown == 255] = 0

    cv2.watershed(img_watershed, markers)
    img_watershed[markers == -1] = [255, 0, 0]

    contours, _ = cv2.findContours(mask.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    areas = [cv2.contourArea(c) for c in contours]
    max_area = max(areas) if areas else 0
    return img_watershed, max_area

def get_area_directory(area_percentage):
    if area_percentage < 1:
        return "output_tiles/less_than_1_percent"
    elif 1 <= area_percentage < 3:
        return "output_tiles/1_to_3_percent"
    elif 3 <= area_percentage < 5:
        return "output_tiles/3_to_5_percent"
    else:
        return "output_tiles/greater_than_5_percent"

def process_tiles(image_path, magnification, patch_w, patch_h):
    image_slide = large_image.getTileSource(image_path)
    
    os.makedirs("output_tiles/less_than_1_percent", exist_ok=True)
    os.makedirs("output_tiles/1_to_3_percent", exist_ok=True)
    os.makedirs("output_tiles/3_to_5_percent", exist_ok=True)
    os.makedirs("output_tiles/greater_than_5_percent", exist_ok=True)

    for slide_info in image_slide.tileIterator(
            scale=image_slide.getMagnificationForLevel(magnification),
            tile_size=dict(width=patch_w, height=patch_h),
            tile_overlap=dict(x=0, y=0),
            format=large_image.tilesource.TILE_FORMAT_NUMPY):
        
        x, y = slide_info['x'], slide_info['y']
        if x < 25000 or y < 25000:
            continue
        print(f"Processing tile at ({x}, {y})")
        tile = np.array(slide_info['tile'])

        img_eq = histogram_equalization(tile)

        labels, cluster_centers = kmeans_clustering(img_eq, n_clusters=4)

        sorted_labels = np.argsort(cluster_centers.flatten())
        sorted_img = np.zeros_like(labels)
        for i, lbl in enumerate(sorted_labels):
            sorted_img[labels == lbl] = i

        mask = np.isin(sorted_img, [1, 2]).astype(np.uint8) * 255

        morph_img = morphological_operations(mask)
        watershed_img, max_area = watershed_segmentation(morph_img)
        total_area = tile.shape[0] * tile.shape[1]
        area_percentage = (100 * max_area) / total_area

        output_dir = get_area_directory(area_percentage)
        os.makedirs(output_dir, exist_ok=True)

        tile_rgb = cv2.cvtColor(tile, cv2.COLOR_BGR2RGB)
        output_filename = f"{output_dir}/{int(area_percentage)}_{x}_{y}.png"
        cv2.imwrite(output_filename, tile_rgb)

        print(f"Saved tile at ({x}, {y}) with area percentage: {area_percentage:.2f}%")

process_tiles(
    image_path="large_image_source_tifffile/DataBase/22C06901.ndpi",
    magnification=10.0,
    patch_w=1000,
    patch_h=1000
)


In [None]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans

def plot_rgb_histogram(img):
    """Display the histogram for Red, Green, and Blue channels."""
    colors = ('b', 'g', 'r')
    plt.figure(figsize=(12, 6))
    
    for i, color in enumerate(colors):
        hist = cv2.calcHist([img], [i], None, [256], [0, 256])
        plt.plot(hist, color=color, label=f'{color.upper()} Channel')
    
    plt.title("RGB Channel Histogram")
    plt.xlabel("Pixel Intensity")
    plt.ylabel("Number of Pixels")
    plt.legend()
    plt.show()

def kmeans_color_quantization(img, k=5):
    """Apply K-Means to detect dominant colors."""
    img_reshaped = img.reshape((-1, 3))  # Convert to 2D array
    kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
    kmeans.fit(img_reshaped)
    
    # Get dominant colors
    dominant_colors = kmeans.cluster_centers_.astype(np.uint8)
    
    return dominant_colors

def watershed_segmentation(image_path):
    """Perform Watershed segmentation with preprocessing and K-Means."""
    
    #Load the image
    img = cv2.imread(image_path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    
    #Display histogram
    plot_rgb_histogram(img)

    #Convert to grayscale
    gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
    
    #Apply histogram equalization to improve contrast
    gray_eq = cv2.equalizeHist(gray)

    #Apply Gaussian blur to reduce noise
    blurred = cv2.GaussianBlur(gray_eq, (5, 5), 0)

    #Otsu thresholding
    _, binary = cv2.threshold(blurred, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)

    #Distance transform for Watershed
    dist_transform = cv2.distanceTransform(binary, cv2.DIST_L2, 5)
    _, sure_fg = cv2.threshold(dist_transform, 0.5 * dist_transform.max(), 255, 0)

    sure_fg = np.uint8(sure_fg)
    unknown = cv2.subtract(binary, sure_fg)

    #Connected component labeling
    markers = cv2.connectedComponents(sure_fg)[1]
    markers += 1
    markers[unknown == 255] = 0

    #Apply Watershed
    cv2.watershed(img, markers)
    img[markers == -1] = [255, 0, 0]  # Red boundaries

    #Detect dominant colors with K-Means
    k = 4
    dominant_colors = kmeans_color_quantization(img, k=k)

    #Display results
    fig, axes = plt.subplots(1, 3, figsize=(15, 5))
    
    axes[0].imshow(gray_eq, cmap="gray")
    axes[0].set_title("Equalized Grayscale Image")
    axes[0].axis("off")

    axes[1].imshow(img)
    axes[1].set_title("Watershed Boundaries")
    axes[1].axis("off")

    axes[2].bar(range(k), [1]*k, color=[dominant_colors[i] / 255 for i in range(k)])
    axes[2].set_title("Dominant Colors")
    
    plt.show()

#Run segmentation on test image
watershed_segmentation("test.png")
