### Delta calculation algo (for now start from (3))
1. On first iteration of trajectory - calculate p, q vectors using identify
2. For each trajectory try to use the p, q vectors calculated before, if converges good using these - good! else run identify again.
3. Given the correct p, q vectors and limit (i.e. theoretical limit - e.g. pi), does our limit converges to the theoretical one. how do we do that? try one step before and after (999, 1000, 1001) and compare limits. difference should be very very small (e.g. 1/2^1000 - should be tested) - this tests general convegence. To test convergence to the constant test the same with the constant.

(read about walk for a matrix)

In [2]:
from ramanujantools.cmf.pfq import pFq, Position, CMF
from ramanujantools import Limit, Matrix
import sympy as sp
from LIReC.db.access import db

In [3]:
x0, x1, y0, n = sp.symbols('x0 x1 y0 n')


# This is pi 2F1 CMF
pi = pFq(2, 1, sp.Rational(1, 2))


def calc_delta(traj, strt, constant):
    # Do walk
    traj_m = pi.trajectory_matrix(traj, strt)
    walked = traj_m.walk({n: 1}, 1000, {n: 0})
    walked = walked.inv().T
    t1_col = (walked / walked[0, 0]).col(0)
    values = t1_col[1:]

    # Identify and calculate delta
    res = db.identify([constant.evalf(300)] + values)
    """
    Use a dictionary to check each time if
    """

    if res:
        # convert result to sympy expression
        res = res[0]
        print(res)
        print(res.to_json()['coeffs'])
        res.include_isolated = 0
        res = '('.join(str(res).split('(')[:-1])
        print(res)
        simpified = sp.sympify(res)

        # estimate pi with the rational
        estimated = simpified.subs({sym: val for sym, val in zip(sp.symbols(f'c:{len(values) + 1}')[1:], values)})

        # compute delta
        error_v = sp.Abs(estimated - sp.pi.evalf(30000))
        denom = sp.denom(estimated)

        if denom == 1:
          raise ZeroDivisionError('Denominator 1 caused zero division in delta calculation')
        if denom < 1e6:
          raise Exception("Probably still rational")

        delta = -1 - sp.log(error_v) / sp.log(denom)

        return delta.evalf(10), estimated.evalf(15)
    return None, None

In [11]:
start1 = {x0: sp.Rational(9,2), x1: sp.Rational(11, 2), y0: sp.Rational(15, 2)}
start2 = {x0: sp.Rational(-9, 2), x1: sp.Rational(-3, 2), y0: sp.Rational(5, 2)}
trajectory = {x0: -4, x1: -3, y0: 2}

try:
    print(calc_delta(trajectory, start1, sp.pi))
except Exception as e:
    print(e)
try:
    print(calc_delta(trajectory, start2, sp.pi))
except Exception as e:
    print(e)

calc_delta() missing 1 required positional argument: 'constant'
calc_delta() missing 1 required positional argument: 'constant'


In [12]:
from typing import List, Tuple

def does_converge(t_mat: Matrix, p, q) -> Tuple[bool, Tuple[Limit, Limit, Limit]]:
    l1, l2, l3 = t_mat.limit({n: 1}, [950, 1000, 1050], {n: 0}, initial_values=Matrix([p, q]))
    diff1 = abs(l2.as_float() - l1.as_float())
    diff2 = abs(l3.as_float() - l2.as_float())
    if diff1 < 1e-10 and diff2 < 1e-10:
        return True, (l1, l2, l3)
    return False, (l1, l2, l3)

In [13]:
cache = list()

def calc_delta(cmf: CMF, traj: Position, start: Position, constant: sp.NumberSymbol):
    """
    Computes delta for a given trajectory, start point and constant.
    :param cmf: the CMF to compute in.
    :param traj: trajectory from the start point
    :param start: start point to compute walk from.
    :param constant: the constant to compute delta for.
    :return: the pair: delta, estimated_value.
    """
    # Do walk
    traj_m = cmf.trajectory_matrix(traj, start)
    walked = traj_m.walk({n: 1}, 1000, {n: 0})
    walked = walked.inv().T
    t1_col = (walked / walked[0, 0]).col(0)

    # Cache lookup
    p, q = None, None
    values = [item for item in t1_col]
    pi_30000 = constant.evalf(30000)
    cache_hit = False
    values_vec = Matrix(values)

    numerator, denom = None, None
    for v1, v2 in cache:
        v1 = Matrix(v1).T
        v2 = Matrix(v2).T
        numerator = v1.dot(values_vec)
        denom = v2.dot(values_vec)
        estimated = sp.Abs(sp.Rational(numerator, denom))
        err = sp.Abs(estimated - pi_30000)
        if sp.N(err, 25) < 1e-12:
            p, q = v1, v2
            cache_hit = True
            break

    # If cache miss - use LIReC
    if not cache_hit:
        res = db.identify([constant.evalf(300)] + t1_col[1:])
        if not res:
            return None, None

        coeffs = res[0].to_json()['coeffs']
        p, q = coeffs[0::2], coeffs[1::2]

    # Check convergence
    if not does_converge(traj_m, p, q)[0]:
        return None, None

    # Add to cache
    if not cache_hit:
        cache.append((tuple(p),tuple(q)))

    # estimate constant
    p = Matrix(p).T
    q = Matrix(q).T
    if not cache_hit:
        numerator = p.dot(values_vec)
        denom = q.dot(values_vec)
    estimated = sp.Abs(sp.Rational(numerator, denom))

    # check abnormal denominator and compute delta
    err = sp.Abs(estimated - pi_30000)
    denom = sp.denom(estimated)
    if denom == 1:
          raise ZeroDivisionError('Denominator 1 caused zero division in delta calculation')
    if denom < 1e6:
        raise Exception(f"Probably still rational as denominator is quite small: {denom}")

    delta = -1 - sp.log(err) / sp.log(denom)
    return delta.evalf(10), estimated.evalf(15)

In [14]:
start1 = {x0: sp.Rational(9,2), x1: sp.Rational(11, 2), y0: sp.Rational(15, 2)}
start2 = {x0: sp.Rational(-9, 2), x1: sp.Rational(-3, 2), y0: sp.Rational(5, 2)}
trajectory = {x0: -4, x1: -3, y0: 2}

try:
    print(calc_delta(pi, trajectory, start1, sp.pi))
except Exception as e:
    print(e)
try:
    print(calc_delta(pi, trajectory, start2, sp.pi))
except Exception as e:
    print(e)

(-0.1077682817, 3.14159265358979)
(-0.1063254637, 3.14159265358979)


In [20]:
pi.matrices

{x0: Matrix([
 [   1,                          x1],
 [1/x0, 1 + (x0 + x1 - 2*y0 + 2)/x0]]),
 x1: Matrix([
 [   1,                          x0],
 [1/x1, 1 + (x0 + x1 - 2*y0 + 2)/x1]]),
 y0: Matrix([
 [-y0*(x0 + x1 - y0)/((x0 - y0)*(x1 - y0)), x0*x1*y0/((x0 - y0)*(x1 - y0))],
 [                y0/((x0 - y0)*(x1 - y0)),   -y0**2/((x0 - y0)*(x1 - y0))]])}

In [7]:
sp.solve(pi.matrices[x1].det())

[{x1: y0 - 1}]

In [24]:
from sympy.matrices import Matrix
from sympy import S, Abs, log, Rational, N
import sympy as sp # Renaming to sp for consistency
# Assuming cmf, traj, strt, constant, db, and does_converge are available.
# Assuming 'n' is a defined variable or a placeholder for a node/state.
# Assuming the necessary structures (e.g., cmf) are compatible or adapted.

cache = list()

def calc_delta_sympy(cmf, traj, strt, constant):
    # Do walk
    # SymPy's Matrix needs to be used for high-precision matrix operations
    # Assuming cmf.trajectory_matrix returns a SymPy Matrix
    traj_m = cmf.trajectory_matrix(traj, strt)
    # The result of 'walk' should be a SymPy object (e.g., a dictionary or Matrix)
    walked_obj = traj_m.walk({n: 1}, 1000, {n: 0})

    # 1. Convert walked_obj to a SymPy Matrix 'walked' if it's not already
    # (Adapt this based on the actual type returned by 'walk')
    if isinstance(walked_obj, dict):
        # Example conversion if walk returns a dict {node: value}
        # This part might need custom implementation based on your 'cmf' structure
        # For simplicity, let's assume 'walked' is a Matrix
        pass # Actual conversion logic goes here

    walked = walked_obj.inv().T

    # SymPy Matrices use .col() and are indexed (0-based)
    t1_col = (walked / walked[0, 0]).col(0)

    # SymPy matrices/vectors are lists of lists, extract elements
    values = t1_col[1:] # A column vector/matrix of the values below the first element

    # 2. Extract values into a flat list of SymPy expressions/numbers
    # SymPy's Matrix elements are SymPy expressions (like S.One, Rational, etc.)
    # We'll use this flat list for dot products later
    values_sp_list = t1_col.tolist()
    values_flat = [item for sublist in values_sp_list for item in sublist]

    # Identify and calculate delta
    p, q = None, None

    # 3. Cache Check: Use SymPy's precise comparison
    # Use SymPy's N() or evalf() for high-precision comparison
    constant_30000 = constant.evalf(30000)

    for p_tuple, q_tuple in cache:
        # Convert tuples back to SymPy vectors/lists for dot product
        p_sp = Matrix(p_tuple).T # Row vector
        q_sp = Matrix(q_tuple).T # Row vector
        values_sp_matrix = Matrix(values_flat) # Column vector

        # Use SymPy's dot product for precise multiplication
        numerator_check = p_sp.dot(values_sp_matrix)
        denominator_check = q_sp.dot(values_sp_matrix)

        # estimated is a SymPy expression (Rational or a number)
        estimated_check = Abs(Rational(numerator_check, denominator_check))

        # Calculate error with high precision
        err_check = Abs(estimated_check - constant_30000)

        # SymPy's N() is like evalf(), but often cleaner for comparison
        if N(err_check, 25) < 1e-20: # Comparing with a high-precision N()
            p, q = p_tuple, q_tuple
            break

    if not p or not q:
        # db.identify should return a SymPy-compatible structure or list of numbers
        # The list passed to identify should contain SymPy numbers
        res = db.identify([constant.evalf(300)] + values_flat)
        if not res:
            return None, None

        # 4. Extracting coefficients from result
        # Assuming res[0].to_json()['coeffs'] returns a list of SymPy-compatible numbers
        coeffs = res[0].to_json()['coeffs']

        # Split the list of coefficients
        mid = len(coeffs) // 2
        p_list, q_list = coeffs[:mid], coeffs[mid:]

        # Convert to tuple for cache storage
        p, q = tuple(p_list), tuple(q_list)
        cache.append((p, q))

    # Convert the p and q tuples (which hold SymPy numbers) back to SymPy Matrices
    p_sp = Matrix(p).T
    q_sp = Matrix(q).T
    values_sp_matrix = Matrix(values_flat)

    # Use SymPy's `does_converge` which must also handle SymPy objects
    if not does_converge(traj_m, p, q)[0]:
        return None, None

    # Final delta calculation
    # Use SymPy's dot product for precise numerator and denominator
    numerator_final = p_sp.dot(values_sp_matrix)
    # print(f'numerator={numerator_final}')
    denom_final = q_sp.dot(values_sp_matrix)

    # Estimated is a SymPy expression
    estimated_sym = Abs(Rational(numerator_final, denom_final))

    # Calculate error with high precision
    err_sym = Abs(estimated_sym - constant_30000)

    # Find the denominator of the SymPy estimated value
    # We need to evaluate the SymPy expression to check its rational components
    # Use sp.simplify to ensure it is in its most reduced form for denom()
    denom = sp.denom(sp.simplify(estimated_sym))

    if denom == 1:
          raise ZeroDivisionError('Denominator 1 caused zero division in delta calculation')

    # Convert denominator to a float for comparison against 1e6
    if N(denom) < 1e6:
        raise Exception(f"Probably still rational: denom={denom}")

    # SymPy's log is the natural logarithm by default
    delta_sym = -S.One - log(err_sym) / log(denom)

    # Return the result evaluated to 10 and 15 digits of precision
    return delta_sym.evalf(10), estimated_sym.evalf(15)

In [25]:
start1 = {x0: sp.Rational(9,2), x1: sp.Rational(11, 2), y0: sp.Rational(15, 2)}
start2 = {x0: sp.Rational(-9, 2), x1: sp.Rational(-3, 2), y0: sp.Rational(5, 2)}
trajectory = {x0: -4, x1: -3, y0: 2}

# try:
print(calc_delta_sympy(pi, trajectory, start1, sp.pi))
# except Exception as e:
#     print(e)
# try:
print(calc_delta_sympy(pi, trajectory, start2, sp.pi))
# except Exception as e:
#     print(e)

ShapeError: Matrix size mismatch: (1, 1) * (2, 2).