In [1]:
# import necessary libraries
from sklearn.metrics import silhouette_score
from sklearn.preprocessing import normalize
from scipy.sparse.linalg import eigsh
from scipy.signal import find_peaks
from scipy.spatial import Delaunay
from scipy.spatial import distance
from sklearn.cluster import DBSCAN
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt
import multiprocessing as mp
from sklearn import metrics
import networkx as nx
import numpy as np
import random
import csv
import pandas as pd

In [2]:
%run ./functions_for_GrapHiC.ipynb

In [13]:
class Point():
    def __init__(self,x,y, cov_matrix = [[400, 0], [0, 400]], N = 150, sigma = 20):
        self.x = x
        self.y = y
        self.label = []
        self.cov_matrix = cov_matrix #[[delta**2, 0], [0, delta**2]]
        self.N = N
        self.sigma = sigma
        
        self.x_collected, self.y_collected = x,y # by default, if the point undergoes no perturbation, the coordinates collected are the GT coordiantes 
        
    def add_label(self, label_to_add):
        self.label.append(label_to_add)
        
    def Scramble(self):
        mean = [0,0]
        #cov_scramble = [[delta**2, 0], [0, delta**2]]
        coord_collected     = np.sum([np.array([self.x,self.y]),
                                        np.random.multivariate_normal(mean, self.cov_matrix)],
                                        axis = 0)
        self.x_collected = coord_collected[0]
        self.y_collected = coord_collected[1]
        
    def GetCoord(self):
        return np.array([self.x_collected,self.y_collected])
    
    def GetCoord_GT(self):
        return np.array([self.x,self.y])
    
    def Isin(self, cluster, delta = 10):
        diff = cluster.center -  self.GetCoord_GT()
        if np.linalg.norm(cluster.center -  self.GetCoord_GT()) < (cluster.radius + delta): return True
        else: return False 
        

In [4]:
class Cluster():
    def __init__(self, center, radius, label):
        self.center = center
        self.radius = radius
        self.points = []
        self.label  = label
        
        
    def Fill(self, NumberOfPoints, cluster_shape, mean_uncertainty, dev_uncertainty, N_photons):
        
        if cluster_shape == 'uniform':
            temp = uniform_cluster(self.radius, NumberOfPoints, self.center)
        if cluster_shape == 'gaussian':
            temp = gaussian_cluster((self.radius / 2), NumberOfPoints, self.center)
            
        for t in temp:
            uncert = np.random.normal(mean_uncertainty, dev_uncertainty)
            cov_matrix = [[uncert**2, 0], [0, uncert**2]]
            new_point = Point(x = t[0], y = t[1], cov_matrix = cov_matrix, N = N_photons, sigma = uncert)
            new_point.add_label(self.label)
            self.points.append(new_point)

In [None]:
class ElongCluster(Cluster):
    # This is a Cluster subclass 
    def __init__(self, center, radius, elongation, label):
        super().__init__(center, radius, label)
        self.elongation = elongation
        self.a = self.radius
        self.b = self.elongation * self.radius
        
        self.orientation = np.random.uniform(low = 0, high = math.pi)
        cos, sin = math.cos(self.orientation), math.sin(self.orientation)
        self.R_matrix = np.array([[cos, -sin], [sin, cos]])
        
        
    def Isin(self, point, delta):
        return False
        '''
        point_
        print('in Cluster.Isin()')
        print('self.center = ', self.center)
        print('point.Coord = ', point.GetCoord_GT())
        print('self.radius = ', self.radius)
        diff = self.center - point.GetCoord_GT()
        if np.linalg.norm(self.center - point.GetCoord_GT()) < (self.radius + delta): return True
        else: return False 
        '''
        
        
    def Fill(self, NumberOfPoints, cluster_shape, mean_uncertainty, dev_uncertainty, N_photons,):
        
        temp = elliptical_cluster(self.a, self.b, self.R_matrix, NumberOfPoints, self.center)
        oriented = temp
        #oriented = [np.dot(self.R_matrix, p) for p in temp]

        for p in oriented:
            uncert = np.random.normal(mean_uncertainty, dev_uncertainty)
            cov_matrix = [[uncert**2, 0], [0, uncert**2]]
            new_point = Point(x = p[0], y = p[1], cov_matrix = cov_matrix, N = N_photons, sigma = uncert)
            new_point.add_label(self.label)
            self.points.append(new_point)
        

In [5]:
class DataGenerator():
    
    def __init__(self,
                 NumberOfClusters, LocPerCluster, r, NoisePercentage = 0.5,
                 NumberOfScales = 1,
                 x_lim = [-5, 5], y_lim = [-5, 5],
                 cluster_shape = 'uniform',
                 UniformNoise = True,
                 mean_delta = 20, dev_delta = 3, N_photons = 150,
                 elongation = 1):
        
        # frame shape
        self.xmin, self.xmax = 1000 * x_lim[0], 1000 * x_lim[1] # from micrometer to nanometer 
        self.ymin, self.ymax = 1000 * y_lim[0], 1000 * y_lim[1]
        
        # parameters 
        self.NumberOfClusters = NumberOfClusters
        self.numbers          = LocPerCluster

        self.r                = r
        #self.IsolatedLoc      = IsolatedLoc
        self.cluster_shape    = cluster_shape
        #self.delta            = delta 
        self.NoisePercentage  = NoisePercentage
        self.IsolatedLoc      = int((sum(LocPerCluster) * NoisePercentage) / (1-NoisePercentage))
        
        self.mean_delta       = mean_delta
        self.dev_delta        = dev_delta
        self.N_photons        = N_photons
        
        # initialise an exmpty clusters list: 
        self.Clusters = []
        
        # initialise a empty list of Isolated points:
        self.IsolatedPoints = []
        
        # elongation 
        self.elongation = elongation 
        

        
        #for s in range(NumberOfScales):
        #    print('s = ', s )
        
        # data generation steps
        self.CreateClusters()
        if (UniformNoise == True):
            self.AddIsolatedLoc()
        else:
            self.AddIsolatedLoc_NonUniformFinal()
        
    def PlaceCenters(self):
        # ------ Version with |ci - cj| < 2*rmax: ------
        # Place a first cluster 
        self.centers      = np.random.uniform(low=[self.xmin + self.r[0], self.ymin + self.r[0]],
                                              high=[self.xmax - self.r[0], self.ymax - self.r[0]],
                                              size=(1,2))
        
        n = 1      # n is the n^th cluster 
        count = 0  # for avoiding endless loop
        while (n < self.NumberOfClusters) and (count < (self.NumberOfClusters * 10)): # 10 is arbitrarily chosen
            count += 1
            if (count  == ((self.NumberOfClusters * 10) -2) ): print('cannot place any more clusters')
            new_center = np.random.uniform(low=[self.xmin + self.r[n], self.ymin + self.r[n]],
                                              high=[self.xmax-self.r[n], self.ymax-self.r[n]],
                                              size=(1,2))
            is_accepted = True
            for i, c in enumerate(self.centers):
                if np.linalg.norm(c - new_center) < (self.r[i] + self.r[n] + self.mean_delta): is_accepted = False # checking that there will be no overlap
            
            if is_accepted: 
                n += 1
                self.centers = np.vstack([self.centers, new_center])
            
            
    def InitializeClusters(self):
        # Creates the Cluster objects
        if (self.elongation == 1):
            for i, center in enumerate(self.centers):   
                self.Clusters.append(Cluster(center, self.r[i], label = i+1))    # first cluster has label = 1. Label = 0 indicates noise. 
        else: 
            for i, center in enumerate(self.centers):   
                self.Clusters.append(ElongCluster(center, self.r[i], self.elongation, label = i+1)) 
            
            
    def FillClusters(self):
        # This method calls the Fill method for each cluster. Fill will create n points in the cluster. 
        for i, cluster in enumerate(self.Clusters):
            if (self.cluster_shape != 'uniform') and (self.cluster_shape != 'gaussian'): print('cluster_shape must be "uniform" or "gaussian"')
            else: cluster.Fill(self.numbers[i], self.cluster_shape, self.mean_delta, self.dev_delta, self.N_photons)
        
    def Scramble(self):
        for cluster in self.Clusters:
            for point in cluster.points: 
                point.Scramble()
    
    def CreateClusters(self):
        self.PlaceCenters()
        self.InitializeClusters()
        self.FillClusters()
        self.Scramble()
        
    
    def AddIsolatedLoc(self):
        # This method creates Isolated points, randomly distributed, but avoiding the cluster areas. 
        n = 0  # n counts the number of isolated locations that are added to the data
        
        while n < self.IsolatedLoc:
            new_noise = np.random.uniform(low=[self.xmin, self.ymin], high=[self.xmax, self.ymax], size=(1,2))
            uncert = np.random.normal(self.mean_delta, self.dev_delta)
            cov_matrix = [[uncert**2, 0], [0, uncert**2]]
            new_noise = Point(new_noise[0][0], new_noise[0][1], cov_matrix, self.N_photons, uncert) # putting it in a Point variable, to use its methods.
            new_noise.add_label(0)  # 0 label is for noise
            
            # If the new point is in a cluster, discard it. 
            is_accepted = True 
            for cluster in self.Clusters: 
                if new_noise.Isin(cluster, delta = self.mean_delta): is_accepted = False
                
            if is_accepted: 
                self.IsolatedPoints.append(new_noise)
                n += 1
            
    def AddIsolatedLoc_NonUniformFinal(self):
        print('creating non uniform noise')
    
        #s = np.random.normal(mu, sigma, 1000)
        
        n = 0  # n counts the number of isolated locations that are added to the data
        sigma_x = (self.xmax - self.xmin)/2.0
        
        while n < self.IsolatedLoc:
            new_y = np.random.uniform(low=self.ymin, high= self.ymax)
            new_x = abs(np.random.normal(self.xmin, sigma_x)) # take the absolute value of a gaussian, to have a continuously decreasing distribution
            uncert = np.random.normal(self.mean_delta, self.dev_delta)
            cov_matrix = [[uncert**2, 0], [0, uncert**2]]
            new_noise = Point(new_x, new_y, cov_matrix, self.N_photons, uncert)
            new_noise.add_label(0)  # 0 label is for noise
            
            # If the new point is in a cluster, discard it. 
            is_accepted = True 
            if ((new_noise.x < self.xmin) or (new_noise.x > self.xmax)): is_accepted = False
            for cluster in self.Clusters: 
                if new_noise.Isin(cluster, delta = self.mean_delta): is_accepted = False
                
            if is_accepted: 
                self.IsolatedPoints.append(new_noise)
                n += 1
        

    def AddIsolatedLoc_NonUniform_test(self):
        #supprimer cette méthode
        
        n = 0  # n counts the number of isolated locations that are added to the data
        
        HalfIsol    = int(self.IsolatedLoc)
        
        # Fill a first "layer" everywhere: 
        while n < HalfIsol:
            new_noise = np.random.uniform(low=[self.xmin, self.ymin], high=[self.xmax, self.ymax], size=(1,2))
            new_noise = Point(new_noise[0][0], new_noise[0][1]) # putting it in a Point variable, to use its methods.
            new_noise.add_label(0)  # 0 label is for noise
            
            # If the new point is in a cluster, discard it. 
            is_accepted = True 
            for cluster in self.Clusters: 
                if new_noise.Isin(cluster, delta = self.mean_delta): is_accepted = False
                
            if is_accepted: 
                self.IsolatedPoints.append(new_noise)
                n += 1
                
        n = 0
        # Fill a second "layer" only on the
        while n < HalfIsol:
            new_noise = np.random.uniform(low=[self.xmin, self.ymin], high=[self.xmax/2, self.ymax], size=(1,2))
            new_noise = Point(new_noise[0][0], new_noise[0][1] ) # putting it in a Point variable, to use its methods.
            new_noise.add_label(0)  # 0 label is for noise
            
            # If the new point is in a cluster, discard it. 
            is_accepted = True 
            for cluster in self.Clusters: 
                if new_noise.Isin(cluster, delta = self.mean_delta): is_accepted = False
                
            if is_accepted: 
                self.IsolatedPoints.append(new_noise)
                n += 1
            
        
    
    def GetPointsCoord(self):
        # returns in an  numpy array the info about all cluster points + noise points
        points  = np.array([[0, 0]]) 
        
        # Points from the Clusters
        for cluster in self.Clusters:
            for p in cluster.points: 
                points = np.vstack([points, p.GetCoord()]) # extend the array with next points
                
        # Isolated Points
        for isolated in self.IsolatedPoints: 
            points = np.vstack([points, isolated.GetCoord()])
        
        return points[1:]
            
    def GetPointsLabels(self):
        labels = []
        for cluster in self.Clusters: 
            for p in cluster.points: 
                labels.append(p.label[0])
                
        for isolated in self.IsolatedPoints:
            labels.append(isolated.label[0])
        return labels
    
    def GetPointsCovMatrix(self):
        Covs = []
        for cluster in self.Clusters: 
            for p in cluster.points: 
                Covs.append(p.cov_matrix)
                
        for isolated in self.IsolatedPoints:
            Covs.append(isolated.cov_matrix)
            
        return Covs
        
    def GetPointsDistribution(self):
        sigma_and_N  = np.array([[0, 0]]) 

        for cluster in self.Clusters:
            for p in cluster.points: 
                sigma_and_N = np.vstack([sigma_and_N, np.array([p.sigma, p.N])]) # extend the array with next points
                
        for isolated in self.IsolatedPoints:
            sigma_and_N = np.vstack([sigma_and_N, np.array([isolated.sigma, isolated.N])])
          
        return sigma_and_N[1:]       
    
    def plot_points(self, dot_size = 1):
        P1              = self.GetPointsCoord()
        labels          = self.GetPointsLabels()
        plot_points(P1, labels, 'Input Data: GT', dot_size)
        plot_points(P1, len(labels) * [0], 'Input Data', dot_size)
    
    def save_fig_pdf(self, path, filename = 'generated_SMLM_untitled', dot_size = 0.01):
        P1              = self.GetPointsCoord()
        labels          = self.GetPointsLabels()
        save_fig_pdf(P1, labels, path, filename, dot_size)
        
    def GetAllData(self):
        # concatenate the positions and the label associated, in a pandas dataframe variable,
        # with columns names: x, y, labels_1, ..., labels_s, s the number of scales 
        df_xy    = pd.DataFrame(self.GetPointsCoord(), columns = ['x','y'])
        df_l     = pd.DataFrame(self.GetPointsLabels(), columns= ['labels_1'])
        df_d     = pd.DataFrame(self.GetPointsDistribution(), columns = ['sigma', 'N_photons'])
        df_cov   = pd.DataFrame({'cov_matrix': self.GetPointsCovMatrix()})
        df_final = pd.concat([df_xy, df_l, df_d, df_cov], axis = 1)
        return df_final
        
    def save_to_csv(self, path, filename):
        # This function saves a .csv file with columns: x, y, and labels. Each row corresponds to one point.
        # Give the filename without .csv, and the path without / 
        # Il faudra ajouter une autre colonne pour toutes les autres échelles

        df_final = self.GetAllData()
        df_final.to_csv(path + '/' + filename + '.csv', index = False)
        
    def GetPoints(self):
        # This method returns a list of all the points (type Point) in the ROI
        Points = []
        for cluster in self.Clusters:
            for p in cluster.points:
                Points.append(p)  
        for isolated in self.IsolatedPoints:
            Points.append(isolated)
        return Points
                

datagen.save_to_csv('/Users/Eliana/Documents/PDM', 'essai_datagen2')

test = pd.read_csv('/Users/Eliana/Documents/PDM/essai_datagen2.csv')
test

for cluster in datagen.Clusters: 
    print(cluster.center)
print(len(datagen.centers))

for cluster in datagen.Clusters: 
    for p in cluster.points:
        print(p.x, p.y)
        print('label:', p.label[0])
        print(p.x_collected, p.y_collected)

[x,y] = np.array([np.sum([p, np.random.multivariate_normal(mean, cov_scramble)],
                                        axis = 0) for p in self.cluster_points])

In [2]:
class CsrGenerator():
    def __init__(self, N, x_lim = [0,4], y_lim = [0,4]):
        self.N = N
        self.xmin, self.xmax = 1000 * x_lim[0], 1000 * x_lim[1] # from micrometer to nanometer 
        self.ymin, self.ymax = 1000 * y_lim[0], 1000 * y_lim[1]
        
        self.Points = []
        
        self.AddPoints()
        

    def AddPoints(self):
        # this method adds N points, complet
        coords = np.random.uniform(low = [self.xmin, self.ymin], high = [self.xmax, self.ymax], size= (self.N, 2))
        
        cov_matrix = [[0, 0], [0, 0]]
        n_photons = 1
        
        for c in coords:
            new_point = Point(c[0], c[1], cov_matrix, n_photons, 0)
            new_point.add_label(0)  # 0 label is for noise
            self.Points.append(new_point)
            
    def GetPointsCoord(self):
        points  = np.array([[0, 0]]) 
        for p in self.Points: 
            points = np.vstack([points, p.GetCoord()]) # extend the array with next points
        return points[1:]
    
    def GetPointsLabels(self):
        labels = []
        for p in self.Points: 
            labels.append(p.label[0])
        return labels
    
        
    def GetPointsCovMatrix(self):
        Covs = [] 
        for p in self.Points: 
            Covs.append(p.cov_matrix)
        return Covs
    
    def GetPointsDistribution(self):
        sigma_and_N  = np.array([[0, 0]]) 
        for p in self.Points:
            sigma_and_N = np.vstack([sigma_and_N, np.array([p.sigma, p.N])]) # extend the array with next points
        return sigma_and_N[1:]         
            
    def GetAllData(self):
        # concatenate the positions and the label associated, in a pandas dataframe variable,
        # with columns names: x, y, labels_1, ..., labels_s, s the number of scales 
        df_xy    = pd.DataFrame(self.GetPointsCoord(), columns = ['x','y'])
        df_l     = pd.DataFrame(self.GetPointsLabels(), columns= ['labels_1'])
        df_d     = pd.DataFrame(self.GetPointsDistribution(), columns = ['sigma', 'N_photons'])
        df_cov   = pd.DataFrame({'cov_matrix': self.GetPointsCovMatrix()})
        df_final = pd.concat([df_xy, df_l, df_d, df_cov], axis = 1)
        return df_final
        
    def plot_points(self, dot_size = 1):
        P1              = self.GetPointsCoord()
        labels          = self.GetPointsLabels()
        plot_points(P1, labels, 'CSR', dot_size)
        

17.859189838012774
