In [None]:
import numpy as np
from scipy.stats import norm

In [2]:
samples = np.random.beta(0.95, 1.1, size=2000)

In [24]:
def calculate_bandwidth_gaussian(data, bw_mehtod='silverman', sigma=None):
    if bw_mehtod == 'silverman':
        assert sigma is not None
        h = ((4 * sigma**5) / (3 * data.size))**(1/5)
    else:
        raise NotImplementedError
    return h

def kernel_pdf_raw(x, data, sigma, h):
    x = np.atleast_1d(x)  # Ensure x is an array
    pdf_values = np.mean(norm.pdf(x[:, None], loc=data, scale=h * sigma), axis=1)
    if pdf_values.size == 1:  # Check if the result is a single value
        return pdf_values.item()  # Convert single-element array to scalar
    return pdf_values


def estimate_density(data):
    sigma = np.array([data, 2-data, -data]).flatten().std()
    if sigma == 0:
        return lambda x: 1.0 # No variability => No bet
    h = calculate_bandwidth_gaussian(data, sigma=sigma)

    kernel_pdf_raw_fitted = lambda x: kernel_pdf_raw(x, data=data, sigma=sigma, h=h)

    betting_function = lambda x: kernel_pdf_raw_fitted(x) + kernel_pdf_raw_fitted(-x) + kernel_pdf_raw_fitted(2 - x) # Reflection to handle bounded support

    return betting_function




In [None]:
# Dependencies
import numpy as np
from scipy.stats import norm

### Helper functions. Can likely be improved for speed

def calculate_bandwidth_gaussian(data, bw_mehtod='silverman', sigma=None):
    if bw_mehtod == 'silverman':
        assert sigma is not None
        h = ((4 * sigma**5) / (3 * data.size))**(1/5)
    else:
        raise NotImplementedError
    return h

def kernel_pdf_raw(x, data, sigma, h):
    x = np.atleast_1d(x)  # Ensure x is an array
    pdf_values = np.mean(norm.pdf(x[:, None], loc=data, scale=h * sigma), axis=1)
    if pdf_values.size == 1:  # Check if the result is a single value
        return pdf_values.item()  # Convert single-element array to scalar
    return pdf_values


def estimate_density(data):
    if data.size < 1:
        return lambda x: 1.0
    sigma = np.array([data, 2-data, -data]).flatten().std()
    if sigma == 0:
        return lambda x: 1.0 # No variability => No bet
    h = calculate_bandwidth_gaussian(data, sigma=sigma)

    kernel_pdf_raw_fitted = lambda x: kernel_pdf_raw(x, data=data, sigma=sigma, h=h)

    betting_function = lambda x: kernel_pdf_raw_fitted(x) + kernel_pdf_raw_fitted(-x) + kernel_pdf_raw_fitted(2 - x) # Reflection to handle bounded support

    return betting_function

# The main function, computing martingale values
def plugin_gaussian_reflect(samples, threshold=None):
    """
    ### Description:
    Plugin martingale 
    
    ### Parameters:
    * samples: The p-values to test (assumed to be an np.array)
    * threshold: The threshold to break the test of combinations. Default value is set to None
    
    ### Returns: 
    * S_list: The martingale values
    """
    S_list = []
    log_S = 0.0 # Equivalent to starting at 1.0

    for i, p in enumerate(samples):
        # Get betting function (KDE Gaussian with reflection method)
        betting_function = estimate_density(samples[: i])
        # Update log martingale value (for numerical stability)
        log_S += np.log(betting_function(p))
        # Add martingale value to the list
        S = np.exp(log_S)
        S_list.append(S)

    # Early stop if the threshold is exceeded
    if threshold is not None and S >= threshold:
        return S_list

    return S_list

KeyboardInterrupt: 

In [None]:
@njit
def simple_jumper(samples, J=0.01, threshold=None):
    """
    ### Description:
    Optimized Simple Jumper function for calculating martingale values.
    With Numba @njit decorator for aggressive optimizations and possibilities for adding threshold.
    
    ### Parameters:
    * samples: The p-values to test
    * J: The jump parameter. Default value is set to 0.01
    * threshold: The threshold to break the test of combinations. Default value is set to None
    
    ### Returns: 
    * S_list: The martingale values

    """

    S_list = []
    
    # Initialize the C_list (weights) and eps_list (shifts)
    C_list = np.array([1/3, 1/3, 1/3])
    eps_list = np.array([-1, 0, 1])
    C = 1.0
    
    for p in samples:
        # Update C_list once instead of twice
        C_list = (1 - J) * C_list + (J / 3) * C
        C_list *= (1 + eps_list * (p - 0.5))
        
        # Update martingale value C
        C = C_list.sum()
        S_list.append(C)
        
        # Early stop if the threshold is exceeded
        if threshold is not None and C >= threshold:
            return S_list
    
    return S_list  # Return the martingale values