In [None]:
from perdiver.distances import *

import matplotlib as mpl

import tdqual.topological_data_quality_0 as tdqual

In [None]:
def get_matching_diagram(Dist_X, Dist_Y):
    # Compute matching from X to Y
    filt_X, filt_Y, matching = tdqual.compute_Mf_0(Dist_X, Dist_Y, is_dist=True)
    # Plot 0 persistence diagram of matching 
    match_diagram = []
    for idx, idx_match in enumerate(matching):
        match_diagram.append([filt_X[idx], filt_Y[idx_match]])
    # end for
    return np.array(match_diagram)

In [None]:
from navground import core, sim

import matplotlib.pyplot as plt

import os
plots_dir = "plots/corridor/"
os.makedirs(plots_dir, exist_ok=True)

In [None]:
length = 8.0
num_steps = 1000
# num_steps = 0
width=1.0
yaml = f"""
steps: {num_steps}
time_step: 0.1
save_directory: ''
record_pose: true
record_twist: true
scenario:
  type: Corridor
  length: {length}
  width: {width} 
  groups:
    -
      type: thymio
      number: 38
      radius: 0.08
      control_period: 0.1
      speed_tolerance: 0.02
      kinematics:
        type: 2WDiff
        wheel_axis: 0.094
        max_speed: 0.166
      behavior:
        type: HL
        optimal_speed: 0.12
        horizon: 5.0
        safety_margin: 0.034
      state_estimation:
        type: Bounded
        range: 5.0
"""
experiment = sim.load_experiment(yaml)
experiment.run()

In [None]:
run = experiment.runs[0]
ps = run.poses[:,:,[0,1]]
twists = run.twists[:,:,:2] # ignore angular speeds

In [None]:
weight = 4
shift_time = 30
steps_list = list(range(0, 800, 5))

In [None]:
start_step = 100
X = ps[start_step]
Y = ps[start_step + shift_time]
vel_X = twists[start_step]
vel_Y = twists[start_step + shift_time]
X_len = X.shape[0]-1
# Interpret diagram via points
Dist_X, Dist_Y, Dist_Z = compute_distance_matrices_timesteps_corridor(X, Y, vel_X, vel_Y, weight, length)
matching_diagram_inclusion = get_matching_diagram(Dist_X, Dist_Z)
matching_diagram_pair = get_matching_diagram(Dist_X, Dist_Y)
fig, ax = plt.subplots(ncols=2, figsize=(8,4))
ax[0].set_title("Matching diagram of inclusion")
tdqual.plot_matching_diagram(matching_diagram_inclusion, ax[0])
ax[1].set_title("Matching diagram of pair")
tdqual.plot_matching_diagram(matching_diagram_pair, ax[1])

In [None]:
def plot_sequence(X_list, ax, mark_points=[], color="red"):
    # Plot figure
    X_old = X_list[0]
    ax.scatter(X_old[:,0], X_old[:,1], s=20, marker="o", color=mpl.colormaps["RdBu"](1/(len(X_list)+1)), zorder=2)
    for idx, X in enumerate(X_list[1:]):
        ax.scatter(X[:,0], X[:,1], s=20, marker="o", color=mpl.colormaps["RdBu"]((idx+1)/(len(X_list)+1)), zorder=2, label="X")
        if len(mark_points)>0:
            mark_X = X[mark_points]
            ax.scatter(mark_X[:,0], mark_X[:,1], s=20, marker="+", color=color, zorder=3)
        X_old = X
    #end for 

def compute_divergence_vector(Dist_X, Dist_Y, Dist_Z):
    assert(Dist_X.shape[0]==Dist_Y.shape[0])
    assert(Dist_X.shape[0]==Dist_Z.shape[0])
    idx_S = list(range(int(Dist_X.shape[0])))
    # Compute matching
    ibfm_out = [
        ibfm.get_IBloFunMatch_subset(Dist_X, Dist_Z, idx_S, output_dir, max_rad=-1, num_it=1, store_0_pm=True, points=False, max_dim=1),
        ibfm.get_IBloFunMatch_subset(Dist_Y, Dist_Z, idx_S, output_dir, max_rad=-1, num_it=1, store_0_pm=True, points=False, max_dim=1)
    ]
    # Compute induced matchings
    matching_XZ = ibfm_out[0]["induced_matching_0"]
    matching_YZ = ibfm_out[1]["induced_matching_0"]
    composition_XY = [matching_YZ.index(i) for i in matching_XZ]
    endpoints_0 = np.array(ibfm_out[0]["S_barcode_0"][:,1])
    endpoints_1 = np.array(ibfm_out[1]["S_barcode_0"][:,1])
    endpoints_1 = endpoints_1[composition_XY]
    return  endpoints_1-endpoints_0, ibfm_out[0]["X_barcode_0"][:,1]

def compute_divergence_Z_arrays(positions, velocities, steps_list, shift_step, weight, side):
    divergence_list = []
    Z_barcodes_list = []
    for start_step in steps_list:
        Dist_X, Dist_Y, Dist_Z = compute_distance_matrices_trajectories_2D_torus(positions, velocities, start_step, shift_step, weight, side)
        divergence, Z_barcode = compute_divergence_vector(Dist_X, Dist_Y, Dist_Z)
        divergence_list.append(divergence)
        Z_barcodes_list.append(Z_barcode)
    # want x-coordinate: steps, and y-coordinate: interval idx
    divergence_arr = np.array(divergence_list).transpose()
    Z_barcodes_arr = np.array(Z_barcodes_list).transpose()
    return divergence_arr, Z_barcodes_arr

def compute_cumulative_array(div_arr):
    cumulative_list = []
    for j, divergence in enumerate(div_arr.transpose()):
        cumulative = []
        for i in range(div_arr.shape[0]):
            cumulative.append(np.sum(divergence[:i+1]))
        cumulative_list.append(cumulative)
    return np.array(cumulative_list).transpose()

def plot_two_timesteps(X, Y, ax, X_col="blue", Y_col="red"):
    # Plot figure
    ax.scatter(X[:,0], X[:,1], s=20, marker="s", c=X_col, zorder=2, label="X")
    ax.scatter(Y[:,0], Y[:,1], s=23, marker="x", c=Y_col, zorder=2, label="Y")
    for edge in zip(X, Y):
        edge_pts = np.array(edge)
        ax.plot(edge_pts[:,0], edge_pts[:,1], c="gray", zorder=1)