In [38]:
import torch
from torch.utils.data import Dataset, DataLoader
import pandas as pd  # Example, if using tabular data
import os
import numpy as np
import cv2 as cv
from PIL import Image
import imageio
import matplotlib.pyplot as plt
import sys
import import_ipynb
from sklearn.cluster import DBSCAN
from scipy.spatial import ConvexHull
from matplotlib.path import Path
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from skimage.transform import resize

#import image_utils as iu
%run ./image_utils.ipynb

In [47]:
class ClusteringAlgorithm:
    def __init__(self, eps=13, min_samples=3, metric='euclidean', algorithm='auto', leaf_size=30, p=None, n_jobs=-1):
        """
        Initialize the clustering algorithm with the specified parameters.
        
        :param eps: Maximum distance between two points to be considered neighbors.
        :param min_samples: Minimum number of points to form a dense region (core point).
        :param metric: Distance metric (default is 'euclidean').
        :param algorithm: Algorithm to compute the nearest neighbors ('auto', 'ball_tree', 'kd_tree', 'brute').
        :param leaf_size: Leaf size for BallTree or KDTree, affects speed and memory.
        :param p: Parameter for Minkowski metric, ignored for 'euclidean'.
        :param n_jobs: The number of parallel jobs to run. -1 means using all processors.
        """
        # Params
        self.eps = eps
        self.min_samples = min_samples
        self.metric = metric
        self.algorithm = algorithm
        self.leaf_size = leaf_size
        self.p = p
        self.n_jobs = n_jobs

        # Output
        self.n_clusters = any
        self.n_noise = any
        self.labels = any
        self.core_samples = any

        #Preprocessed Output
        #self.clusters = any
        self.point_density = any
        self.grid_density = any
        self.X = any

    def __set_dataset__(self, X):
        self.X = X
        return self

    def __reset_clustering_params__(self, X, eps=13, min_samples=3, metric='euclidean', algorithm='auto', leaf_size=30, p=None, n_jobs=-1):
        self.eps = eps
        self.min_samples = min_samples
        self.metric = metric
        self.algorithm = algorithm
        self.leaf_size = leaf_size
        self.p = p
        self.n_jobs = n_jobs
        # Output
        self.n_clusters = any
        self.n_noise = any
        self.labels = any
        self.core_samples = any

        #Preprocessed Output
        self.point_density = any
        self.grid_density = any
        self.X = X

        return self
        
    def __fit__(self):
        # Check if the array has the correct shape
        self.sanity_check(self.X)
        # define the fit params for the DBSCAN algorithm 
        dbscan = DBSCAN(
            eps=self.eps,                 # Maximum distance between two points to be considered neighbors
            min_samples=self.min_samples,        # Minimum number of points to form a dense region (core point)
            metric= self.metric,    # Distance metric (default is 'euclidean')
            algorithm= self.algorithm,      # Algorithm to compute the nearest neighbors ('auto', 'ball_tree', 'kd_tree', 'brute')
            leaf_size=self.leaf_size,          # Leaf size for BallTree or KDTree, affects speed and memory
            p= self.p,                # Parameter for Minkowski metric, ignored for 'euclidean'
            n_jobs= self.n_jobs            # Number of parallel jobs to run (-1 means using all processors)
        )
        
        # Fit the DBSCAN model to the data
        dbscan.fit(self.X)
        
        # Extract the cluster labels and core sample indices
        labels = dbscan.labels_
        core_samples = dbscan.core_sample_indices_
        
        # Output the results
        #print("Cluster labels for each point:", labels)
        #print("Indices of core samples:", core_samples)
        
        # Optional: You can also access the number of clusters found (excluding noise points)
        n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
        n_noise = list(labels).count(-1)

        self.__set_params__(n_clusters, n_noise, labels, core_samples)
        return self

    def sanity_check(self,trpos, expected_shape=(3664, 2)):
        # Check if the input is a numpy array
        if not isinstance(trpos, np.ndarray):
            raise ValueError(f"Input is not a numpy array. Got {type(trpos)} instead.")
    
        #print("Sanity check passed.")

    def __set_params__(self, n_clusters, n_noise, labels, core_samples):
        self.n_clusters = n_clusters
        self.n_noise = n_noise
        self.labels = labels
        self.core_samples = core_samples

    def __get_params__(self):
        return self.labels, self.core_samples, self.n_clusters, self.n_noise 

    def __visualize__(self,arr):
        plt.figure(figsize=(10, 6))

        # Unique labels (excluding noise)
        unique_labels = set(self.labels)
        colors = plt.cm.Spectral(np.linspace(0, 1, len(unique_labels)))
        
        
        #clusters = []
        
        for k, col in zip(unique_labels, colors):
            if k == -1:
                # Black used for noise.
                col = 'k'
        
            class_member_mask = (labels == k)
            xy = arr[class_member_mask & ~np.isnan(arr).any(axis=1)]
        
            #cluster = dict(points=xy, label=col, importance=0)
            #clusters.append(cluster) 
                
            plt.scatter(xy[:, 0], xy[:, 1], s=2, c=[col], label=f'Cluster {k}' if k != -1 else 'Noise')
        
        plt.title('DBSCAN Clustering')
        plt.xlabel('Feature 1')
        plt.ylabel('Feature 2')
        plt.xlim(0, 428)
        plt.ylim(0, 684)
        # plt.legend(loc='best')
        plt.gca().invert_yaxis() 
        plt.gca().set_aspect('equal', adjustable='box')
        plt.show()

    def __get_clusters__(self, arr):
        # Unique labels (excluding noise)
        unique_labels = set(self.labels)
        colors = plt.cm.Spectral(np.linspace(0, 1, len(unique_labels)))
        # define the cluster array
        clusters = []
        
        for k, col in zip(unique_labels, colors):
            if k == -1:
                # Black used for noise.
                col = 'k'
        
            class_member_mask = (self.labels == k)
            xy = arr[class_member_mask & ~np.isnan(arr).any(axis=1)]

            # save each cluster as dict entry with all point array and the colors
            cluster = dict(points=xy, label=col, importance=0)
            clusters.append(cluster) 

        return clusters

    def __clear_clusters__(self, clusters):
        X_sizes = []
        for cluster in clusters:
            #print("Points:",cluster["points"].shape, " Label:", cluster["label"])
            X_sizes.append(cluster["points"].shape[0])
            
        total_sum =sum(X_sizes)
        importance_scores = [value / total_sum for value in X_sizes]
        
        # Display the importance scores
        # for value, score in zip(X_sizes, importance_scores):
        #     print(f"Value: {value}, Importance Score: {score:.4f}")
        
        filtered_clusters= [cluster for cluster,score in zip(clusters, importance_scores) if score >=0.03]
        # for cluster in filtered_clusters:
        #     print("Points:",cluster["points"].shape, " Label:", cluster["label"])

        return filtered_clusters

   
    # def __create_mask__(self):
    #     clusters = self.__get_clusters__(self.X)    
    #     filtered_clusters = self.__clear_clusters__(clusters)

    #     combined_mask = []
    #     combined_grid_x = []
    #     combined_grid_y = []
    #     combined_points = []
    #     combined = []
    #     for cluster in filtered_clusters:
            
    #         mask, grid_x, grid_y = self.create_convex_hull_mask(cluster["points"], 200)
    #         combined.append(dict(mask=mask, grid_x=grid_x, grid_y=grid_y, points = cluster["points"]))

    #     return combined

    def __create_mask__(self):
        clusters = self.__get_clusters__(self.X)
        filtered_clusters = self.__clear_clusters__(clusters)
    
        combined_mask = np.zeros((684, 428))  # Create an empty mask for the entire image
    
        combined = []
        for cluster in filtered_clusters:
            # Generate the individual mask for the cluster
            mask, grid_x, grid_y = self.create_convex_hull_mask(cluster["points"], 200)
    
            # Convert min and max of grid_x and grid_y to integers
            x_min, x_max = int(grid_x.min()), int(grid_x.max())
            y_min, y_max = int(grid_y.min()), int(grid_y.max())
    
            # Determine the region in combined_mask
            region_height = y_max - y_min
            region_width = x_max - x_min
    
            # Resize the mask to fit into the region
            resized_mask = resize(mask, (region_height, region_width))
    
            # Combine this resized mask into the overall combined mask
            combined_mask[y_min:y_max, x_min:x_max] = np.maximum(
                combined_mask[y_min:y_max, x_min:x_max], resized_mask
            )
    
            # Append the individual cluster info for visualization if needed
            combined.append(dict(mask=mask, grid_x=grid_x, grid_y=grid_y, points=cluster["points"]))
    
        return combined_mask, combined
            
    def fill_space_with_points(points, grid_density=50):
        """
        Fills the space bounded by the given points with a grid of points and displays them.
        
        Parameters:
        - points (numpy.ndarray): Array of shape (n, 2) containing the input points.
        - grid_density (int): Number of grid points along each dimension.
        
        Returns:
        - grid_points (numpy.ndarray): Array of shape (m, 2) containing the grid points within the bounded space.
        """
        
        # Ensure points is a numpy array
        points = np.asarray(points)
        
        # Compute the convex hull of the points
        hull = ConvexHull(points)
        hull_path = Path(points[hull.vertices])
        
        # Get the bounding box of the convex hull
        min_x, min_y = np.min(points, axis=0)
        max_x, max_y = np.max(points, axis=0)
        
        # Generate grid points within the bounding box
        x = np.linspace(min_x, max_x, grid_density)
        y = np.linspace(min_y, max_y, grid_density)
        grid_x, grid_y = np.meshgrid(x, y)
        grid_points = np.vstack([grid_x.ravel(), grid_y.ravel()]).T
        
        # Filter out points that are outside the convex hull
        inside_hull = hull_path.contains_points(grid_points)
        grid_points = grid_points[inside_hull]
        
        # Plotting
        plt.figure(figsize=(10, 10))
        
        # Plot original points
        plt.scatter(points[:, 0], points[:, 1], color='red', label='Original Points', edgecolor='k', s=10)
        
        # Plot grid points within the convex hull
        plt.scatter(grid_points[:, 0], grid_points[:, 1], color='blue', label='Grid Points', s=1)
        
        # Plot convex hull boundary
        hull_points = np.concatenate([points[hull.vertices], points[hull.vertices][:1]])  # Close the hull
        plt.plot(hull_points[:, 0], hull_points[:, 1], 'g--', label='Convex Hull')
        
        plt.xlabel('X')
        plt.ylabel('Y')
        plt.title('Points and Filled Grid within Convex Hull')
        plt.legend()
        plt.gca().set_aspect('equal', adjustable='box')
        plt.show()
        
        return grid_points

    def create_convex_hull_mask(self,points, grid_density=100):
        """
        Creates a binary mask defined by the convex hull of the given points.
        
        Parameters:
        - points (numpy.ndarray): Array of shape (n, 2) containing the input points.
        - grid_density (int): Number of grid points along each dimension (controls mask resolution).
        
        Returns:
        - mask (numpy.ndarray): Binary mask of shape (grid_density, grid_density).
        - grid_x, grid_y (numpy.ndarray): Coordinates of the grid for visualization.
        """
        # Ensure points is a numpy array
        points = np.asarray(points)
        
        # Compute the convex hull of the points
        hull = ConvexHull(points)
        hull_path = Path(points[hull.vertices])
        
        # Get the bounding box of the convex hull
        min_x, min_y = np.min(points, axis=0)
        max_x, max_y = np.max(points, axis=0)
        
        # Generate grid points within the bounding box
        x = np.linspace(min_x, max_x, grid_density)
        y = np.linspace(min_y, max_y, grid_density)
        grid_x, grid_y = np.meshgrid(x, y)
        grid_points = np.vstack([grid_x.ravel(), grid_y.ravel()]).T
        
        # Create mask for points inside the convex hull
        inside_hull = hull_path.contains_points(grid_points)
        mask = inside_hull.reshape(grid_density, grid_density)
        
        return mask, grid_x, grid_y
    
    def display_mask(mask, grid_x, grid_y, points):
        """
        Displays the binary mask along with the original points.
        
        Parameters:
        - mask (numpy.ndarray): Binary mask to be displayed.
        - grid_x, grid_y (numpy.ndarray): Coordinates of the grid for visualization.
        - points (numpy.ndarray): Original points to be displayed on top of the mask.
        """
        
        plt.figure(figsize=(10, 10))
        
        # Display the mask
        plt.imshow(mask, extent=(grid_x.min(), grid_x.max(), grid_y.min(), grid_y.max()), origin='lower', cmap='Greys', alpha=1)
        
        # Plot original points
        plt.scatter(points[:, 0], points[:, 1], color='red', label='Original Points', edgecolor='k', s=2)
        
        plt.xlabel('X')
        plt.ylabel('Y')
        plt.title('Binary Mask Defined by Convex Hull')
        plt.legend()
        plt.gca().set_aspect('equal', adjustable='box')
        plt.show()

    def display_mask_combined(self,combined):
        """
        Displays the binary mask along with the original points.
        
        Parameters:
        - mask (numpy.ndarray): Binary mask to be displayed.
        - grid_x, grid_y (numpy.ndarray): Coordinates of the grid for visualization.
        - points (numpy.ndarray): Original points to be displayed on top of the mask.
        """
        
        plt.figure(figsize=(10, 10))
    
        for value in combined:
            # Display the mask
            mask, grid_x, grid_y, points = value["mask"], value["grid_x"], value["grid_y"], value["points"]
            plt.imshow(mask, extent=(grid_x.min(), grid_x.max(), grid_y.min(), grid_y.max()), origin='lower', cmap='Greys', alpha=1)
            
            # Plot original points
            plt.scatter(points[:, 0], points[:, 1], color='red', label='Original Points', edgecolor='k', s=2)
        
        plt.xlabel('X')
        plt.ylabel('Y')
        plt.xlim(0, 428)
        plt.ylim(0, 684)
        plt.title('Binary Mask Defined by Convex Hull')
        plt.legend()
        plt.gca().invert_yaxis()
        plt.gca().set_aspect('equal', adjustable='box')
        plt.show()

    def __check_filled_points__(self,filtered_clusters):
        for cluster in filtered_clusters:
            print("Points:",cluster["points"].shape, " Label:", cluster["label"])
            fill_space_with_points(cluster["points"], grid_density=100)

    def __check_masked_points__(self,filtered_clusters):
        for cluster in filtered_clusters:
            print("Points:",cluster["points"].shape, " Label:", cluster["label"])
            mask, grid_x, grid_y = create_convex_hull_mask(cluster["points"], grid_density=200)
            display_mask(mask, grid_x, grid_y, sample_points)

In [51]:
# BaseFolder = '../../datasets/'
# pos = 6
# filename = BaseFolder + f'positions/EGF/EGFR_Steps_{pos+1}.csv'
# Brightfield = BaseFolder + f'brightfields/EGF/MDA-MB-468_4-colour_EGF_B10_1nM_posXY{pos}_channels_t0_posZ0_colour0.tif'


# image= crop_png(Brightfield)
# trpos = trpos_conversion(filename)
# np_image = np.array(image)
# print("Type of Image,", type(image))
# print("Type of Image,", type(np_image))
# print("Shape of image array",np_image.shape)
# print("Tr pos shape", trpos.shape)
# display_plots(image,trpos)
# X = trpos.copy()
# transformer = ClusteringAlgorithm().__set_dataset__(X).__fit__()

# combined, mask = transformer.__create_mask__()
# flipped_mask = np.flipud(mask)
# transformer.display_mask_combined(mask)

# plt.figure(figsize=(10, 10))

# # Display the combined mask
# plt.imshow(combined, extent=(0, 428, 0, 684), origin='lower', cmap='Greys', alpha=1)