In [1]:
import os
import sys
import numpy as np
sys.path.append('../')
import scipy.stats as stats
from utils import get_distance, haversineDistance
from placecell import PlaceNetwork, loadNetwork

In [2]:
PATH_DIR = 'paths'

In [3]:
def one_tailed_t_test(array1, array2, alternative='greater'):
    """
    Perform a one-tailed t-test on two arrays.

    Parameters:
    - array1, array2: The two arrays for which the t-test will be performed.
    - alternative: The direction of the test. It can be 'greater' or 'less'.
    
    Returns:
    - t_statistic: The t-statistic of the test.
    - p_value: The p-value of the test.
    """
    _, p_value = stats.ttest_ind(array1, array2)
    
    if alternative == 'greater':
        t_statistic = p_value / 2  # Divide p-value by 2 for a one-tailed test
    elif alternative == 'less':
        t_statistic = 1 - p_value / 2  # Subtract p-value divided by 2 from 1
    else:
        raise ValueError("Invalid alternative. Use 'greater' or 'less'.")

    result = {
        'statistic': t_statistic,
        'p_value': p_value,
    }
    
    return result

def wilcoxon_ranksum_test(data1, data2):
    statistic, p_value = stats.ranksums(data1, data2)

    result = {
        'statistic': statistic,
        'p_value': p_value,
    }
    return result

In [4]:
def stringToTuple(input_string):
    try:
        tuples_list = eval(input_string)
        
        if isinstance(tuples_list, list) and all(isinstance(t, tuple) and len(t) == 2 for t in tuples_list):
            return tuples_list
        else:
            raise ValueError("Invalid input format. Must be a list of tuples.")
    
    except Exception as e:
        raise ValueError("Error converting string to tuples: {}".format(str(e)))

def evalCost(input_string):
    try:
        # Using eval to parse the string into a Python expression
        list_of_lists = eval(input_string)
        
        # Check if the result is a list of lists
        if isinstance(list_of_lists, list) and all(isinstance(sublist, list) for sublist in list_of_lists):
            # Check if each element in the sublists is a float
            flattened_list = [elem for sublist in list_of_lists for elem in sublist]
            return list_of_lists
        else:
            raise ValueError("Invalid input format. Must be a list of lists.")
    
    except Exception as e:
        return None
        raise ValueError("Error converting string to list of lists: {}".format(str(e)))

In [5]:
dir_list = os.listdir(PATH_DIR)

paths = {'RRT' : {}, 'spike' : {}, 'astar' : {}}
start_ends = []

for file in dir_list:
    if os.path.isdir(os.path.join(PATH_DIR, file)):
        continue
    comps = file[0:-4].split("_")
    
    pathtype = comps[0]
    
    start = comps[1].split("-")
    start = (int(start[0]), int(start[1]))

    end = comps[2].split("-")
    end = (int(end[0]), int(end[1]))

    point = ((start, end))

    with open(os.path.join(PATH_DIR, file)) as f:
        path = stringToTuple(f.readline())
        costs = evalCost(f.readline())
        if costs != None:
            paths[pathtype][point] = {'trajectory' : None, 'cost' : None}
            paths[pathtype][point]['trajectory'] = path
            paths[pathtype][point]['cost'] = costs

    if point not in start_ends and costs != None:
        start_ends.append(point)

In [6]:
def calculateCost(data):
    current_total = 0
    current_time = 0

    obstacle_total = 0
    obstacle_time = 0

    slope_total = 0
    slope_time = 0

    time_total = 0

    for wp in data:
        current_total += wp[1]
        current_time += wp[2]
        obstacle_total += wp[3]
        obstacle_time += wp[4]
        slope_total += wp[5]
        slope_time += wp[6]

        time_total += wp[0] 

    return [current_total, current_time, obstacle_total, obstacle_time, slope_total, slope_time, time_total]

In [7]:
rrts = []
swps = []
astars = []

rrt_plens = []
swp_plens = []
astar_plens = []

for pts in start_ends:
    rrt_costs = paths['RRT'][pts]['cost']
    swp_costs = paths['spike'][pts]['cost']
    astar_costs = paths['astar'][pts]['cost']

    rrts.append(calculateCost(rrt_costs))
    swps.append(calculateCost(swp_costs))
    astars.append(calculateCost(astar_costs))

    rrt_plens.append(len(paths['RRT'][pts]['trajectory']))
    swp_plens.append(len(paths['spike'][pts]['trajectory']))
    astar_plens.append(len(paths['astar'][pts]['trajectory']))

if len(rrts) != len(swps):
    print("Number of data not the same")

In [8]:
n = PlaceNetwork()
n.initAldritch(numcosts=6)
n.initConnections()

In [9]:
rrt_dists = []
swp_dists = []
astar_dists = []

for pts in start_ends:
    rrt = paths['RRT'][pts]['trajectory']
    swp = paths['spike'][pts]['trajectory']
    astar = paths['astar'][pts]['trajectory']

    rrt_d = 0
    swp_d = 0
    astar_d = 0

    for i in range(len(rrt) - 1):
        d1 = n.cells[n.points[rrt[i]]].origin
        d2 = n.cells[n.points[rrt[i+1]]].origin

        rrt_d += haversineDistance(d1[0], d1[1], d2[0], d2[1])

    rrt_dists.append(rrt_d)

    for i in range(len(swp) - 1):
        d1 = n.cells[n.points[swp[i]]].origin
        d2 = n.cells[n.points[swp[i+1]]].origin
    
        swp_d += haversineDistance(d1[0], d1[1], d2[0], d2[1])

    swp_dists.append(swp_d)

    for i in range(len(astar) - 1):
        d1 = n.cells[n.points[astar[i]]].origin
        d2 = n.cells[n.points[astar[i+1]]].origin
    
        astar_d += haversineDistance(d1[0], d1[1], d2[0], d2[1])

    astar_dists.append(astar_d)
        

In [10]:
distance_rrt = one_tailed_t_test(swp_dists, rrt_dists, alternative='less')
print("Spiking wavefront versus rrt")
print(f"Mean swp distance: {np.mean(swp_dists)}")
print(f"Mean rrt distance: {np.mean(rrt_dists)}")
print(f"p-value: {distance_rrt['p_value']}")

Spiking wavefront versus rrt
Mean swp distance: 46.60400973848205
Mean rrt distance: 60.24742640545722
p-value: 0.01125063503687932


In [12]:
distance_astar = one_tailed_t_test(swp_dists, astar_dists, alternative='less')
print("Spiking wavefront versus astar")
print(f"Mean swp distance: {np.mean(swp_dists)}")
print(f"Mean astar distance: {np.mean(astar_dists)}")
print(f"p-value: {distance_astar['p_value']}")

Spiking wavefront versus astar
Mean swp distance: 46.60400973848205
Mean astar distance: 46.981456232882294
p-value: 0.9141730835079851


In [13]:
swp_curr = [swps[i][0] / swps[i][1] * (swps[i][6]) for i in range(len(swps))]
rrt_curr = [rrts[i][0] / rrts[i][1] * (rrts[i][6]) for i in range(len(rrts))]

swp_obs = [swps[i][2] / swps[i][3] * 100 for i in range(len(swps))]
rrt_obs = [rrts[i][2] / rrts[i][3] * 100 for i in range(len(rrts))]

swp_slope = [np.rad2deg(swps[i][4] / swps[i][5]) for i in range(len(swps))]
rrt_slope = [np.rad2deg(rrts[i][4] / rrts[i][5]) for i in range(len(rrts))]

curr_result_rrt = one_tailed_t_test(swp_curr, rrt_curr, alternative='less')
print(f"Mean swp current: {np.mean(swp_curr)}")
print(f"Mean rrt current: {np.mean(rrt_curr)}")
print(f"p-value: {curr_result_rrt['p_value']}")

print("___________________________________________")

obs_result_rrt = one_tailed_t_test(swp_obs, rrt_obs, alternative='less')
print(f"Mean swp obstacle: {np.mean(swp_obs)}")
print(f"Mean rrt obstacle: {np.mean(rrt_obs)}")
print(f"p-value: {obs_result_rrt['p_value']}")

print("___________________________________________")

slope_result_rrt = one_tailed_t_test(swp_slope, rrt_slope, alternative='less')
print(f"Mean swp slope: {np.mean(swp_slope)}")
print(f"Mean rrt slope: {np.mean(rrt_slope)}")
print(f"p-value: {slope_result_rrt['p_value']}")

Mean swp current: 415.8916010531817
Mean rrt current: 468.7360005084618
p-value: 0.33805456112219867
___________________________________________
Mean swp obstacle: 2.0644825549312027
Mean rrt obstacle: 4.446772594980196
p-value: 0.00683539044199596
___________________________________________
Mean swp slope: 7.570660604121381
Mean rrt slope: 7.522386438607012
p-value: 0.9236900389913347


In [14]:
swp_curr = [swps[i][0] / swps[i][1] * (swps[i][6]) for i in range(len(swps))]
astar_curr = [astars[i][0] / astars[i][1] * (astars[i][6]) for i in range(len(astars))]

swp_obs = [swps[i][2] / swps[i][3] * 100 for i in range(len(swps))]
astar_obs = [astars[i][2] / astars[i][3] * 100 for i in range(len(astars))]

swp_slope = [np.rad2deg(swps[i][4] / swps[i][5]) for i in range(len(swps))]
astar_slope = [np.rad2deg(astars[i][4] / astars[i][5]) for i in range(len(astars))]

curr_result_astar = one_tailed_t_test(swp_curr, astar_curr, alternative='less')
print(f"Mean swp total current: {np.mean(swp_curr)}")
print(f"Mean astar total current: {np.mean(astar_curr)}")
print(f"p-value: {curr_result_astar['p_value']}")

print("___________________________________________")

obs_result_astar = one_tailed_t_test(swp_obs, astar_obs, alternative='less')
print(f"Mean swp total obstacle: {np.mean(swp_obs)}")
print(f"Mean astar total obstacle: {np.mean(astar_obs)}")
print(f"p-value: {obs_result_astar['p_value']}")

print("___________________________________________")

slope_result_astar = one_tailed_t_test(swp_slope, astar_slope, alternative='less')
print(f"Mean swp total slope: {np.mean(swp_slope)}")
print(f"Mean astar total slope: {np.mean(astar_slope)}")
print(f"p-value: {slope_result_astar['p_value']}")

Mean swp total current: 415.8916010531817
Mean astar total current: 382.82136479279393
p-value: 0.42934023512360453
___________________________________________
Mean swp total obstacle: 2.0644825549312027
Mean astar total obstacle: 3.9517479700044476
p-value: 0.029877626817600384
___________________________________________
Mean swp total slope: 7.570660604121381
Mean astar total slope: 7.723805713953298
p-value: 0.7919961492454977


In [26]:
from statsmodels.stats.multitest import multipletests
all_curr = [i['p_value'] for i in [curr_result_astar, curr_result_rrt]]
all_obs = [i['p_value'] for i in [obs_result_astar, obs_result_rrt]]
all_slope = [i['p_value'] for i in [slope_result_astar, slope_result_rrt]]
all_distance = [i['p_value'] for i in [distance_astar, distance_rrt]]

curr_corr = multipletests(all_curr, alpha=0.05, method='bonferroni')
print("CURRENT (atar, rrt)")
print(curr_corr)
print("___________________________________________")
obs_corr = multipletests(all_obs, alpha=0.05, method='bonferroni')
print("OBSTACLE (atar, rrt)")
print(obs_corr)
print("___________________________________________")
slope_corr = multipletests(all_slope, alpha=0.05, method='bonferroni')
print("SLOPE (atar, rrt)")
print(slope_corr)
print("___________________________________________")
distance_corr = multipletests(all_distance, alpha=0.05, method='bonferroni')
print("DISTANCE (atar, rrt)")
print(distance_corr)
print("___________________________________________")

CURRENT (atar, rrt)
(array([False, False]), array([0.85868047, 0.67610912]), 0.025320565519103666, 0.025)
___________________________________________
OBSTACLE (atar, rrt)
(array([False,  True]), array([0.05975525, 0.01367078]), 0.025320565519103666, 0.025)
___________________________________________
SLOPE (atar, rrt)
(array([False, False]), array([1., 1.]), 0.025320565519103666, 0.025)
___________________________________________
DISTANCE (atar, rrt)
(array([False,  True]), array([1.        , 0.02250127]), 0.025320565519103666, 0.025)
___________________________________________


In [None]:
same = 0
same_distance = 0
for i in start_ends:
    asp = paths['astar'][i]['trajectory']
    swp = paths['spike'][i]['trajectory']
    if asp == swp:
        same += 1
    if len(asp) == len(swp):
        same_distance += 1

print(f"Number of identical paths: {same} of {len(start_ends)}. {same/len(start_ends) * 100:0.2f}%")
print(f"Number of paths with identical distance: {same_distance} of {len(start_ends)}. {same_distance/len(start_ends) * 100:0.2f}%")