In [1]:
import sympy as sp
import minterpy as mp
import numpy as np
from minterpy.pointcloud_utils import *

from mpl_toolkits import mplot3d
%matplotlib inline

from mpl_toolkits.mplot3d import axes3d

import torch
import torchvision
from torchvision import transforms, datasets

import random
import numpy as np
import matplotlib.pyplot as plt

import os
from operator import itemgetter

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

import ot

from sklearn.neighbors import NearestNeighbors

import ripser
import persim
from persim import plot_diagrams

from operator import itemgetter


#do pip installs as follows to use vedo for plotting point clouds
#pip install vedo
#pip install ipyvtklink

import numpy as np
from vedo import *

import matplotlib.pyplot
from mpl_toolkits.mplot3d import Axes3D

import time
torch.set_printoptions(precision=8)


In [2]:
def _compute_distance_matrix(x, p=2):
    x_flat = x.view(x.size(0), -1)

    distances = torch.norm(x_flat[:, None] - x_flat, dim=2, p=p)

    return distances

def get_persistence_diagram(point_cloud, maximum_dim):

    point_cloud = torch.tensor(point_cloud)

    dist_matrix = _compute_distance_matrix(point_cloud, p=2)
    diagrams = ripser.ripser(dist_matrix.cpu().detach().numpy(), distance_matrix=True, maxdim=maximum_dim)['dgms']
    return diagrams, plot_diagrams(diagrams, show=True)


def random_sampling_from_pt_cloud(point_cloud, req_sample_size):
    
    point_cloud = torch.tensor(point_cloud)

    idx = np.random.randint(point_cloud.shape[0], size=req_sample_size)

    random_Sampled_point_cloud = point_cloud[idx,:]

    return random_Sampled_point_cloud

In [3]:
x, y, z = sp.symbols('x y z')

expr = x**2 + y**2 + z**2 - 1
poly = sp.Poly(expr, x, y, z)

# convert sympy polynomial to minyterpy polynomial
newt_poly = sympy_to_mp(poly, mp.NewtonPolynomial)


#sample points
point_data = sample_points_on_poly(80,        # Number of points to be sampled
                                   newt_poly,  # Polynomial in Newton basis
                                   bounds=1, # Boundary of the Cubic domain to be sampled
                                   tol=1e-15)  # Tolerance in solution

In [4]:
point_data.shape

(80, 3)

# Persistent homology

In [5]:
# function to check whether the selected edge is going to close a potential loop

def expecting_a_cycle(actual_new_test, my_edge):

    left_ind = my_edge[0][0]
    right_ind = my_edge[0][1]
    found_right_ind = False
    going_nowhere= False

    new_test = actual_new_test

    tracker = 0
    no_branches_formed = True
    while (not(found_right_ind) or not(going_nowhere)):

        positions1 = (new_test == left_ind).nonzero(as_tuple=False)

        if(positions1.shape[0]>1):
            edge_to_delete = new_test[positions1[0][0]]
            no_branches_formed = False
        
        branches_rising = positions1.shape[0]

        if(positions1.shape[0]==0):
            going_nowhere= True
            if(no_branches_formed):
                break
            
            left_ind = my_edge[0][0]

            deletable_edge_position1 = (actual_new_test == edge_to_delete[0]).nonzero(as_tuple=False)
            deletable_edge_position2 = (actual_new_test == edge_to_delete[1]).nonzero(as_tuple=False)

            deletable_edge_position1 = deletable_edge_position1[:,0]

            deletable_edge_position2 = deletable_edge_position2[:,0]

            a_cat_b1, counts1 = torch.cat([deletable_edge_position1, deletable_edge_position2]).unique(return_counts=True)
            deletable_row_position = a_cat_b1[torch.where(counts1.gt(1))]

            if(deletable_row_position.shape[0]==0):
                going_nowhere = True
                break

            deletable_row_position = deletable_row_position[0]

            actual_new_test = torch.cat((actual_new_test[:deletable_row_position], actual_new_test[deletable_row_position+1:]))
            new_test = actual_new_test

            positions1 = (new_test == left_ind).nonzero(as_tuple=False)

            if(tracker ==0):
                break

        if(positions1.shape[0]>1):
            edge_to_delete = new_test[positions1[0][0]]
            no_branches_formed = False
                
        first_position = positions1[0][0]
        adj_edge1 = new_test[positions1[0][0]]
        other_end1 = abs(positions1 - torch.tensor([[0, 1]]))


        consec_pt1 = new_test[other_end1[0][0]][other_end1[0][1]]
        consec_pt1 = int(consec_pt1)

        if(consec_pt1 == right_ind):
            found_right_ind = True
            break

        else:
            left_ind = consec_pt1
            new_test = torch.cat((new_test[:first_position], new_test[first_position+1:]))
            tracker = tracker+1
    
    return found_right_ind



# important functions

In [6]:
# function to get unbranched edges from the other side till there is branch

def right_side_pts_before_branching(actual_new_test, my_edge):

    left_ind = my_edge[0][1]
    right_ind = my_edge[0][0]
    found_right_ind = False
    going_nowhere= False

    new_test = actual_new_test
    actual_new_test_an = actual_new_test
    
    #tracker = 0
    no_branches_formed = True
    loop_tracker = 0
    positions1 = (new_test == left_ind).nonzero(as_tuple=False)
    loops_collec = []
    current_loop = torch.tensor([])
    consec_pt_tracker = torch.tensor([])
    while (not(found_right_ind) or not(going_nowhere)):

        positions1 = (new_test == left_ind).nonzero(as_tuple=False)
        
        if(positions1.shape[0]>1):
            break

        branches_rising = positions1.shape[0]

        if(positions1.shape[0]==0):
            #lets see
            break

        else:
            first_position = positions1[0][0]

            adj_edge1 = new_test[positions1[0][0]]
            other_end1 = abs(positions1 - torch.tensor([[0, 1]]))


            consec_pt1 = new_test[other_end1[0][0]][other_end1[0][1]]
            consec_pt1s = torch.unsqueeze(consec_pt1,0)
            consec_pt_tracker = torch.cat((consec_pt_tracker, consec_pt1s),0)
            consec_pt1 = int(consec_pt1)

            current_loop = torch.cat((current_loop,adj_edge1),0)
            current_loop1 = current_loop.reshape(int(current_loop.shape[0]/2),2)
            
            if(consec_pt1 == my_edge[0][0]):
                current_loop = torch.tensor([])
                
            if(consec_pt1 == right_ind):
                my_edge1 = torch.squeeze(my_edge,0)
                current_loop = torch.cat((current_loop,my_edge1),0)
                current_loop1 = current_loop.reshape(int(current_loop.shape[0]/2),2)                

                loops_collec.append(current_loop1)

                loop_tracker = loop_tracker + 1
            left_ind = consec_pt1
            new_test = torch.cat((new_test[:first_position], new_test[first_position+1:]))
            #tracker = tracker+1
    
    return consec_pt_tracker



In [7]:
# function to check whether the selected edge is going to close a potential loop

def expecting_a_connection(actual_new_test, my_edge):

    other_side_unbranched_pts = right_side_pts_before_branching(actual_new_test, my_edge)
    #print('other_side_unbranched_pts.shape',other_side_unbranched_pts.shape[0])
    left_ind = my_edge[0][0]
    right_ind = my_edge[0][1]
    found_right_ind = False
    going_nowhere= False
    a_connection_is_found = False

    new_test = actual_new_test
    actual_new_test_an = actual_new_test
    
    tracker = 0
    no_branches_formed = True
    loop_tracker = 0
    positions1 = (new_test == left_ind).nonzero(as_tuple=False)
    loops_collec = []
    current_loop = torch.tensor([])
    consec_pt_tracker = torch.tensor([])
    while (not(found_right_ind) or not(going_nowhere)):

        positions1 = (new_test == left_ind).nonzero(as_tuple=False)
        
        #lets see
        #edge_to_delete = new_test[positions1[0][0]]
        #print('positions1',positions1.shape)

        #print(new_test)
        #print()
        #print('positions1.shape[0]',positions1.shape[0])
        #print()
        
        if(positions1.shape[0]>0):
            #edg_q_del = new_test[positions1[0][0]]
            other_end_con = abs(positions1 - torch.tensor([[0, 1]]))
            consec_pt_con = new_test[other_end_con[0][0]][other_end_con[0][1]]
            #print('did i get consec_pt_con ', consec_pt_con)
            #print('now check if it works', not(consec_pt_con in consec_pt_tracker))
            #print('other_side_unbranched_pts.shape',other_side_unbranched_pts.shape)
            if(not(other_side_unbranched_pts.shape[0] == 0) ):
                #if(not(consec_pt_con in consec_pt_tracker) and not(consec_pt_con==other_side_unbranched_pts[-2])):
                if(not(consec_pt_con in consec_pt_tracker) ):
                    edge_to_delete = new_test[positions1[0][0]]
            else:
                if(not(consec_pt_con in consec_pt_tracker)):
                    edge_to_delete = new_test[positions1[0][0]]                
            no_branches_formed = False
            tracker = tracker+1
            #print('edge_to_delete first',edge_to_delete)
        branches_rising = positions1.shape[0]
        
        #print('tracker',tracker)
        if(positions1.shape[0]==0 and tracker ==0):
            #lets see     
            going_nowhere = True
            found_right_ind = True
            return a_connection_is_found, loops_collec
        
        if(positions1.shape[0]==0):
            current_loop = torch.tensor([])
            consec_pt_tracker = torch.tensor([])
            #going_nowhere= True
            '''if(no_branches_formed):
                break'''
            
            left_ind = my_edge[0][0]

            
            deletable_edge_position1 = (actual_new_test == edge_to_delete[0]).nonzero(as_tuple=False)
            deletable_edge_position2 = (actual_new_test == edge_to_delete[1]).nonzero(as_tuple=False)

            deletable_edge_position1 = deletable_edge_position1[:,0]

            deletable_edge_position2 = deletable_edge_position2[:,0]

            a_cat_b1, counts1 = torch.cat([deletable_edge_position1, deletable_edge_position2]).unique(return_counts=True)
            deletable_row_position = a_cat_b1[torch.where(counts1.gt(1))]
            #print()
            #print('deletable_row_position',deletable_row_position)
            
            if(deletable_row_position.shape[0]==0):
                #going_nowhere = True
                current_loop = torch.tensor([])
                break

            deletable_row_position = deletable_row_position[0]
            
            #print('Does my edge to delete contain my edge left index ? ', my_edge[0][0] in edge_to_delete)
            #print()
            actual_new_test = torch.cat((actual_new_test[:deletable_row_position], actual_new_test[deletable_row_position+1:]))
            if(my_edge[0][0] in edge_to_delete):

                deletable_edge_position1 = (actual_new_test_an == edge_to_delete[0]).nonzero(as_tuple=False)
                deletable_edge_position2 = (actual_new_test_an == edge_to_delete[1]).nonzero(as_tuple=False)

                deletable_edge_position1 = deletable_edge_position1[:,0]

                deletable_edge_position2 = deletable_edge_position2[:,0]

                a_cat_b1, counts1 = torch.cat([deletable_edge_position1, deletable_edge_position2]).unique(return_counts=True)
                deletable_row_position = a_cat_b1[torch.where(counts1.gt(1))]
                #print()
                #print('deletable_row_position',deletable_row_position)

                if(deletable_row_position.shape[0]==0):
                    #going_nowhere = True
                    current_loop = torch.tensor([])
                    break

                deletable_row_position = deletable_row_position[0]
                
                actual_new_test_an = torch.cat((actual_new_test_an[:deletable_row_position], actual_new_test_an[deletable_row_position+1:]))    
                actual_new_test = actual_new_test_an
                
            #actual_new_test = torch.cat((actual_new_test[:deletable_row_position], actual_new_test[deletable_row_position+1:]))
            #print('what is this', actual_new_test)
            new_test = actual_new_test

            positions1 = (new_test == left_ind).nonzero(as_tuple=False)
            #print('whats happening here',positions1.shape )
            #print('is the same edge still to delete', edge_to_delete)
            if(tracker ==0):
                break

            '''if(positions1.shape[0]>1):
            edge_to_delete = new_test[positions1[0][0]]
            no_branches_formed = False'''
        else:
            first_position = positions1[0][0]
            #print('first_position',first_position)
            adj_edge1 = new_test[positions1[0][0]]
            other_end1 = abs(positions1 - torch.tensor([[0, 1]]))


            consec_pt1 = new_test[other_end1[0][0]][other_end1[0][1]]
            consec_pt1s = torch.unsqueeze(consec_pt1,0)
            consec_pt_tracker = torch.cat((consec_pt_tracker, consec_pt1s),0)
            consec_pt1 = int(consec_pt1)

                
            #print('consec_pt1',consec_pt1)
            #print('adj_edge1',adj_edge1)
            current_loop = torch.cat((current_loop,adj_edge1),0)
            current_loop1 = current_loop.reshape(int(current_loop.shape[0]/2),2)
            #print('consec_pt_tracker',consec_pt_tracker)
            
            if(consec_pt1 == my_edge[0][0]):
                current_loop = torch.tensor([])
                
            if(consec_pt1 == right_ind):
                my_edge1 = torch.squeeze(my_edge,0)
                current_loop = torch.cat((current_loop,my_edge1),0)
                current_loop1 = current_loop.reshape(int(current_loop.shape[0]/2),2)                
                #found_right_ind = True
                #print('current_loop',current_loop1)
                #current_loop1 = torch.unsqueeze(current_loop1,0)
                #print('current_loop shape now',current_loop1.shape)
                #print('loop_tracker', loop_tracker)
                loops_collec.append(current_loop1)
                #loops_collec[loop_tracker] = current_loop1
                loop_tracker = loop_tracker + 1
                #print()
                #print("Wow! Found a loop here")
                #print()
                a_connection_is_found = True
                #break

            #else:

            left_ind = consec_pt1
            new_test = torch.cat((new_test[:first_position], new_test[first_position+1:]))
            #print('new_test',new_test)
            tracker = tracker+1
    tracker=0
    #loops_collec = torch.FloatTensor(loops_collec)
    return a_connection_is_found, loops_collec



In [17]:
def get_zero_dim_persistent_homology(dist_matrix):
    
    dist_matrix = torch.unique(dist_matrix, dim=0)
    dist_matrix = torch.unique(dist_matrix, dim=1)

    upp_diag = torch.triu(dist_matrix, diagonal=1)
    ff = upp_diag.sort()

    sorted_upper_diag_edges = ff[0]

    sorted_upper_diag_indices = ff[1]

    flattened_uppdg_edges = torch.flatten(sorted_upper_diag_edges)

    non_zero_flattened_uppdg_edges = flattened_uppdg_edges[flattened_uppdg_edges.nonzero()]

    non_zero_flattened_uppdg_edges = non_zero_flattened_uppdg_edges.reshape(non_zero_flattened_uppdg_edges.shape[0])
    increasing_edges = non_zero_flattened_uppdg_edges.sort()[0]
    increasing_edges = torch.unique(increasing_edges, dim=0)
    selected_edges = torch.tensor([])
    all_edges = torch.tensor([])
    dead_indices = torch.tensor([])
    potential_triangles = torch.tensor([])
    edge_leads_to_loop = False

    for i in range(increasing_edges.shape[0]):
        a = (upp_diag == increasing_edges[i]).nonzero(as_tuple=False)
        alle = (upp_diag == increasing_edges[i]).nonzero(as_tuple=False)
        all_edges = torch.cat(((all_edges, alle)), 0)
        
        if(selected_edges.shape[0] > 1):
            edge_leads_to_loop = expecting_a_cycle(selected_edges, a)

        if(not(edge_leads_to_loop)):
            selected_edges = torch.cat(((selected_edges, a)), 0)


    zeroD_PH = torch.tensor([])
    print('all_edges',all_edges[0:10])
    print('all_edges.shape',all_edges[0:10].shape)
    
    dept_arr = torch.tensor([[ 60, 72]])

    for i in range(all_edges.shape[0]):
        #print('i',i)
        connected, connections_found = expecting_a_connection(all_edges[0:i+1], dept_arr)
        if((connected)):
            print('it got connected at ', i)
            print('connections_found', connections_found)
            break
        
    for i in range(selected_edges.shape[0]):    
        death = dist_matrix[int(selected_edges[i][0])][int(selected_edges[i][1])]
        death = death.reshape(1,1)    
        zeroD_PH = torch.cat(((zeroD_PH, death)), 0)

    births = torch.zeros(zeroD_PH.shape[0], 1)
    zeroD_PH_births_deaths = torch.cat((births, zeroD_PH ),1)

    return connections_found

    

# end important functions

In [18]:
#random_pts = random_sampling_from_pt_cloud(point_data, 80)

In [19]:
distance_matrix = _compute_distance_matrix(torch.tensor(point_data), p=2)


In [20]:
start = time.time()

zero_d_PH = get_zero_dim_persistent_homology(distance_matrix)

end = time.time()

print(f"Runtime of the program is {end - start}")

all_edges tensor([[20., 23.],
        [36., 38.],
        [60., 61.],
        [47., 48.],
        [35., 40.],
        [10., 11.],
        [25., 30.],
        [71., 72.],
        [58., 63.],
        [ 0.,  1.]])
all_edges.shape torch.Size([10, 2])
it got connected at  115
connections_found [tensor([[60., 67.],
        [66., 67.],
        [66., 73.],
        [73., 75.],
        [70., 75.],
        [70., 71.],
        [71., 72.],
        [60., 72.]]), tensor([[60., 67.],
        [66., 67.],
        [66., 73.],
        [73., 75.],
        [70., 75.],
        [70., 71.],
        [71., 72.],
        [60., 72.]]), tensor([[60., 67.],
        [66., 67.],
        [66., 73.],
        [73., 75.],
        [70., 75.],
        [70., 72.],
        [60., 72.]]), tensor([[60., 61.],
        [61., 69.],
        [67., 69.],
        [66., 67.],
        [66., 73.],
        [73., 75.],
        [70., 75.],
        [70., 71.],
        [71., 72.],
        [60., 72.]]), tensor([[60., 61.],
        [61., 69.],
 

In [21]:
zero_d_PH

[tensor([[60., 67.],
         [66., 67.],
         [66., 73.],
         [73., 75.],
         [70., 75.],
         [70., 71.],
         [71., 72.],
         [60., 72.]]),
 tensor([[60., 67.],
         [66., 67.],
         [66., 73.],
         [73., 75.],
         [70., 75.],
         [70., 71.],
         [71., 72.],
         [60., 72.]]),
 tensor([[60., 67.],
         [66., 67.],
         [66., 73.],
         [73., 75.],
         [70., 75.],
         [70., 72.],
         [60., 72.]]),
 tensor([[60., 61.],
         [61., 69.],
         [67., 69.],
         [66., 67.],
         [66., 73.],
         [73., 75.],
         [70., 75.],
         [70., 71.],
         [71., 72.],
         [60., 72.]]),
 tensor([[60., 61.],
         [61., 69.],
         [67., 69.],
         [66., 67.],
         [66., 73.],
         [73., 75.],
         [70., 75.],
         [70., 71.],
         [71., 72.],
         [60., 72.]]),
 tensor([[60., 61.],
         [61., 69.],
         [67., 69.],
         [66., 67.],
   