In [4]:
import os,sys

currentdir = os.getcwd()
parentdir = os.path.dirname(currentdir)
sys.path.insert(0, parentdir) 

from util import *

from DetermineRidges.RidgeAnalysis import PolylineGraphs
from Classes.BasicClasses import manualEdges

from Functions.BasicMSII1D import angle_between_vectors

# from Functions.main import labelledmesh_procedures,graph_procedures,MSII_chaineoperatoire_procedures

# compare two operational sequences 
from Functions.EvaluateGraph import evaluate_directed_edges

from Functions.EssentialEdgesFunctions import get_manual_edges


class DiGraphEvaluation (PolylineGraphs):

    def __init__(self, graphname):
        super().__init__()

        self.graphname = graphname

    def manual_operational_sequences_edges (self):

        edge_df = pd.read_csv(''.join([self.path,self.id,'_links','.csv']),
                                sep=',',header=0)

        self.manual_edges  = {(int(edge[0]),int(edge[1])) for _,edge in edge_df.iterrows()}

    def manual_operational_sequences_nodes_edges (self):

        nodes_df = pd.read_csv(''.join([self.path,self.id,'_nodes','.csv']),
                                sep=',',header=0)

        self.manual_nodes = {node[-1]: {nodes_df.columns[n]:para for n,para in enumerate(node[:-1])} 
                    
                        for _,node in nodes_df.iterrows()
                        }
        
        edge_df = pd.read_csv(''.join([self.path,self.id,'_links','.csv']),
                                sep=',',header=0)

        
        self.manual_edges = {(int(nodes_df[nodes_df['node'] == edge['source']]['gt_label']),
                        int(nodes_df[nodes_df['node'] == edge['target']]['gt_label']))
                    
                        for _,edge in edge_df.iterrows()}

    def export_max_angle (obj,label_arr_mean):


        origin = np.array([ 0.00000000001,
                            0.00000000001,
                            0.00000000001])

        max_edge_angle = {vert: max([abs(angle_between_vectors(vals[obj.dict_label[vert]] + origin, 
                                        origin,
                                        normal+origin) [0])

                                    for nei, normal in vals.items() 
                                        if math.isnan(angle_between_vectors(   vals[obj.dict_label[vert]] + origin, 
                                        origin,
                                        normal+origin)[0]) != True
                                    ], key=abs)
                                for vert,vals in label_arr_mean.items()
                        }

        write_labels_txt_file ( max_edge_angle, 
                                ''.join ([  obj.path, 
                                            obj.id,
                                            '_'.join([  '',
                                                        'edge-angle',
                                                        'labels'])
                                        ])
                                )
    
    def get_max_func_val (self,label_arr_mean):    

        self.max_func_val = {vert:   max([func_val
                                            for func_val in values.values() 
                                        ], key=abs)

                                    for vert,values in label_arr_mean.items()
                            }
        
    def export_max_func_val (self,func_val_name):    

        write_labels_txt_file ( self.max_func_val, 
                                ''.join ([  self.path, 
                                            self.id,
                                            '_'.join([  '',
                                                        func_val_name,
                                                        'labels'])
                                        ])
                                )    
        
    def segment_to_ridgegraph (self):
        self.G_ridges = nx.Graph()

        self.mean_segments_funv_node = {}

        self.mean_segments_funv = {}        
        self.mean_maxsegments_funv = {}
        self.mean_minsegments_funv = {}                

        for edge,nodes in self.segments.items():
            if nodes != {'vertices': []}:

                self.G_ridges.add_nodes_from(edge)
                self.G_ridges.add_edge(*edge,
                                        nodes = nodes['vertices'],
                                        length = len(nodes['vertices']),
                                        
                                        funct_vals = self.segments_funv[edge]['funct_vals'],
                                        max = np.round(np.max(self.segments_funv[edge]['funct_vals'])),
                                        mean = np.mean(self.segments_funv[edge]['funct_vals']),
                                        med = np.median(self.segments_funv[edge]['funct_vals']),
                                        std = np.std(self.segments_funv[edge]['funct_vals']),
                                        var = np.var(self.segments_funv[edge]['funct_vals']))
        
                for node in nodes['vertices']:
                    
                    self.mean_segments_funv_node [node] = {}
                    
                    self.mean_segments_funv_node [node] = np.mean(self.segments_funv[edge]['funct_vals'])

                # self.mean_angle_edge[edge] = {}
                # self.mean_maxangle_edge[edge] = {}
                # self.mean_minangle_edge[edge] = {}                

                self.mean_segments_funv[edge] = np.mean(self.segments_funv[edge]['funct_vals'])
                self.mean_maxsegments_funv[edge] = np.max(self.segments_funv[edge]['funct_vals'])
                self.mean_minsegments_funv[edge] = np.max(self.segments_funv[edge]['funct_vals'])

            else:
                # self.mean_angle_edge[edge] = {}
                # self.mean_maxangle_edge[edge] = {}
                # self.mean_minangle_edge[edge] = {}                

                self.mean_segments_funv[edge] = 0
                self.mean_maxsegments_funv[edge] = 0
                self.mean_minsegments_funv[edge] = 0

    def evaluate_label_arr_mean (self):

        edges = get_manual_edges(self.path, self.id) #evaluate_label_arr_mean

        self.get_max_func_val (self.label_arr_mean)  

        self.segment_pline_parameter(self.max_func_val)

        self.segment_to_ridgegraph() # self.segment_to_ridgegraph()

        self.ridge_pairs()

        self.direct_ridgegraph()

        self.get_G_ridge_properties()

        self.get_DiG_ridge_properties(self.graphname)

        ridgepairs = {  ridge:values['bigger_smaller'] * values['difference'] 
                        
                        for ridge,values in self.ridges_pairs.items() 
                            if values ['bigger_smaller'] != 0.0
                    }
        

        edges_turned = {(edge[1],edge[0]) for edge in edges}

        evaluate_directed_edges(ridgepairs,edges)
        evaluate_directed_edges(ridgepairs,edges_turned)    


In [3]:
def get_vertices_in_radius (mesh:object,
                            label_verts:dict,
                            neighs: dict,
                            dict_label: dict,
                            radius:float,
                            metadata: np.ndarray) -> dict:

    kdtree = mesh.kdtree
    vertices = mesh.vertices

    label_arr = {}

    for label,values in label_verts.items():

        label_arr [label] = {}

        for n,val in enumerate(values):

            label_arr [label] [val] = {}

            if neighs [val] == None:
                neighs [val] = {label}

            neighs [val].add(label)

            # get nearest points 
            indices_within_radius = kdtree.query_ball_point(vertices[val], radius)
            # vertices_within_radius = vertices[indices_within_radius]        

            for nei in neighs [val]:

                arr = np.empty((metadata.shape[1],),float) 

                for ind in indices_within_radius:

                    if dict_label[ind] == nei:

                        arr = np.vstack((arr, metadata[ind]))

                label_arr [label] [val] [nei] = arr

    return label_arr

def get_label_mean( label_verts:dict,
                    metadata: np.ndarray) -> dict:

    label_dict = {}

    for label,values in label_verts.items():

        arr = np.empty((metadata.shape[1],),float) 

        for val in values:

            arr = np.vstack((arr, metadata[val]))

        label_dict [label] = arr

    return label_dict

def filter_metadata (imp_metadata,parameters):

    if len(parameters) > 1:
        
        metadata = np.column_stack((imp_metadata [para] for para in parameters))
        
        return metadata
            
    elif len(parameters) == 1:

        metadata = np.array((imp_metadata [parameters[0]]))
        metadata = metadata.reshape(metadata.shape[0],1)

        return metadata
    
    else:
        return
    
def ridge_inside_mean_curv(path,id,preprocessed,radius,dict_label,label_arr):

    label_arr_mean = {  vert:{nei: float(np.mean(params,axis=0))
                                for nei,params in neighs.items()
                                    if dict_label[vert] == nei
                                } 
                          
                            for values in label_arr.values() 
                                for vert,neighs in values.items()

                        }
    pd.DataFrame.from_dict(label_arr_mean, orient='index').to_csv(  ''.join([path,
                                                                    id,
                                                                    preprocessed,
                                                                    '_r{}'.format(radius),
                                                                    '_inside_mean_curv',
                                                                    '.txt']), 
                                                            mode='w',
                                                            header=False, 
                                                            index=True,
                                                            sep=' ') 
    
    return label_arr_mean
    
def ridge_outside_mean_curv(path,id,preprocessed,radius,dict_label,label_arr):

    label_arr_mean = {  vert:{nei: float(np.mean(params,axis=0))
                                for nei,params in neighs.items()
                                    if dict_label[vert] != nei
                                } 
                          
                            for values in label_arr.values() 
                                for vert,neighs in values.items()

                        }
    pd.DataFrame.from_dict(label_arr_mean, orient='index').to_csv(  ''.join([path,
                                                                    id,
                                                                    preprocessed,
                                                                    '_r{}'.format(radius),
                                                                    '_outside_mean_curv',
                                                                    '.txt']), 
                                                            mode='w',
                                                            header=False, 
                                                            index=True,
                                                            sep=' ') 



    # mean_curv = {vert:np.mean([val for val in vals.values()]) for vert,vals in label_arr_mean.items()}

    # mean_curv_data = pd.DataFrame.from_dict(mean_curv, orient='index')

    # mean_curv_data.to_csv(  ''.join([path,
    #                                 id,
    #                                 preprocessed,
    #                                 'r{}'.format(radius),
    #                                 '_mean_curv',
    #                                 '.txt']), 
    #                         mode='w',
    #                         header=False, 
    #                         index=True,
    #                         sep=' ')        

def CO_concavity_procedure (obj,**kwargs):

    path = kwargs ['path'] 
    id = kwargs ['id']
    preprocessed = kwargs ['preprocessed']
    labelname = kwargs ['labelname']
    exp_path = kwargs ['exp_path'] 
    diameter = kwargs ['diameter'] 
    parameters = kwargs ['parameters'] 
    n_rad = kwargs ['n_rad'] 
    graphname = kwargs ['graphname']

    # Data import and data preparation 
    obj.prep_polygraphs(path,id,preprocessed,labelname,exp_path)

    # Create 
    obj.prep_ridges()
    obj.extract_ridges()    

    # create node coordinates
    # obj.get_centroids()

    # get polylines
    obj.polyline_to_nx()
    obj.polygraphs_to_polylines()


    # prepare for creating MSII1D_Pline object 
    obj.create_normals_vertices()
    obj.create_dict_mesh_info()
    obj.prepare_polyline()

    #create MSII1D_Pline object and calculate the MSII-1D  
    # obj.polygraphs_to_nx()

    # obj.calc_II_new_sphere (diameter,n_rad)
    # obj.get_feature_vectors()

    # extract parameters, which are important to calculate CO concavity  
    mesh = obj.tri_mesh
    dict_label = obj.dict_label
    label_outline_vertices = obj.label_outline_vertices
    neighs = obj.ridge_neighbour_notshared_label  

    metadata = filter_metadata (mesh.metadata ['ply_raw']['vertex']['data'],
                                parameters)
    
    
    for r in range(1,2**n_rad+1):
        print(r)    
        obj.label_arr = get_vertices_in_radius (mesh,
                                                label_outline_vertices,
                                                neighs,
                                                dict_label,
                                                diameter*r/2**n_rad,
                                                metadata)      
              
        obj.label_arr_mean = ridge_inside_mean_curv(path,id,preprocessed,r,dict_label,obj.label_arr)

        obj.max_func_val = obj.label_arr_mean
        # obj.export_max_func_val(obj.func_val_name)

        obj.evaluate_label_arr_mean()

def chaineoperatoire_procedures (method,**kwargs) -> object:
   
    CO = DiGraphEvaluation()

    procedures = {
                #     'MSII':               MSII_procedure,
                #   'MSII_feature_vector':MSII_feature_vector_procedure,
                  'CO_concavity':       CO_concavity_procedure}

    func = procedures.get(method)

    func(CO,**kwargs)

    return CO

# Chaine operatoire obejct

exp_path =              'exp_path/'
id =                    '207'  # 
# id = 'RF.c_49'
preprocessed =          '_GMOCF_PC2' #   '_GMOCF_r1.00_n4_v256.volume'#'_GMOCF_gt_labels' #'_GMOCF'
labelname =             '_gt_labels'#'_Kmeans_labels'#
graphname =             'max'#'normal' # 'min' # 'max'# 'normal'
diameter =              1
n_rad =                 4    
radius_scale =          0.5
circumference_scale =   1
parameters =            ['quality']

#path = '/home/linsel/Documents/PhD/Data/Fumane/test_orientated/operational_sequences/{}/data/'.format(id)
path = '/home/linsel/Documents/PhD/Data/Fumane/V1.0.0_Linsel_et_al_2023/Graph/CO/{}/PCA_quality/'.format(id)

kwargs = {'path':           path,
          'id':             id,
          'preprocessed':   preprocessed,
          'labelname':      labelname,
          'exp_path':       exp_path,
          'graphname':      graphname,
          'diameter':       diameter,
          'parameters':     parameters,
          'n_rad':          n_rad
          }

print('CO3:')
CO3 = chaineoperatoire_procedures('CO_concavity',**kwargs)

CO3:
1


NameError: name 'get_manual_edges' is not defined

In [None]:
# Chaine operatoire obejct

exp_path =              'exp_path/'
id =                    '207'  # 
# id = 'RF.c_49'
preprocessed =          '_GMOCF_PC1' #   '_GMOCF_r1.00_n4_v256.volume'#'_GMOCF_gt_labels' #'_GMOCF'
labelname =             '_gt_labels'#'_Kmeans_labels'#
graphname =             'max'#'normal' # 'min' # 'max'# 'normal'
diameter =              1
n_rad =                 4    
radius_scale =          0.5
circumference_scale =   1
parameters =            ['quality']


#path = '/home/linsel/Documents/PhD/Data/Fumane/test_orientated/operational_sequences/{}/data/'.format(id)
path = '/home/linsel/Documents/PhD/Data/Fumane/V1.0.0_Linsel_et_al_2023/Graph/CO/{}/PCA_quality/'.format(id)


kwargs = {'path':           path,
          'id':             id,
          'preprocessed':   preprocessed,
          'labelname':      labelname,
          'exp_path':       exp_path,
          'graphname':      graphname,
          'diameter':       diameter,
          'parameters':     parameters,
          'n_rad':          n_rad
          }



for file in os.listdir(path):
    if file.endswith('_mean_curv.txt'):
        print(file[len(id) + len(preprocessed):])


        para_name = kwargs ['para_name'] 
        parameter = kwargs ['parameter'] 

In [None]:
from Functions.EssentialLabelAlteration import get_unique_labels,get_uniquelabel_vertlist,get_labels_IoU_max,get_labels_IoU,label_vertices

def label_connections_nodes (self):

    label_vertices( self,
                    0,
                    self.connections_mesh.vertices,
                    # [''.join([f"{edges[0]:02}",'0',f"{edges[1]:02}"]) for edges in self.G_ridges.edges],
                    [num for num,_ in enumerate(self.G_ridges.edges)], 
                    '_connections')

    label_vertices( self, 
                    len(self.connections_mesh.vertices),
                    self.nodes_mesh.vertices,
                    self.G_ridges.nodes,
                    '_nodes')


GE1.label_connections_nodes ()