In [1]:
# rbm_train_4q_test.py
# Minimal, dependency-free (NumPy-only) test of training an RBM on bitstrings
# sampled from a 4-qubit quantum circuit.

import numpy as np

# ---------- Utilities ----------
def kron(*ops):
    out = np.array([[1.0+0j]])
    for op in ops:
        out = np.kron(out, op)
    return out

def normalize(psi):
    return psi / np.linalg.norm(psi)

def sample_bitstrings_from_state(psi, n_samples, rng):
    # psi is a 16-dim statevector for 4 qubits (|0000>..|1111> order)
    probs = np.abs(psi)**2
    idxs = rng.choice(len(probs), size=n_samples, p=probs)
    # Convert indices to 4-bit strings (0/1)
    bits = ((idxs[:, None] >> np.arange(4)[::-1]) & 1).astype(np.int64)
    return bits

def one_hot_counts(bits, n_qubits=4):
    idx = np.packbits(bits, axis=1, bitorder='big')  # 4 bits -> 1 byte
    # But packbits returns uint8 with all 8 bits; compute integer index manually:
    idx = (bits[:,0]<<3) + (bits[:,1]<<2) + (bits[:,2]<<1) + (bits[:,3]<<0)
    counts = np.bincount(idx, minlength=2**n_qubits)
    return counts

def total_variation_distance(p, q):
    return 0.5 * np.sum(np.abs(p - q))

# ---------- Simple 4-qubit circuit (statevector) ----------
def four_qubit_state(rng=None):
    # Gates
    I = np.eye(2, dtype=complex)
    X = np.array([[0,1],[1,0]], dtype=complex)
    H = (1/np.sqrt(2))*np.array([[1,1],[1,-1]], dtype=complex)
    Rz = lambda theta: np.array([[np.exp(-1j*theta/2),0],[0,np.exp(1j*theta/2)]], dtype=complex)
    Rx = lambda theta: np.array([[np.cos(theta/2), -1j*np.sin(theta/2)],
                                 [-1j*np.sin(theta/2), np.cos(theta/2)]], dtype=complex)
    # CNOT on control c, target t; 4-qubit space
    def CNOT(c, t):
        # Build 16x16 operator by projecting on control
        P0 = np.array([[1,0],[0,0]], dtype=complex)
        P1 = np.array([[0,0],[0,1]], dtype=complex)
        ops0 = [I,I,I,I]; ops1 = [I,I,I,I]
        ops0[c] = P0; ops1[c] = P1
        U0 = kron(*ops0)
        # X on target when control is 1
        opsx = [I,I,I,I]
        opsx[t] = X
        X_t = kron(*opsx)
        return U0 + kron(*[P1 if i==c else I for i in range(4)]) @ X_t

    # Start in |0000>
    psi = np.zeros(16, dtype=complex); psi[0] = 1.0

    # Layer 1: H on all qubits
    U_H_all = kron(H,H,H,H)
    psi = U_H_all @ psi

    # Layer 2: random single-qubit rotations (fixed seed for reproducibility)
    if rng is None: rng = np.random.default_rng(7)
    thetas_rz = rng.uniform(-0.9, 0.9, size=4)
    thetas_rx = rng.uniform(-0.9, 0.9, size=4)
    U1 = kron(Rz(thetas_rz[0])@Rx(thetas_rx[0]),
              Rz(thetas_rz[1])@Rx(thetas_rx[1]),
              Rz(thetas_rz[2])@Rx(thetas_rx[2]),
              Rz(thetas_rz[3])@Rx(thetas_rx[3]))
    psi = U1 @ psi

    # Layer 3: entanglers (CNOT 0->1 and 2->3)
    psi = CNOT(0,1) @ psi
    psi = CNOT(2,3) @ psi

    return normalize(psi)

# ---------- RBM (0/1 units) ----------
class RBM01:
    """
    RBM with 0/1 visible and hidden units.
    Energy:  E(v,h) = -a^T v - b^T h - v^T W h
    p(v,h) ∝ exp(-E). Conditionals:
      p(h=1|v) = sigmoid(b + W^T v)
      p(v=1|h) = sigmoid(a + W h)
    """
    def __init__(self, n_visible, n_hidden, rng=None, scale=0.01):
        self.nv = n_visible
        self.nh = n_hidden
        self.rng = np.random.default_rng() if rng is None else rng
        self.a = scale*self.rng.standard_normal(self.nv)
        self.b = scale*self.rng.standard_normal(self.nh)
        self.W = scale*self.rng.standard_normal((self.nv, self.nh))

    @staticmethod
    def sigmoid(x):
        return 1.0/(1.0 + np.exp(-x))

    def p_h_given_v(self, v):
        # v: (B, nv) or (nv,)
        x = self.b + v @ self.W
        return self.sigmoid(x)

    def p_v_given_h(self, h):
        x = self.a + h @ self.W.T
        return self.sigmoid(x)

    def sample_h(self, v):
        ph = self.p_h_given_v(v)
        return (self.rng.random(ph.shape) < ph).astype(np.float64), ph

    def sample_v(self, h):
        pv = self.p_v_given_h(h)
        return (self.rng.random(pv.shape) < pv).astype(np.float64), pv

    def cd_k(self, v0, k=1):
        v = v0.copy()
        h, ph = self.sample_h(v)
        for _ in range(k):
            v, pv = self.sample_v(h)
            h, ph = self.sample_h(v)
        return v, h  # vk, hk

    def train(self, data, epochs=2000, batch_size=128, lr=0.05, k=1, verbose_every=200):
        n = data.shape[0]
        for ep in range(1, epochs+1):
            # mini-batch SGD
            perm = self.rng.permutation(n)
            for i in range(0, n, batch_size):
                batch = data[perm[i:i+batch_size]]
                # Positive phase
                ph_pos = self.p_h_given_v(batch)
                # Negative phase via CD-k
                v_k, h_k = self.cd_k(batch, k=k)
                ph_neg = self.p_h_given_v(v_k)

                # Gradients (expected sufficient statistics diff)
                dW = (batch.T @ ph_pos - v_k.T @ ph_neg) / batch.shape[0]
                da = (batch - v_k).mean(axis=0)
                db = (ph_pos - ph_neg).mean(axis=0)

                # Update
                self.W += lr * dW
                self.a += lr * da
                self.b += lr * db

            if verbose_every and ep % verbose_every == 0:
                pll = self.pseudo_log_likelihood(data[:256])
                print(f"Epoch {ep:4d}  PLL≈ {pll:.4f}")

    def free_energy(self, v):
        # F(v) = -a^T v - sum_j log(1 + exp(b_j + W_j^T v))
        lin = self.a @ v.T  # shape: (B,)
        t = self.b + v @ self.W
        logsum = np.sum(np.log1p(np.exp(t)), axis=1)
        return -lin - logsum

    def pseudo_log_likelihood(self, data):
        # Standard RBM diagnostic: average PLL via single bit flip
        B = min(256, data.shape[0])
        idx = self.rng.integers(0, self.nv, size=B)
        v = data[:B].copy()
        fe_v = self.free_energy(v)
        # flip selected bit
        v_flipped = v.copy()
        rows = np.arange(B)
        v_flipped[rows, idx] = 1.0 - v_flipped[rows, idx]
        fe_vf = self.free_energy(v_flipped)
        pll = self.nv * np.mean(np.log(self.sigmoid(fe_vf - fe_v)))
        return pll

    def sample_model(self, n_samples=10000, k=20):
        # Start from random visibles; run k-step block Gibbs; return samples
        v = (self.rng.random((n_samples, self.nv)) < 0.5).astype(np.float64)
        for _ in range(k):
            h, _ = self.sample_h(v)
            v, _ = self.sample_v(h)
        return v.astype(int)

# ---------- Main test ----------
if __name__ == "__main__":
    rng = np.random.default_rng(123)

    # 1) Build 4-qubit state and sample training data
    psi = four_qubit_state(rng)
    n_train = 20000
    data_bits = sample_bitstrings_from_state(psi, n_train, rng)  # shape (n,4), 0/1
    print("Empirical distribution from circuit (first 10 rows):")
    print(data_bits[:10])

    # True distribution (for evaluation)
    true_probs = np.abs(psi)**2  # length 16

    # 2) Initialize and train RBM
    rbm = RBM01(n_visible=4, n_hidden=8, rng=rng)
    rbm.train(data_bits, epochs=1500, batch_size=128, lr=0.05, k=1, verbose_every=150)

    # 3) Evaluate: sample from RBM model and compare to true probs
    rbm_samples = rbm.sample_model(n_samples=200000, k=25)
    model_counts = one_hot_counts(rbm_samples, n_qubits=4).astype(np.float64)
    model_probs = model_counts / model_counts.sum()

    tvd = total_variation_distance(true_probs, model_probs)
    kl = np.sum(np.where(model_probs > 0, true_probs * (np.log(true_probs + 1e-12) - np.log(model_probs + 1e-12)), 0.0))
    print("\n--- Evaluation ---")
    print("True probs (first 8):  ", np.round(true_probs[:8], 4))
    print("Model probs (first 8): ", np.round(model_probs[:8], 4))
    print(f"TVD(true, model) = {tvd:.4f}")
    print(f"KL(true || model) = {kl:.4f}")

Empirical distribution from circuit (first 10 rows):
[[1 1 0 1]
 [1 1 1 0]
 [1 0 0 0]
 [0 0 1 1]
 [1 1 0 1]
 [0 0 1 1]
 [1 0 1 1]
 [1 0 1 0]
 [1 1 1 0]
 [0 0 1 1]]
Epoch  150  PLL≈ -2.7761
Epoch  300  PLL≈ -2.7625
Epoch  450  PLL≈ -2.7732
Epoch  600  PLL≈ -2.7745
Epoch  750  PLL≈ -2.7790
Epoch  900  PLL≈ -2.7716
Epoch 1050  PLL≈ -2.7687
Epoch 1200  PLL≈ -2.7794
Epoch 1350  PLL≈ -2.7743
Epoch 1500  PLL≈ -2.7753

--- Evaluation ---
True probs (first 8):   [0.0625 0.0625 0.0625 0.0625 0.0625 0.0625 0.0625 0.0625]
Model probs (first 8):  [0.0597 0.061  0.062  0.0619 0.0605 0.0609 0.0662 0.0626]
TVD(true, model) = 0.0111
KL(true || model) = 0.0004


In [1]:
1+1j

(1+1j)

In [14]:
# rbm_observable_estimator_test.py
# Estimate <O> for O = sum_k c_k P_k using a trained complex RBM psi_lambda.
# Demonstrates MC local-estimator + exact enumeration cross-check (small N).

import numpy as np

# ----------------------------
# Complex RBM wavefunction
# ----------------------------
class ComplexRBM:
    """
    RBM with spins sigma_i in {-1, +1}.
    psi(sigma) = exp(sum_i a_i sigma_i) * prod_j 2 cosh( b_j + sum_i W_ij sigma_i )
    where a, b, W are complex.
    """
    def __init__(self, n_visible, n_hidden, a=None, b=None, W=None, rng=None):
        self.N = n_visible
        self.M = n_hidden
        self.rng = np.random.default_rng() if rng is None else rng

        # If not supplied, set arbitrary (random) complex params for demo
        if a is None: a = 0.05*(self.rng.standard_normal(self.N) + 1j*self.rng.standard_normal(self.N))
        if b is None: b = 0.05*(self.rng.standard_normal(self.M) + 1j*self.rng.standard_normal(self.M))
        if W is None: W = 0.05*(self.rng.standard_normal((self.N,self.M)) + 1j*self.rng.standard_normal((self.N,self.M)))

        self.a = a.astype(np.complex128)
        self.b = b.astype(np.complex128)
        self.W = W.astype(np.complex128)

    def theta(self, sigma):
        # theta_j = b_j + sum_i W_ij sigma_i ; shape (M,)
        return self.b + (sigma @ self.W)

    def logpsi(self, sigma):
        # sigma: shape (N,) with entries in {-1, +1}
        th = self.theta(sigma)  # (M,)
        # log psi = sum_i a_i sigma_i + sum_j log(2 cosh(theta_j))
        # Use np.cosh for complex; add small epsilon to avoid branch issues if desired
        return np.sum(self.a * sigma) + np.sum(np.log(2.0*np.cosh(th)))

    def psi(self, sigma):
        return np.exp(self.logpsi(sigma))

    # ---- Metropolis sampling from |psi|^2 ----
    def metropolis_samples(self, n_samples=10000, burn_in=1000, thin=10):
        sigma = self.random_spin_config()  # start
        logpsi = self.logpsi(sigma)
        # cache theta for O(M) updates
        th = self.theta(sigma)

        samples = []
        total_steps = burn_in + n_samples*thin
        for step in range(total_steps):
            i = self.rng.integers(self.N)           # propose flip at i
            sigma_new = sigma.copy()
            sigma_new[i] *= -1

            # Efficient theta' update: theta'_j = theta_j + W_ij (sigma'_i - sigma_i) = theta_j - 2 W_ij sigma_i
            th_new = th - 2.0*self.W[i,:]*sigma[i]

            # log |psi|^2 = 2 * Re(logpsi)
            logpsi2 = 2.0*np.real(logpsi)
            logpsi_new = (np.sum(self.a * sigma_new) + np.sum(np.log(2.0*np.cosh(th_new))))
            logpsi2_new = 2.0*np.real(logpsi_new)

            # Accept/reject
            logR = logpsi2_new - logpsi2
            if logR >= 0 or np.log(self.rng.random()) < logR:
                sigma = sigma_new
                logpsi = logpsi_new
                th = th_new

            # collect
            if step >= burn_in and ((step - burn_in) % thin == 0):
                samples.append(sigma.copy())

        return np.array(samples, dtype=int)

    def random_spin_config(self):
        return 2*self.rng.integers(0,2,size=self.N)-1  # {-1,+1}

# -------------------------------------------
# Pauli words: apply in Z-basis & phase rules
# -------------------------------------------
# Represent a Pauli word as a list of ('I'/'X'/'Y'/'Z') of length N.
# Local estimator for a single sample sigma uses:
#   <sigma | P | psi> / <sigma | psi> = phase * psi(sigma_flipped)/psi(sigma)
# where sigma_flipped flips bits where X or Y act; phase accumulates Z and Y phases.

def apply_pauli_on_config(sigma, pauli_word):
    """
    Returns (sigma_prime, phase) such that
    <sigma | P | psi> = phase * psi(sigma_prime)
    in the computational Z basis with sigma_i in {-1, +1} mapping to |0>, |1> by sigma_i=+1->|0|, -1->|1|.
    Phase convention:
      Z on |0>,|1> gives +1,-1 respectively => factor sigma_i
      X flips the bit (no phase)
      Y flips the bit and contributes phase i*(sign) where sign = -sigma_i (since Y|0>=i|1>, Y|1>=-i|0>)
    """
    sigma_prime = sigma.copy()
    phase = 1.0 + 0.0j
    for i, P in enumerate(pauli_word):
        if P == 'I':
            continue
        elif P == 'Z':
            phase *= sigma[i]  # eigenvalue +1/-1
        elif P == 'X':
            sigma_prime[i] *= -1
        elif P == 'Y':
            # Y = i X Z ; acting on basis adds: flip and multiply by (i * sigma_i)
            phase *= (1j * sigma[i])
            sigma_prime[i] *= -1
        else:
            raise ValueError("Invalid Pauli letter")
    return sigma_prime, phase

def local_estimator_sample(rbm: ComplexRBM, sigma, pauli_terms):
    """
    pauli_terms: list of (coeff, pauli_word_list), e.g.
      [(0.7, ['Z','I','X','X']), (0.3, ['I','Z','Z','I'])]
    Returns O_loc(sigma) = sum_k c_k * <sigma|P_k|psi>/ <sigma|psi>
    """
    logpsi_sigma = rbm.logpsi(sigma)
    psi_sigma = np.exp(logpsi_sigma)
    Oloc = 0.0 + 0.0j
    for ck, Pk in pauli_terms:
        sigma_k, phase_k = apply_pauli_on_config(sigma, Pk)
        psi_sigma_k = rbm.psi(sigma_k)
        Oloc += ck * phase_k * (psi_sigma_k / psi_sigma)
    return Oloc

def estimate_observable_mc(rbm: ComplexRBM, pauli_terms, n_samples=20000, burn_in=1000, thin=10):
    samples = rbm.metropolis_samples(n_samples=n_samples, burn_in=burn_in, thin=thin)
    vals = []
    for s in samples:
        vals.append(local_estimator_sample(rbm, s, pauli_terms))
    vals = np.array(vals, dtype=np.complex128)
    mean = np.mean(vals)
    # Standard error from MC
    stderr = np.std(vals)/np.sqrt(len(vals))
    return mean, stderr

# -------------------------------------------
# Exact enumeration (small N) for validation
# -------------------------------------------
def all_spin_configs(N):
    # Returns array of shape (2^N, N) with entries in {-1,+1}
    arr = []
    for idx in range(2**N):
        bits = [((idx >> (N-1-i)) & 1) for i in range(N)]
        sig = np.array([1 if b==0 else -1 for b in bits], dtype=int)  # 0->+1, 1->-1
        arr.append(sig)
    return np.stack(arr, axis=0)

# def exact_expectation(rbm: ComplexRBM, pauli_terms):
#     N = rbm.N
#     cfgs = all_spin_configs(N)
#     # Build normalized psi over all configs
#     psi = np.array([rbm.psi(s) for s in cfgs])
#     norm2 = np.vdot(psi, psi).real
#     psi /= np.sqrt(norm2)

#     # Compute <O> = sum_sigma |psi(sigma)|^2 O_loc(sigma)
#     Oexp = 0.0 + 0.0j
#     for sigma, psi_s in zip(cfgs, psi):
#         Oloc = 0.0 + 0.0j
#         for ck, Pk in pauli_terms:
#             sigma_k, phase_k = apply_pauli_on_config(sigma, Pk)
#             # find index of sigma_k
#             idx_k = 0
#             for i, s_i in enumerate(sigma_k):
#                 bit = 0 if s_i==1 else 1
#                 idx_k = (idx_k<<1) | bit
#             Oloc += ck * phase_k * (psi[idx_k] / psi_s)
#         Oexp += (np.abs(psi_s)**2) * Oloc
#     return Oexp

def exact_expectation(rbm: ComplexRBM, pauli_terms):
    N = rbm.N
    cfgs = all_spin_configs(N)
    # Build normalized psi over all configs
    psi = np.array([rbm.psi(s) for s in cfgs])
    norm2 = np.vdot(psi, psi).real
    psi /= np.sqrt(norm2)
    
    # Create a dictionary for O(1) lookups
    cfg_to_idx = {tuple(sigma): i for i, sigma in enumerate(cfgs)}

    # Compute <O> = sum_sigma |psi(sigma)|^2 O_loc(sigma)
    Oexp = 0.0 + 0.0j
    for idx, (sigma, psi_s) in enumerate(zip(cfgs, psi)):
        Oloc = 0.0 + 0.0j
        for ck, Pk in pauli_terms:
            sigma_k, phase_k = apply_pauli_on_config(sigma, Pk)
            idx_k = cfg_to_idx[tuple(sigma_k)]
            Oloc += ck * phase_k * (psi[idx_k] / psi_s)
        Oexp += (np.abs(psi_s)**2) * Oloc
    return Oexp

# ----------------------------
# Demo / self-test
# ----------------------------
if __name__ == "__main__":
    rng = np.random.default_rng(0)
    N = 3
    M = 4
    rbm = ComplexRBM(n_visible=N, n_hidden=M, rng=rng)  # <-- plug your trained params here

    # Define an observable O = 0.8 * (X Z I) + 0.5 * (Z Z Z) - 0.3 * (Y I X)
    pauli_terms = [
        (0.8, ['X','Z','I']),
        (0.5, ['Z','Z','Z']),
        (-0.3, ['Y','I','X']),
    ]

    # Monte Carlo estimate from RBM samples
    mc_mean, mc_se = estimate_observable_mc(
        rbm, pauli_terms, n_samples=30000, burn_in=2000, thin=5
    )
    print(f"MC estimate  <O> = {mc_mean:.6f}  ± {mc_se:.6f}  (standard error)")

    # Exact expectation from full enumeration (same RBM parameters)
    exact = exact_expectation(rbm, pauli_terms)
    print(f"Exact value  <O> = {exact:.6f}")

    print(f"Absolute error |MC - Exact| = {abs(mc_mean-exact):.6e}")


MC estimate  <O> = -0.023905+0.000677j  ± 0.005717  (standard error)
Exact value  <O> = -0.018401+0.000000j
Absolute error |MC - Exact| = 5.546242e-03


In [37]:
# complex_rbm_training.py
# Training code for ComplexRBM with spins in {-1, +1}
# Adapted to work with the ComplexRBM class from the observable estimator

import numpy as np

class ComplexRBM:
    """
    RBM with spins sigma_i in {-1, +1}.
    psi(sigma) = exp(sum_i a_i sigma_i) * prod_j 2 cosh( b_j + sum_i W_ij sigma_i )
    where a, b, W are complex.
    """
    def __init__(self, n_visible, n_hidden, a=None, b=None, W=None, rng=None):
        self.N = n_visible
        self.M = n_hidden
        self.rng = np.random.default_rng() if rng is None else rng

        # If not supplied, initialize with small random complex parameters
        if a is None: 
            a = 0.01*(self.rng.standard_normal(self.N) + 1j*self.rng.standard_normal(self.N))
        if b is None: 
            b = 0.01*(self.rng.standard_normal(self.M) + 1j*self.rng.standard_normal(self.M))
        if W is None: 
            W = 0.01*(self.rng.standard_normal((self.N,self.M)) + 1j*self.rng.standard_normal((self.N,self.M)))

        self.a = a.astype(np.complex128)
        self.b = b.astype(np.complex128)
        self.W = W.astype(np.complex128)

    def theta(self, sigma):
        """theta_j = b_j + sum_i W_ij sigma_i ; shape (..., M)"""
        return self.b + (sigma @ self.W)

    def logpsi(self, sigma):
        """Log wavefunction amplitude for configuration(s) sigma"""
        # sigma: shape (..., N) with entries in {-1, +1}
        th = self.theta(sigma)  # (..., M)
        return np.sum(self.a * sigma, axis=-1) + np.sum(np.log(2.0*np.cosh(th)), axis=-1)

    def psi(self, sigma):
        """Wavefunction amplitude"""
        return np.exp(self.logpsi(sigma))

    def prob(self, sigma):
        """Probability |psi(sigma)|^2 (unnormalized)"""
        logpsi_val = self.logpsi(sigma)
        return np.exp(2.0 * np.real(logpsi_val))

    def sample_h_given_v(self, sigma):
        """
        Sample hidden units given visible spins.
        For complex RBM: p(h_j = ±1 | sigma) ∝ exp(±Re[b_j + sum_i W_ij sigma_i])
        """
        th = self.theta(sigma)  # (..., M)
        # p(h=+1) / p(h=-1) = exp(2*Re(theta))
        # p(h=+1) = exp(Re(theta)) / (exp(Re(theta)) + exp(-Re(theta))) = 1/(1+exp(-2*Re(theta)))
        prob_plus = 1.0 / (1.0 + np.exp(-2.0 * np.real(th)))
        h = np.where(self.rng.random(th.shape) < prob_plus, 1, -1)
        return h

    def sample_v_given_h(self, h):
        """
        Sample visible spins given hidden units.
        For complex RBM: p(sigma_i = ±1 | h) ∝ exp(±Re[a_i + sum_j W_ij h_j])
        """
        field = self.a + (h @ self.W.T)  # (..., N)
        prob_plus = 1.0 / (1.0 + np.exp(-2.0 * np.real(field)))
        sigma = np.where(self.rng.random(field.shape) < prob_plus, 1, -1)
        return sigma

    def contrastive_divergence_k(self, sigma0, k=1):
        """
        CD-k: Start from data sigma0, run k steps of Gibbs sampling.
        Returns final visible and hidden samples.
        """
        sigma = sigma0.copy()
        for _ in range(k):
            h = self.sample_h_given_v(sigma)
            sigma = self.sample_v_given_h(h)
        h = self.sample_h_given_v(sigma)
        return sigma, h

    def compute_gradients(self, data_batch, k=1):
        """
        Compute gradients using contrastive divergence.
        Returns: grad_a, grad_b, grad_W
        """
        batch_size = data_batch.shape[0]
        
        # Positive phase: data statistics
        th_pos = self.theta(data_batch)  # (B, M)
        h_pos_mean = np.tanh(np.real(th_pos))  # <h_j>_data ≈ tanh(Re(theta))
        
        # Negative phase: model statistics via CD-k
        sigma_neg, h_neg = self.contrastive_divergence_k(data_batch, k=k)
        th_neg = self.theta(sigma_neg)
        h_neg_mean = np.tanh(np.real(th_neg))
        
        # Gradients for complex parameters (we optimize Re and Im separately via complex gradient)
        # For complex params, gradient is: ∂L/∂θ* (conjugate)
        # But for simplicity, we'll update using real gradients on log|psi|^2
        
        # Gradient of log|psi|^2 w.r.t. complex parameters
        # d log|psi|^2 / da* = 2 * sigma * psi/|psi|^2 ≈ 2 * sigma (in expectation)
        grad_a = np.mean(data_batch, axis=0) - np.mean(sigma_neg, axis=0)
        
        # d log|psi|^2 / db* involves tanh terms
        grad_b = np.mean(h_pos_mean, axis=0) - np.mean(h_neg_mean, axis=0)
        
        # d log|psi|^2 / dW*
        grad_W = (data_batch.T @ h_pos_mean - sigma_neg.T @ h_neg_mean) / batch_size
        
        return grad_a, grad_b, grad_W

    def train(self, data_01, epochs=2000, batch_size=128, lr=0.05, k=1, verbose_every=200):
        """
        Train the ComplexRBM on data.
        
        Args:
            data_01: Training data in {0, 1} format, shape (n_samples, n_visible)
            epochs: Number of training epochs
            batch_size: Mini-batch size
            lr: Learning rate
            k: Number of CD steps
            verbose_every: Print progress every N epochs (None to disable)
        """
        # Convert 0/1 data to -1/+1 spins
        data_spins = 2 * data_01 - 1  # 0 -> -1, 1 -> +1
        n = data_spins.shape[0]
        
        for ep in range(1, epochs + 1):
            # Shuffle data
            perm = self.rng.permutation(n)
            
            # Mini-batch SGD
            for i in range(0, n, batch_size):
                batch = data_spins[perm[i:i+batch_size]]
                
                # Compute gradients
                grad_a, grad_b, grad_W = self.compute_gradients(batch, k=k)
                
                # Update parameters (complex gradient descent)
                # We treat complex params as having real and imaginary parts
                # For simplicity, update both parts with the same gradient
                self.a += lr * grad_a.astype(np.complex128)
                self.b += lr * grad_b.astype(np.complex128)
                self.W += lr * grad_W.astype(np.complex128)
            
            # Verbose output
            if verbose_every and ep % verbose_every == 0:
                # Compute approximate log-likelihood on a subset
                sample_size = min(256, n)
                sample_idx = self.rng.choice(n, size=sample_size, replace=False)
                sample_data = data_spins[sample_idx]
                avg_logprob = np.mean(2.0 * np.real(self.logpsi(sample_data)))
                print(f"Epoch {ep:4d}  Avg log|psi|^2 ≈ {avg_logprob:.4f}")

    def metropolis_samples(self, n_samples=10000, burn_in=1000, thin=10, return_format='bits'):
        """
        Sample from |psi|^2 using Metropolis-Hastings
        
        Args:
            n_samples: Number of samples to return
            burn_in: Number of initial steps to discard
            thin: Keep every thin-th sample
            return_format: 'spins' for {-1,+1} or 'bits' for {0,1}
        
        Returns:
            Array of samples in requested format
        """
        sigma = self.random_spin_config()
        logpsi = self.logpsi(sigma)
        th = self.theta(sigma)

        samples = []
        total_steps = burn_in + n_samples * thin
        
        for step in range(total_steps):
            # Propose single spin flip
            i = self.rng.integers(self.N)
            sigma_new = sigma.copy()
            sigma_new[i] *= -1

            # Efficient theta update
            th_new = th - 2.0 * self.W[i, :] * sigma[i]

            # Metropolis acceptance
            logpsi_new = np.sum(self.a * sigma_new) + np.sum(np.log(2.0*np.cosh(th_new)))
            logpsi2 = 2.0 * np.real(logpsi)
            logpsi2_new = 2.0 * np.real(logpsi_new)
            
            logR = logpsi2_new - logpsi2
            if logR >= 0 or np.log(self.rng.random()) < logR:
                sigma = sigma_new
                logpsi = logpsi_new
                th = th_new

            # Collect sample
            if step >= burn_in and ((step - burn_in) % thin == 0):
                samples.append(sigma.copy())

        samples = np.array(samples, dtype=int)
        
        # Convert to bits if requested
        if return_format == 'bits':
            samples = ((samples + 1) // 2).astype(int)  # -1 -> 0, +1 -> 1
        
        return samples


    def random_spin_config(self):
        """Generate random spin configuration in {-1, +1}"""
        return 2 * self.rng.integers(0, 2, size=self.N) - 1


# Example usage
if __name__ == "__main__":
    # Example: Train on 6-qubit data
    rng = np.random.default_rng(42)
    
    # Generate some example training data (0/1 format)
    n_train = 5000
    n_qubits = 6
    training_data = rng.integers(0, 2, size=(n_train, n_qubits)).astype(float)
    
    # Create and train ComplexRBM
    rbm = ComplexRBM(n_visible=n_qubits, n_hidden=8, rng=rng)
    
    print("Training ComplexRBM...")
    rbm.train(training_data, epochs=1000, batch_size=128, lr=0.05, k=1, verbose_every=100)
    
    print("\nTraining complete!")
    print(f"Final parameters: a.shape={rbm.a.shape}, b.shape={rbm.b.shape}, W.shape={rbm.W.shape}")
    
    # Sample from trained model
    print("\nSampling from trained model...")
    samples = rbm.metropolis_samples(n_samples=1000, burn_in=500, thin=5)
    print(f"Generated {len(samples)} samples")
    print("First 5 samples (in {-1,+1} format):")
    print(samples[:5])

Training ComplexRBM...
Epoch  100  Avg log|psi|^2 ≈ 11.2453
Epoch  200  Avg log|psi|^2 ≈ 11.2665
Epoch  300  Avg log|psi|^2 ≈ 11.3106
Epoch  400  Avg log|psi|^2 ≈ 11.2942
Epoch  500  Avg log|psi|^2 ≈ 11.2984
Epoch  600  Avg log|psi|^2 ≈ 11.2769
Epoch  700  Avg log|psi|^2 ≈ 11.2859
Epoch  800  Avg log|psi|^2 ≈ 11.2851
Epoch  900  Avg log|psi|^2 ≈ 11.3377
Epoch 1000  Avg log|psi|^2 ≈ 11.3424

Training complete!
Final parameters: a.shape=(6,), b.shape=(8,), W.shape=(6, 8)

Sampling from trained model...
Generated 1000 samples
First 5 samples (in {-1,+1} format):
[[1 1 1 1 1 0]
 [1 0 0 1 0 0]
 [0 0 0 0 0 1]
 [0 0 1 1 0 1]
 [0 0 0 1 0 0]]


In [38]:
 # BEH2
 
pauli_path = "../data/tomography/BeH2/paulis.file"
interactions_path = "../data/tomography/BeH2/interactions.file"

pauli = np.load(pauli_path,allow_pickle=True)                                 
interactions = np.load(interactions_path,allow_pickle=True)                   



In [39]:
import nqs.BeH2_ptutorial as bpm
import netket as nk

hi = nk.hilbert.Qubit(N=6)
samples_path = "../data/tomography/BeH2/train_samples.txt"
bases_path = "../data/tomography/BeH2/train_bases.txt"
ns = 100000  
N_beh2=6

In [40]:

# Load training data
rotations, tr_samples, tr_bases = bpm.LoadData(N_beh2,hi,samples_path, bases_path)
if (ns > tr_samples.shape[0]):
    "Not enough training samples"
else:
    training_samples = tr_samples[0:ns]
    training_bases   = tr_bases[0:ns]
    
# load bases using new function:
Us = bpm.load_bases(bases_path)

Loaded 128000 measurement bases from ../data/tomography/BeH2/train_bases.txt
Number of qubits: 6
Unique bases: 54


In [41]:
tr_samples[:10]

array([[0., 1., 0., 1., 0., 1.],
       [0., 0., 1., 0., 1., 1.],
       [1., 1., 0., 1., 1., 1.],
       [1., 1., 1., 1., 0., 1.],
       [0., 0., 0., 0., 1., 1.],
       [0., 0., 1., 0., 1., 1.],
       [0., 1., 1., 0., 0., 1.],
       [1., 1., 1., 1., 1., 0.],
       [0., 1., 0., 1., 0., 1.],
       [0., 0., 1., 1., 0., 0.]])

In [42]:
def convert_pauli_format(pauli_strings, coefficients):
    """
    Convert Pauli strings from string format to list format with coefficients.
    
    Args:
        pauli_strings: List of strings like ['IIIIII', 'ZIIIII', ...]
        coefficients: List of floats corresponding to each Pauli string
    
    Returns:
        List of tuples: [(coeff, ['I','I','I',...]), ...]
    """
    result = []
    for pauli_str, coeff in zip(pauli_strings, coefficients):
        pauli_list = list(pauli_str)
        result.append((coeff, pauli_list))
    return result

In [43]:
converted_strs = convert_pauli_format(pauli,interactions)

In [12]:
print(converted_strs[:10])

[(np.float64(-17.07279443954866), ['I', 'I', 'I', 'I', 'I', 'I']), (np.float64(0.11563387473407918), ['Z', 'I', 'I', 'I', 'I', 'I']), (np.float64(0.007269543961396736), ['X', 'X', 'Z', 'I', 'I', 'I']), (np.float64(0.007269543961396736), ['Y', 'Y', 'I', 'I', 'I', 'I']), (np.float64(0.10000450328928216), ['Z', 'Z', 'I', 'I', 'I', 'I']), (np.float64(-0.004618457080023506), ['Z', 'X', 'X', 'I', 'I', 'I']), (np.float64(-0.004618457080023506), ['I', 'Y', 'Y', 'I', 'I', 'I']), (np.float64(-0.15987964617217187), ['I', 'Z', 'Z', 'I', 'I', 'I']), (np.float64(-0.3907728921618504), ['I', 'I', 'Z', 'I', 'I', 'I']), (np.float64(0.1156338747340795), ['I', 'I', 'I', 'Z', 'I', 'I'])]


In [46]:
# 2) Initialize and train RBM
rbm_beh2 = ComplexRBM(n_visible=6, n_hidden=8, rng=rng)
rbm_beh2.train(tr_samples[:ns], epochs=1500, batch_size=10000, lr=0.01, k=1, verbose_every=50)

Epoch   50  Avg log|psi|^2 ≈ 14.9880
Epoch  100  Avg log|psi|^2 ≈ 14.9845
Epoch  150  Avg log|psi|^2 ≈ 15.0610
Epoch  200  Avg log|psi|^2 ≈ 15.0113
Epoch  250  Avg log|psi|^2 ≈ 15.0174
Epoch  300  Avg log|psi|^2 ≈ 15.3712
Epoch  350  Avg log|psi|^2 ≈ 15.7565
Epoch  400  Avg log|psi|^2 ≈ 15.5838
Epoch  450  Avg log|psi|^2 ≈ 15.6272
Epoch  500  Avg log|psi|^2 ≈ 15.5002
Epoch  550  Avg log|psi|^2 ≈ 15.4099
Epoch  600  Avg log|psi|^2 ≈ 15.4887
Epoch  650  Avg log|psi|^2 ≈ 15.7770
Epoch  700  Avg log|psi|^2 ≈ 15.7499
Epoch  750  Avg log|psi|^2 ≈ 15.5934
Epoch  800  Avg log|psi|^2 ≈ 15.8502
Epoch  850  Avg log|psi|^2 ≈ 15.6710
Epoch  900  Avg log|psi|^2 ≈ 15.6929
Epoch  950  Avg log|psi|^2 ≈ 15.5122
Epoch 1000  Avg log|psi|^2 ≈ 15.6946
Epoch 1050  Avg log|psi|^2 ≈ 15.9296
Epoch 1100  Avg log|psi|^2 ≈ 15.6452
Epoch 1150  Avg log|psi|^2 ≈ 15.8675
Epoch 1200  Avg log|psi|^2 ≈ 15.9492
Epoch 1250  Avg log|psi|^2 ≈ 15.7281
Epoch 1300  Avg log|psi|^2 ≈ 15.9149
Epoch 1350  Avg log|psi|^2 ≈ 16.3465
E

In [48]:
# Monte Carlo estimate from RBM samples
beh2_mean, beh2_se = estimate_observable_mc(
    rbm_beh2, converted_strs, n_samples=100000, burn_in=2000, thin=5
)
print(f"MC estimate  <O> = {beh2_mean:.6f}  ± {beh2_se:.6f}  (standard error)")

# Exact expectation from full enumeration (same RBM parameters)
exact_beh2 = exact_expectation(rbm_beh2, converted_strs)
print(f"Exact value  <O> = {exact_beh2:.6f}")

print(f"Absolute error |MC - Exact| = {abs(beh2_mean-exact_beh2):.6e}")

MC estimate  <O> = -16.676977-0.000467j  ± 0.000509  (standard error)
Exact value  <O> = -16.733263-0.000000j
Absolute error |MC - Exact| = 5.628807e-02
