In [72]:
import numpy as np

def truncated_sprt_sim(
    p0 = 0.50,        # null hypothesis value
    p1 = 0.55,        # alternative hypothesis value
    alpha = 0.05,     # desired type-I error
    beta = 0.20,      # desired type-II error
    max_n = 40,       # truncation
    n_sims = 10_000,  # number of simulation replications
    seed = 123
):
    """
    Run a truncated SPRT with nominal alpha/beta, up to max_n.
    Return:
      - observed alpha
      - observed beta
      - average sample size under p0
      - average sample size under p1
    """
    rng = np.random.default_rng(seed)
    
    # 1) Compute Wald boundaries (log form)
    #    A = (1-beta)/alpha,  B = beta/(1-alpha)
    A = (1 - beta)/alpha
    B = beta/(1 - alpha)
    logA = np.log(A)
    logB = np.log(B)
    
    # The per-trial log-likelihood ratio for X in {0,1} is
    #   LLR( X=1 ) = log( p1 / p0 )
    #   LLR( X=0 ) = log( (1-p1)/(1-p0) )
    llr_1 = np.log(p1 / p0)
    llr_0 = np.log((1-p1) / (1-p0))
    
    # Container to store decisions and sample sizes
    # Under p0:
    decisions_0 = []  # True=reject H0, False=accept/fail to reject
    sample_sizes_0 = []
    # Under p1:
    decisions_1 = []
    sample_sizes_1 = []
    
    # --- Simulate under p0
    for _ in range(n_sims):
        llr_sum = 0.0
        decision_made = False
        for k in range(1, max_n + 1):
            # Simulate one Bernoulli trial from p0
            x = rng.random() < p0
            # Update LLR
            llr_sum += (llr_1 if x else llr_0)
            # Check boundaries
            if llr_sum >= logA:
                # reject H0
                decisions_0.append(True)
                sample_sizes_0.append(k)
                decision_made = True
                break
            elif llr_sum <= logB:
                # accept/fail to reject H0
                decisions_0.append(False)
                sample_sizes_0.append(k)
                decision_made = True
                break
        if not decision_made:
            # forced decision if we haven't stopped by max_n
            # e.g. accept H0 if we didn't cross upper boundary
            decisions_0.append(False)
            sample_sizes_0.append(max_n)
    
    # --- Simulate under p1
    for _ in range(n_sims):
        llr_sum = 0.0
        decision_made = False
        for k in range(1, max_n + 1):
            # Simulate one Bernoulli trial from p1
            x = rng.random() < p1
            # Update LLR
            llr_sum += (llr_1 if x else llr_0)
            # Check boundaries
            if llr_sum >= logA:
                # reject H0
                decisions_1.append(True)
                sample_sizes_1.append(k)
                decision_made = True
                break
            elif llr_sum <= logB:
                # accept/fail to reject H0
                decisions_1.append(False)
                sample_sizes_1.append(k)
                decision_made = True
                break
        if not decision_made:
            # forced decision after max_n
            decisions_1.append(False)
            sample_sizes_1.append(max_n)
    
    # compute realized alpha and beta
    # alpha = P(reject H0 | p=p0)
    # beta =  P(fail to reject H0 | p=p1)
    realized_alpha = np.mean(decisions_0)  # fraction that was True under p0
    realized_beta  = np.mean([not d for d in decisions_1])
    
    mean_n0 = np.mean(sample_sizes_0)
    mean_n1 = np.mean(sample_sizes_1)
    
    return realized_alpha, realized_beta, mean_n0, mean_n1

# Example usage:
alpha_hat, beta_hat, meanN0, meanN1 = truncated_sprt_sim(
    p0=0.50, p1=0.55, alpha=0.01, beta=0.9879, max_n=40, n_sims=200_000
)

print(f"Realized alpha = {alpha_hat:.3f}")
print(f"Realized beta  = {beta_hat:.3f}")
print(f"Avg sample size under p0 = {meanN0:.2f}")
print(f"Avg sample size under p1 = {meanN1:.2f}")


Realized alpha = 0.250
Realized beta  = 0.698
Avg sample size under p0 = 1.50
Avg sample size under p1 = 1.55


In [67]:
import math
from functools import lru_cache

# Parameters
p0 = 0.50          # Null hypothesis probability (for type-I error)
p1 = 0.55          # Alternative hypothesis probability (for power)
alpha_nom = 0.01   # Nominal type-I error rate
beta_nom = 0.9879    # Nominal type-II error rate (power = 1-beta)
n_max = 40         # Truncation sample size

# Wald boundaries (for the untruncated SPRT)
A = (1 - beta_nom) / alpha_nom   # Upper threshold ratio
B = beta_nom / (1 - alpha_nom)     # Lower threshold ratio
logA = math.log(A)
logB = math.log(B)

# Log-likelihood ratio increments:
d1 = math.log(p1 / p0)         # when X=1
d0 = math.log((1 - p1) / (1 - p0))   # when X=0

# -------------------------------
# Dynamic programming recurrences
# -------------------------------

# f(k, s, p) returns the probability to eventually reject H0 (i.e. cross upper boundary)
# starting at step k with LLR = s, under a hypothesis with success probability p.
@lru_cache(maxsize=None)
def f(k, s, p):
    # If already crossed boundaries, return corresponding decision
    if s >= logA:
        return 1.0  # reject H0
    if s <= logB:
        return 0.0  # accept H0
    
    # If reached maximum sample size, forced decision (here: accept H0 if not crossed)
    if k == n_max:
        # forced decision: if upper boundary not reached, we set decision = accept H0.
        return 0.0
    
    # Otherwise, take the expectation over the next sample:
    return p * f(k + 1, s + d1, p) + (1 - p) * f(k + 1, s + d0, p)

# g(k, s, p) returns the expected sample size (stopping time) starting at step k, with LLR = s.
@lru_cache(maxsize=None)
def g(k, s, p):
    # If boundary already hit, return current step count
    if s >= logA or s <= logB:
        return k
    if k == n_max:
        return n_max
    return 1 + p * g(k + 1, s + d1, p) + (1 - p) * g(k + 1, s + d0, p)

# Compute rejection probabilities and expected sample sizes starting at (k=0, s=0)
# Under H0:
rej_prob_H0 = f(0, 0.0, p0)   # This is the realized alpha.
exp_samples_H0 = g(0, 0.0, p0)

# Under H1:
rej_prob_H1 = f(0, 0.0, p1)   # Power = probability of rejecting H0 under H1.
exp_samples_H1 = g(0, 0.0, p1)
beta_realized = 1 - rej_prob_H1

print("Analytic (DP) Results for Truncated SPRT:")
print(f"Under H0 (p0 = {p0}):")
print(f"  Rejection probability (alpha) = {rej_prob_H0:.3f}")
print(f"  Expected sample size         = {exp_samples_H0:.2f}")
print("")
print(f"Under H1 (p1 = {p1}):")
print(f"  Power  = {rej_prob_H1:.3f}  (so beta = {beta_realized:.3f})")
print(f"  Expected sample size         = {exp_samples_H1:.2f}")


Analytic (DP) Results for Truncated SPRT:
Under H0 (p0 = 0.5):
  Rejection probability (alpha) = 0.250
  Expected sample size         = 3.00

Under H1 (p1 = 0.55):
  Power  = 0.303  (so beta = 0.698)
  Expected sample size         = 3.10


In [134]:
import math
import random

def classical_sprt_boundaries(alpha, beta):
    """
    Compute the classical SPRT boundaries.
    A: Upper boundary (evidence for the alternative)
    B: Lower boundary (evidence for the null)
    """
    A = math.log((1 - beta) / alpha)
    B = math.log(beta / (1 - alpha))
    return A, B

def simulate_truncated_sprt(p, p0, p1, alpha, beta, N, num_simulations=10000):
    """
    Simulate a truncated SPRT for binomial data.
    
    Parameters:
    - p: true probability of success (simulate under H0: p = p0 or under H1: p = p1)
    - p0: success probability under the null hypothesis
    - p1: success probability under the alternative hypothesis
    - alpha: target type I error rate
    - beta: target type II error rate
    - N: maximum sample size (truncation point)
    - num_simulations: number of simulation runs
    
    Returns:
    A dictionary with:
      - 'accept_H1_rate': proportion of simulations where H1 was accepted
      - 'accept_H0_rate': proportion of simulations where H0 was accepted
      - 'inconclusive_rate': proportion of simulations that reached N without a decision
      - 'average_sample_size': average number of samples used per simulation
    """
    # Compute classical SPRT boundaries
    A, B = classical_sprt_boundaries(alpha, beta)
    
    decisions = []       # 1: accept H1, -1: accept H0, 0: inconclusive
    sample_sizes = []    # Number of observations used in each simulation
    
    for sim in range(num_simulations):
        log_lr = 0.0  # cumulative log-likelihood ratio
        decision = None
        
        for n in range(1, N + 1):
            # Generate one Bernoulli observation (1 for success, 0 for failure)
            x = 1 if random.random() < p else 0
            
            # Compute the log-likelihood ratio increment:
            #   log( p1^x * (1-p1)^(1-x) / [p0^x * (1-p0)^(1-x)] )
            if x == 1:
                increment = math.log(p1 / p0)
            else:
                increment = math.log((1 - p1) / (1 - p0))
            log_lr += increment
            
            # Check if decision boundaries are crossed
            if log_lr >= A:
                decision = 1  # Accept H1
                sample_sizes.append(n)
                break
            elif log_lr <= B:
                decision = -1  # Accept H0
                sample_sizes.append(n)
                break
        
        # If no decision is reached by N, declare it inconclusive
        if decision is None:
            decision = 0
            sample_sizes.append(N)
        
        decisions.append(decision)
    
    # Calculate statistics from the simulation
    num_accept_H1 = sum(1 for d in decisions if d == 1)
    num_accept_H0 = sum(1 for d in decisions if d == -1)
    num_inconclusive = sum(1 for d in decisions if d == 0)
    avg_sample_size = sum(sample_sizes) / num_simulations
    
    return {
        'accept_H1_rate': num_accept_H1 / num_simulations,
        'accept_H0_rate': num_accept_H0 / num_simulations,
        'inconclusive_rate': num_inconclusive / num_simulations,
        'average_sample_size': avg_sample_size
    }

if __name__ == "__main__":
    # Define the error rates and maximum sample size
    alpha = 0.05  # Type I error rate
    beta = 0.91   # Type II error rate
    N = 40        # Maximum number of samples (truncation point)
    
    # Hypothesized probabilities under H0 and H1
    p0 = 0.5  # Null hypothesis success probability
    p1 = 0.55  # Alternative hypothesis success probability
    
    num_simulations = 20000  # Number of simulated experiments
    
    # Simulation under the alternative hypothesis (true p = p1)
    print("Simulating truncated SPRT under H1 (true p = p1):")
    stats_H1 = simulate_truncated_sprt(p1, p0, p1, alpha, beta, N, num_simulations)
    print(stats_H1)
    
    # Simulation under the null hypothesis (true p = p0)
    print("\nSimulating truncated SPRT under H0 (true p = p0):")
    stats_H0 = simulate_truncated_sprt(p0, p0, p1, alpha, beta, N, num_simulations)
    print(stats_H0)

Simulating truncated SPRT under H1 (true p = p1):
{'accept_H1_rate': 0.1868, 'accept_H0_rate': 0.8012, 'inconclusive_rate': 0.012, 'average_sample_size': 7.31645}

Simulating truncated SPRT under H0 (true p = p0):
{'accept_H1_rate': 0.10035, 'accept_H0_rate': 0.89005, 'inconclusive_rate': 0.0096, 'average_sample_size': 6.02275}
