In [None]:
import numpy as np
import matplotlib.pyplot as plt
import scipy.stats as stats
import concurrent.futures
import pandas as pd
import itertools

# Exercises 1 & 2: Simulate different scenarios (Gaussian & Student)

In [None]:
def simulate_amoc_data(n, tau, mu1, mu2, sigma1=1.0, sigma2=1.0, 
                  distribution='gaussian', df=None, random_seed=None):
    """
    Generate a sequence with a changepoint at 'tau'.
    """
    if random_seed is not None:
        np.random.seed(random_seed)
    if tau < 1 or tau >= n:
        raise ValueError("tau must be in {1, 2, ..., n-1}.")
    
    # Construct true mean vector
    mu = np.concatenate([np.repeat(mu1, tau), np.repeat(mu2, n - tau)])
    
    # Generate noise based on specified distribution
    if distribution == 'gaussian':
        noise = np.concatenate([
            np.random.normal(loc=0, scale=sigma1, size=tau), 
            np.random.normal(loc=0, scale=sigma2, size=n-tau)
        ])
    elif distribution == 't':
        if df is None:
            raise ValueError("df must be provided for Student-t distribution.")
        noise = np.concatenate([
            stats.t.rvs(df, loc=0, scale=sigma1, size=tau), 
            stats.t.rvs(df, loc=0, scale=sigma2, size=n-tau)
        ])
    else:
        raise ValueError("Unknown distribution type.")
    
    data = mu + noise
    return mu, data

In [None]:
plt.figure(figsize=(8, 12))

# Case 1
plt.subplot(3, 1, 1)        
plt.step(x, mu_case1, label="Mean vector",    color="blue", linewidth=1)
plt.plot(x, y_case1,  label="Simulated data", color="grey", alpha=0.7)
plt.axvline(x=tau, color="red", linestyle="--", label=f"Changepoint (τ={tau})")
plt.title("A. Change in Mean Only")
plt.ylabel("y")
plt.legend()

# Case 2
plt.subplot(3, 1, 2)        
plt.step(x, mu_case2, color="blue", linewidth=1)
plt.plot(x, y_case2, color="grey", alpha=0.7)
plt.axvline(x=tau, color="red", linestyle="--")
plt.title("B. Change in Variance Only")
plt.ylabel("y")

# Case 3
plt.subplot(3, 1, 3)        
plt.step(x, mu_case3, color="blue", linewidth=1)
plt.plot(x, y_case3, color="grey", alpha=0.7)
plt.axvline(x=tau, color="red", linestyle="--")
plt.title("C. Change in Mean and Variance")
plt.ylabel("y")
plt.xlabel("t")

plt.tight_layout()
plt.show()

# Exercise 3: Bonferroni Testing (Gaussian)

In [None]:
def amoc_t_test_bonferroni(y, alpha):
    """
    Apply a two-sample t-test at every possible split and return the changepoint 
    if the minimum p-value (after Bonferroni correction) is significant.
    """
    n = len(y)
    # Compute a t-test for each possible split
    tests  = [stats.ttest_ind(y[:i], y[i:]) for i in range(1, n)]
    pvals  = np.array([t.pvalue for t in tests])
    argmin = np.argmin(pvals)
    if pvals[argmin] < alpha / (n - 1):
        return argmin + 1  # changepoint index (1-indexed)
    else:
        return np.nan

def one_simu_H0_bonf(param):
    """
    Simulate under H0 (no change) and return the false positive rate for Bonferroni.
    """
    n, rep, alpha, distribution = param['n'], param['rep'], param['alpha'], param['distribution']
    df      = param.get('df', None)
    tau_hats = np.array([])
    for _ in range(rep):
        y        = simulate_amoc_data(n, 1, 0, 0, 1, 1, distribution, df)[1]
        tau_hats = np.append(tau_hats, amoc_t_test_bonferroni(y, alpha))     
    return np.mean(~np.isnan(tau_hats))

def one_simu_H1_bonf(param):
    """
    Simulate under H1 (change in mean) and return the power for Bonferroni.
    The mean changes from 0 to delta at tau.
    """
    n, tau, rep, alpha, delta, distribution = param['n'], param['tau'], param['rep'], param['alpha'], param['delta'], param['distribution']
    df = param.get('df', None)
    tau_hats = np.array([])
    for _ in range(rep):
        y       = simulate_amoc_data(n, tau, 0, delta, 1, 1, distribution, df)[1]
        tau_hats = np.append(tau_hats, amoc_t_test_bonferroni(y, alpha)) 
    return np.mean(~np.isnan(tau_hats))

In [None]:
nb_thr = 10  # number of parallel workers
rep    = 100
n      = 2**10
alpha  = 0.05

################################################################################
# Simulation under H0

param_H0_bonf = {'n': n, 'rep': rep, 'alpha': alpha, 'distribution': "gaussian"}
fpr_bonf      = one_simu_H0_bonf(param_H0_bonf)
print("Bonferroni false positive rate (H0):", fpr_bonf)

################################################################################
# Simulation under H1

tau_list = [2**i for i in range(1, 10)]
# For Gaussian, delta is set to np.sqrt(70/n) for effect size
delta    = np.sqrt(70 / n)

params_bonf = pd.DataFrame(
    itertools.product([n], [alpha], [rep], tau_list, [delta], ['gaussian']),
    columns = ['n', 'alpha', 'rep', 'tau', 'delta', 'distribution']
)
# For Bonferroni H1, use the same test as in one_simu_H1_bonf
with concurrent.futures.ProcessPoolExecutor(max_workers=nb_thr) as executor:
    params_bonf['TPR'] = np.array(list(executor.map(
        one_simu_H1_bonf,
        params_bonf.to_dict(orient='records')
    )))
print(params_bonf)

In [None]:
# Plot comparison: Bonferroni vs Monte Carlo for Gaussian

plt.figure(figsize=(8, 6))
plt.plot(params_bonf['tau'], params_bonf['TPR'], marker='o', linestyle="-", color="red", label="Bonferroni")
plt.plot(params_MC['tau'], params_MC['TPR'], marker='o', linestyle="-", color="blue", label="Monte Carlo")
plt.xlabel('τ (Change-Point Position)')
plt.ylabel('True Positive Rate (TPR)')
plt.title("Comparison of TPR vs. τ (Gaussian)")
plt.xscale('log', base=2)
plt.grid(True)
plt.legend()
plt.show()

# Exercise 5: Extend to Student Data

In [None]:
x = np.arange(1, 101)
# Case 1: Change in Mean Only (variance constant)
mu_case1, y_case1 = simulate_amoc_data(100, 50, 0.0, 3.0, 1, 1, 't', df = 3, random_seed=2)
# Case 2: Change in Variance Only (mean constant)
mu_case2, y_case2 = simulate_amoc_data(100, 50, 0.0, 0.0, 1, 2, 't', df = 3, random_seed=2)
# Case 3: Change in Both Mean and Variance
mu_case3, y_case3 = simulate_amoc_data(100, 50, 0.0, 3.0, 1, 2, 't', df = 3, random_seed=2)

################################################################################
# Plot all cases

plt.figure(figsize=(8, 12))

# Case 1
plt.subplot(3, 1, 1)        
plt.step(x, mu_case1, label="Mean vector",    color="blue", linewidth=1)
plt.plot(x, y_case1,  label="Simulated data", color="grey", alpha=0.7)
plt.axvline(x=tau, color="red", linestyle="--", label=f"Changepoint (τ={tau})")
plt.title("A. Change in Mean Only (Student-t (df=3))")
plt.ylabel("y")
plt.legend()

# Case 2
plt.subplot(3, 1, 2)        
plt.step(x, mu_case2, color="blue", linewidth=1)
plt.plot(x, y_case2, color="grey", alpha=0.7)
plt.axvline(x=tau, color="red", linestyle="--")
plt.title("B. Change in Variance Only (Student-t (df=3))")
plt.ylabel("y")

# Case 3
plt.subplot(3, 1, 3)        
plt.step(x, mu_case3, color="blue", linewidth=1)
plt.plot(x, y_case3, color="grey", alpha=0.7)
plt.axvline(x=tau, color="red", linestyle="--")
plt.title("C. Change in Mean and Variance (Student-t (df=3))")
plt.ylabel("y")
plt.xlabel("t")

plt.tight_layout()
plt.show()

In [None]:
################################################################################
# Bonferroni Testing

# degrees of freedom Student's t-distribution

df_list = [3, 5, 10]

################################################################################
# Simulation under H0

params_H0_bonf_t = pd.DataFrame(
    itertools.product([n], [alpha], [rep], df_list, ['t']),
    columns = ['n', 'alpha', 'rep', 'df', 'distribution']
)

with concurrent.futures.ProcessPoolExecutor(max_workers=nb_thr) as executor:
    params_H0_bonf_t['FPR'] = np.array(list(executor.map(
        one_simu_H0_bonf,
        params_H0_bonf_t.to_dict(orient='records')
    )))
print(params_H0_bonf_t)

################################################################################
# Simulation under H1

params_bonf_t = pd.DataFrame(
    itertools.product([n], [alpha], [rep], tau_list, [delta], df_list, ['t']),
    columns = ['n', 'alpha', 'rep', 'tau', 'delta', 'df', 'distribution']
)

with concurrent.futures.ProcessPoolExecutor(max_workers=nb_thr) as executor:
    params_bonf_t['TPR'] = np.array(list(executor.map(
        one_simu_H1_bonf,
        params_bonf_t.to_dict(orient='records')
    )))
print(params_bonf_t)

In [None]:
################################################################################
# Monte Carlo beta calibration

params_H0_MC_t = pd.DataFrame(
    itertools.product([n], [rep], df_list, ['t'], [alpha]),
    columns = ['n', 'rep', 'df', 'distribution', 'alpha']
)

with concurrent.futures.ProcessPoolExecutor(max_workers=nb_thr) as executor:
    params_H0_MC_t['beta'] = np.array(list(executor.map(
        beta_MC,
        params_H0_MC_t.to_dict(orient='records')
    )))

print("Monte Carlo calibrated beta (Student-t):")
print(params_H0_MC_t)

################################################################################
# Simulation under H1

params_MC_t = pd.DataFrame(
    itertools.product([n], [rep], tau_list, [delta], df_list, ['t']),
    columns = ['n', 'rep', 'tau', 'delta', 'df', 'distribution']
)

# Insert MC calibrated beta into the MC parameter DataFrame for each row
params_MC_t = pd.merge(params_MC_t, params_H0_MC_t)
with concurrent.futures.ProcessPoolExecutor(max_workers=nb_thr) as executor:
    params_MC_t['TPR'] = np.array(list(executor.map(
        one_simu_H1_MC,
        params_MC_t.to_dict(orient='records')
    )))
print(params_MC_t)

In [None]:
# Plot comparison: Bonferroni vs Monte Carlo for Gaussian

linestyles = {3: '-', 5: ':', 10: '--'}
colors = {'Bonferroni': 'red', 'Monte Carlo': 'blue'}
    
plt.figure(figsize=(8, 6))
for method, df in zip(['Bonferroni', 'Monte Carlo'], [params_bonf_t, params_MC_t]):
    for d in sorted(df['df'].unique()):
        subset = df[df['df'] == d]
        style  = linestyles[d]
        plt.plot(subset['tau'], subset['TPR'], marker='o', linestyle=style,
                         color=colors[method], label=f'{method} (df={d})')
plt.xlabel('τ (Change-Point Position)')
plt.ylabel('True Positive Rate (TPR)')
plt.title("Comparison of TPR vs. τ (Gaussian)")
plt.xscale('log', base=2)
plt.grid(True)
plt.legend()
plt.show()

# Exercise 9 : Implementation of the CUSUM algorithm 

In [None]:
def cusum(y, sigma, beta):
    
    # 1. Determine the total number of observations
    n = len(y)
    if n < 2:
        # Edge case: not enough data to detect a changepoint
        return np.nan, 0.0, np.array([])
    
    # 2. Compute the means for the left segments of the series
    mean_left  = (np.cumsum(y) / np.arange(1, n + 1))[:-1]
    
    # 3. Compute the means for the right segments of the series
    mean_right = (np.cumsum(y[::-1]) / np.arange(1, n + 1))[:-1][::-1]
    
    # 4. Compute the scaling factor for each possible changepoint
    scaling_factor = np.arange(1, n) * np.arange(n-1, 0, -1) / n

    # 5. Compute the scaled squared differences between left and right means
    C2_seq = scaling_factor * (mean_left - mean_right)**2

    # 6. Save maximum statistic
    tau_hat = np.argmax(C2_seq)
    C2_max  = C2_seq[tau_hat] 

    if C2_max/sigma**2 > beta:
        return tau_hat+1, C2_max, C2_seq
    else:
        return np.nan, C2_max, C2_seq

In [None]:
################################################################################
# Example on a synthetic dataset

n         = 100
sigma     = 1
tau       = 20
mu1, mu2  = 0, 3
mu_vec, y = simulate_amoc_data(n, tau, 0.0, 3.0, sigma, sigma, 't', df=2, random_seed=2)
beta      = 2 * np.log(n)  # Detection threshold
tau_hat, C2_max, C2_seq = cusum(y, sigma, beta)
print("Estimated changepoint position:", tau_hat)

################################################################################
# Plot observed data and true mean function

x = np.arange(1,n+1)
plt.figure(figsize=(8, 10))
plt.subplot(2, 1, 1)
plt.step(x, mu_vec, label="Mean vector (mu)", color="blue", linewidth=1)
plt.plot(x, y, label="Simulated data", color="grey", alpha=0.7)
if tau_hat is not None:
    plt.axvline(x=tau_hat, color="red", linestyle="--", label=f"tau_hat={tau_hat}")
plt.title("i.i.d. Student-t (df=2) Data with a Single Changepoint")
plt.xlabel("t")
plt.ylabel("y")
plt.legend()

################################################################################
# Plot the CUSUM statistic sequence

plt.subplot(2, 1, 2)
plt.plot(range(1, n), C2_seq, label=r"$C_\tau^2$", color="grey")
if tau_hat is not None:
    plt.axvline(x=tau_hat, color="red", linestyle="--", label=f"tau_hat={tau_hat}")
plt.title("CUSUM Statistic Sequence")
plt.xlabel("t")
plt.ylabel(r"$C_\tau^2$")
plt.legend()

plt.tight_layout()
plt.show()