In [None]:
import numpy as np

#Operator Imports
from qiskit import QuantumRegister, QuantumCircuit
from qiskit.opflow import X, Y, Z, I, Zero, StateFn, CircuitStateFn, SummedOp, ListOp, PauliExpectation, PauliSumOp
from qiskit.opflow.gradients import Gradient, Hessian
from qiskit.circuit import Parameter
from qiskit.utils import QuantumInstance
from qiskit_machine_learning.neural_networks import OpflowQNN
from qiskit.aqua.components.optimizers import ADAM, AQGD, COBYLA

# imports to plot stuff
import matplotlib.pyplot as plt
from IPython.display import display
plt.style.use('dark_background')

In [None]:
class QuantumODESolver(object):
    """
    Main Class for the ODE solver using Hybrid Classical-Quantum
    neural network. Instantiate this class to use the solver.
    
    Implementation of the following paper: arXiv:2011.10395v2 in Qiskit.
    
    >> Developed as part of Qiskit Mentorship Program 2021 (Project #32:
    Solving Navier-Stokes equations using Qiskit for Water Management.) <<
    
    Init Parameters
    ---------------
    
    num_qubits : Number of qubits in the quantum register of the quantum model 
                 circuit.
    
    variational_layers : Number of variational layers in the model circuit.
    
    encoder_layers : Number of encoder layers in the model circuit.
    
    """
    def __init__(self, num_qubits = 1, variational_layers = 1, encoder_layers = 1):
        self.num_qubits = num_qubits
        self.encoder_layers = encoder_layers
        self.variational_layers = variational_layers
        self.encoder_symbols = []
        self.unitary_symbols = []
        self.model_circuit = QuantumCircuit(self.num_qubits)
        self.model = None
        self.measure_op = None
        self.gradient_op = None
        # Construct encoder block of the model
        self.construct_encoder()
        
        # Construct variational block of the model
        #self.construct_variational_ansatz()
        
        # Get measurement observable
        self.get_measurement_observable()
        
        # Forming combined model
        self.compile_model()
        
        print("Model compiled...........................")
        
        # Initialize unitary parameters
        self.unitary_params = np.random.random(size = (len(self.unitary_symbols), ))
        
        # Initialize gradient op of the model circuit
        self.gradient_op = Gradient(grad_method = 'fin_diff').convert(operator = self.model, params = (self.encoder_symbols))
        
        
    def construct_encoder(self):
        """
        Apply Quantum Feature map (Chebyshev sparse encoding) to the final 
        model circuit. Feel free to change the code below to implement other
        quantum feature maps.
        
        Parameters
        ----------
        
        None
        
        Returns
        -------
        
        None
        
        """
        self.encoder_block = QuantumCircuit(self.num_qubits, name = 'Encoder Block')
        x = Parameter('x')
        self.encoder_symbols.append(x)
        for i in range(self.num_qubits):
            self.encoder_block.ry(2 * (i+1) * np.arccos(x), i)
        #print("Encoder Block------------------------------------")
        #display(self.encoder_block.draw(output = 'mpl'))
        return 
        
        
    def construct_variational_ansatz(self, layer):
        """
        Apply Hardware-efficient ansatz as the variational block in the
        final model circuit. Feel free to change the code below to
        implement different architectures for the variational part of
        the model circuit.
        
        Parameters
        ----------
        
        layer : Layer number of the variational block to be created. To
                differentiate between the subsequent layers of the same
                variational block, if the same block is repeated. If layer
                is same for two blocks, then the two blocks will 
                have the same values for the unitary parameters.
        
        Returns
        -------
        
        None
        
        """
        self.variational_block = QuantumCircuit(self.num_qubits, name = 'Variational Block ' + str(layer + 1))
        # First, we apply Wall of unitaries
        for i in range(self.num_qubits):
            theta_z0 = Parameter('theta_z0' + str(i) + str(layer))
            self.unitary_symbols.append(theta_z0)
            theta_x = Parameter('theta_x' + str(i) + str(layer))
            self.unitary_symbols.append(theta_x)
            theta_z1 = Parameter('theta_z1'+ str(i) + str(layer))
            self.unitary_symbols.append(theta_z1)
            self.variational_block.rz(theta_z0, i)
            self.variational_block.rx(theta_x, i)
            self.variational_block.rz(theta_z1, i)

        # Next, we apply Entanglement wall (in a cyclic fashion, joining immediate neighbours)
        for i in range(1, self.num_qubits):
            self.variational_block.cx(i - 1, i)
        #if num_qubits > 1:
        #    self.variational_block.cx(self.num_qubits-1, 0)
        #print("Variational block--------------------------------")
        #display(self.variational_block.draw(output = 'mpl'))
        return
    
    
    def get_measurement_observable(self):
        """
        Construct operator for measurement of expectation value
        of the final model circuit.
        Note, can be any quantum circuit, but for now considering
        only the total magnetic spin of the quantum register. Feel free
        to change this functin as per design choice.
        
        Parameters
        ----------
        
        None
        
        Returns
        -------
        
        None
        
        """
        self.measure_op = None
        for i in range(self.num_qubits):
            block = None
            for j in range(self.num_qubits):
                if j == i:
                    if block is None:
                        block = Z
                    else:
                        block = Z ^ block
                else:
                    if block is None:
                        block = I
                    else:
                        block = I ^ block
            if self.measure_op is None:
                self.measure_op = block
            else:
                self.measure_op = self.measure_op + block
        #print("Measurement op \n, ", self.measure_op)
        return
    
    
    def compile_model(self):
        """
        Compile (concatenate) the variational blocks and encoder blocks to get 
        the final model circuit expectation operator. 
        Feel free to change the code of this function to change the overall architecture 
        of the quantum model circuit (i.e, whether or not to repeat variational
        or encoder blocks ("Data re-uploading") etc.)
        
        Parameters
        ----------
        
        None
        
        Returns
        -------
        
        None
        
        """
        measure = self.convert_to_statefn(self.measure_op, is_measurement = True)
        for i in range(self.encoder_layers):
            if i == 0:
                self.construct_variational_ansatz(layer = i)
            self.model_circuit.append(self.variational_block, range(self.num_qubits))
            self.model_circuit.append(self.encoder_block, range(self.num_qubits))
        self.construct_variational_ansatz(layer = self.encoder_layers)
        self.model_circuit.append(self.variational_block, range(self.num_qubits))
        self.model = PauliExpectation().convert(measure @ CircuitStateFn(self.model_circuit))
        return
        
        
    def convert_to_statefn(self, obj, is_measurement = False):
        """
        Convert obj to StateFn, throws exception if cannot convert.
        
        """
        try:
            if is_measurement:
                return ~StateFn(obj)
            else:
                return StateFn(obj)
        except Exception as err:
            print("Cannot convert object of type {} to StateFn ".format(type(obj)))
            print("Complete error log is as follows-------------")
            print(">> {} <<".format(err))
        return
    
    
    def resolve_parameters(self, data, param_values):
        """
        Resolves the parameter values for free parameters in the final
        model circuit.
        
        Parameters
        ----------
        
        data : Classical data that goes into the encoder block.
               
        param_values : Parameter values for the free parameter in the unitary block.
        
        Returns
        -------
        
        A dictionary that maps each free parameter in the final model circuit
        (encoder blocks + unitary blocks) to corresponding value. The output 
        of this function can directly be used to bind parameter values in the
        model circuit operator (if successful).
        
        """
        param_dict = {}
        data = np.array(data)
        if len(param_values.shape) == 1:
            param_values = list(param_values)
        else:
            param_values = list(param_values.flatten())
        
        data_size = data.shape[0]
        model_param_values = []
        model_param_vars = []
        if data.shape[-1] != len(self.encoder_symbols) and len(data.shape) != len(self.encoder_symbols):
            print("Dimensionality of data does not match the number of free parameters for encoding data in the model circuit")
            return
        if len(param_values) != len(self.unitary_symbols):
            print(len(self.unitary_symbols))
            print(len(param_values))
            print("Number of param values supplied not equal to number of free parameters in the unitary blocks of the model circuit")
            return
        for var in self.encoder_symbols:
            model_param_vars.append(var)
        if len(data.shape) == 1:
            model_param_values.append(list(data))
        else:
            for p in data:
                model_param_values.append(list(p))
        #print("After resolving encoder params : ", model_param_values)
        for param in param_values:
            model_param_values.append([param] * data_size)

        #print("After resolving unitary params : ", model_param_values)
        for var in self.unitary_symbols:
            model_param_vars.append(var) 
        param_dict = dict(zip(model_param_vars, model_param_values))
        if len(param_dict.keys()) != len(self.encoder_symbols) + len(self.unitary_symbols):
            print("Resolving of parameters failed, check the number of parameter values passes for resolving !!")
            print("Number of free parameters : {}".format(len(self.encoder_symbols) + len(self.unitary_symbols)))
            print("Number of resolved params : {}".format(len(param_dict.keys())))
            print(param_dict)
            return
        return param_dict
    
    
    def predict(self, data, param_values = None, is_training = False, verbose = False):
        """
        Get expectation value and gradient at each data point from the model circuit.
        
        Parameters
        ----------
        
        data: Classical Grid points which is to be encoded as input in the quantum model.
        
        param_values : (Optional) Parameter values for the unitary block parameters at which the 
                       model output should be evaluated. If not provided by user, it fills this variable
                       with already stored parameter values in the object of QuantumODESolver class.
        
        is_training : Boolean flag to denote whether the call to this function is being made at the time
                      of training. If True, then the function outputs both solution of the differential equation
                      at grid points x and the corresponding gradients of the solution function at those points.
                      If False, it just outputs the evaulated solution at grid points. 
        
        verbose : (Optional) To print every predict function call output in each iteration of optimizer. Useful
                  to check how many calls the optimizer makes to the predict function.
        
        Returns
        -------
        
        Either two lists of solution values at grid points x and corresponding gradients at 
        those points, or simply a list of solution values at grid points x.
        
        """
        batch_size = 1
        exp_values = []
        grad_values = []
        if param_values is None:
            param_values = self.unitary_params
        param_values = np.array(param_values).flatten()
        if len(param_values) == len(self.unitary_params):
            param_values = param_values[np.newaxis, :]  
        else:
            if len(param_values.flatten()) % len(self.unitary_params) != 0:
                print("Cannot reshape the parameter values provided into shape  = ({}, )".format(len(self.unitary_params)))
                sys.exit()
            batch_size = int(len(param_values.flatten()) / len(self.unitary_params))
            param_values = np.reshape(param_values, newshape = (batch_size, len(self.unitary_params)))
                             
        # Predicting for each batch of parameters supplied
        for batch in range(batch_size):
            batch_exp_value = None
            batch_grad_value = None
            if verbose:
                print("Predicting for batch {} / {}".format((batch + 1), batch_size), end = '\r')
            batch_param_values = param_values[batch]
            param_dict = self.resolve_parameters(data, batch_param_values)
            if param_dict is None:
                print("Prediction failed for batch {} / {}...".format((batch + 1), batch_size))
                sys.exit()
            batch_exp_value = self.model.bind_parameters(param_dict)
            batch_exp_value = np.real(batch_exp_value.eval()).flatten() / self.num_qubits
            exp_values.append(list(batch_exp_value))
            if is_training:
                batch_grad_value = np.real(self.gradient_op.bind_parameters(param_dict).eval()).flatten()
                grad_values.append(list(batch_grad_value))
        if is_training:
            return exp_values, grad_values
        else:
            return exp_values
    
    
    def solution(self, x, param_values = None, is_training = False, verbose = False):
        """
        Internally calls the predict function and outputs the value of
        solution for given array points in the arguments. Also takes
        care of the floating boundary condition.
        
        Parameters
        ----------
        
        x : Numpy array of data grid points at which the solution (function that solves 
            the required differential equation) needs to be evaluated.
        
        param_values : (Optional) Parameter values for the unitary block parameters at which the 
                       model output should be evaluated. If not provided by user, it fills this variable
                       with already stored parameter values in the object of QuantumODESolver class.
        
        is_training : Boolean flag to denote whether the call to this function is being made at the time
                      of training. If True, then the function outputs both solution of the differential equation
                      at grid points x and the corresponding gradients of the solution function at those points.
                      If False, it just outputs the evaulated solution at grid points. 
        
        verbose : (Optional) To print every predict function call output in each iteration of optimizer. Useful
                  to check how many calls the optimizer makes to the predict function.
                  
        Returns
        -------
        
        Either two lists of solution values at grid points x and corresponding gradients at 
        those points, or simply a list of solution values at grid points x.
        
        """
        u, du_dx = self.predict(x, param_values, is_training = True, verbose = verbose)
        
        # Floating boundary condition
        bc = self.predict(np.array([0.0]), param_values, verbose = verbose)
        # Reshaping arrays
        u  = np.array(u)
        bc = np.array(bc)
        du_dx = np.array(du_dx)
        u = np.reshape(u, newshape = (u.shape[1], u.shape[0]))
        bc = np.reshape(bc, newshape = (bc.shape[1], bc.shape[0]))
        du_dx = np.reshape(du_dx, newshape = (du_dx.shape[1], du_dx.shape[0]))
        
        # With floating boundary condition, the value of function itself is defined in such a way
        # that the boundary condition is always satisfied irrespective of the model (quantum circuit's) output.
        u = np.array([1] * u.shape[1])  - bc + u
        
        if is_training:
            return u, du_dx
        else:
            return u
        
    
    def fit(self, x, y = None, epochs = 20, batch_size = 5, lr = 0.05, optimizer_maxiter = 5, verbose = False):
        """
        
        Fit function for the quantum model used to train the model. Feel free to change the loss function
        method inside as per the differential equation required to solve.
        
        Parameters
        ----------
        
        x : Numpy array of data grid points at which the model is fitted.
        
        y : (Optional) values which can be further used to define boundary conditions or 
            initial conditions. Default to None.
            
        epochs : Number of epochs for which the model should be trained.
        
        batch_size : Number of data grid points to be taken in a single batch of training.
                     Note, that the update to the parameter happens for each batch rather 
                     than at the end of epoch.
        
        lr : learning rate used for the ADAM optimizer. Not required if some other optimizer is chosen.
        
        optimizer_maxiter : Max iterations for which a single batch of training data should be used
                            to update the parameter values. This parameter is passed to the optimizer (currently
                            implemented for ADAM).
        
        verbose : (Optional) To print every loss function call output in each iteration of optimizer. Useful
                  to check how many calls the optimizer makes to the loss function.
                  
        Returns
        -------
        
        List of parameter values after the model is trained.
                  
        """
        def loss_function(param_values):
            # get function value (f(x)) and gradients at points specified (array 'x' above)
            u, du_dx = self.solution(curr_batch, param_values, is_training = True, verbose = verbose)
            
            # Change the line below as per the differential equation required to solve
            # Following differential equation is taken from arXiv:2011.10395v2
            loss = np.mean(np.log((du_dx + 20 * u *(0.1 + np.tan(20 * u))) ** 2), axis = 0)
            return loss
        
        epoch = 0
        batch_start_idx = 0
        curr_batch = x
        epoch_end = False
        
        print("Initial Loss : {}".format(loss_function(param_values = None)))
        print("Initial unitary params : {}".format(self.unitary_params))
        print("Initial learning rate : {}".format(lr))
        print("Training Begins...........................................................................................")
        while(epoch < epochs):
            epoch_loss = 0.0
            batch_start_idx = 0
            curr_batch = None
            epoch_end = False
            while(not epoch_end):
                print("Training for Batch {} / {}".format(int(batch_start_idx / batch_size) + 1, int(np.ceil(len(x) / batch_size))), end = '\r')
                curr_batch = x[batch_start_idx : min(batch_start_idx + batch_size, len(x))]
                self.unitary_params, epoch_loss , _ = ADAM(maxiter = optimizer_maxiter, lr = lr).optimize(len(self.unitary_symbols), loss_function, initial_point = self.unitary_params)
                if min(batch_start_idx + batch_size, len(x)) == len(x):
                    epoch_end = True
                else:
                    batch_start_idx += batch_size
            curr_batch = x
            print("Epoch : {}, Current Loss : {}".format((epoch + 1), loss_function(param_values = None)))
            print("Updated param values: ", self.unitary_params)
            print("Current learning rate : {}".format(lr))
            print("-------------------------------------------------------------------------------------------------------")
            epoch += 1
            #lr = (epoch + 1) * (1e-02 - 1e-04)/(epochs)
        return self.unitary_params
        
            
    def __call__(self):
        print("This model is constructed for-----")
        print("Number of qubits : {}".format(self.num_qubits))
        print("Number of layers (depth) : For encoder block - {}, For variational block - {}".format(self.encoder_layers, self.variational_layers))
        print("Structure of encoder block-----")
        display(self.encoder_block.draw(output = 'mpl'))
        print("Structure of variational block-----")
        display(self.variational_block.draw(output = 'mpl'))
        print("Complete model circuit looks like : ")
        display(self.model_circuit.draw(output = 'mpl'))
        print("Number of variational parameters : {}".format(len(self.unitary_symbols)))
        print("Number of encoder parameters : {}".format(len(self.encoder_symbols)))

In [None]:
# Defining the solver architecture
num_qubits = 6
enc_layers = 5
var_layers = 1
# Creating data grid points
X = np.linspace(0, 0.9, 20)
# Instantiating the solver
solver = QuantumODESolver(num_qubits, var_layers, enc_layers)
# Summary of model architecture
solver()
# Training the solver
solver.fit(X, verbose = False)

In [None]:
# Testing the solver
X_test = np.linspace(0, 0.9, 30)
predicted = solver.solution(X_test)
plt.plot(X_test, np.exp(-20 * 0.1 * X_test) * np.cos(20 * X_test), c = 'yellow')
plt.plot(X_test, predicted, c = 'blue')
plt.show()