In [None]:
import numpy as np
import os

class SimpleConfig:
    def __init__(self, graph_path: str = ""):
        """
        Initialize a configuration object for graph-based or physics-inspired models.
        If a valid graph file path is provided, the graph will be loaded from that file.
        Otherwise, a random symmetric coupling matrix (representing a random graph)
        will be generated.

        Parameters:
            graph_path (str): Optional path to a text file defining the graph structure.
        """

        if graph_path and os.path.exists(graph_path):
            print("using", graph_path)
            # Load the coupling matrix (Jz) and edge list (graph_list) from a file
            self.Jz, self.graph_list = self.from_txt_load_graph(graph_path)
            # Derive the number of nodes based on the loaded edge list
            self.num_nodes = self.obtain_num_nodes(self.graph_list)
        else:
            print("random graph")
            # If no file is given or file not found, create a random graph
            self.num_nodes = 20  # Default number of nodes
            # Generate a random symmetric matrix with entries ~ N(0, 1/sqrt(N))
            # This scaling helps maintain numerical stability as N increases
            self.Jz = np.random.normal(0, 1/np.sqrt(self.num_nodes), size=(self.num_nodes, self.num_nodes))
            # Make sure the matrix is symmetric, since the graph is undirected
            self.Jz = (self.Jz + self.Jz.T) / 2
            # Empty list — no explicit edge data since it’s random
            self.graph_list = []

        # Random seed for reproducibility
        self.seed = 1

        # Transformer model hyperparameters (for possible machine learning applications)
        self.d_model = 16       # Hidden dimension size of the model
        self.num_heads = 1      # Number of attention heads in the transformer
        self.num_layers = 1     # Number of transformer layers
        self.dff = 64           # Dimension of the feedforward network inside the transformer

        # Simulation or training parameters
        self.numsamples = 64            # Number of Monte Carlo or training samples
        self.lr = 5 * (1e-4)            # Learning rate for optimizer
        self.T0 = 1.0                   # Initial temperature (often used in annealing)
        self.Bx0 = 0                    # Initial transverse field (for quantum models)
        self.num_warmup_steps = 100     # Steps before starting annealing/sampling
        self.num_annealing_steps = 20   # Number of annealing (temperature reduction) steps
        self.num_equilibrium_steps = 10 # Steps for reaching equilibrium per temperature

    def from_txt_load_graph(self, graph_path: str):
        """
        Load a graph structure from a text file.

        Expected file format:
            Line 1: <num_spins> <num_couplings>
            Following lines: <spin1> <spin2> <weight>
            - Spins are 1-indexed in the file and will be converted to 0-indexed internally.
            - Each line represents an undirected weighted edge between two spins/nodes.

        Returns:
            Jz (np.ndarray): A symmetric NxN coupling (adjacency) matrix.
            graph_list (list): A list of tuples (node1, node2, weight).
        """
        with open(graph_path, "r") as file:
            # First line defines the number of spins (nodes) and couplings (edges)
            spins, couplings = file.readline().split(" ")
            N = int(spins)
            # Initialize an empty NxN matrix for edge weights
            Jz = np.zeros((N, N), dtype=np.float64)
            graph_list = []

            # Read edge data line by line
            line = file.readline()
            while line and line.strip():
                spin1, spin2, weight = line.split(" ")
                # Convert to 0-based indices for internal storage
                idx1, idx2 = int(spin1) - 1, int(spin2) - 1
                weight = float(weight)
                # Store the weight symmetrically since the graph is undirected
                Jz[idx1, idx2] = weight
                Jz[idx2, idx1] = weight
                # Append edge information to the list
                graph_list.append((idx1, idx2, weight))
                # Read next line
                line = file.readline()

        return Jz, graph_list

    def obtain_num_nodes(self, graph_list):
        """
        Determine the number of nodes in a graph based on its edge list.

        Parameters:
            graph_list (list): List of edges, each represented as (node1, node2, weight).

        Returns:
            int: The total number of distinct nodes.
        """
        if not graph_list:
            return 0
        # Find the maximum node index appearing in any edge, then add 1
        # because nodes are zero-indexed internally
        return max([max(n0, n1) for n0, n1, w in graph_list]) + 1


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

class TransformerWavefunction(nn.Module):
    def __init__(self, config):
        """
        A Transformer-based autoregressive wavefunction model.
        Designed to represent probability distributions over spin configurations,
        similar to models used in quantum Monte Carlo or generative physical systems.

        Args:
            config: A configuration object (e.g., SimpleConfig) containing model hyperparameters.
        """
        super().__init__()
        self.N = config.num_nodes        # Number of spins/nodes in the system
        self.d_model = config.d_model    # Hidden dimension of the Transformer
        self.num_heads = config.num_heads
        self.num_layers = config.num_layers
        self.dff = config.dff            # Feedforward layer size

        # ===== Embedding layer setup =====
        # We use an embedding for 3 tokens:
        #   0 -> spin down, 1 -> spin up, 2 -> <sos> (start-of-sequence token)
        self.embedding = nn.Embedding(3, self.d_model)

        # Positional embeddings (added to token embeddings)
        # Randomly initialized and scaled by sqrt(d_model) for stability
        self.pos_embedding = nn.Parameter(
            torch.randn(1, self.N + 1, self.d_model) / math.sqrt(self.d_model)
        )

        # Layer normalization for stabilizing embedding outputs
        self.emb_ln = nn.LayerNorm(self.d_model)

        # ===== Transformer Encoder =====
        # Using a pre-LayerNorm TransformerEncoderLayer (norm_first=True)
        # Pre-LN is more stable for deep or autoregressive settings.
        decoder_layer = nn.TransformerEncoderLayer(
            d_model=self.d_model,
            nhead=self.num_heads,
            dim_feedforward=self.dff,
            dropout=0.0,
            activation="relu",
            batch_first=True,
            norm_first=True  # Use pre-LN (set to False or remove if unsupported)
        )
        self.transformer = nn.TransformerEncoder(decoder_layer, num_layers=self.num_layers)

        # Final linear layer to map Transformer outputs to logits over spin values (0/1)
        self.fc_out = nn.Linear(self.d_model, 2)

        # Output temperature scaling (helps prevent overly sharp probability distributions)
        # This can also be defined as a learnable parameter if needed.
        self.out_tau = 1.2  # Recommended range: 1.0–1.5

    @staticmethod
    def _causal_mask(L, device):
        """
        Create a causal attention mask of size (L, L).
        Entries above the diagonal are True (masked out),
        ensuring that each token only attends to past tokens.
        """
        return torch.triu(torch.ones(L, L, device=device, dtype=torch.bool), diagonal=1)

    def forward(self, tokens, use_out_tau=True):
        """
        Forward pass of the Transformer.

        Args:
            tokens (torch.LongTensor): Input sequence of shape (B, T), where
                T <= N+1. The first token may be the <sos> token (value=2).
            use_out_tau (bool): Whether to apply output temperature scaling.

        Returns:
            probs (torch.FloatTensor): Shape (B, T, 2), probabilities for spin values (0/1)
        """
        # Token embeddings + positional embeddings
        x = self.embedding(tokens) + self.pos_embedding[:, :tokens.size(1), :]
        x = self.emb_ln(x)

        # Apply causal attention mask to prevent information leakage from future steps
        mask = self._causal_mask(tokens.size(1), tokens.device)

        # Pass through Transformer encoder
        h = self.transformer(x, mask=mask)

        # Map final hidden states to logits for 2 spin classes
        logits = self.fc_out(h)

        # Apply temperature scaling to logits for smoother probabilities
        if use_out_tau and self.out_tau != 1.0:
            logits = logits / self.out_tau

        # Convert logits to probabilities
        probs = F.softmax(logits, dim=-1)
        return probs  # Shape: (B, T, 2)

    def log_probability(self, spins):
        """
        Compute the log-probability of given spin configurations.

        Args:
            spins (torch.LongTensor): Tensor of shape (B, N), each element ∈ {0,1}

        Returns:
            logp (torch.FloatTensor): Log-probabilities for each configuration (B,)
        """
        B = spins.size(0)
        device = spins.device

        # Prepend <sos> token (start-of-sequence)
        sos = torch.full((B, 1), 2, dtype=torch.long, device=device)
        tokens = torch.cat([sos, spins], dim=1)  # Shape: (B, N+1)

        # Forward pass through the model (excluding last prediction)
        probs = self.forward(tokens)[:, :-1, :]  # (B, N, 2)

        # Create one-hot encoding for actual spins and select their probabilities
        one_hot = F.one_hot(spins, num_classes=2).float()
        sel = (probs * one_hot).sum(dim=-1)  # Select the probability for each true spin value

        # Compute total log-probability
        logp = torch.log(sel.clamp_min(1e-12)).sum(dim=-1)  # (B,)
        return logp

    @torch.no_grad()
    def sample(self, numsamples, tau_sample=1.0, epsilon=0.0):
        """
        Autoregressive sampling of spin configurations.

        Args:
            numsamples (int): Number of samples to generate.
            tau_sample (float): Sampling temperature; <1.0 makes samples sharper, >1.0 softer.
            epsilon (float): Epsilon-exploration rate; mixes in uniform noise to avoid collapse.

        Returns:
            spins (torch.LongTensor): Sampled spin configurations of shape (B, N)
            logp (torch.FloatTensor): Log-probabilities of each sample (B,)
        """
        device = next(self.parameters()).device
        B = numsamples

        # Start each sequence with <sos> token
        tokens = torch.full((B, 1), 2, dtype=torch.long, device=device)
        logp_parts = []

        # Sequentially generate spins one by one
        for n in range(self.N):
            # Compute probability distribution for the next spin
            probs = self.forward(tokens, use_out_tau=True)[:, -1, :]  # (B, 2)

            # Apply temperature scaling to sampling probabilities
            if tau_sample != 1.0:
                probs = F.softmax((probs.clamp_min(1e-20)).log() / tau_sample, dim=-1)

            # Epsilon-exploration: mix in a uniform distribution to encourage diversity
            if epsilon > 0.0:
                probs = (1.0 - epsilon) * probs + epsilon * 0.5

            # Sample next spin from categorical distribution
            dist = torch.distributions.Categorical(probs)
            s = dist.sample()  # (B,)
            tokens = torch.cat([tokens, s[:, None]], dim=1)

            # Record log-probabilities
            logp_parts.append(dist.log_prob(s))

        # Remove the <sos> token from the final sequence
        spins = tokens[:, 1:]
        logp = torch.stack(logp_parts, dim=1).sum(dim=1)
        return spins, logp


In [3]:
import torch, numpy as np, time
import torch.nn.functional as F
import random

def local_energy_SK(Jz, samples):
    # samples: (B, N) in {0,1} -> {-1,+1}
    spins = (2 * samples.cpu().numpy()) - 1
    E = -0.5 * np.einsum("bi,ij,bj->b", spins, Jz, spins)  # -1/2 Σ_ij J_ij s_i s_j
    return torch.tensor(E, dtype=torch.float64, device=samples.device)

def train_wavefunction(config, model_class, device="cpu"):

    seed = config.seed
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed) 

    model = model_class(config).to(device)
    opt = torch.optim.Adam(model.parameters(), lr=config.lr, weight_decay=0.0)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(opt, T_max=(
        config.num_warmup_steps + config.num_annealing_steps * config.num_equilibrium_steps
    ), eta_min=config.lr * 0.1)

    # Initialize Temperature and Steps
    T, Bx = config.T0, config.Bx0
    num_steps = config.num_warmup_steps + (config.num_annealing_steps * config.num_equilibrium_steps)
    start = time.time()
    for it in range(num_steps + 1):

        # Linear Temperature Schedule
        if it >= config.num_warmup_steps and it % config.num_equilibrium_steps == 0 and it <= config.num_annealing_steps*config.num_equilibrium_steps + config.num_warmup_steps:
            anneal_k = (it - config.num_warmup_steps) / config.num_equilibrium_steps
            T  = config.T0 * (1 - anneal_k / config.num_annealing_steps)
            Bx = config.Bx0 * (1 - anneal_k / config.num_annealing_steps)
            print(f"\nAnnealing Step: {anneal_k}/{config.num_annealing_steps}")

        # Sample SIGMA and Log Probabilities
        with torch.no_grad():
            spins, logp_seq = model.sample(config.numsamples)
        
        # Calculate Local Energy
        E_local = local_energy_SK(config.Jz, spins).to(torch.float64)     # (B,)
        logp_seq = logp_seq.to(torch.float64)                             # (B,)

        # Print Statistics: Energy, Free Energy, Variance
        free_local_energy = (E_local + T*logp_seq).detach()
        if it%config.num_equilibrium_steps==0:
            print(f'mean(E): {E_local.mean().item()}, mean(F): {free_local_energy.mean().item()}, var(E): {E_local.var().item()}, var(F): {free_local_energy.var().item()}, #samples {config.numsamples}, #Training step {it}')
            print("Temperature: ", T)
            print("Magnetic field: ", Bx)

        # Inference at the end of Annealing
        if it == config.num_annealing_steps*config.num_equilibrium_steps + config.num_warmup_steps:

            numsamples_inference = 10**4 # Total number of samples at the end of annealing 
            Nsteps = 20 # number of steps to sample numsamples_inference
            numsamples_perstep = numsamples_inference//Nsteps
            energies = torch.zeros((numsamples_inference))
            # solutions = torch.zeros((numsamples_inference, config.N))

            print("\nSaving energy and variance before the end of annealing")
            for i in range(Nsteps):
                with torch.no_grad():
                    spins, _ = model.sample(numsamples_perstep)
                energies[i*numsamples_perstep:(i+1)*numsamples_perstep] = local_energy_SK(config.Jz, spins)
                # solutions[i*numsamples_perstep:(i+1)*numsamples_perstep] = spins
                print(f"Sampling Step: {i+1}/{Nsteps}")
            print("meanE = ", energies.mean().item())
            print("varE = ", energies.var().item())
            print("minE = ", energies.min().item())

            return energies.mean().item(), energies.min().item(), energies

        logp_for_grad = model.log_probability(spins).to(torch.float64)
        Floc = (E_local + T*logp_for_grad).detach()
        loss = torch.mean(Floc * logp_for_grad) - torch.mean(logp_for_grad) * torch.mean(Floc)

        opt.zero_grad()
        loss.backward()
        # total_norm = torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        opt.step(); scheduler.step()

        # ---- 打印 ----
        if it % config.num_equilibrium_steps == 0:
            print(f"Grad Log Prob: {logp_for_grad.mean().item()}")
            print(f"T*LogProb: {(T*logp_for_grad).mean().item()}")
            print(f"Elapsed time = {time.time()-start:.2f} seconds\n")

    return model


In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

for exp in range(2, 8):
    anneal = 2**exp

    energy_list = []
    for seed in range(1, 2):
        config = SimpleConfig(f"data/ising_chain_32_seed{seed}.txt")
        config.seed = seed
        config.num_annealing_steps = anneal

        meanE, meanFm, energy = train_wavefunction(
            config,
            model_class=TransformerWavefunction, 
            device=device,
        )

        energy_list.append(f"{meanE}\n")
        with open(f"TRANSFORMER_chain_{32}_{anneal}.txt", "w") as f:
            f.writelines(energy_list)


using data/ising_chain_32_seed1.txt




mean(E): 0.1875, mean(F): -21.816055297851562, var(E): 32.21825396825397, var(F): 32.43516295131433, #samples 64, #Training step 0
Temperature:  1.0
Magnetic field:  0
Grad Log Prob: -22.00355550646782
T*LogProb: -22.00355550646782
Elapsed time = 0.47 seconds

mean(E): -0.40625, mean(F): -22.415929317474365, var(E): 29.546626984126984, var(F): 28.867850776174034, #samples 64, #Training step 10
Temperature:  1.0
Magnetic field:  0
Grad Log Prob: -22.009679824113846
T*LogProb: -22.009679824113846
Elapsed time = 1.03 seconds

mean(E): -0.03125, mean(F): -22.02250897884369, var(E): 41.64980158730159, var(F): 39.802894635316655, #samples 64, #Training step 20
Temperature:  1.0
Magnetic field:  0
Grad Log Prob: -21.991259276866913
T*LogProb: -21.991259276866913
Elapsed time = 1.71 seconds

mean(E): 0.03125, mean(F): -21.770846635103226, var(E): 26.157738095238095, var(F): 24.46382399161717, #samples 64, #Training step 30
Temperature:  1.0
Magnetic field:  0
Grad Log Prob: -21.802096784114838



mean(E): 0.1875, mean(F): -21.816055297851562, var(E): 32.21825396825397, var(F): 32.43516295131433, #samples 64, #Training step 0
Temperature:  1.0
Magnetic field:  0
Grad Log Prob: -22.00355550646782
T*LogProb: -22.00355550646782
Elapsed time = 0.06 seconds

mean(E): -0.40625, mean(F): -22.41593959927559, var(E): 29.546626984126984, var(F): 28.867711182241937, #samples 64, #Training step 10
Temperature:  1.0
Magnetic field:  0
Grad Log Prob: -22.00968998670578
T*LogProb: -22.00968998670578
Elapsed time = 0.84 seconds

mean(E): -0.03125, mean(F): -22.022260189056396, var(E): 41.64980158730159, var(F): 39.79350449282518, #samples 64, #Training step 20
Temperature:  1.0
Magnetic field:  0
Grad Log Prob: -21.991010397672653
T*LogProb: -21.991010397672653
Elapsed time = 1.43 seconds

mean(E): -0.03125, mean(F): -21.82400879263878, var(E): 26.28472222222222, var(F): 24.631023104222585, #samples 64, #Training step 30
Temperature:  1.0
Magnetic field:  0
Grad Log Prob: -21.792759031057358
T*

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import torch

def plot_histogram(energies, config, gsenergy_per_spin):

    N = config.num_nodes
    num_annealing_steps = config.num_annealing_steps

    if isinstance(energies, torch.Tensor):
        energies = energies.cpu().numpy()

    gsenergy = gsenergy_per_spin * N

    MIN, MAX = 1e-10, 1.0
    tol = 1e-10

    eres = (energies - gsenergy) / N
    eres[eres <= 0.0] = tol 

    plt.figure(figsize=(10, 6))
    plt.hist(eres, alpha=0.9, color='b', bins=10**np.linspace(np.log10(MIN), np.log10(MAX), 20), label=f'$N_{{annealing}}={num_annealing_steps}$')
    
    plt.gca().set_xscale("log")
    plt.ylim(0, len(energies))
    plt.legend(loc='best', frameon=False)
    plt.xlabel(r'$\epsilon_{res}/N$')
    plt.ylabel('Count')
    plt.title('Residual Energy Distribution')
    plt.show()

In [None]:
GS_ENERGY_PER_SPIN_benchmark = meanFm / config.num_nodes

plot_histogram(energies, config, GS_ENERGY_PER_SPIN_benchmark)