### Libraries

In [1]:
import numpy as np
from numpy.random import default_rng
import pandas as pd
import scipy
from scipy.stats import normal_inverse_gamma
from tqdm import trange

$$
\alpha, \beta, \lambda , \mu  \\
$$
$$
\alpha > 0  \\
$$
$$
\lambda > 0  \\
$$
$$
\mu \in \mathbb{R}  \\
$$
$$
abs(\beta) < \alpha \\
$$



In [2]:
import zarr

N=4
n_samples=2
dataset_size= 10000
def gen_zarr(N, n_samples, dataset_size):
    dataset_zarr = zarr.create_array(
       store="data/x_train.zarr",
       shape=(dataset_size, n_samples+ 1, N),
       chunks=(10, n_samples+ 1, N),
       dtype="float32",
        overwrite=True
    )
    return dataset_zarr
dataset_zarr = gen_zarr(N, n_samples, dataset_size)

### X_train Generation

In [6]:
from numpy.random import default_rng

def sigmoid(z):
    return 1 / (1 + np.exp(-z))
    
def my_sample_from_gamma(x):
    inv_gamma = normal_inverse_gamma(*x)
    x_ = inv_gamma.rvs()[0]
    return x_
    
sample_from_gamma = np.vectorize(my_sample_from_gamma, signature="(n)->()")

def gen_X_train(N, dataset_size, rng, rng2, rng3, rng4, dataset_zarr):
    nu_seed = rng.random((N, 1))
    alpha_seed = 13 * rng.beta(rng2.random(), rng2.random(), size=(N, 1))
    mu_loc = [
        np.random.choice(np.array([-1, -0.5,0.6,0.3, 1]))
        * np.random.beta(np.random.random(), np.random.random())
        for _ in range(N)
    ]
    mu_scale = [np.random.beta(np.random.random(), np.random.random()) for _ in range(N)]
    
    mu_seed = rng.normal(loc=mu_loc, scale=mu_scale, size=(N, N))
    mu_seed = np.diagonal(mu_seed)[None].T
    lambda_seed = rng2.random((N, 1))
    beta_seed = rng3.random((N, 1))
    
    params = np.hstack([mu_seed, lambda_seed, alpha_seed, beta_seed])
    for _ in trange(dataset_size):
        params_ = params.copy()
        x = sample_from_gamma(params_)
        dataset_zarr[_, 0] = x
    return dataset_zarr

In [7]:
rng = default_rng(34)
rng2 = default_rng(np.random.randint(1, 3090))
rng3 = default_rng(np.random.randint(1, 3090))
rng4 = default_rng(np.random.randint(1, 3090))

dataset_zarr = gen_X_train(N, dataset_size, rng, rng2, rng3, rng4, dataset_zarr)


100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 10000/10000 [00:23<00:00, 424.35it/s]


### Kernels

In [8]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import invgamma


def mixture_kernel_sampler(x):
    """
    Maps input x to a bimodal distribution where the two modes
    diverge and the noise oscillates.
    """
    # --- 1. Define Dynamic Parameters based on x ---

    # Mode 1: Curves upward
    mu_1 = 2 * x**2 + 1
    # Mode 2: Curves downward
    mu_2 = -2 * x**2 - 1

    # Variance: Oscillates sinusoidally (Heteroscedasticity)
    # High noise when sin is 1, low noise when sin is -1
    sigma = 0.3 * np.abs(np.sin(4 * x)) + 0.05

    # Mixing Probability: Transitions from Mode 1 to Mode 2 as x increases
    # sigmoid function centered at 0
    prob_mode_1 = 1 / (1 + np.exp(-5 * x))

    # --- 2. Sampling Logic ---

    # Step A: Choose which mode (gaussian) to sample from
    # random choice based on prob_mode_1
    mode_choice = np.random.rand() < prob_mode_1

    # Step B: Sample from the chosen Gaussian
    if mode_choice:
        y = np.random.normal(loc=mu_1, scale=sigma)
    else:
        y = np.random.normal(loc=mu_2, scale=sigma)

    return y


def hierarchical_nig_sampler(x):
    """
    Implements the BELLE Hierarchical Sampling mechanism [cite: 165-167].
    The 'probabilistic space' itself is uncertain.
    """
    # --- 1. Map x to Hyper-parameters ---
    # This emulates the neural network projection layer

    # Gamma (The 'expected' mean): Linear trend
    gamma = 3 * x

    # Nu (Evidence count): Higher x -> Higher confidence in the mean
    nu = np.abs(x) + 1.0

    # Alpha (Shape of variance dist):
    # Low alpha = Heavy tails (high uncertainty about variance)
    alpha = 2.0 + np.exp(-0.5 * x**3)  # High uncertainty near 0

    # Beta (Scale of variance dist): Constant scale
    beta = 0.5

    # --- 2. Hierarchical Sampling Steps [cite: 166-167] ---

    # Step A: Sample the Variance (Sigma^2) from Inverse Gamma
    # Note: scipy invgamma takes 'a' (alpha) and 'scale' (beta)
    sigma_sq = invgamma.rvs(a=alpha, scale=beta)

    # Step B: Sample the Mean (Mu) conditioned on Sigma^2
    # Variance of the mean is sigma^2 / nu
    mean_variance = sigma_sq / nu
    sampled_mu = np.random.normal(loc=gamma, scale=np.sqrt(mean_variance)) * x

    # Step C: Sample the final data point y
    y = np.random.normal(loc=sampled_mu, scale=np.sqrt(sigma_sq))

    return y


def jump_diffusion_sampler(x):
    """
    Samples from a process with continuous noise and rare, discrete jumps.
    Useful for modeling systems with 'shocks' or 'glitches'.
    """
    # 1. Continuous Drift Component (The 'Normal' behavior)
    drift = np.sin(3 * x)
    diffusion_noise = np.random.normal(0, 0.2)

    # 2. Jump Component (The 'Rare Event')
    # Probability of a jump increases dramatically as x gets far from 0
    jump_prob = 0.05 + 0.4 * (np.abs(x) > 1.5)

    is_jump = np.random.rand() < jump_prob

    if is_jump:
        # Jumps are large, discrete shifts
        jump_magnitude = np.random.choice([-2, 2])
        # Add some jitter to the jump
        jump_val = jump_magnitude + np.random.normal(0, 0.5)
    else:
        jump_val = 0

    y = drift + diffusion_noise + jump_val
    return y


def fractal_weierstrass_sampler(x, K=10):
    """
    Samples from a distribution defined by a randomized fractal function.
    x controls the 'roughness' and amplitude of the summation.
    """
    # 1. Dynamic Fractal Parameters
    # b: Frequency multiplier (must be > 1).
    # We vary b slightly with x to create 'spectral shifting'
    b = 2.0 + 0.5 * np.sin(x)

    # a: Amplitude decay (0 < a < 1).
    # x controls how fast high frequencies decay.
    # x near 0 -> a near 1 -> Very rough/noisy (High fractal dimension)
    # x far from 0 -> a near 0 -> Very smooth (Low fractal dimension)
    a = 0.5 / (1 + np.abs(x))

    y_sum = 0

    # 2. Summation (The Weierstrass Function approximation)
    for k in range(K):
        # Randomized phase shift per component creates the 'kernel' variance
        phi = np.random.uniform(0, 2 * np.pi)

        # Term: amplitude * cos(frequency * x + phase)
        term = (a**k) * np.cos((b**k) * np.pi * x + phi)
        y_sum += term

    # Add base noise
    return y_sum + np.random.normal(0, 0.05)


def stochastic_volatility_jump_sampler(x):
    """
    Simulates a process with latent volatility and rare jumps.
    x influences the 'stability' of the market regime.
    """
    # 1. Regime Determination based on x
    # if x > 0: 'Bull Market' (Drift up, low noise)
    # if x < 0: 'Bear Market' (Drift down, high noise)

    # Base parameters
    if x > 0:
        mu = 0.5 * x
        base_vol = 0.1
        jump_prob = 0.4  # Rare jumps
    else:
        mu = 0.5 * x
        base_vol = 0.4  # High volatility
        jump_prob = 0.2  # Frequent jumps

    # 2. Stochastic Volatility Component
    # Volatility itself is random (log-normal)
    # This creates "fat tails" (Kurtosis) in the distribution
    realized_vol = base_vol * np.exp(np.random.normal(0, 0.2))

    # 3. Jump Component (Poisson Process)
    is_jump = np.random.rand() < jump_prob

    if is_jump:
        # Jumps are usually negative panic events in this model
        jump_size = np.random.normal(-2.0, 0.5)
    else:
        jump_size = 0

    # 4. Final Sample
    # y = Trend + Stochastic_Vol_Noise + Jump
    y = mu + np.random.normal(0, realized_vol) + jump_size

    return y

### Target Generation

In [9]:
kernel_funcs = [mixture_kernel_sampler, hierarchical_nig_sampler, jump_diffusion_sampler, stochastic_volatility_jump_sampler, fractal_weierstrass_sampler]
replace_ = True if N > 5 else False
kernel_pos = np.random.choice(kernel_funcs, size=(N,), replace=replace_)

def gen_targets(x, n_samples):
    y_targets = []
    for _ in range(n_samples):
        target = [kernel_pos[i](x[i]) for i in range(4)]
        y_targets.append(target)
    return y_targets

# with tqdm(total=dataset_size) as pbar:
for i in trange(dataset_size):
    _x = dataset_zarr[i, 0]
    targets = gen_targets(_x, n_samples)
    for j in range(n_samples):
        dataset_zarr[i, j+1]= targets[j]
            

100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 10000/10000 [00:25<00:00, 394.58it/s]


### Dataset

In [12]:
import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np

class TripletSplitDataset(Dataset):
    """
    A custom PyTorch Dataset that takes an (N, 3, 4) array.
    
    Mapping strategy:
    - The sample at index `idx` has shape (3, 4).
    - Row 0 is treated as the Input (x).
    - Row 1 is treated as Target 1 (y1).
    - Row 2 is treated as Target 2 (y2).
    
    All outputs retain the shape (1, 4).
    """
    def __init__(self, data_source):
        """
        Args:
            data_source (numpy.ndarray or torch.Tensor): Data of shape (N, 3, 4).
        """
        self.data= data_source

    def __len__(self):
        return self.data.shape[0]

    def __getitem__(self, idx):
        # We select the sample: shape (3, 4)
        sample = self.data[idx]
        
        # Note on Slicing:
        # Using sample[0] would return shape (4,).
        # Using sample[0:1] keeps the dimension, returning shape (1, 4).
        
        # Input: First row (1, 4)
        x = sample[0:1, :]
        
        # Output 1: Second row (1, 4)
        y1 = sample[1:2, :]
        
        # Output 2: Third row (1, 4)
        y2 = sample[2:3, :]
        
        return x, y1, y2

dataset = TripletSplitDataset(data_source=dataset_zarr)
dataloader = DataLoader(dataset, batch_size=4, shuffle=True)

In [13]:
next(iter(dataloader))

[tensor([[[ 1.0975, -1.6968, -0.4248, -0.9426]],
 
         [[ 0.8702, -1.2522,  0.2274, -1.0835]],
 
         [[ 0.9938, -1.2330, -0.4124, -1.0743]],
 
         [[ 0.8595, -0.4764, -0.4192, -0.9798]]]),
 tensor([[[-0.6127,  1.0152, -1.0313, -2.0852]],
 
         [[-0.4455,  0.8198,  1.0105, -0.5656]],
 
         [[-0.3874,  0.7546, -1.0485, -0.6213]],
 
         [[-0.7256, -2.4891, -1.7670, -1.9850]]]),
 tensor([[[ 3.1658e-05,  1.1560e+00, -2.1931e+00, -9.7359e-01]],
 
         [[ 9.2789e-01,  5.4331e-01,  8.6473e-01, -2.1324e+00]],
 
         [[ 1.0649e+00,  5.4007e-01, -1.0082e+00, -1.5628e-01]],
 
         [[-6.2773e-01, -1.1013e+00, -2.1031e+00, -1.3680e-01]]])]

### Modeling

In [None]:
def generate_kernel_data(
    alpha1, beta1, alpha2, beta2, num_samples=100, samples_per_kernel=4
):
    data_list = []

    for i in range(num_samples):
        #  Sample Seeds (x1 and x2)
        x1 = np.random.normal(loc=alpha1, scale=beta1)
        x2 = np.random.normal(loc=alpha2, scale=beta2)

        # Process x1 Kernel

        # Parameters for the N(mu, sigma) distribution for x1
        mu_1 = 2 * x1
        sigma_1 = abs(3 * x1)

        # Sample 4 values (y) from the distribution
        y_samples_1 = np.random.normal(loc=mu_1, scale=sigma_1, size=samples_per_kernel)

        # Apply the kernel function
        k_values_1 = sigmoid(y_samples_1) + 0.1 * x1

        # Process x2 Kernel

        # Parameters for the N(mu, sigma) distribution for x2
        mu_2 = x2**2
        sigma_2 = abs(3 * x2)

        # Sample 4 values (y) from the distribution
        y_samples_2 = np.random.normal(loc=mu_2, scale=sigma_2, size=samples_per_kernel)

        # This replaces any non-positive y with a small positive number (1e-6)
        y_samples_2[y_samples_2 <= 0] = 1e-6

        # Apply the kernel function: (1 / sqrt(y)) * abs(x2)
        k_values_2 = (1 / np.sqrt(y_samples_2)) * abs(x2)

        row_data = {
            "x1_seed": x1,
            "x2_seed": x2,
        }

        for j in range(samples_per_kernel):
            row_data[f"k1_output_{j + 1}"] = k_values_1[j]
            row_data[f"k2_output_{j + 1}"] = k_values_2[j]

        data_list.append(row_data)

    df = pd.DataFrame(data_list)
    return df

In [15]:
final_data = generate_kernel_data(
    alpha1=5, beta1=10, alpha2=15, beta2=20, num_samples=100, samples_per_kernel=4
)

In [19]:
final_data.head()

Unnamed: 0,x1_seed,x2_seed,k1_output_1,k2_output_1,k1_output_2,k2_output_2,k1_output_3,k2_output_3,k1_output_4,k2_output_4
0,16.974569,-8.726524,2.697454,1.042234,2.697457,0.861392,1.697462,1.066583,2.697457,0.81843
1,-8.55055,15.657805,0.140813,0.856635,-0.855055,0.959655,-0.855055,0.906347,-0.855055,0.887822
2,7.493067,15.674365,0.76073,0.982754,1.749302,1.141796,0.74992,0.887147,1.749302,0.964876
3,1.156695,17.911992,1.102111,0.847279,1.112339,0.993034,1.091787,0.855107,0.751819,1.01522
4,-9.550308,-5.750766,-0.955024,1.023174,0.044969,0.951915,-0.949947,0.882685,-0.948986,0.688759


In [20]:
# use different distribution