In [1]:
import numpy as __np
from scipy.optimize import minimize as __minimize
from scipy import stats as __stats

In [2]:
import numpy as __np
from scipy.optimize import minimize as __minimize
from scipy import stats as __stats

def calculate_segment_distances(trajectory):
    """Calculate distances between consecutive points in the trajectory."""
    return __np.sqrt(__np.sum(__np.diff(trajectory, axis=0)**2, axis=1))

def calculate_total_distance(trajectory):
    """Calculate the total distance of the trajectory."""
    return __np.sum(calculate_segment_distances(trajectory))

In [3]:
def __EmpiricalCDF(bins, data):
    empiricalHistogram = __np.histogram(data, bins=bins)[0]
    empiricalCH = __np.cumsum(empiricalHistogram)
    if len(empiricalCH) == 0:
        return __np.array([1.0]) 
    return empiricalCH / empiricalCH[-1]

def __MLE(distLike, distParams, data):
    return __minimize(distLike, distParams, args=(data),
                      method="nelder-mead").x

In [4]:
def __JSD(data, empiricalDist, theorDist, returnParams=True):
    distParams = __MLE(theorDist["likelihood"], theorDist["params"], data)
    cdfBins = __np.linspace(empiricalDist["start"],
                            empiricalDist["stop"],
                            num=empiricalDist["bins"])
    cdf1 = __EmpiricalCDF(cdfBins, data)
    cdf2 = theorDist["cdf"](distParams, cdfBins[1:])
    mcdf = 0.5*(cdf1+cdf2)
    with __np.errstate(divide="ignore", invalid="ignore"):
        term1 = cdf1*__np.log(cdf1/mcdf)
        term2 = cdf2*__np.log(cdf2/mcdf)
        term1[cdf1==0] = 0
        term2[cdf2==0] = 0
    normalization = __np.sum(cdf1)+__np.sum(cdf2)
    jsd = __np.sqrt(__np.sum(term1+term2)/(normalization*__np.log(2)))
    if returnParams:
        return jsd, distParams
    return jsd

def JSD(data, empiricalDist, theorDist, bootstrap):
    data = __np.ravel(data)
    
    jsdEstimate, distParams = __JSD(data, empiricalDist, theorDist)
    
    jsdConfidence = None
    if bootstrap["iterations"] > 0:
        if bootstrap["blockSize"] <= 1:
            tmpJSD = []
            for rep in range(bootstrap["iterations"]):
                resample = __np.random.choice(data, size=len(data))
                tmpJSD.append(__JSD(resample, empiricalDist, theorDist, returnParams=False))
            jsdConfidence = __np.percentile(tmpJSD, bootstrap["percentiles"])
        else:
            origLen = len(data)
            data = __np.append(data, data[:bootstrap["blockSize"]-1])
            getBlocks = origLen // bootstrap["blockSize"] + 1
            tmpJSD = []
            for rep in range(bootstrap["iterations"]):
                selectedBlocks = __np.random.choice(range(origLen), size=getBlocks)
                resample = __np.concatenate([data[sb:sb+bootstrap["blockSize"]] for sb in selectedBlocks])
                resample = resample[:origLen]
                tmpJSD.append(__JSD(resample, empiricalDist, theorDist, returnParams=False))
            jsdConfidence = __np.percentile(tmpJSD, bootstrap["percentiles"])
    
    return {
        "parameterEstimates": distParams,
        "jsdEstimate": jsdEstimate,
        "jsdConfidence": jsdConfidence,
    }


In [5]:
def TrajectoryJSD(trajectory, empiricalDist_total, empiricalDist_segment, theorDist, bootstrap):
    total_distance = calculate_total_distance(trajectory)
    
    segment_distances = calculate_segment_distances(trajectory)
    
    jsd_td = JSD(__np.array([total_distance]), empiricalDist_total, theorDist, bootstrap)
    
    jsd_sd = JSD(segment_distances, empiricalDist_segment, theorDist, bootstrap)
    
    return {
        "JSD-TD": jsd_td,
        "JSD-SD": jsd_sd
    }
    
def calculate_average_jsd(trajectories, empiricalDist_total, empiricalDist_segment, theorDist, bootstrap):
    jsd_results = [TrajectoryJSD(traj, empiricalDist_total, empiricalDist_segment, theorDist, bootstrap) 
                   for traj in trajectories]
    
    avg_jsd_td = __np.mean([result["JSD-TD"]["jsdEstimate"] for result in jsd_results])
    avg_jsd_sd = __np.mean([result["JSD-SD"]["jsdEstimate"] for result in jsd_results])
    
    return {
        "Average JSD-TD": avg_jsd_td,
        "Average JSD-SD": avg_jsd_sd
    }  

def example_usage():
    try:
        trajectory = __np.load("data_1x.npy")
    except FileNotFoundError:
        print("Error: File 'generated_samples1.npy' not found. Please ensure the file exists in the correct location.")
        return


    total_dist = calculate_total_distance(trajectory)
    segment_dists = calculate_segment_distances(trajectory)
    
    empiricalDist_total = {
        "start": total_dist - 1e-10,  
        "stop": total_dist + 1e-10,  
        "bins": 3  
    }
    
    empiricalDist_segment = {
        "start": __np.min(segment_dists),
        "stop": __np.max(segment_dists),
        "bins": 100 
    }

    def normal_pdf(params, x):
        return __stats.norm.pdf(x, loc=params[0], scale=params[1])

    def normal_cdf(params, x):
        return __stats.norm.cdf(x, loc=params[0], scale=params[1])

    def normal_likelihood(params, data):
        return -__np.sum(__np.log(normal_pdf(params, data)))

    theorDist = {
        "pdf": normal_pdf,
        "cdf": normal_cdf,
        "likelihood": normal_likelihood,
        "params": [__np.mean(segment_dists), __np.std(segment_dists)] 
    }


    bootstrap = {
        "iterations": 100,  
        "percentiles": [2.5, 97.5],  
        "blockSize": 1
    }

    results = TrajectoryJSD(trajectory, empiricalDist_total, empiricalDist_segment, theorDist, bootstrap)
    ave_results = calculate_average_jsd([trajectory], empiricalDist_total, empiricalDist_segment, theorDist, bootstrap)

    print("JSD-TD (Total Distance):", results["JSD-TD"])
    print("JSD-SD (Segment Distance):", results["JSD-SD"])
    print("Average JSD-TD:", ave_results["Average JSD-TD"])
    print("Average JSD-SD:", ave_results["Average JSD-SD"])

    num_trajectories = 20
    all_results = []
    all_trajectories = []

    for i in range(num_trajectories):
        try:
            com_trajectory = __np.load("data_train_766.npy")
            all_trajectories.append(com_trajectory)
        except FileNotFoundError:
            print(f"Error: File 'data_train_766.npy' not found for trajectory {i+1}. Skipping this trajectory.")
            continue
        
        new_total_dist = calculate_total_distance(com_trajectory)
        new_segment_dists = calculate_segment_distances(com_trajectory)
        
        new_empiricalDist_total = {
            "start": new_total_dist - 1e-10,
            "stop": new_total_dist + 1e-10,
            "bins": 3
        }
        
        new_empiricalDist_segment = {
            "start": __np.min(new_segment_dists),
            "stop": __np.max(new_segment_dists),
            "bins": 100
        }
        
        new_results = TrajectoryJSD(com_trajectory, new_empiricalDist_total, new_empiricalDist_segment, theorDist, bootstrap)
        all_results.append(new_results)
        
        print(f"Trajectory {i+1} results:")
        print("JSD-TD (Total Distance):", new_results["JSD-TD"])
        print("JSD-SD (Segment Distance):", new_results["JSD-SD"])

    sampleave_results = calculate_average_jsd(all_trajectories, new_empiricalDist_total, new_empiricalDist_segment, theorDist, bootstrap)
    print("Average JSD-TD for all trajectories:", sampleave_results["Average JSD-TD"])
    print("Average JSD-SD for all trajectories:", sampleave_results["Average JSD-SD"])
    
    jsd_td_values = [result["JSD-TD"]["jsdEstimate"] for result in all_results]
    jsd_sd_values = [result["JSD-SD"]["jsdEstimate"] for result in all_results]
    
    print("Summary Statistics:")
    print("JSD-TD: Mean =", __np.mean(jsd_td_values), "Std =", __np.std(jsd_td_values))
    print("JSD-SD: Mean =", __np.mean(jsd_sd_values), "Std =", __np.std(jsd_sd_values))

if __name__ == "__main__":
    example_usage()

Error: File 'generated_samples1.npy' not found. Please ensure the file exists in the correct location.
