In [1]:
from __future__ import annotations

import h5py
import sys
import time
from collections import defaultdict
from collections.abc import Iterable, Sequence
from functools import lru_cache

cache = lru_cache(maxsize=None)

import numpy as np

from qiskit.circuit import Instruction, Parameter, QuantumCircuit
from qiskit.exceptions import QiskitError
from qiskit.opflow import PauliSumOp
from qiskit.primitives import BaseEstimator, EstimatorResult
from qiskit.primitives.utils import init_circuit, init_observable
from qiskit.quantum_info import Statevector
from qiskit.quantum_info.operators.base_operator import BaseOperator

from qiskit.circuit import QuantumCircuit, Parameter
from qiskit.circuit.library import TwoLocal

from qiskit.algorithms.optimizers import SPSA
from qiskit.algorithms.optimizers.spsa import powerseries

from qiskit_nature.drivers import Molecule, QMolecule
from qiskit_nature.properties.second_quantization.electronic.bases import ElectronicBasis
from qiskit_nature.transformers.second_quantization.electronic.active_space_transformer import ActiveSpaceTransformer
from qiskit_nature.drivers.second_quantization import PySCFDriver, HDF5Driver
from qiskit_nature.problems.second_quantization import ElectronicStructureProblem
from qiskit_nature.mappers.second_quantization import JordanWignerMapper, ParityMapper
from qiskit_nature.converters.second_quantization import QubitConverter
from qiskit_nature.algorithms.ground_state_solvers import (
    GroundStateEigensolver,
    NumPyMinimumEigensolverFactory,
)

from circuit_knitting_toolbox.entanglement_forging import (
    EntanglementForgingKnitter,
    EntanglementForgingDriver,
)
from circuit_knitting_toolbox.decompose import CholeskyDecomposition

class CachingEstimator(BaseEstimator):
    """
    Reference implementation of :class:`BaseEstimator`.
    :Run Options:
        - **shots** (None or int) --
          The number of shots. If None, it calculates the exact expectation
          values. Otherwise, it samples from normal distributions with standard errors as standard
          deviations using normal distribution approximation.
        - **seed** (np.random.Generator or int) --
          Set a fixed seed or generator for the normal distribution. If shots is None,
          this option is ignored.
    """

    def __init__(
        self,
        circuits: QuantumCircuit | Iterable[QuantumCircuit],
        observables: BaseOperator | PauliSumOp | Iterable[BaseOperator | PauliSumOp],
        parameters: Iterable[Iterable[Parameter]] | None = None,
    ):
        if isinstance(circuits, QuantumCircuit):
            circuits = (circuits,)
        circuits = tuple(init_circuit(circuit) for circuit in circuits)

        if isinstance(observables, (PauliSumOp, BaseOperator)):
            observables = (observables,)
        observables = tuple(init_observable(observable) for observable in observables)

        super().__init__(
            circuits=circuits,
            observables=observables,
            parameters=parameters,
        )
        self._is_closed = False

    ################################################################################
    ## INTERFACE
    ################################################################################
    def _call(
        self,
        circuits: Sequence[int],
        observables: Sequence[int],
        parameter_values: Sequence[Sequence[float]],
        **run_options,
    ) -> EstimatorResult:
        if self._is_closed:
            raise QiskitError("The primitive has been closed.")

        # Rename for clarity
        circuit_indices = circuits
        observable_indices = observables
        parameter_values_list = parameter_values
        del circuits, observables, parameter_values

        # Parse options
        shots = run_options.pop("shots", None)
        rng = self._parse_rng_from_seed(run_options.pop("seed", None))

        # Initialize
        states = [
            self._build_statevector(circuit_index, tuple(parameter_values))
            for circuit_index, parameter_values in zip(
                circuit_indices, parameter_values_list
            )
        ]
        observables = [self._observables[i] for i in observable_indices]

        # Solve
        raw_results = [
            self._compute_result(state, observable, shots, rng)
            for state, observable in zip(states, observables)
        ]
        expectation_values, metadata = zip(*raw_results)

        return EstimatorResult(np.array(expectation_values), metadata)

    def close(self):
        self._is_closed = True

    ################################################################################
    ## UTILS
    ################################################################################
    def _bind_circuit_parameters(
        self, circuit_index: int, parameter_values: tuple[float]
    ) -> QuantumCircuit:
        parameters = self._parameters[circuit_index]
        if len(parameter_values) != len(parameters):
            raise ValueError(
                f"The number of values ({len(parameter_values)}) does not match "
                f"the number of parameters ({len(parameters)})."
            )
        circuit = self._circuits[circuit_index]
        if not parameter_values:
            return circuit
        parameter_mapping = dict(zip(parameters, parameter_values))
        return circuit.bind_parameters(parameter_mapping)

    @cache  # Enables memoization (tuples are hashable)
    def _build_statevector(
        self, circuit_index: int, parameter_values: tuple[float]
    ) -> Statevector:
        circuit = self._bind_circuit_parameters(circuit_index, parameter_values)
        instruction = circuit.to_instruction()
        return Statevector(instruction)

    def _compute_result(
        self, state: Statevector, observable: BaseOperator | PauliSumOp, shots: int, rng
    ) -> tuple[float, dict]:
        if state.num_qubits != observable.num_qubits:
            raise QiskitError(
                f"The number of qubits of a circuit ({state.num_qubits}) does not match "
                f"the number of qubits of a observable ({observable.num_qubits})."
            )
        expectation_value = np.real_if_close(state.expectation_value(observable))
        metadatum = {}
        if shots is not None:
            sq_obs = (observable @ observable).simplify()
            sq_exp_val = np.real_if_close(state.expectation_value(sq_obs))
            variance = sq_exp_val - expectation_value**2
            standard_deviation = np.sqrt(variance / shots)
            expectation_value = rng.normal(expectation_value, standard_deviation)
            metadatum["variance"] = variance
            metadatum["shots"] = shots
        return float(expectation_value), metadatum

    def _parse_rng_from_seed(self, seed: None | int | np.random.Generator):
        if seed is None:
            return np.random.default_rng()
        elif isinstance(seed, np.random.Generator):
            return seed
        else:
            return np.random.default_rng(seed)

### Define the $CH_3$ molecule, define the active space transform, and instantiate an ElectronicStructureProblem

In [2]:
# Define a molecular system of interest - Methyl radical 
molecule = Molecule(
    geometry= [['C',[0.0, 0.0, 0.00]],
                ['H',[1.0790, 0.0, 0.00]],
                ['H',[-0.5395, -0.9344, 0.00]],
                ['H',[-0.5395, 0.9344, 0.00]]],
    charge=0,
    multiplicity=2,
)

driver = PySCFDriver.from_molecule(molecule=molecule, basis="sto-3g")
converter = QubitConverter(JordanWignerMapper())

# Construct an active space composed of 6 molecular orbitals 
transformer = ActiveSpaceTransformer(num_electrons=(3,2), num_molecular_orbitals=6)
problem_reduced = ElectronicStructureProblem(driver, [transformer])

### Retrieve the one and two-body integrals and the nuclear repulsion energy. These will be used to decompose the operator into a bipartite system.

In [3]:
H_fermionic = problem_reduced.second_q_ops()[0]
electronic_energy_object = problem_reduced.grouped_property_transformed.get_property(
    "ElectronicEnergy"
)
energy_shift = (
    electronic_energy_object._shift["ActiveSpaceTransformer"]
    + electronic_energy_object._nuclear_repulsion_energy
)

# These are the integrals in the molecular orbital basis retrieved from the 6 orbital active space.
one_body_integrals_alpha = electronic_energy_object.get_electronic_integral(
    ElectronicBasis.MO, 1
)._matrices[0]
one_body_integrals_beta = electronic_energy_object.get_electronic_integral(
    ElectronicBasis.MO, 1
)._matrices[1]

two_body_integrals_alpha_alpha = electronic_energy_object.get_electronic_integral(
    ElectronicBasis.MO, 2
)._matrices[0]
two_body_integrals_beta_alpha = electronic_energy_object.get_electronic_integral(
    ElectronicBasis.MO, 2
)._matrices[1]
two_body_integrals_beta_beta = electronic_energy_object.get_electronic_integral(
    ElectronicBasis.MO, 2
)._matrices[2]
two_body_integrals_alpha_beta = electronic_energy_object.get_electronic_integral(
    ElectronicBasis.MO, 2
)._matrices[3]

### Create an ElectronicStructureProblem from our EF Driver and perform second quantization transformation

In [4]:
driver = EntanglementForgingDriver(
    hcore=one_body_integrals_alpha,
    mo_coeff=np.eye(6, 6),
    eri=two_body_integrals_alpha_alpha,
    num_alpha=3,
    num_beta=2,
    nuclear_repulsion_energy=energy_shift,
)
problem = ElectronicStructureProblem(driver)
ops = problem.second_q_ops()

### Prepare the bitstrings and the ansatz. 

The ansatz for Entanglement Forging consists of a set of input bitstrings and a parameterized ansatz. If only one set of bitstrings is passed, it will be used for both subsystems. For this demo, we will specify different bitstrings for each subsystem.

In [5]:
bitstrings_u = [
    [1, 1, 1, 0, 0, 0],
    [0, 1, 1, 0, 0, 1],
    [1, 0, 1, 0, 1, 0],
    [1, 0, 1, 1, 0, 0],
    [0, 1, 1, 1, 0, 0],
]
bitstrings_v = [
    [1, 1, 0, 0, 0, 0],
    [0, 1, 0, 0, 0, 1],
    [1, 0, 0, 0, 1, 0],
    [1, 0, 0, 1, 0, 0],
    [0, 1, 0, 1, 0, 0],
]

# Define ansatz parameters:
brickwall =[(4,5),(3,4),(2,3),(4,5),(1,2),(3,4),(4,5),(2,3),(0,1),(1,2),(3,4),(2,3),(4,5),(3,4),(4,5)]

n_theta    = len(brickwall)
nqubit = len(bitstrings_u[0])
theta    = Parameter('θ')
hop_gate = QuantumCircuit(2, name="Hop gate")
hop_gate.h(0)
hop_gate.cx(1, 0)
hop_gate.cx(0, 1)
hop_gate.ry(-theta, 0)
hop_gate.ry(-theta, 1)
hop_gate.cx(0, 1)
hop_gate.h(0)

theta_vec = [Parameter('θ%d'%i) for i in range(n_theta)]

# Create the parametrized circuit (ansatz). The same circuit will be used for both subsystems, U and V
ansatz = QuantumCircuit(nqubit)
for m,(i,j) in enumerate(brickwall):
    ansatz.append(hop_gate.to_gate({theta:theta_vec[m]}),[i,j])

ansatz.draw('text', justify='right', fold=-1)

### Decompose the operator.

In [6]:
cholesky = CholeskyDecomposition(problem)
decomposed_operator = cholesky.decompose()

### Instantiate an EntanglementForgingKnitter to provide our decomposed ansatze

In [7]:
forging_knitter = EntanglementForgingKnitter(ansatz, bitstrings_u=bitstrings_u, bitstrings_v=bitstrings_v)

### Set up our circuits and observables for the Estimator

In [8]:
from qiskit.primitives import Estimator

# Assign some random parameters to our ansatz. Normally these would be passed in from the optimizer
ansatz.assign_parameters(np.random.random(len(ansatz.parameters)), inplace=True)

tensor_ansatze_u = [
    prep_circ.compose(ansatz) for prep_circ in forging_knitter._tensor_circuits_u
]
superposition_ansatze_u = [
    prep_circ.compose(ansatz) for prep_circ in forging_knitter._superposition_circuits_u
]
tensor_ansatze_v = [
    prep_circ.compose(ansatz) for prep_circ in forging_knitter._tensor_circuits_v
]
superposition_ansatze_v = [
    prep_circ.compose(ansatz) for prep_circ in forging_knitter._superposition_circuits_v
]

tensor_ansatze = tensor_ansatze_u + tensor_ansatze_v
superposition_ansatze = superposition_ansatze_u + superposition_ansatze_v

tensor_observables = decomposed_operator.tensor_paulis
superpos_observables = decomposed_operator.superposition_paulis

tensor_estimator = Estimator(tensor_ansatze, tensor_observables)
superposition_estimator = Estimator(superposition_ansatze, superpos_observables)

### Calculate all expvals for one iteration of entanglement forging using Qiskit's Estimator and time it.

### For this algorithm, we need expvals for every circuit, evaluated on every observable. The circuits have already had their parameters bound ahead of time, as they all share identical parameters and values.

In [9]:
start_time = time.time()

tensor_expvals = []
for circ, i in enumerate(tensor_ansatze):
    circuit_indices = [i] * len(tensor_observables)
    observable_indices = range(len(tensor_observables))
    expvals = tensor_estimator(circuit_indices, observable_indices)
    tensor_expvals.append(expvals)
    
superposition_expvals = []
for circ, i in enumerate(superposition_ansatze):
    circuit_indices = [i] * len(superpos_observables)
    observable_indices = range(len(superpos_observables))
    expvals = superposition_estimator(circuit_indices, observable_indices)
    superposition_expvals.append(expvals)
    
end_time = time.time()

runtime = end_time - start_time

In [10]:
print(f"Runtime: {runtime} seconds")

Runtime: 63.3615300655365 seconds


### Now use the caching estimator and observe the speedup

In [11]:
tensor_estimator = CachingEstimator(tensor_ansatze, tensor_observables)
superposition_estimator = CachingEstimator(superposition_ansatze, superpos_observables)

In [12]:
start_time = time.time()

tensor_expvals = []
for circ, i in enumerate(tensor_ansatze):
    circuit_indices = [i] * len(tensor_observables)
    observable_indices = range(len(tensor_observables))
    expvals = tensor_estimator(circuit_indices, observable_indices)
    tensor_expvals.append(expvals)
    
superposition_expvals = []
for circ, i in enumerate(superposition_ansatze):
    circuit_indices = [i] * len(superpos_observables)
    observable_indices = range(len(superpos_observables))
    expvals = superposition_estimator(circuit_indices, observable_indices)
    superposition_expvals.append(expvals)
    
end_time = time.time()

runtime = end_time - start_time

In [13]:
print(f"Runtime: {runtime} seconds")

Runtime: 1.3411839008331299 seconds
