In [1]:
# The code of model and prediction function
import torch
import torch.nn as nn
import math

class TransformerBlock_prenorm(nn.Module):
    def __init__(self, embed_size, head_count):
        super(TransformerBlock_modify_pre, self).__init__()
        self.attention = nn.MultiheadAttention(embed_size, head_count, batch_first=True)
        self.norm1 = nn.LayerNorm(embed_size)
        self.norm2 = nn.LayerNorm(embed_size)
        self.feed_forward = nn.Sequential(
            nn.Linear(embed_size, 4*embed_size),
            nn.ReLU(),
            nn.Dropout(0.25),
            nn.Linear(4*embed_size, embed_size)
        )
        self.dropout = nn.Dropout(0.25)
    
    def forward(self, x):
        x_norm = self.norm1(x)
        attention_output, _ = self.attention(x_norm,x_norm,x_norm)
        x = x + self.dropout(attention_output)
        x_norm = self.norm2(x)
        ff_output = self.feed_forward(x_norm)
        x = x + self.dropout(ff_output)
        return x

class Transformer(nn.Module):
    def __init__(self, vocab_size, posi_size, embed_size, num_layers, head_count):
        super(Transformer, self).__init__()
        self.embed_size = embed_size
        self.vocab_size = vocab_size
        self.posi_size = posi_size
        self.word_embedding = nn.Embedding(vocab_size, embed_size)
        self.position_embedding = nn.Embedding(posi_size, embed_size)
        self.para_embedding = nn.Sequential(
            nn.Linear(1, embed_size),
            nn.ReLU(),
            nn.LayerNorm(embed_size),
            nn.Linear(embed_size,embed_size*2),
            nn.ReLU(),
            nn.LayerNorm(embed_size*2),
            nn.Linear(embed_size*2, embed_size)
        )
        self.layers = nn.ModuleList(
            [TransformerBlock_prenorm(embed_size, head_count) for _ in range(num_layers)]
        )
        self.fc_out = nn.Linear(embed_size, 2)
    def forward(self, inputs, device, mask=None):
        inputs = inputs.to(device)
        h = inputs[:, 0].view(-1, 1).float()
        input_tokens = inputs[:, 1:].to(torch.long)
        batch_size, token_count = input_tokens.shape[:2]
        # parameter embedding (as prompt)
        h_embedding = self.para_embedding(h).unsqueeze(1)  # (batch_size, 1, embed_size)       
        # Token and position embeddings
        token_embeddings = self.word_embedding(input_tokens)
        positions = torch.arange(0, token_count).expand(batch_size, token_count).to(device)
        position_embeddings = self.position_embedding(positions)
        
        out = token_embeddings+position_embeddings      
        # Concatenate parameter embedding with token embeddings
        out = torch.cat((h_embedding, out), dim=1)
        
        for layer in self.layers:
            out = layer(out)       
        out = self.fc_out(out[:, -1, :].reshape(batch_size, self.embed_size)).reshape(batch_size, 2)
        return out

# predict   
def generate_GPTmeasureoutput(h, model, num_qubits, sample_size, device):
    outputs = torch.full((sample_size, 1 + num_qubits * 2), h, device=device)  # [h, ..., ...]   
    with torch.no_grad():
        # Iterate over the qubits
        for i in range(num_qubits):
            # Generate the random measurement input for the current step
            random_input = torch.randint(2, 5, (sample_size, 1), device=device)       
            outputs[:, 1 + 2 * i] = random_input.squeeze()  # Fill the random input in appropriate columns       
            # Forward pass through the model
            measurepro = model(outputs[:, :(2 * i + 2)], device)  # Only pass relevant portion of `outputs`
            measurepro = torch.nn.functional.softmax(measurepro, dim=1)          
            # Generate new index based on softmax probabilities
            newindex = torch.bernoulli(measurepro[:, 1].clamp(min=0, max=1)).to(torch.long)      
            # Update the outputs tensor with the new index
            outputs[:, 2 + 2 * i] = newindex.squeeze()  # Fill in the next position   
    return outputs

In [None]:
# load data (for training)
from torch.utils.data import Dataset

class myDataset(Dataset):
    def __init__(self,measureResults):
        super().__init__()
        self.results = measureResults
        
    def __getitem__(self, index):
        squence = self.results[index]
        return squence
    
    def __len__(self):
        return len(self.results)
    
from torch.utils.data import DataLoader
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
sampleResults = torch.load('Ising_sym_negative_dataset_h0_0.25_0.4_0.5_0.6_0.75_0.9_1_every10000_10bit_pbc.pt').to(device)
train_index = torch.load('train_index_80000.pt') # random indcies generated before
test_index = torch.load('test_index_80000.pt')
train_dataset = myDataset(measureResults=sampleResults[train_index])
test_dataset = myDataset(measureResults=sampleResults[test_index])

batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
print(len(train_dataset),len(test_dataset))

In [None]:
# train the model
import torch
from torch import nn, optim
from torch.utils.tensorboard import SummaryWriter
writer = SummaryWriter()

def train_recursive(model, dataloader, optimizer, loss_fn, epoch, device):
    model.train()
    total_loss = 0
    num_batches = len(dataloader)

    for batch_idx, dataset in enumerate(dataloader):
        dataset = dataset.to(device)
        optimizer.zero_grad()
        batch_size, token_count = dataset.shape[0], dataset.shape[1]
        loss = 0

        for cur_count in range(2, token_count):
            # Identify if the current token is a measurement result
            if cur_count % 2 == 0:
                # Prepare target vector for CrossEntropyLoss
                expected_next_token_indexes = dataset[:, cur_count]
                model_input = dataset[:, :cur_count] 
                outputs = model(model_input, device)
                loss += loss_fn(outputs, expected_next_token_indexes.to(torch.long))

        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    average_loss = total_loss / num_batches
    print(f"Epoch {epoch + 1} Train Loss: {average_loss:.6f}")
    writer.add_scalar("Loss/train", average_loss, epoch + 1)

def test_recursive(model, dataloader, loss_fn, epoch, device):
    model.eval()
    total_loss = 0
    num_batches = len(dataloader)

    with torch.no_grad():
        for dataset in dataloader:
            dataset = dataset.to(device)
            batch_size, token_count = dataset.shape[0], dataset.shape[1]
            loss = 0
            for cur_count in range(2, token_count):
                if cur_count % 2 == 0:
                    expected_next_token_indexes = dataset[:, cur_count]
                    model_input = dataset[:, :cur_count]
                    outputs = model(model_input, device)
                    loss += loss_fn(outputs, expected_next_token_indexes.to(torch.long))
            total_loss += loss.item()
            
    average_loss = total_loss / num_batches
    print(f"Epoch {epoch + 1} Test Loss: {average_loss:.6f}")
    writer.add_scalar("Loss/test", average_loss, epoch + 1)

    return average_loss

vocab_size = 5
posi_size = 20
embed_size = 128
num_layers = 3
head_count = 4
model = Transformer(vocab_size, posi_size, embed_size, num_layers, head_count)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

print(device)
model.to(device)
optimizer = optim.AdamW(model.parameters(), lr=1e-4)
scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, T_0=1, T_mult=2, eta_min=0.000001)
loss_fn = nn.CrossEntropyLoss()
epochs = 63 # 1+2+4+8+16+32
best_test_loss = float('inf')

for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train_recursive(model, train_loader, optimizer, loss_fn, t, device)
    scheduler.step()
    cur_test_loss = test_recursive(model, test_loader, loss_fn, t, device)
    if cur_test_loss < best_test_loss:
        torch.save({
            'epoch': t,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': loss_fn
        }, 'GPT_sym_TFI_prompt_h0_0.25_0.4_0.5_0.6_0.75_0.9_1_10000_10bit_em128_n3_hc4_coscy6_prenorm_jump_best.pth')
        best_test_loss = cur_test_loss

writer.flush()
writer.close()

torch.save({
    'epoch': t,
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'loss': loss_fn
}, 'GPT_sym_TFI_prompt_h0_0.25_0.4_0.5_0.6_0.75_0.9_1_10000_10bit_em128_n3_hc4_coscy6_prenorm_jump.pth')

In [None]:
# Here are functions to calculate physical properties (both for true values and predicted values using shadow tomography)
import numpy as np
X = torch.tensor([[0.,1.],[1.,0.]], dtype = torch.cfloat)
Z = torch.tensor([[1.,0.],[0.,-1.]], dtype = torch.cfloat)
I = torch.tensor([[1.,0.],[0.,1.]], dtype = torch.cfloat)
operator_map = {0:I, 1:X, 2:Z}

def correlation_func(sqe_op):
    func = operator_map[int(sqe_op[0])]
    for i in range(len(sqe_op)-1):
        func = torch.kron(func,operator_map[int(sqe_op[i+1])])
    return func

# periodic boundary condition
def Hamiltonian_sym_circle(num_qubits, h):
    ham = 0.+0.j
    for i in range(num_qubits-1):
        Z_index = np.zeros(num_qubits)
        X_index = np.zeros(num_qubits)
        Z_index[i] = 2
        Z_index[i+1] = 2
        X_index[i] = 1
        ham+= -((1-h)*correlation_func(Z_index)+h*correlation_func(X_index))
    X_index = np.zeros(num_qubits)
    X_index[-1] = 1
    ham += -h*correlation_func(X_index)
    Z_index = np.zeros(num_qubits)
    Z_index[0], Z_index[-1] = 2, 2
    ham -= (1-h)*correlation_func(Z_index)
    return ham

def symmetry_proj(rho, num_qubits):
    op1_index = np.zeros(num_qubits)
    op1 = correlation_func(op1_index)
    op2_index = np.ones(num_qubits)
    op2 = correlation_func(op2_index)
    rho_ = rho+op2@rho@op2.H
    return rho_/np.trace(rho_)

def obtain_eigenrho(num_bits,h):
    Ham = Hamiltonian_sym_circle(num_bits,h)
    eigenvalues, eigenvectors = torch.linalg.eigh(Ham)
    ground = eigenvectors[:,0].view(-1,1)
    rho = ground@ground.H
    rho = symmetry_proj(rho,num_bits)
    return rho

# ground truth
# Two point correlation function <Z_i Z_i+d>
def ground_cal_averge_two_point_pbc(num_bits, h, d):
    rho = obtain_eigenrho(num_bits,h)
    nmz = 0
    for i in range(num_bits):
        ZZ_index = np.zeros(num_bits)
        ZZ_index[i] = 2
        ZZ_index[(i+d)%num_bits] = 2
        ope = correlation_func(ZZ_index)
        nmz += torch.trace(rho@ope)
    return nmz/num_bits
# <X>
def ground_cal_magnezition_xpbc(num_bits,h):
    rho = obtain_eigenrho(num_bits,h)
    mz = 0
    for i in range(num_bits):
        Z_index = np.zeros(num_bits)
        Z_index[i] = 1
        ope = correlation_func(Z_index)
        mz += torch.trace(rho@ope)
    return mz/num_bits

# <XX...X> 
def ground_cal_averge_Xstring_pbc(num_bits, h, d):
    rho = obtain_eigenrho(num_bits,h)
    nmx = 0
    for i in range(num_bits):
        Xs_index = np.zeros(num_bits)
        for j in range(i, i + d + 1):
            Xs_index[j % num_bits] = 1
        ope = correlation_func(Xs_index)
        nmx += torch.trace(rho@ope)
    return nmx/num_bits

# GPT
# calculate <X> given sequences generated by transformer
def cal_magnezition_x(sqe):
    num_bits = int(sqe.shape[1]/2)
    mz = 0
    for i in range(num_bits):
        cond_Z_p = (sqe[:,i*2]==2) & (sqe[:,i*2+1]==1)
        cond_Z_n = (sqe[:,i*2]==2) & (sqe[:,i*2+1]==0)
        tempz = torch.sum(cond_Z_p)*3-torch.sum(cond_Z_n)*3
        mz += tempz
    return mz/(num_bits*sqe.shape[0])

# calculate E(<Z_i Z_i+d>) given sequences generated by transformer
def cal_averge_two_point_pbc(sqe, d):
    num_bits = int(sqe.shape[1]/2)
    nmz = 0
    expZ = torch.zeros(sqe.shape[0], sqe.shape[1] // 2, dtype=sqe.dtype)
    cond_3_Z = (sqe[:, :-1] == 4) & (sqe[:, 1:] == 1)
    cond_neg_3_Z = (sqe[:, :-1] == 4) & (sqe[:, 1:] == 0)
    for i in range(0, expZ.shape[1] * 2, 2):
        expZ[:, i // 2][cond_3_Z[:, i]] = 3
        expZ[:, i // 2][cond_neg_3_Z[:, i]] = -3
    expZZ = expZ* torch.cat((expZ[:,d:],expZ[:,:d]),dim=1)
    return torch.sum(expZZ)/(num_bits*sqe.shape[0])

# calculate E(<X...X>), here d = l-1, d=0 means calculating <X> given sequences generated by transformer
def cal_averge_Xstring_pbc(sqe, d):
    if d == 0:
        return cal_magnezition_x(sqe)
    else:
        num_bits = int(sqe.shape[1] / 2)
        nmx = 0
        expX = torch.zeros(sqe.shape[0], sqe.shape[1] // 2, dtype=sqe.dtype)  
        cond_3_X = (sqe[:, :-1] == 2) & (sqe[:, 1:] == 1)
        cond_neg_3_X = (sqe[:, :-1] == 2) & (sqe[:, 1:] == 0)    
        for i in range(0, expX.shape[1] * 2, 2):
            expX[:, i // 2][cond_3_X[:, i]] = 3
            expX[:, i // 2][cond_neg_3_X[:, i]] = -3   
        expXs = expX.clone()
        for shift in range(1, d + 1):
            expXs *= torch.cat((expX[:, shift:], expX[:, :shift]), dim=1)
        return torch.sum(expXs) / (num_bits * sqe.shape[0])
    
# calculate ground energy given sequences generated by transformer
def cal_ground_pbc(h,sqe):
    expX = torch.zeros(sqe.shape[0], sqe.shape[1] // 2, dtype=sqe.dtype)
    cond_3_X = (sqe[:, :-1] == 2) & (sqe[:, 1:] == 1)
    cond_neg_3_X = (sqe[:, :-1] == 2) & (sqe[:, 1:] == 0)
    for i in range(0, expX.shape[1] * 2, 2):
        expX[:, i // 2][cond_3_X[:, i]] = 3
        expX[:, i // 2][cond_neg_3_X[:, i]] = -3
        
    expZ = torch.zeros(sqe.shape[0], sqe.shape[1] // 2, dtype=sqe.dtype)
    cond_3_Z = (sqe[:, :-1] == 4) & (sqe[:, 1:] == 1)
    cond_neg_3_Z = (sqe[:, :-1] == 4) & (sqe[:, 1:] == 0)
    for i in range(0, expZ.shape[1] * 2, 2):
        expZ[:, i // 2][cond_3_Z[:, i]] = 3
        expZ[:, i // 2][cond_neg_3_Z[:, i]] = -3
    expZZ = expZ[:,:-1]*expZ[:,1:]

    T = sqe.shape[0]
    return -((1-h)*torch.sum(expZZ) + (1-h)*torch.sum(expZ[:,-1]*expZ[:,0]) + h*torch.sum(expX))/T

In [None]:
# code to implement median of means

def median_of_means(h, sqe, num_parts):
    T, _ = sqe.shape
    part_size = T // num_parts
    
    means = []
    for i in range(num_parts):
        part_sqe = sqe[i*part_size : (i+1)*part_size]
        mean_value = cal_ground_pbc(h, part_sqe)
        means.append(mean_value.item())
    
    median_value = torch.median(torch.tensor(means))
    return median_value
def median_of_means_two(d, sqe, num_parts):
    T, _ = sqe.shape
    part_size = T // num_parts
    
    means = []
    for i in range(num_parts):
        part_sqe = sqe[i*part_size : (i+1)*part_size]
        mean_value = cal_averge_two_point_pbc(part_sqe,d)
        means.append(mean_value.item())
    
    median_value = torch.median(torch.tensor(means))
    return median_value
def median_of_means_X(d, sqe, num_parts):
    T, _ = sqe.shape
    part_size = T // num_parts
    
    means = []
    for i in range(num_parts):
        part_sqe = sqe[i*part_size : (i+1)*part_size]
        mean_value = cal_averge_Xstring_pbc(part_sqe,d)
        means.append(mean_value.item())
    
    median_value = torch.median(torch.tensor(means))
    return median_value

In [None]:
# predict results

import torch

vocab_size = 5
posi_size = 20
embed_size = 128
num_layers = 3
head_count = 4
model = Transformer_prompt(vocab_size, posi_size, embed_size, num_layers, head_count)
checkpoint = torch.load('GPT_sym_TFI_prompt_h0_0.25_0.4_0.5_0.6_0.75_0.9_1_10000_10bit_em128_n3_hc4_coscy6_prenorm_jump_best.pth')
model.load_state_dict(checkpoint['model_state_dict'])
model.eval()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
model.to(device)

gpt_eval_points = torch.zeros((6, 5, 41))
gpt_eval_pointsX = torch.zeros((6, 5, 41))
gpt_eval_points_energy = torch.zeros((6, 41))
true_ZZ = torch.zeros(5, 101)
true_stringX = torch.zeros(5, 101)

hs = torch.linspace(0, 1, 41)

# Loop to generate sequences and calculate values for all metrics
for m in range(6):
    for j in range(41):
        h = hs[j]
        sqe = generate_GPTmeasureoutput(h, model, 10, 300000, device)[:, 1:].to(torch.long)

        # Calculate different metrics
        for i in range(5):
            gpt_eval_points[m, i, j] = median_of_means_two(i + 1, sqe, 10) # two point correlation function
            gpt_eval_pointsX[m, i, j] = median_of_means_X(i, sqe, 10) # Xstring

        gpt_eval_points_energy[m, j] = median_of_means(h, sqe, 10) # ground energy

        torch.cuda.empty_cache()

# Save results
torch.save(gpt_eval_points, 'Newarc_sym_6times_TFI_two_ZZ_d_gpt_es128_30w_MoM10.pt')
torch.save(gpt_eval_pointsX, 'Newarc_sym_6times_TFI_Xs_d_gpt_es128_30w_MoM10.pt')
torch.save(gpt_eval_points_energy, 'Newarc_sym_TFI_Energy_gpt_es128_30w_MoM10.pt')