## 1. Libraries

In [1]:
import numpy as np

from qiskit_optimization import QuadraticProgram
from qiskit_optimization.converters import QuadraticProgramToQubo
from qiskit_optimization.translators import to_ising
from qiskit import QuantumCircuit
from qiskit.circuit import ParameterVector
from qiskit.quantum_info import Statevector, SparsePauliOp
from qiskit_nature.units import DistanceUnit
from qiskit_nature.second_q.drivers import PySCFDriver
from qiskit_nature.second_q.mappers import JordanWignerMapper
from qiskit_nature.second_q.transformers import ActiveSpaceTransformer
from qiskit_algorithms import VQE as QiskitVQE
from qiskit_algorithms.optimizers import SLSQP, COBYLA, SPSA

from pyscf import gto, dft as pyscf_dft, scf
import CoRE_MOF
from pymatgen.core import Structure
from pymatgen.core.periodic_table import Element

## 2. Global parameters

In [2]:
# MOF
DATASET = "2019-ASR"
MOF_ID = "KAXQIL_clean"
#  #VOGTIV_clean_h #KAXQIL_clean — they simple enough for our pipeline
CLUSTER_RADIUS = 2.4      # Radius in Å around first metal particle
MAX_CANDIDATES = 3        # how many QUBO configs to carry into DFT/VQE

# QUBO 
TARGET_LONG_FRACTION = 0.5  # how many "1" bits we want
DIST_CUTOFF = 3.0           # Å: connect sites whose distance < cutoff in QUBO graph

# Geometry scaling
GEOM_SCALE_ALPHA = 0.05     # max 5% isotropic scaling around the cluster center

# DFT / VQE basis
BASIS = "lanl2dz"           # safer for heavier metals than sto-3g / def2-svp

# VQE modes
VQE_QUBO_SOLVER = 'custom_spsa' #vqe solver for QUBO (either 'custom_spsa' or 'qiskit')
VQE_GROUND_SOLVER = 'custom_spsa' #vqe solver for ground state energy (either 'custom_spsa' or 'qiskit')



## 3. Building cluster from CoRE-MOF structure

In [3]:
def get_mof(dataset, mof_id):
    
    """
    Gets MOF structure by id, checks if it exists
    """
    
    struct = CoRE_MOF.get_structure(dataset, mof_id)
    
    if not isinstance(struct, Structure):
        raise TypeError("CoRE_MOF.get_structure did not return a pymatgen Structure.")
    
    return struct


def first_metal_ind(struct):
    
    for i, site in enumerate(struct):
        elem = Element(site.species_string)
        if elem.is_metal:
            return i
    return 0


def build_cluster(struct, radius, center_index=None):
    if center_index is None:
        center_index = first_metal_ind(struct)
    center = struct[center_index].coords

    species = []
    coords = []
    for site in struct:
        r = np.linalg.norm(site.coords - center)
        if r <= radius:
            species.append(site.species_string)
            coords.append(site.coords)

    coords = np.array(coords, dtype=float)
    return species, coords, center

## 4. Building QUBO

In [4]:
def build_qubo(coords, target_fraction=0.5, dist_cutoff=3.0):
    
    """
    Encode binary choices on each cluster atom
    This gets us a QUBO model and Ising operator.
    """
    num_sites = len(coords)
    qp = QuadraticProgram("mtv_cluster_qp")

    bits = [qp.binary_var(f"x_{i}") for i in range(num_sites)]

    target_long = target_fraction * num_sites
    qp.minimize(constant=0.0)

    linear = {b.name: 0.0 for b in bits}
    quadratic = {}

    for i in range(num_sites):
        vi = bits[i].name
        linear[vi] += 1.0 - 2.0 * target_long 
        for j in range(i + 1, num_sites):
            vj = bits[j].name
            key = (vi, vj)
            quadratic[key] = quadratic.get(key, 0.0) + 2.0

    edges = []
    for i in range(num_sites):
        for j in range(i + 1, num_sites):
            d_ij = np.linalg.norm(coords[i] - coords[j])
            if d_ij <= dist_cutoff:
                edges.append((i, j))
                vi, vj = bits[i].name, bits[j].name
                linear[vi] += -1.0
                linear[vj] += -1.0
                key = (vi, vj)
                quadratic[key] = quadratic.get(key, 0.0) + 2.0

    qp.minimize(linear=linear, quadratic=quadratic)

    # QUBO -> Ising
    to_qubo = QuadraticProgramToQubo()
    qubo = to_qubo.convert(qp)
    ising_op, offset = to_ising(qubo)

    return qp, qubo, ising_op, offset, edges

## 5. Bulding custom VQE 
Used custom optimizer here to show variability of our method, regular optimization method used as a benchmark in "qiskit_vqe"

In [None]:
class CompatibleEstimator:
    
    """
    Estimator wrapper for V1 and V2 Qiskit circuits (or else they don't work at all)
    """
    
    def run(self, circuits, observables=None, parameter_values=None, **kwargs):
    
        is_v2 = False
        if observables is None and isinstance(circuits, list) and len(circuits) > 0:
            if isinstance(circuits[0], (tuple, list)):
                is_v2 = True

        if is_v2:
            return self._run_v2(circuits)
        else:
            return self._run_v1(circuits, observables, parameter_values)

    def _compute_expectation(self, qc, obs, params):

        if params is not None and len(params) > 0:
            if hasattr(params[0], '__len__') and len(params) == 1: 
                params = params[0]
            param_dict = dict(zip(qc.parameters, params))
            bound_qc = qc.assign_parameters(param_dict, inplace=False)
        else:
            bound_qc = qc

        sv = Statevector.from_instruction(bound_qc)
        
        return sv.expectation_value(obs).real

    def _run_v2(self, pubs):
    
        results = []
        for pub in pubs:
            if len(pub) == 3:
                qc, obs, params = pub
            elif len(pub) == 2:
                qc, obs = pub
                params = []
            else:
                raise ValueError(f"Invalid PUB format: {pub}")

            val = self._compute_expectation(qc, obs, params)
            
            class Data:
                def __init__(self, v): self.evs = v
            class PubResult:
                def __init__(self, v): self.data = Data(v); self.metadata = {}
            
            results.append(PubResult(val))

        class Job:
            def result(self): return results
        return Job()

    def _run_v1(self, circuits, observables, parameter_values):
        
        if not isinstance(circuits, list): circuits = [circuits]
        if not isinstance(observables, list): observables = [observables]
        if parameter_values is None: parameter_values = [[]] * len(circuits)
        if not isinstance(parameter_values, list): parameter_values = [parameter_values]

        values = []
        for qc, obs, params in zip(circuits, observables, parameter_values):
            val = self._compute_expectation(qc, obs, params)
            values.append(val)

        class Result:
            def __init__(self, v): self.values = np.array(v); self.metadata = [{}]*len(v)
        return Result(values)

def vqe_circuit(num_qubits, depth):
    
    theta = ParameterVector("theta", length=num_qubits * depth)
    qc = QuantumCircuit(num_qubits)
    k = 0
    
    for d in range(depth):
        for q in range(num_qubits):
            qc.ry(theta[k], q)
            k += 1
        if num_qubits > 1:
            for q in range(num_qubits - 1):
                qc.cz(q, q + 1)
                
    return qc, list(theta)

def top_bitstrings(qc, param_list, params, max_candidates=4):
    
    param_dict = dict(zip(param_list, params))
    bound = qc.assign_parameters(param_dict, inplace=False)
    sv = Statevector.from_instruction(bound)
    probs = sv.probabilities()
    top_indices = np.argsort(probs)[::-1][:max_candidates]
    
    candidates = []
    num_qubits = qc.num_qubits
    
    for idx in top_indices:
        p = probs[idx]
        b_str = format(idx, f'0{num_qubits}b')[::-1]
        candidates.append([b_str, p])
        
    return candidates

def qiskit_vqe(ham, num_qubits, depth=2, maxiter=200, seed=42):
    
    qc, param_list = vqe_circuit(num_qubits, depth)
    
    estimator = CompatibleEstimator()
    optimizer = SLSQP(maxiter=maxiter)
    
    rng = np.random.default_rng(seed)
    initial_point = rng.uniform(0, 2 * np.pi, size=len(param_list))
    
    vqe = QiskitVQE(estimator=estimator, ansatz=qc, optimizer=optimizer, initial_point=initial_point)
    result = vqe.compute_minimum_eigenvalue(ham)
    
    return result.eigenvalue.real, result.optimal_point, qc, param_list

def custom_vqe(ham, num_qubits, depth=2, lr=0.1, maxiter=100, seed=42, grad_method="spsa"):
    
    qc, param_list = vqe_circuit(num_qubits, depth)
    rng = np.random.default_rng(seed)
    params = rng.uniform(0, 2 * np.pi, size=len(param_list))
    estimator = CompatibleEstimator()
    
    for i in range(maxiter):
        
        # SPSA step, but we can use other ones as well
        c, gamma = 0.1, 0.1
        A, alpha = 50, 0.6
        step = i + 1
        
        ck = c / (step ** gamma)
        ak = lr / ((step + A) ** alpha)
        delta = rng.choice([-1, 1], size=len(params))
        
        est_p = estimator.run([(qc, ham, params + ck*delta)])
        ep = est_p.result()[0].data.evs
        est_m = estimator.run([(qc, ham, params - ck*delta)])
        em = est_m.result()[0].data.evs
        
        grad = (ep - em)/(2*ck*delta)
        params -= ak * grad
    
    est = estimator.run([(qc, ham, params)])
    E_final = est.result()[0].data.evs
    
    return E_final, params, qc, param_list

def solve_vqe(ham, num_qubits, depth=2, solver="qiskit", **kwargs):
    
    """
    VQE wrapper for custom and qiskit VQE implementations
    We use spsa for both QUBO and ground state, Qiskit here to benchmark our VQE implementation
    """
    
    if solver == "custom_spsa":
        return custom_vqe(ham, num_qubits, depth=depth, grad_method="spsa", **kwargs)
    elif solver == "qiskit":
        q_kwargs = {k: v for k, v in kwargs.items() if k in ['maxiter', 'seed']}
        return qiskit_vqe(ham, num_qubits, depth=depth, **q_kwargs)
    else:
        raise ValueError(f"Unknown solver {solver}")

## 6. Building a cluster

In [6]:
def make_pyscf_mol(species, coords, charge=0, basis='sto-3g'):
    
    """
    Making molecule for our candidate using parameters from QUBO
    Possible basis change — lanl2dz (for most), sto-3g (for simple)
    """
    
    atom_str = "; ".join(f"{s} {x:.8f} {y:.8f} {z:.8f}" for s, (x, y, z) in zip(species, coords))
    total_electrons = sum(Element(s).Z for s in species) - charge
    spin = 1 if total_electrons % 2 == 1 else 0
    
    mol = gto.Mole()
    mol.atom = atom_str
    mol.unit = "Angstrom"
    mol.basis = basis 
    mol.charge = charge
    mol.spin = spin
    mol.verbose = 0
    mol.build()
    
    return mol

def new_element(coords_inp, pattern_bits, alpha):
    
    coords = np.array(coords_inp, dtype=float)
    pattern = np.array(pattern_bits, dtype=int)
    
    if len(pattern) == 0:
        f = 0.5
    else:
        f = pattern.mean()

    scale = 1.0 + alpha * (f - 0.5)
    center = coords.mean(axis=0)
    new_coords = center + scale * (coords - center)
    
    return new_coords, scale


## 7. DFT for ground state computation

In [7]:
def dft_energy(species, coords, basis='sto-3g'):
    
    """
    DFT configuration for new generated structure
    
    We try charges [-2, 2] because clusters are often charged and
    return the energy of the first one that converges 
    """
    
    for chg in [0, 1, -1, 2, -2]:
        try:
            mol = make_pyscf_mol(species, coords, charge=chg, basis=basis)
            
            if mol.spin == 0:
                mf = pyscf_dft.RKS(mol)
            else:
                mf = pyscf_dft.UKS(mol)
                
            mf.xc = 'pbe'
            mf.max_cycle = 75
            mf.level_shift = 0.2
            e_tot = mf.kernel()
            
            if mf.converged:
                return e_tot, chg
                
        except Exception:
            continue
        
    print("  WARN: DFT failed to converge at any charge state.")
    return 0.0, 0

## 8. VQE for ground state computation

In [8]:
def build_qubit_hamiltonian(species, coords, charge=0, basis='sto-3g'):
    
    """
    Builds a molecule and plots it to qubits
    Returns hamiltonian of the system (qubit_ham, others not important)
    """
    
    try:
        mol = make_pyscf_mol(species, coords, charge=charge, basis=basis)
        if mol.spin == 0: 
            mf = scf.RHF(mol)
        else: 
            mf = scf.ROHF(mol)
        
        mf.run()
        if not mf.converged:
            return None, 0.0, None

        driver = PySCFDriver(atom=mol.atom, basis=basis, charge=charge, spin=mol.spin)
        es_problem = driver.run()
        
        #4 qubit size for faster computations, else takes too long and doesn't converge
        transformer = ActiveSpaceTransformer(num_electrons=2, num_spatial_orbitals=2)
        es_problem_reduced = transformer.transform(es_problem)
        ham = es_problem_reduced.hamiltonian
        operator = ham.second_q_op()
        
        mapper = JordanWignerMapper()
        qubit_ham = mapper.map(operator)
        
        return qubit_ham, mapper, es_problem_reduced

    except Exception as e:
        print(f"VQE Build Error: {e}")
        return None, 0.0, None

def vqe_energy(species, coords, charge_hint=0, basis='sto-3g', depth=1, lr=0.1, maxiter=50, seed=123, solver="qiskit"):
    
    qubit_h, shift, problem = build_qubit_hamiltonian(species, coords, charge=charge_hint, basis=basis)
    #TODO: find out why it doesn't work without shift and problem
    
    if qubit_h is None: return 0.0

    E_active, _, _, _ = solve_vqe(
        ham=qubit_h, num_qubits=qubit_h.num_qubits, depth=depth,
        solver=solver, lr=lr, maxiter=maxiter, seed=seed
    )
            
    return E_active

def binding_energy(total_energy, species_list, ref_energies):
    
    """
    Subtracts atomic reference energies from total energy to get binding energy
    """
    if total_energy == 0.0: return 0.0
    ref_sum = sum(ref_energies[s] for s in species_list)
    return total_energy - ref_sum


## 9. Overall pipeline

In [10]:
# 1. MOF
struct = get_mof(DATASET, MOF_ID)
species, coords, center = build_cluster(struct, CLUSTER_RADIUS)
unique_elements = sorted(list(set(species)))
if len(unique_elements) >= 2:
    type_map = {0: unique_elements[0], 1: unique_elements[1]}
else:
    type_map = {0: species[0], 1: species[0]}
print(f"Loaded {MOF_ID} ({len(species)} atoms)")
print(f"Mapping: 0={type_map[0]}, 1={type_map[1]}")

# Energy references
ref_energies = {}
for el in unique_elements:
    e_atom, _ = dft_energy([el], [[0.0, 0.0, 0.0]], basis='sto-3g')
    ref_energies[el] = e_atom
    # print(f"  E({el}) = {e_atom:.6f} Ha")

#QUBO
qp, qubo, ising_op, offset, edges = build_qubo(coords, target_fraction=TARGET_LONG_FRACTION, dist_cutoff=DIST_CUTOFF)
print("QUBO candidate search")
E_qubo, theta_qubo, qc_qubo, param_list_qubo = solve_vqe(ising_op, ising_op.num_qubits, depth=2, solver=VQE_QUBO_SOLVER)
candidates = top_bitstrings(qc_qubo, param_list_qubo, theta_qubo, max_candidates=MAX_CANDIDATES)

print(f"Candidates found: {[cand[0] for cand in candidates]}")

print(f"\n{'Pattern':<10} | {'Formula':<10} | {'E_Total (Ha)':<15} | {'E_Binding (Ha)':<15} | {'Method'}")
print("-" * 75)

E_base, chg_base = dft_energy(species, coords)
E_bind_base = binding_energy(E_base, species, ref_energies)
baseline_formula = "".join(str(b) for b in [1 if s == type_map[1] else 0 for s in species])
print(f"{baseline_formula:<10} | {str(species):<10} | {E_base:.6f}        | {E_bind_base:.6f}        | Original")

# Ground state
results = []

for bs, prob in candidates:
    
    if len(bs) > len(species): 
        bs = bs[-len(species):]
    
    # Drop combinations where all atoms are the same
    patt = [int(b) for b in bs]
    if len(unique_elements) >= 2:
        if 0 not in patt or 1 not in patt:
            continue 
            
    new_species = [type_map[b] for b in patt]
    new_coords, scale = new_element(coords, patt, alpha=GEOM_SCALE_ALPHA)
    formula_display = f"{type_map[0]}{patt.count(0)} {type_map[1]}{patt.count(1)}"
    
    E_dft, chg = dft_energy(new_species, new_coords)
    E_bind_dft = binding_energy(E_dft, new_species, ref_energies)
    
    E_vqe_tot = 0.0
    E_bind_vqe = 0.0
    if E_dft != 0.0:
        E_bind_vqe = vqe_energy(new_species, new_coords, charge_hint=chg, seed=int(prob*100), solver=VQE_GROUND_SOLVER)
        E_vqe_tot = -binding_energy(E_bind_vqe, new_species, ref_energies)
        results.append([bs, E_bind_dft, E_bind_vqe])
        
    print(f"{bs:<10} | {formula_display:<10} | {E_dft:.6f}        | {E_bind_dft:.6f}        | DFT")
    if E_vqe_tot != 0.0:
        print(f"{'':<10} | {'':<10} | {E_vqe_tot:.6f}        | {E_bind_vqe:.6f}        | VQE")

print("-" * 75)

if results:
    
    best = min(results, key=lambda x: x[1])
    
    print(f"Best Configuration (DFT): {best[0]}")
    print(f"  Binding Energy: {best[1]:.6f} Ha")
    
    diff = best[1] - E_bind_base
    
    if diff < -0.001:
        print(f"-> The pattern '{best[0]}' is more stable than the original!")
        print(f"   (Stability gain: {abs(diff):.6f} Ha)")
    elif diff > 0.001:
        print(f"-> Original structure is the global minimum")
    else:
        print(f"-> Candidates are energetically equivalent")
else:
    print("No valid mixed candidates found")

Loaded KAXQIL_clean (3 atoms)
Mapping: 0=Ca, 1=O
QUBO candidate search
Candidates found: ['010', '011', '100']

Pattern    | Formula    | E_Total (Ha)    | E_Binding (Ha)  | Method
---------------------------------------------------------------------------
011        | ['Ca', 'O', 'O'] | -818.406047        | -0.281336        | Original
010        | Ca2 O1     | -1414.854527        | -0.073895        | DFT
           |            | -1414.326503        | -0.454129        | VQE
VQE Build Error: 'The number of inactive electrons must be even.'
011        | Ca1 O2     | -818.406482        | -0.281771        | DFT
100        | Ca2 O1     | -1415.014102        | -0.233470        | DFT
           |            | -1414.493796        | -0.286835        | VQE
---------------------------------------------------------------------------
Best Configuration (DFT): 011
  Binding Energy: -0.281771 Ha
-> Candidates are energetically equivalent
