In [None]:
import os
os.environ['OMP_NUM_THREADS']='1'
os.environ['OPENBLAS_NUM_THREADS']='1'
os.environ['MKL_NUM_THREADS']='1'

In [None]:
import math
import xfacpy as xfac


In [None]:
def price_trinomial_call_ttcross(
    N,
    S0,
    K,
    r,
    T,
    u, m, d,
    pu, pm, pd,
    bond_dim=50,
    reltol=1e-8,
    max_sweeps=20,
    verbose=False,
):
    """
    European call pricing via a trinomial tree represented as a tensor
    and evaluated using TT-cross.
    """

    # Basic consistency check on probabilities
    if abs((pu + pm + pd) - 1.0) > 1e-10:
        raise ValueError("Probabilities pu, pm, pd must sum to 1.")

    # The all-up path is used as initial pivot for TT-cross;
    # it must give a non-zero payoff
    ST_all_up = S0 * (u ** N)
    if ST_all_up <= K:
        raise ValueError(
            f"All-up path is out of the money (S_T={ST_all_up} <= K={K})."
        )

    # Discount factor
    disc = math.exp(-r * T)

    # Each time step has 3 possible states: down, mid, up
    localDim = [3] * N

    # Function evaluated by TT-cross:
    # given a path (sequence of moves), return discounted payoff
    # multiplied by the probability of that path
    def f(indices):
        S = S0
        prob = 1.0
        for move in indices:
            if move == 0:        # down
                S *= d
                prob *= pd
            elif move == 1:      # middle
                S *= m
                prob *= pm
            else:                # up
                S *= u
                prob *= pu
        return disc * prob * max(S - K, 0.0)

    # TT-cross parameters
    param = xfac.TensorCI2Param()
    param.bondDim = bond_dim    # maximum TT rank
    param.reltol = reltol       # relative accuracy target

    # Initial pivot: all-up path (2,2,...,2)
    # Handle both single-index and multi-index pivot conventions
    pivot = [2] * N
    p1 = param.pivot1
    if isinstance(p1, list) and len(p1) > 0 and isinstance(p1[0], list):
        param.pivot1 = [pivot]
    else:
        param.pivot1 = pivot

    # Build TT approximation of f over the full path space
    ci = xfac.TensorCI2(f, localDim, param)

    if verbose:
        print("Initial trueError:", ci.trueError())

    # Alternating TT-cross sweeps
    for k in range(max_sweeps):
        ci.iterate(1)
        if verbose:
            print(f"Iter {k+1}, trueError =", ci.trueError())
        if ci.isDone():
            if verbose:
                print("Convergence reached.")
            break

    # The option price is the sum of f over all paths,
    # which reduces to a contraction of the TT
    tt = ci.tt
    price = tt.sum1()

    return price


In [None]:
import math
import itertools

In [None]:


def price_trinomial_call_bruteforce(
    N,
    S0,
    K,
    r,
    T,
    u, m, d,
    pu, pm, pd,
):
    disc = math.exp(-r * T)
    total = 0.0
    for path in itertools.product([0, 1, 2], repeat=N):
        S = S0
        prob = 1.0
        for move in path:
            if move == 0:
                S *= d
                prob *= pd
            elif move == 1:
                S *= m
                prob *= pm
            else:
                S *= u
                prob *= pu
        payoff = max(S - K, 0.0)
        total += disc * prob * payoff
    return total


In [None]:
N = 5
S0 = 100.0
K  = 100.0
r  = 0.05
T  = 1.0

u  = 1.1
m  = 1.0
d  = 0.9
pu = 0.3
pm = 0.4
pd = 0.3

# exact = price_trinomial_call_bruteforce(N, S0, K, r, T, u, m, d, pu, pm, pd)
tt_price = price_trinomial_call_ttcross(
    N, S0, K, r, T, u, m, d, pu, pm, pd,
    bond_dim=10, reltol=1e-6, max_sweeps=10,
    verbose=True,
)

# print("Brute force price =", exact)
print("Price TT-cross    =", tt_price)


In [None]:
import time
import numpy as np
import matplotlib.pyplot as plt

In [None]:
def benchmark_ttcross_pricing_error(
    S0, K, r, T,
    u, m, d, pu, pm, pd,
    P_ref,
    N_list=(25, 30),
    D_min=30,
    D_max=100,
    n_D=10,
    n_runs=10,
    reltol=1e-6,
    max_sweeps=10,
    base_seed=1234,
):
    """
    TTcross benchmark:
    - for each N in N_list;
    - for D values between D_min and D_max;
    - for each (N, D), run n_runs independent runs;
    - measure walltime and error vs P_ref[N].
    """

    D_values = np.linspace(D_min, D_max, n_D, dtype=int)

    results = {
        N: {
            'D': D_values.copy(),
            'time_mean': [],
            'err_median': [],
            'err_std': [],
        }
        for N in N_list
    }

    for iN, N in enumerate(N_list):
        print(f"\n=== N = {N} ===")

        price_ref = P_ref[N]

        for jD, D in enumerate(D_values):
            print(f"  -> D = {D} ({jD+1}/{len(D_values)})")

            run_times = []
            errors = []

            for run in range(n_runs):
                seed = base_seed + run + 100 * jD + 10_000 * iN

                t0 = time.perf_counter()
                price = price_trinomial_call_ttcross(
                    N,
                    S0=S0, K=K, r=r, T=T,
                    u=u, m=m, d=d,
                    pu=pu, pm=pm, pd=pd,
                    bond_dim=D,
                    reltol=reltol,
                    max_sweeps=max_sweeps,
                    verbose=False,
                )
                t1 = time.perf_counter()

                run_times.append(t1 - t0)
                errors.append(price - price_ref)

            run_times = np.array(run_times, dtype=float)
            errors = np.array(errors, dtype=float)

            results[N]['time_mean'].append(run_times.mean())
            results[N]['err_median'].append(np.median(errors))
            results[N]['err_std'].append(errors.std(ddof=1))

        for key in ('time_mean', 'err_median', 'err_std'):
            results[N][key] = np.array(results[N][key], dtype=float)

    return results


def plot_ttcross_pricing_error(results):
    """
    Pricing error vs walltime plot (median Â± std).
    """

    fig, ax = plt.subplots(figsize=(8, 5))

    colors = {
        25: 'tab:blue',
        30: 'tab:orange',
        50: 'tab:green',
    }

    for N, data in results.items():
        t_mean  = data['time_mean']
        err_med = data['err_median']
        err_std = data['err_std']
        color   = colors.get(N, None)

        ax.plot(
            t_mean, err_med + err_std,
            label=f"N={N} median+std",
            color=color, linestyle='-',
            marker='o', linewidth=1.5,
        )
        ax.plot(
            t_mean, err_med - err_std,
            label=f"N={N} median-std",
            color=color, linestyle='--',
            marker='o', linewidth=1.5,
        )

    ax.axhline(0.0, color='black', linewidth=1.0)
    ax.set_xscale('log')

    ax.set_xlabel("Mean walltime per run [s]")
    ax.set_ylabel("Pricing error (TTcross - reference)")
    ax.set_title("TTcross pricing error vs walltime")

    ax.legend()
    ax.grid(True, which='both', linestyle=':', alpha=0.7)
    plt.tight_layout()
    plt.show()


if __name__ == "__main__":

    S0 = 100.0
    K  = 100.0
    r  = 0.05
    T  = 1.0

    u  = 1.1
    m  = 1.0
    d  = 0.9
    pu = 0.25
    pm = 0.50
    pd = 0.25

    P_ref = {
        25: 7.98985,
        30: 8.69942,
        50: 11.0844,
    }

    results = benchmark_ttcross_pricing_error(
        S0, K, r, T,
        u, m, d, pu, pm, pd,
        P_ref,
        N_list=(25, 30),
        D_min=20,
        D_max=50,
        n_D=10,
        n_runs=10,
        reltol=1e-8,
        max_sweeps=20,
    )

    plot_ttcross_pricing_error(results)
