In [1]:
from quant_rotor.core.dense.hamiltonian_big import hamiltonian_general_dense, hamiltonian_big_dense
from quant_rotor.core.sparse.hamiltonian_big import hamiltonian_general_sparse, hamiltonian_big_sparse
from quant_rotor.core.dense.hamiltonian import hamiltonian_dense
from quant_rotor.core.sparse.hamiltonian import hamiltonian_sparse
from quant_rotor.models.sparse.support_ham import build_V_in_p, m_to_p, vector_in_p
from quant_rotor.models.dense.support_ham import write_matrix_elements, basis_m_to_p_matrix_conversion
import quant_rotor.models.dense.support_ham as dn
import opt_einsum as oe
import numpy as np
import thermofield_boltz_funcs as bz
import scipy.sparse as sp
import traceback
import time
from datetime import datetime
import itertools

Stress testing the hamiltonian generator.

In [2]:
# # ---------- CONFIG ----------
# max_state = 11   # must be odd; try increasing this cautiously
# max_site = 61     # must be odd; try up to your system's memory
# g = 1            # coupling strength

# log_file = "hamiltonian_stress_test_log.txt"

# # ---------- Logging ----------
# def log(msg):
#     timestamp = datetime.now().strftime("[%Y-%m-%d %H:%M:%S]")
#     with open(log_file, "a") as f:
#         f.write(f"{timestamp} {msg}\n")

# # ---------- Header ----------
# log("\n===================== NEW STRESS TEST =====================")
# log(f"Testing odd state/site values up to state={max_state}, site={max_site}\n")

# last_success = None
# start_time = time.time()

# try:
#     for site in range(3, max_site + 1, 2):  # only odd sites
#         for state in range(3, max_state + 1, 2):  # only odd states
#             try:
#                 log(f"Trying state={state}, site={site}...")
#                 t0 = time.time()
#                 H, _, _ = hamiltonian_sparse(state, site, g)
#                 t1 = time.time()
#                 dim = H.shape[0]
#                 nnz = H.nnz if sp.issparse(H) else np.count_nonzero(H)
#                 log(f"✓ Success: dim={dim}, nnz={nnz}, time={t1 - t0:.2f}s")
#                 last_success = (state, site, dim, nnz)
#             except MemoryError:
#                 log(f"🧨 MemoryError at state={state}, site={site}. Halting test.")
#                 raise
#             except Exception as e:
#                 log(f"⚠️ Exception at state={state}, site={site}")
#                 log(traceback.format_exc())
#                 raise
# except Exception:
#     pass

# end_time = time.time()
# log(f"\n⏱️ Test completed in {end_time - start_time:.2f} seconds.")

# if last_success:
#     s, L, d, n = last_success
#     log(f"\n✅ Last successful configuration:")
#     log(f"  state = {s}")
#     log(f"  site  = {L}")
#     log(f"  dim   = {d}")
#     log(f"  nnz   = {n}")
# else:
#     log("\n❌ No successful Hamiltonians generated.")

# Testing the hamiltonian big and general by comparing sparse to dense.

In [3]:
from scipy.sparse.linalg import norm

def sparse_allclose(A, B, atol=1e-8, rtol=1e-5):
    if A.shape != B.shape:
        return False
    diff = A - B
    return norm(diff, ord='fro') <= atol + rtol * max(norm(A, ord='fro'), norm(B, ord='fro'))

In [4]:
def compare_ground_states(H_dense, H_sparse, atol=1e-6):
    from scipy.sparse.linalg import eigsh
    eigval_d, eigvec_d = np.linalg.eigh(H_dense)
    eigval_s, eigvec_s = eigsh(H_sparse, k=1, which='SA')
    
    vec_d = eigvec_d[:, 0]
    vec_s = eigvec_s[:, 0]

    vec_d /= np.linalg.norm(vec_d)
    vec_s /= np.linalg.norm(vec_s)
    
    overlap = np.vdot(vec_d, vec_s)
    fidelity = np.abs(overlap)**2
    energy_match = np.abs(eigval_d[0] - eigval_s[0]) < atol

    return fidelity, energy_match

In [5]:
state = 11
site = 3
g = 0.1

H_K_V_s = hamiltonian_sparse(state, site, g)
H_K_V_d = hamiltonian_dense(state, site, g)

print(sparse_allclose(sp.csr_matrix(H_K_V_d[0]), H_K_V_s[0]))
print(sparse_allclose(sp.csr_matrix(H_K_V_d[1]), H_K_V_s[1]))
print(sparse_allclose(sp.csr_matrix(H_K_V_d[2]), H_K_V_s[2]))

True
True
True


In [None]:
state = 5
site = 5

H_K_V_sp = hamiltonian_big_sparse(state, site, g, H_K_V_s)

H_K_V_de = hamiltonian_big_dense(state, site, g, H_K_V_d)

print(sparse_allclose(sp.csr_matrix(H_K_V_de[0]), H_K_V_sp[0]))
print(sparse_allclose(sp.csr_matrix(H_K_V_de[1]), H_K_V_sp[1]))
print(sparse_allclose(sp.csr_matrix(H_K_V_de[2]), H_K_V_sp[2]))
print(np.allclose(H_K_V_de[3], H_K_V_sp[3], atol=1e-7))

This gives us the idea wether the eigenvalues and eigenvalues match for the ground states bewteen the sparse and dense.

In [None]:
print(compare_ground_states(H_K_V_de[0], H_K_V_sp[0]))

## Testing the degenaret eigenvectors.

The sparse smallest eigenvalue diagonalization does not give consistent eigenvalues but eigenvectors in the same degenerate space as dense.

In [None]:
def check_eigenvec(H_dense: np.ndarray, H_sparse: sp.csr_matrix):

    eigval_dense, eigvecs_dense = np.linalg.eigh(H_dense)
    eigval_sparse, eigvecs_sparse = sp.linalg.eigsh(H_sparse, k=1, which='SA')

    psi_sparse = eigvecs_sparse[:, 0]

    # Identify degeneracy threshold
    ground_energy = eigval_dense[0]
    degeneracy_tol = 1e-10  # you can adjust this
    degenerate_indices = np.where(np.abs(eigval_dense - ground_energy) < degeneracy_tol)[0]

    # Extract dense ground subspace
    ground_subspace = eigvecs_dense[:, degenerate_indices]  # shape: (dim, d)

    # Project sparse vector into dense ground subspace
    projection_coeffs = ground_subspace.conj().T @ psi_sparse  # shape: (d,)
    psi_projected = ground_subspace @ projection_coeffs        # back to shape (dim,)

    # Compute fidelity between original and projected
    fidelity_to_subspace = np.linalg.norm(projection_coeffs)**2
    print("Fidelity to dense ground state subspace:", fidelity_to_subspace)

##

In [None]:
state = 5
site = 5

H_K_V_sparse = hamiltonian_general_sparse(state, site, g)
H_K_V_dense = hamiltonian_general_dense(state, site, g)

print(sparse_allclose(sp.csr_matrix(H_K_V_dense[0]), H_K_V_sparse[0]))
print(sparse_allclose(sp.csr_matrix(H_K_V_dense[1]), H_K_V_sparse[1]))
print(sparse_allclose(sp.csr_matrix(H_K_V_dense[2]), H_K_V_sparse[2]))

False
True
False


In [None]:
check_eigenvec(H_K_V_de[0], H_K_V_sp[0])
check_eigenvec(H_K_V_dense[0], H_K_V_sparse[0])

Fidelity to dense ground state subspace: 0.8508381599953562
Fidelity to dense ground state subspace: 0.9568448395709543


In [None]:
state = 7
site = 5
g = 0.1

In [None]:
H_K_V_sparse = hamiltonian_general_sparse(state, site, g)

In [None]:
H_K_V_s = hamiltonian_sparse(state, site, g)

In [None]:
H_K_V_dense = hamiltonian_general_dense(state, site, g)

In [None]:
H_K_V_d = hamiltonian_dense(state, site, g)

Stress testing hamiltonian_genreal sparse.

In [None]:
# ---------- CONFIG ----------
max_state = 11   # must be odd; try increasing this cautiously
max_site = 61     # must be odd; try up to your system's memory
g = 1            # coupling strength

log_file = "hamiltonian_general_stress_test_log.txt"

# ---------- Logging ----------
def log(msg):
    timestamp = datetime.now().strftime("[%Y-%m-%d %H:%M:%S]")
    with open(log_file, "a") as f:
        f.write(f"{timestamp} {msg}\n")

# ---------- Header ----------
log("\n===================== NEW STRESS TEST =====================")
log(f"Testing odd state/site values up to state={max_state}, site={max_site}\n")

last_success = None
start_time = time.time()

try:
    for site in range(3, max_site + 1, 2):  # only odd sites
        state = 11
        try:
            log(f"Trying state={state}, site={site}...")
            t0 = time.time()
            H, _, _ = hamiltonian_general_sparse(state, site, g)
            t1 = time.time()
            dim = H.shape[0]
            nnz = H.nnz if sp.issparse(H) else np.count_nonzero(H)
            log(f"✓ Success: dim={dim}, nnz={nnz}, time={t1 - t0:.2f}s")
            last_success = (state, site, dim, nnz)
        except MemoryError:
            log(f"🧨 MemoryError at state={state}, site={site}. Halting test.")
            raise
        except Exception as e:
            log(f"⚠️ Exception at state={state}, site={site}")
            log(traceback.format_exc())
            raise
except Exception:
    pass

end_time = time.time()
log(f"\n⏱️ Test completed in {end_time - start_time:.2f} seconds.")

if last_success:
    s, L, d, n = last_success
    log(f"\n✅ Last successful configuration:")
    log(f"  state = {s}")
    log(f"  site  = {L}")
    log(f"  dim   = {d}")
    log(f"  nnz   = {n}")
else:
    log("\n❌ No successful Hamiltonians generated.")

# Creating direct sparse kinetic and potential hamiltonian.

In [None]:
state = 3

K, V = build_V_in_p(state)

In [None]:
def grouped_pairs_permutations(n: int) -> np.ndarray:
    """
    Produce all sequences of length 4 built from adjacent pairs (i, i+1),
    allowing repeats, and including internal pair swaps.
    Each output row is (u1, u2, l1, l2).
    """

    pairs = [(i, i+1) for i in range(n-1)]
    seqs = set()

    # choose two pairs with replacement (order matters): (Prow, Pcol)
    for prow, pcol in itertools.product(pairs, repeat=2):
        
        # internal permutations of each pair
        seqs.add((prow[0], prow[1], pcol[0], pcol[1], 0.75))

        seqs.add((prow[1], prow[0], pcol[1], pcol[0], 0.75))

        seqs.add((prow[0], prow[1], pcol[1], pcol[0], -0.25))

        seqs.add((prow[1], prow[0], pcol[0], pcol[1], -0.25))

    return sorted(seqs)

In [None]:
seqs = grouped_pairs_permutations(n=3)  # array of shape (N, 4)
# Each row is (u1, u2, l1, l2)

In [None]:
seqs[0]

(0, 1, 0, 1, 0.75)

In [None]:
_, V_old = write_matrix_elements((state-1)//2)

0, 1, 0, 1 --> 0.75
0, 1, 1, 0 --> -0.25
0, 1, 1, 2 --> 0.75
0, 1, 2, 1 --> -0.25
1, 0, 0, 1 --> -0.25
1, 2, 0, 1 --> 0.75
1, 0, 1, 0 --> 0.75
1, 0, 1, 2 --> -0.25
1, 2, 1, 0 --> -0.25
1, 2, 1, 2 --> 0.75
1, 0, 2, 1 --> 0.75
1, 2, 2, 1 --> -0.25
2, 1, 0, 1 --> -0.25
2, 1, 1, 0 --> 0.75
2, 1, 1, 2 --> -0.25
2, 1, 2, 1 --> 0.75


In [None]:
for i in seqs:
    print(V_old.reshape(state, state, state, state)[i[0], i[2], i[1], i[3]],"->", i[4])

0.75 -> 0.75
-0.25 -> -0.25
0.75 -> 0.75
-0.25 -> -0.25
-0.25 -> -0.25
0.75 -> 0.75
-0.25 -> -0.25
0.75 -> 0.75
0.75 -> 0.75
-0.25 -> -0.25
0.75 -> 0.75
-0.25 -> -0.25
-0.25 -> -0.25
0.75 -> 0.75
-0.25 -> -0.25
0.75 -> 0.75


In [None]:
def build_V_in_p_new(state: int) -> tuple[sp.csr_matrix, sp.csr_matrix]:
    """
    Constructs:
    - K: a diagonal kinetic energy operator in the 'p' basis
    - V: a sparse potential energy operator directly in the 'p' basis

    Avoids constructing in the m basis and transforming later.
    """

    dim_V = state * state

    # Construct a diagonal of Kinetic energy matrix in m basis.
    m_vals = np.arange(-(state - 1) // 2, (state - 1) // 2 + 1)

    # Construct index map m -> p.
    perm = np.vectorize(m_to_p)(m_vals)  # Maps m-index → p-index

    data_V = []
    rows = []
    cols = []

    def index(p1: int, p2: int) -> int:
        return p1 * state + p2
    
    indecies = grouped_pairs_permutations(n=state)

    # Loop over (m1, m2) — this preserves physics
    for indx in indecies:
            p1 = perm[indx[0]]
            p2 = perm[indx[2]]
            p1p = perm[indx[1]]
            p2p = perm[indx[3]]

            i = index(p1, p2)
            j = index(p1p, p2p)
            
            rows.append(i)
            cols.append(j)
            data_V.append(indx[4])

    V = sp.csr_matrix((data_V, (rows, cols)), shape=(dim_V, dim_V), dtype=np.float64)

    # Diagonal kinetic operator in p basis
    p = vector_in_p(state)
    K = sp.diags(p**2, offsets=0, format='csr')

    return K, V*2

In [None]:
K_new, V_new = build_V_in_p_new(state)

In [None]:
for st in range(3, 111, 2):
    K_new, V_new = build_V_in_p_new(st)
    K, V = build_V_in_p(st)
    print(np.array_equal(K_new.toarray(), K.toarray()))
    print(np.array_equal(V_new.toarray(), V.toarray()))
    

True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True


In [None]:
st = 10001

In [None]:
K, V = build_V_in_p(st)

# Hamiltonian thermofield sparse.

In [None]:
state = 3

In [None]:
K, V = build_V_in_p(state)
K_old, V = K.toarray(), V.toarray()

U, _ = bz.thermofield_change_of_basis(K_old)

K_tilda_old = bz.H_tilde_maker(K_old)

V_tensor = V.reshape(state, state, state, state)

I = np.eye(state)
V_prim = oe.contract('pqrs,mw,nv->pmqnrwsv', V_tensor, I, I,  optimize='optimal')

V_grouped = V_prim.reshape(state**2, state**2, state**2, state**2)

V_tilda_old = oe.contract('Mi,Wj,ijab,aN,bV->MWNV', U, U, V_grouped, U, U, optimize='optimal')

In [None]:
U.shape

(9, 9)

In [None]:
def build_V_prime_in_p(state: int) -> tuple[sp.csr_matrix, sp.csr_matrix]:
    """
    Constructs:
    - K: a diagonal kinetic energy operator in the 'p' basis
    - V: a sparse potential energy operator directly in the 'p' basis

    Avoids constructing in the m basis and transforming later.
    """

    dim_V = state **4

    # Construct a diagonal of Kinetic energy matrix in m basis.
    m_vals = np.arange(-(state - 1) // 2, (state - 1) // 2 + 1)

    # Construct index map m -> p.
    perm = np.vectorize(m_to_p)(m_vals)  # Maps m-index → p-index

    data_V = []
    rows = []
    cols = []

    def index(p1: int, p2: int) -> int:
        return p1 * state**3 + p2 * state

    # Loop over (m1, m2) — this preserves physics
    for m1 in range(state):
        for m2 in range(state):
            p1 = perm[m1]
            p2 = perm[m2]
            i = index(p1, p2)
            for dm1, dm2, coef in [
                (1, 1, 1.5),
                (-1, -1, 1.5),
                (1, -1, -0.5),
                (-1, 1, -0.5),]:

                m1p = m1 + dm1
                m2p = m2 + dm2
                if 0 <= m1p < state and 0 <= m2p < state:
                    p1p = perm[m1p]
                    p2p = perm[m2p]
                    j = index(p1p, p2p)

                    for shift_state in range(0, state):
                        for shift_index in range(0, state):
                            rows.append(i + shift_index + state**2 * shift_state)
                            cols.append(j + shift_index + state**2 * shift_state)
                            data_V.append(coef)

    V = sp.csr_matrix((data_V, (rows, cols)), shape=(dim_V, dim_V), dtype=np.float64)

    # Diagonal kinetic operator in p basis
    p = vector_in_p(state)
    p_prime = np.array([[i]*state for i in p]).flatten()
    K = sp.diags(p_prime**2, offsets=0, format='csr')

    return K, V

In [None]:
K, V = build_V_prime_in_p(state)
I = np.eye(state)

U, _ = bz.thermofield_change_of_basis(I)

U_sparse = sp.csr_matrix(U)

K_tilda = U_sparse.T @ K @ U_sparse

V_tilda = oe.contract('Mi,Wj,ijab,aN,bV->MWNV', U, U, V.toarray().reshape(state**2, state**2, state**2, state**2), U, U, optimize='optimal')

In [None]:
np.array_equal(K_tilda_old, K_tilda.toarray())

True

In [None]:
np.array_equal(V_prim.reshape(state**4, state**4), V.toarray())

True

In [None]:
np.array_equal(V_tilda_old, V_tilda)

True

In [None]:
# Small state size so we can see numbers
S = 2

# Random U and V4
np.random.seed(0)
U = np.random.randn(S, S)     # (i->M)
V4 = np.random.randn(S, S, S, S)  # (i, j, a, b)

# ---- Method 1: einsum ----
V4_tilda = np.einsum('Mi,Wj,ijab,aN,bV->MWNV', U, U, V4, U, U)

# ---- Method 2: kron sandwich ----
# Flatten V4 to (ij, ab) in C-order: i*S + j, a*S + b
V_flat = V4.reshape(S*S, S*S, order='C')

UU = np.kron(U, U)     # (S*S, S*S)
V_flat_tilda = UU @ V_flat @ UU.T
V4_tilda_from_kron = V_flat_tilda#.reshape(S, S, S, S, order='C')

# ---- Compare ----
# print("Are they equal?", np.allclose(V4_tilda, V4_tilda_from_kron))

# Show both
print("\nEinsum result:\n", V4_tilda)
print("\nKronecker result:\n", V4_tilda_from_kron)


Einsum result:
 [[[[22.15035832 -3.29218103]
   [18.50356323  4.97777493]]

  [[22.76305421 14.0373807 ]
   [24.24031076 27.2889174 ]]]


 [[[28.58531808  3.80767682]
   [21.4779879   7.77583267]]

  [[38.04571936  4.77211149]
   [21.04993689  1.81086932]]]]

Kronecker result:
 [[20.88998566  2.50917078 24.30491504 11.11354333]
 [16.37716441 12.2923606  22.49529066 34.03695821]
 [25.34377535 10.37853398 28.04884506 17.02979526]
 [35.42055723 16.39264346 32.67046885 14.2289836 ]]


In [None]:
UU

array([[3.11188068, 0.70589826, 0.70589826, 0.16012579],
       [1.72654504, 3.95305291, 0.39164906, 0.89670957],
       [1.72654504, 0.39164906, 3.95305291, 0.89670957],
       [0.95792804, 2.19324729, 2.19324729, 5.02160233]])

In [None]:
U

array([[1.76405235, 0.40015721],
       [0.97873798, 2.2408932 ]])

In [None]:
2.2408932*1.76405235

3.95305291555902