In [1]:
import numpy as np
from scipy.stats import linregress
from tqdm.notebook import tqdm, trange
import matplotlib.pyplot as plt
import json

In [2]:
def exist_cp(seq, confid, permut):
    """
    Determines whether a change point exists in the given sequence using a permutation test.

    Parameters:
    seq (list or np.array): The time series sequence.
    confid (float): Confidence level for statistical testing (e.g., 95 for 95% confidence).
    permut (int): Number of permutations to perform.

    Returns:
    bool: True if a change point is detected, False otherwise.
    """
    T = np.arange(len(seq))
    l0 = linregress(T, seq)
    slope0, intercept0 = l0.slope, l0.intercept

    # Compute residuals and cumulative sum
    d = np.array(seq) - slope0*T - intercept0
    cumsum = np.cumsum(d)
    D_data = cumsum.max() - cumsum.min()
    
    # Generate null distribution by permuting residuals
    D_permute = []
    for _ in range(permut): 
        np.random.shuffle(d)
        cusum_permute = np.cumsum(d)
        D_permute.append(np.max(cusum_permute) - np.min(cusum_permute))
    
    # Compute significance threshold and compare with observed statistic
    D_cutoff = np.percentile(D_permute, confid)
    if D_data >= D_cutoff:
        return True
    else:
        return False

def find_cp(seq, minSeg=3):
    """
    Finds a single change point in a sequence by minimizing the sum of squared errors.

    Parameters:
    seq (list or np.array): The time series sequence.
    minSeg (int): Minimum segment length on either side of the change point.

    Returns:
    int or None: The index of the detected change point or None if no valid change point is found.
    """
    if len(seq) < 2 * minSeg:  # sequence too short to segment 
        return None
    ls_se = []
    
    for i in range(minSeg, len(seq)-minSeg):
        seg1, seg2 = np.array(seq[:i]), np.array(seq[i:])
        T1, T2 = np.arange(i), np.arange(i, len(seq))

        # Fit linear regression to each segment
        l1, l2 = linregress(T1, seg1), linregress(T2, seg2)
        slope1, intercept1 = l1.slope, l1.intercept
        slope2, intercept2 = l2.slope, l2.intercept

        # Fit linear regression to each segment
        se = sum([(seg1[j]-slope1*T1[j]-intercept1)**2 for j in range(i)]) + \
            sum([(seg2[j-i]-slope2*T2[j-i]-intercept2)**2 for j in range(i, len(seq))])
        ls_se.append(se)
    
    if len(ls_se) > 0:
        cp = np.argmin(np.array(ls_se)) + minSeg  
        return cp


def find_cp_iterative(seq, minSeg=3,confid=95, permut=10000):
    """
    Iteratively finds multiple change points in a sequence using a binary segmentation approach.

    Parameters:
    seq (list or np.array): The time series sequence.
    minSeg (int): Minimum segment length on either side of a change point.
    confid (float): Confidence level for statistical testing.

    Returns:
    list: A sorted list of detected change points, including the end of the sequence.
    """
    # Initialize the stack to store the index range of the fragments to be processed, initially the entire sequence
    stack = [(0, len(seq))]
    change_points = []

    while stack:
        start, end = stack.pop()
        sub_seq = seq[start:end]
        # Check whether the current segment has a change point or is long enough
        if not exist_cp(sub_seq, confid=confid, permut=permut) or (end - start) <= minSeg:
            continue

        # Find the change point in the current segment
        cp = find_cp(sub_seq, minSeg=minSeg)
        if cp is not None:
            cp_global = start + cp  # Global location of change points

            # Add the change point to the result list
            change_points.append(cp_global)

            # Push the left and right sub-segments onto the stack
            stack.append((start, cp_global))  # left sub segments
            stack.append((cp_global, end))    # right sub segments

    # Returns a list of change points sorted in order
    change_points.append(len(seq)+1)
    return sorted(change_points)

def check_cp(seq, cps, confid, permut=10000):
    """
    Verifies the detected change points using a secondary statistical test.

    Parameters:
    seq (list or np.array): The time series sequence.
    cps (list): List of initially detected change points.
    confid (float): Confidence level for statistical testing.

    Returns:
    list: A refined list of verified change points.
    """
    if not cps:
        return []
    if len(cps) == 1:
        return cps
    verified_cps = []
    ec = False
    new_cp = 0
    for i in range(len(cps)-1):
        # Determine the segment boundaries
        if i == 0:  
            start, end = 0, cps[i + 1] # first cp
        else: 
            start, end = new_cp, cps[i + 1] # Subsequent segments
    
        sub_seq = seq[start:end]
        # Check if the change point is statistically significant
        ec = exist_cp(sub_seq, confid=confid, permut=permut)
        if ec:
            tmp_cp = find_cp(sub_seq)
            if tmp_cp:
                new_cp = find_cp(sub_seq) + start # Convert local index to global index
                verified_cps.append(new_cp)
            
    verified_cps.append(len(seq)+1)

    return verified_cps

In [20]:
res = []
confidence = 95.0
permutation = 1000

In [21]:
# load continuous alpha(t) predictions
with open('alphaMeansPred_startingkit.json') as file:
    alphaMeansPred = json.load(file)

In [22]:
for e in trange(len(alphaMeansPred)):
    for i in range(len(alphaMeansPred[e])):
        cps = find_cp_iterative(alphaMeansPred[e][i], minSeg=3, confid=confidence, permut=permutation)
        res.append(check_cp(alphaMeansPred[e][i], cps, confid=confidence, permut=permutation))

  0%|          | 0/1 [00:00<?, ?it/s]