In [3]:
import numpy as np
from scipy import stats

def reduce_mi_rectangles(mi_rectangles):
    """
    Reduce MI rectangles to (r_u, z_u) for proper rectangles and (r_u, w_u) for improper rectangles.
    """
    
    proper = [(r, z, p) for l, r, w, z, p in mi_rectangles if np.isfinite(z)]
    improper = [(r, w, p) for l, r, w, z, p in mi_rectangles if np.isinf(z)]
    
    return proper, improper

def impute_rectangles(rectangles, num_imputations, max_time):
    """
    Impute rectangles.
    """
    
    imputed = []
    for _ in range(num_imputations):
        imputed.append([
            (r, np.random.uniform(i, max_time), p)
            for r, i, p in rectangles
        ])
    
    return imputed

def compute_test_statistic(proper, imputed, s, time_points):
    """
    Compute the Titman-Putter test statistic.
    """
    
    test_stats = []
    for imputation in imputed:
        stat = 0
        for t in time_points:
            n_1_to_3 = sum(I(s <= r < t) * I(t == z) * p for r, z, p in proper)
            n_total_3 = sum(I(t == z) * p for r, z, p in proper)
            
            y_total = sum(I(r < t <= z) * p for r, z, p in proper + imputation)
            y_1 = sum(I(s <= r < t <= z) * p for r, z, p in proper + imputation)
            
            if y_total > 0:
                stat += n_1_to_3 - y_1 * (n_total_3 / y_total)
        
        test_stats.append(stat)
    
    return np.mean(test_stats)

def compute_variance(proper, imputed_improper, s, time_points):
    """
    Compute the variance of the test statistic.
    """
    
    variances = []
    for imputation in imputed_improper:
        var = 0
        for t in time_points:
            n_total_3 = sum(I(t == z) * p for r, z, p in proper)
            y_total = sum(I(r < t <= z) * p for r, z, p in proper + imputation)
            y_1 = sum(I(s <= r < t <= z) * p for r, z, p in proper + imputation)
            y_2 = y_total - y_1
            
            if y_total > 0:
                var += (y_1 * y_2 * n_total_3) / (y_total ** 2)
        
        variances.append(var)
    
    return np.mean(variances)

def I(condition):
    """
    Indicator function.
    """
    
    return 1.0 if condition else 0.0

def test_markov_property(mi_rectangles, s, num_imputations=1000):
    """
    Test the Markov property using the Titman-Putter log-rank statistic.
    """
    
    proper, improper = reduce_mi_rectangles(mi_rectangles)
    
    if not proper:
        raise ValueError("No proper rectangles found in the data.")
    
    max_time = max(z for r, z, p in proper)
    imputed_improper = impute_improper_rectangles(improper, num_imputations, max_time)
    
    time_points = sorted(set(z for r, z, p in proper))
    
    test_statistic = compute_test_statistic(proper, imputed_improper, s, time_points)
    variance = compute_variance(proper, imputed_improper, s, time_points)
    
    # Diagnostic information
    n_proper = len(proper)
    n_improper = len(improper)
    n_time_points = len(time_points)
    
    print(f"\nDiagnostic Information:")
    print(f"Number of proper rectangles: {n_proper}")
    print(f"Number of improper rectangles: {n_improper}")
    print(f"Number of unique time points: {n_time_points}")
    print(f"s value: {s}")
    print(f"Time points: {time_points}")
    print(f"Test statistic: {test_statistic}")
    print(f"Variance: {variance}")
    
    # Detailed computation breakdown
    print("\nDetailed Computation:")
    for t in time_points:
        n_1_to_3 = sum(I(s <= r < t) * I(t == z) * p for r, z, p in proper)
        n_total_3 = sum(I(t == z) * p for r, z, p in proper)
        y_total = sum(I(r < t <= z) * p for r, z, p in proper + imputed_improper[0])
        y_1 = sum(I(s <= r < t <= z) * p for r, z, p in proper + imputed_improper[0])
        y_2 = y_total - y_1
        
        print(f"  Time point {t}:")
        print(f"    n_1_to_3: {n_1_to_3}")
        print(f"    n_total_3: {n_total_3}")
        print(f"    y_total: {y_total}")
        print(f"    y_1: {y_1}")
        print(f"    y_2: {y_2}")
    
    if variance == 0:
        return test_statistic, None, None
    
    z_score = test_statistic / np.sqrt(variance)
    p_value = 2 * (1 - stats.norm.cdf(abs(z_score)))
    
    return test_statistic, z_score, p_value

# Example usage
mi_rectangles = [
    # (l, r, w, z, p)
    (5, 7, 23, np.inf, 0.0437),
    (5, 7, 13, 13, 0.0208),
    (3, 7, 17, 17, 0.0104),
    (1, 7, 16, 16, 0.0104),
    (5, 7, 11, 12, 0.0208),
]

s_values = [6, 8, 10, 12]  # Try different s values

for s in s_values:
    print(f"\nTesting with s = {s}")
    result = test_markov_property(mi_rectangles, s)
    print(f"Test Statistic: {result[0]}")
    if result[1] is not None:
        print(f"Z-Score: {result[1]}")
        print(f"P-value: {result[2]}")
    else:
        print("Variance is zero. Unable to compute z-score and p-value.")


Testing with s = 6

Diagnostic Information:
Number of proper rectangles: 4
Number of improper rectangles: 1
Number of unique time points: 4
s value: 6
Time points: [12, 13, 16, 17]
Test statistic: 0.0
Variance: 0.0

Detailed Computation:
  Time point 12:
    n_1_to_3: 0.0208
    n_total_3: 0.0208
    y_total: 0.1061
    y_1: 0.1061
    y_2: 0.0
  Time point 13:
    n_1_to_3: 0.0208
    n_total_3: 0.0208
    y_total: 0.0853
    y_1: 0.0853
    y_2: 0.0
  Time point 16:
    n_1_to_3: 0.0104
    n_total_3: 0.0104
    y_total: 0.0645
    y_1: 0.0645
    y_2: 0.0
  Time point 17:
    n_1_to_3: 0.0104
    n_total_3: 0.0104
    y_total: 0.0541
    y_1: 0.0541
    y_2: 0.0
Test Statistic: 0.0
Variance is zero. Unable to compute z-score and p-value.

Testing with s = 8

Diagnostic Information:
Number of proper rectangles: 4
Number of improper rectangles: 1
Number of unique time points: 4
s value: 8
Time points: [12, 13, 16, 17]
Test statistic: 0.0
Variance: 0.0

Detailed Computation:
  Time po