In [1]:
from generate_data import generate_data
import numpy as np
from tqdm import trange
from collections import Counter
from ripser import ripser

In [2]:
def center_point(point: np.ndarray, bin_size: float) -> tuple[float]:
    # returns the center of the bin
    # bins are evenly spaced from 0 to 1, so bin_size is the fraction of the parameter space
    # for example, with bin_size = 0.01, 0.4783 centers to 0.475 since it is between 0.47 and 0.48
    # bin size should be a unit fraction
    return tuple((param//bin_size+0.5)*bin_size for param in point)

def bin_points(data: np.ndarray, bin_size: float) -> Counter[tuple[float]]:
    # Counts the number of points in each bin
    centered_points = map(lambda point: center_point(point, bin_size), data)
    return Counter(centered_points)

def filter_by_threshold(counter: Counter, threshold: int) -> np.ndarray:
    return np.array([point for point, count in counter.items() if count >= threshold])

def bin_data(data: np.ndarray, bin_size: float = 0.01, threshold: int = 1, verbose: bool = True) -> np.ndarray:
    n_TIs = data.shape[0]
    n = data.shape[1]

    binned_data = np.zeros((n_TIs, n), dtype = object) # object since each sample will have a different number of points after binning, so a Counter is used

    range_func = trange if verbose else range
    for i in range_func(n_TIs):
        for j in range(n):
            binned_counter = bin_points(data[i, j], bin_size=bin_size)
            binned_data[i, j] = filter_by_threshold(binned_counter, threshold=threshold)

    return binned_data


In [3]:
def get_separation(sample: np.ndarray) -> float:

    if len(sample) < 2: # cannot calculate if only one point left after binning
        return np.nan
    
    # ["dgms"][0][-2][1] gets the persistence data from Ripser, selects H_0,
    # the penultimate bar, and the endpoint
    return ripser(sample, maxdim = 0)["dgms"][0][-2][1]


def calculate_cluster_separation(data: np.ndarray, verbose: bool = True) -> np.ndarray:
    n_TIs = data.shape[0]
    n = data.shape[1]

    cluster_separations = np.zeros(n_TIs, dtype = object) # some separations may be nan (and thus excluded), so size varies by TI

    range_func = trange if verbose else range
    for i in range_func(n_TIs):
        cluster_separations[i] = []

        for j in range(n):
            if not np.isnan(cluster_separation := get_separation(data[i, j])): # only include if separation isn't nan
                cluster_separations[i].append(cluster_separation)

        cluster_separations[i] = np.array(cluster_separations[i])

    return cluster_separations

In [4]:
TIs = range(208, 1000, 16)
n = 1000
m = 10000

In [5]:
data = generate_data(TIs, n, m, flip = True, normalize_data = True)

  model = d1*np.exp(-TE/T21)+d2*np.exp(-TE/T22)
  residual = (data-model)**2
100%|██████████| 50/50 [3:22:57<00:00, 243.56s/it]  


In [6]:
binned_data = bin_data(data)

100%|██████████| 50/50 [16:44<00:00, 20.08s/it]


In [7]:
cluster_separations = calculate_cluster_separation(binned_data)

100%|██████████| 50/50 [24:43:08<00:00, 1779.78s/it]   
