In [None]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl

import scipy.spatial.distance as dist
import itertools

import tdqual.topological_data_quality_0 as tdqual

import os 
plots_dir = "plots/example_computation/"
os.makedirs(plots_dir, exist_ok=True)

To make some plots from this notebook, we need to install GUDHI for working with simplicial complexes in an easy way.

In [None]:
# pip install gudhi
import gudhi

# Computation of Block Function in dimension 0

Consider the following example, with points taken from a sample.

We consider 7 points and a sample of three points. 

In [None]:
RandGen = np.random.default_rng(2)
# # Generate Random Sample
Z = tdqual.sampled_circle(0,2,6, RandGen)
X_indices = RandGen.choice(Z.shape[0],3, replace=False)
# Sort Z so that the first # X points are from X, also, modify some points and save 
X_compl = np.ones(Z.shape[0], dtype="bool")
X_compl[X_indices] = False
Z = np.vstack((Z[X_indices], Z[X_compl]))
Z[3] = [-0.1,0]
Z[4] = [0.4,0]
Z[5] = [0.15,np.sqrt(0.5**2 - 0.25**2)]
X_indices = range(len(X_indices))
np.savetxt("Z_example_new.txt", Z, fmt="%.4f")
np.savetxt("X_idx_example_new.txt", X_indices, fmt="%d")
# Load Z and X_indices from files 
Z = np.loadtxt("Z_example_new.txt")
X_indices = np.loadtxt("X_idx_example_new.txt", dtype="int")
X = Z[X_indices]
# Plot point cloud
fig, ax = plt.subplots(ncols=1, figsize=(3,3))
ax.scatter(X[:,0], X[:,1], color=mpl.colormaps["RdBu"](0.3/1.3), s=60, marker="o", zorder=2)
ax.scatter(Z[:,0], Z[:,1], color=mpl.colormaps["RdBu"](1/1.3), s=40, marker="x", zorder=1)
ax.set_axis_off()
plt.savefig(plots_dir + "points_0.png")

We plot, for illustration, the Vietoris-Rips complex at a sequence of values

In [None]:
### Geometric Matching 
def compute_components(edgelist, num_points):
    components = np.array(range(num_points))
    for edge in edgelist:
        max_idx = np.max(components[edge])
        min_idx = np.min(components[edge])
        indices = np.nonzero(components == components[max_idx])[0]
        components[indices]=np.ones(len(indices))*components[min_idx]
    
    return components

In [None]:
def plot_Vietoris_Rips_subset(Z, X_indices, filt_val, ax, labels=False, fontsize=15):
    X = Z[X_indices]
    # Plot point cloud
    if labels:
        ax.scatter(X[:,0], X[:,1], color=mpl.colormaps["RdBu"](0.3/1.3), s=230, marker="o", zorder=2)
        ax.scatter(Z[:,0], Z[:,1], color=mpl.colormaps["RdBu"](1/1.3), s=230, marker="o", zorder=1)
    else:
        ax.scatter(X[:,0], X[:,1], color=mpl.colormaps["RdBu"](0.3/1.3), s=60, marker="o", zorder=2)
        ax.scatter(Z[:,0], Z[:,1], color=mpl.colormaps["RdBu"](1/1.3), s=40, marker="o", zorder=1)
    # Plot simplicial complex 
    rips_complex = gudhi.RipsComplex(points=Z, max_edge_length=filt_val)
    simplex_tree = rips_complex.create_simplex_tree(max_dimension=1)
    simplex_tree.expansion(2)
    edgelist = []
    for filtered_value in simplex_tree.get_filtration():
        simplex = filtered_value[0]
        if len(simplex)==2:
            edgelist.append(simplex)
            if len(np.intersect1d(simplex, X_indices))==2:
                ax.plot(Z[simplex][:,0], Z[simplex][:,1], linewidth=2, c=mpl.colormaps["RdBu"](0.3/1.3), zorder=0.5)
            else:
                ax.plot(Z[simplex][:,0], Z[simplex][:,1], linewidth=2, c=mpl.colormaps["RdBu"](1/1.3), zorder=0.5)
        # if len(simplex)==3:
        #     ax.add_patch(mpl.patches.Polygon(Z[simplex], closed=True, facecolor="black", alpha=0.3, zorder=0.2))
    ax.set_aspect("equal")
    # Adjust margins
    xscale = ax.get_xlim()[1]-ax.get_xlim()[0]
    yscale = ax.get_ylim()[1]-ax.get_ylim()[0]
    xlim = ax.get_xlim()
    xlim = (xlim[0]-xscale*0.1, xlim[1]+xscale*0.1)
    ax.set_xlim(xlim)
    ylim = ax.get_ylim()
    ylim = (ylim[0]-yscale*0.1, ylim[1]+yscale*0.1)
    ax.set_ylim(ylim)
    # Plot labels
    if labels:
        components = compute_components(edgelist, Z.shape[0])
        # Point Labels 
        for i in range(Z.shape[0]):
            ax.text(Z[i,0]-0.035*xscale, Z[i,1]-0.035*yscale, f"{components[i]}", fontsize=fontsize, color="white", fontweight="bold")
    # Finish with aspect details 
    ax.set_xticks([])
    ax.set_yticks([])

In [None]:
filtrations = [0.0, 1, 2, 3]
fig, ax = plt.subplots(ncols=len(filtrations), figsize=(3*len(filtrations),3))
for j, filt_val in enumerate(filtrations):
    plot_Vietoris_Rips_subset(Z, X_indices, filt_val, ax[j])
    # Set title 
    ax[j].set_title(f"{filt_val:1.1f}") 
    #ax[j].set_title(f"VR_{filt_val:1.1f}(Z) \subseteq VR_{filt_val:1.1f}(Z)") 
plt.savefig(plots_dir + "VR_filtration.png")


Repeat computation of Vietoris-Rips complex with labels on vertices and components.

In [None]:
filtrations = [0,1,1.1,1.2, 2.5]
fig, ax = plt.subplots(nrows=2, ncols=len(filtrations), figsize=(2.5*len(filtrations),6))
for i, filt_val in enumerate(filtrations):
    plot_Vietoris_Rips_subset(Z, X_indices, filt_val, ax[0, i], labels=True)
    X = Z[X_indices]
    plot_Vietoris_Rips_subset(X, [], filt_val, ax[1, i], labels=True)
    # Plot point cloud extra large
    ax[0,i].set_title(f"{filt_val:.1f}")

ax[0,0].set_title("Dataset")
ax[1,0].set_title("Subset")
plt.tight_layout()
plt.savefig(plots_dir + "VR_components.png")

Next, we compute the block function induced by the inclusion $S\hookrightarrow X$

In [None]:
from importlib import reload
reload(tdqual)

In [None]:
# filtration_list_Z, pairs_arr_Z = tdqual.filtration_pairs(Z)
# filtration_list_X, pairs_arr_X = tdqual.filtration_pairs(X)
# F = tdqual.get_inclusion_matrix(pairs_arr_S, pairs_arr_Z, X_indices)
# matching = tdqual.get_inclusion_matrix_pivots(F, Z.shape[0])
filt_X, filt_Z, matching = tdqual.compute_Mf_0(X, Z, X_indices)

We now print some of these values.

In [None]:
np.set_printoptions(precision=3)
print("Endpoints from X")
print(np.array(filt_X))
print()
print("Endpoints from Z")
print(np.array(filt_Z))
print()
print("Induced Matching Mf")
print(matching)

Now, we describe the $0$-persistence barcodes in terms of evolution of components.

In [None]:
def plot_merge_tree(endpoints_0, reps_0, ax):
    max_x = np.max(endpoints_0)*1.1
    num_points = len(endpoints_0)+1
    y= np.linspace(0, 0.3*num_points, num_points)
    idx_death = []
    merging_into= []
    death_val = []
    for idx, (end, rep) in enumerate(zip(endpoints_0, reps_0)):
        ax.plot([0,end], [y[idx], y[idx]], c=mpl.colormaps["RdBu"](1/1.3), linewidth=3, zorder=0.5)
        idx_death.append(np.max(rep))
        merging_into.append(np.min(rep))
        death_val.append(end)
    
    # merge lines in red
    idx_death.append(0)
    for idx, (j, death) in enumerate(zip( merging_into, death_val)):
        death_merging = idx_death.index(j)
        ax.plot([death, death], [y[idx],y[death_merging]], linewidth=3, c=mpl.colormaps["RdBu"](0.3/1.3), zorder=0.5)

    xscale = ax.get_xlim()[1]-ax.get_xlim()[0]
    yscale = ax.get_ylim()[1]-ax.get_ylim()[0]
    for i, idx in enumerate(idx_death):
        ax.text(-0.015*xscale, y[i]-0.04*yscale, f"{idx}", zorder=0.7, fontsize=10, color="white", fontweight="bold")
        if i < len(idx_death)-1:
            death_x = endpoints_0[i]
            ax.text(death_x-0.015*xscale, y[i]-0.04*yscale, f"{merging_into[i]}", zorder=0.7, fontsize=10, color="white", fontweight="bold")

    ax.scatter(np.zeros(len(y)),y, s=100, marker="o", color=mpl.colormaps["RdBu"](1/1.3), zorder=0.6)
    ax.scatter(endpoints_0, y[:-1], s=100, marker="o", color=mpl.colormaps["RdBu"](0.3/1.3), zorder=0.6)
    ax.set_xlim(ax.get_xlim()[0]-0.1*xscale, ax.get_xlim()[1]+0.1*xscale)
    ax.set_ylim(ax.get_ylim()[0]-0.1*yscale, ax.get_ylim()[1]+0.1*yscale)
    # Top horizontal interval
    ax.plot([0,max_x*2], [y[-1],y[-1]], linewidth=3, c=mpl.colormaps["RdBu"](1/1.3), zorder=0.5)
    ax.set_yticks([])

In [None]:
filt_X, pairs_arr_X = tdqual.mst_edge_filtration(X)
TMT_X_pairs = tdqual.compute_tmt_pairs(filt_X, pairs_arr_X)
filt_Z, pairs_arr_Z = tdqual.mst_edge_filtration(Z)
TMT_Z_pairs = tdqual.compute_tmt_pairs(filt_Z, pairs_arr_Z)

In [None]:
fig, ax = plt.subplots(ncols=2, figsize=(9,2.4))
plot_merge_tree(filt_X, TMT_X_pairs, ax[0])
plot_merge_tree(filt_Z, TMT_Z_pairs, ax[1])
ax[0].set_title("Merge tree from Z")
ax[1].set_title("Merge tree from Z")
ylim = ax[1].get_ylim()
ax[1].plot([0.8,0.8], ylim, linewidth=1, color="gray", linestyle="--", zorder=0)
plt.tight_layout()
plt.savefig(plots_dir + "merge_trees_X_Z.png")

Next, we plot the barcode matching $\mathcal{M}^0_f$

In [None]:
fig, ax = plt.subplots(figsize=(7,2.5))
tdqual.plot_matching_0(filt_X, filt_Z, matching, ax)
plt.tight_layout()
plt.savefig(plots_dir + "block_matching_0.png")

Next, we plot the persistence diagram.

In [None]:
fig, ax = plt.subplots(figsize=(3,3))
D_f, multiplicities = tdqual.compute_matching_diagram(filt_X, filt_Z, matching, _tol=1e-5)
tdqual.plot_matching_diagram(D_f, ax)
plt.tight_layout()
plt.savefig(plots_dir + "matching_diagram_0.png")

In [None]:
D_f.tolist()

In [None]:
D_f_rep = []
for i, pair in enumerate(D_f):
    for j in range(multiplicities[i]):
        D_f_rep += list(pair)

D_f_rep = np.array(D_f_rep).reshape(-1,2)

In [None]:
D_f_rep

In [None]:
fin_D_f = D_f_rep[D_f_rep[:,0]<np.inf]
print(fin_D_f)

In [None]:
coker_f = D_f[D_f[:,0]==np.inf][:,1]

In [None]:
tdqual.plot_density_matching_diagram(D_f_rep, coker_f, plots_dir + "density_matrix_0.png", nbins=5, show_colorbar=True)