In [1]:
import numpy as np
from sklearn.utils import resample
import statsmodels.api as sm


def generate_data(n, break_point, beta, delta):
    index = np.arange(n).reshape(-1, 1)  # Creating an index column
    c2 = np.random.normal(size=n)  # Making c2 a column vector
    c2_with_index = np.column_stack((index, c2))  # Including index as a column
    e = np.random.normal(size=n)
    c1 = beta * c2 + e
    c1[break_point:] += delta * c2[break_point:]
    return c1, c2_with_index


def compute_f_stat_and_delta(c1, c2_with_index, theta):
    index = c2_with_index[:, 0]
    c2 = c2_with_index[:, 1]
    X_unrestricted = sm.add_constant(np.column_stack((c2, (index >= theta) * c2)))
    X_restricted = sm.add_constant(c2[:, np.newaxis])

    # Solve for unrestricted coefficients
    unrestricted_model = np.linalg.lstsq(X_unrestricted, c1, rcond=None)
    unrestricted_coeffs = unrestricted_model[0]
    ssr_unrestricted = ((c1 - X_unrestricted @ unrestricted_coeffs) ** 2).sum()

    restricted_model = np.linalg.lstsq(X_restricted, c1, rcond=None)
    restricted_coeffs = restricted_model[0]
    ssr_restricted = ((c1 - X_restricted @ restricted_coeffs) ** 2).sum()

    delta = unrestricted_coeffs[2]
    k = 2  # number of unrestricted coefficients
    f_stat = ((ssr_restricted - ssr_unrestricted) / (k-1)) / (ssr_unrestricted / (len(c2) - k))
    return f_stat, delta


def bootstrap_covariance(c1, c2_with_index, theta_tilde, theta_i, n_bootstraps=1000):
    results = []

    for _ in range(n_bootstraps):
        c1_b, c2_b_with_index = resample(c1, c2_with_index)

        f_stat_tilde, delta_tilde = compute_f_stat_and_delta(c1_b, c2_b_with_index, theta_tilde)
        f_stat_i, delta_i = compute_f_stat_and_delta(c1_b, c2_b_with_index, theta_i)

        results.append((f_stat_i, delta_i, f_stat_tilde, delta_tilde))

    return np.cov(list(zip(*results)))


def compute_matrices(cov_matrix, f_stat_tilde, delta_tilde,print_stuff=False):
    ΣYX_true, ΣY_tilde = cov_matrix[1, 0], cov_matrix[3, 3]
    ΣXY_tilde = cov_matrix[2, 3]
    ΣYX = cov_matrix[0, 3]# really need the covariance between x and y_tilde
    # y and x_tilde would be cov_matrix[1,2]
    ΣX, ΣX_tilde = cov_matrix[0, 0], cov_matrix[2, 2]

    # Projection calculations using the provided formulation
    Z = f_stat_tilde - (ΣYX / ΣY_tilde) * delta_tilde
    Z_tilde = f_stat_tilde - (ΣXY_tilde / ΣY_tilde) * delta_tilde
    A = ΣY_tilde**(-2) * (ΣXY_tilde**2 - ΣYX**2)
    B = 2 * ΣY_tilde**(-1) * (ΣXY_tilde * Z_tilde - ΣYX * Z)
    C = Z_tilde**2 - Z**2

    D = B ** 2 - 4 * A * C
    H = -C / B if B != 0 else np.nan
    G = (-B - np.sqrt(D)) / (2 * A) if D >= 0 and A!=0 else np.nan
    K = (-B + np.sqrt(D)) / (2 * A) if D >= 0 and A!=0 else np.nan
    if print_stuff:
        print('------------------')
        print(cov_matrix)
        print(ΣYX,'//',cov_matrix[0, 3],  cov_matrix[3, 0],'//',cov_matrix[1,2],cov_matrix[1, 2],'//',ΣYX_true,cov_matrix[0, 1])
        
    return A, B, C, D, H, G, K



def compute_confidence_intervals(theta_tilde, candidate_breaks, c1, c2,a_cutoff=1,print_stuff=False):
    # Cache the intensive computations for reuse in other functions
    cached_results = {}
    f_stat_tilde, delta_tilde = compute_f_stat_and_delta(c1, c2, theta_tilde)
    
    # Perform all necessary computations and cache results
    for theta in candidate_breaks:
        cov_matrix = bootstrap_covariance(c1, c2, theta_tilde, theta)
        if theta==theta_tilde:
            bootstrap_covariance(c1, c2, theta_tilde, theta)
        cached_results[theta] = (cov_matrix, f_stat_tilde, delta_tilde)
    
    sigma = cached_results[theta][0][3,3] #save self covariance... for the confidence intervals later...
    
    # Pre-calculate fixed intervals for each side
    l1Z = calc_l1Z(candidate_breaks, cached_results,a_cutoff=a_cutoff)
    u2Z = calc_u2Z(candidate_breaks, cached_results,a_cutoff=a_cutoff)
    if print_stuff:
        print('lz1 u2z',l1Z,u2Z)
        print('----------')
    
    
    left_intervals = {theta: [l1Z, np.inf] for theta in candidate_breaks}
    right_intervals = {theta: [-np.inf, u2Z] for theta in candidate_breaks}
    # Loop through candidate breaks to compute intervals
    #print(left_intervals)
    for theta in candidate_breaks:
        cov_matrix, f_stat_tilde, delta_tilde = cached_results[theta]
        A, B, C, D, H, G, K = compute_matrices(cov_matrix, f_stat_tilde, delta_tilde,print_stuff=print_stuff)
            
        left_intervals[theta][1] = min(u2Z,G) #l2Z 
        right_intervals[theta][0] = max(l1Z,K) #u1Z
        
        #only consider inervals where A>0 and D>0...
        if A < a_cutoff or D < 0: #real condition im checking is A<0... but, even slightly bigger than 0 is a problem...
            left_intervals[theta] = np.array([-np.inf,np.inf])
            right_intervals[theta] = np.array([-np.inf,np.inf])
            
        if print_stuff:
            print(theta,'//',A,B,C,D,"//HGK",H,G,K)
            print(left_intervals[theta])
            print('------------------')
    #among the 40 by 2 array, find the interesection... e.g.,
    left_interval_array = np.array([left_intervals[theta] for theta in candidate_breaks])
    right_interval_array = np.array([right_intervals[theta] for theta in candidate_breaks])
    #print(right_interval_array)
    left_final = [left_interval_array[:,0].max(),left_interval_array[:,1].min()]
    right_final = [right_interval_array[:,0].max(),right_interval_array[:,1].min()]

    return np.array(left_final), np.array(right_final),delta_tilde,sigma


def calc_l1Z(candidate_breaks, cached_results,a_cutoff=1):
    max_G = -np.inf
    max_H = -np.inf
    
    for theta in candidate_breaks:
        cov_matrix, f_stat_tilde, delta_tilde = cached_results[theta]
        A, B, C, D, H, G, K = compute_matrices(cov_matrix, f_stat_tilde, delta_tilde)
        if A < -a_cutoff and D >= 0: #should really be A<0...
            max_G = max(max_G, G)
        if np.abs(A) == 0 and B > 0:  #technically should be a==0
            max_H = max(max_H, H)

    return max(max_G,max_H)


def calc_u2Z(candidate_breaks, cached_results,a_cutoff=1):
    min_K = np.inf
    min_H = np.inf
    
    for theta in candidate_breaks:
        cov_matrix, f_stat_tilde, delta_tilde = cached_results[theta]
        A, B, C, D, H, G, K = compute_matrices(cov_matrix, f_stat_tilde, delta_tilde)
        if A < -a_cutoff and D >= 0: #should really be A<0...
            min_K = min(min_K, K)
        if np.abs(A) == 0  and B<0: #technically should be a==0
            min_H = min(min_H, H)
    return min(min_K,min_H)


n = 100
true_break_point = 50
beta = 2
delta = 3
A_CUTOFF = 5000 # this is like a mangitude of 1/100 of A...
    
for i in range(10):
    # Generate data with an index column in c2
    c1, c2_with_index = generate_data(n, true_break_point, beta, delta)

    # Define the range of candidate break points
    candidate_breaks = np.arange(35, 65)

    # Find the theta where the maximum F-statistic occurs
    theta_hat, max_f_stat = max(
        ((theta, compute_f_stat_and_delta(c1, c2_with_index, theta)[0]) for theta in candidate_breaks),
        key=lambda x: x[1]
    )

    print('F,delta',compute_f_stat_and_delta(c1, c2_with_index, theta_hat))
    interval_left, interval_right,mean, variance = compute_confidence_intervals(theta_hat, candidate_breaks, c1,
                                                                                 c2_with_index ,a_cutoff=A_CUTOFF)



    print('results',interval_left, interval_right,mean, variance )
    print('-----')

F,delta (246.10445897458297, 3.1504933708783485)
results [-inf  inf] [-inf  inf] 3.1504933708783485 0.04650789424181546
-----
F,delta (204.99906575154617, 3.149003237445142)
results [       -inf -0.33930986] [3.14900324        inf] 3.149003237445142 0.03840888925855603
-----
F,delta (187.95439463185383, 2.8207949527443104)
results [       -inf -2.84149548] [2.82079495        inf] 2.8207949527443104 0.04269918138622003
-----
F,delta (179.92140768539005, 2.906312162709424)
results [      -inf 0.68170332] [2.90631216        inf] 2.906312162709424 0.06459174624203326
-----
F,delta (246.00017775343053, 2.922840021253212)
results [      -inf 0.13811265] [2.92284002        inf] 2.922840021253212 0.03711895339898981
-----
F,delta (212.31523758174367, 2.973536484066029)
results [      -inf 0.39929735] [2.97353648        inf] 2.973536484066029 0.03924339019656052
-----
F,delta (199.05052597746771, 3.0410368386813804)
results [      -inf 0.58124562] [3.04103684        inf] 3.0410368386813804 0.04

In [2]:
import numpy as np
from scipy.stats import norm

def truncated_normal_quantile(quantile, interval_left, interval_right, mean, variance):
    # Unpack intervals
    l1, u1 = interval_left
    l2, u2 = interval_right
    
    # Standardize the intervals with the mean and variance
    l1_std, u1_std = (l1 - mean) / np.sqrt(variance), (u1 - mean) / np.sqrt(variance)
    l2_std, u2_std = (l2 - mean) / np.sqrt(variance), (u2 - mean) / np.sqrt(variance)
    
    # Calculate the probabilities over the two intervals
    P1 = norm.cdf(u1_std) - norm.cdf(l1_std)
    P2 = norm.cdf(u2_std) - norm.cdf(l2_std)
    
    # Total probability considering the truncated intervals
    P_total = P1 + P2
    
    # Target cumulative probability adjusted for truncation
    adjusted_quantile = quantile * P_total
    
    # Determine in which interval the quantile lies
    if adjusted_quantile <= P1:
        # Quantile lies in the first interval
        quantile_value_std = norm.ppf(norm.cdf(l1_std) + adjusted_quantile)
    else:
        # Quantile lies in the second interval
        adjusted_quantile -= P1
        quantile_value_std = norm.ppf(norm.cdf(l2_std) + adjusted_quantile)

    # Convert back to the original scale
    quantile_value = quantile_value_std * np.sqrt(variance) + mean
    
    return quantile_value

# Example usage


quantile_upper = truncated_normal_quantile(.95, interval_left, interval_right, mean, variance)
quantile_lower = truncated_normal_quantile(.05, interval_left, interval_right, mean, variance)
print(quantile_lower,quantile_upper)

3.198680839639377 3.671646146397083
