In [None]:
from qiskit import QuantumCircuit, QuantumRegister, ClassicalRegister, Aer, execute, transpile
from qiskit.algorithms import FasterAmplitudeEstimation, EstimationProblem
from qiskit.quantum_info import Statevector
import numpy as np

# Variational quantum sampling for probability distribution approximation
- [Quantum self-learning Monte Carlo with quantum Fourier transform sampler](https://arxiv.org/abs/2005.14075).

**Quantum Fourier Transform**

To start things off, we are going to define a function that returns a QFT circuit. We don't use Qiskit's native implementation to get more control over our circuits and facilitate further debugging. 

In [None]:
def qft(n):
    qc = QuantumCircuit(n)
    for i in range(n - 1, -1, -1):
        qc.h(i)
        for j in range(i - 1, -1, -1):
            x = 2 ** (j - i)
            qc.cp(np.pi * x, i, j)
    return qc

In [None]:
qft(4).draw()

**One-dimensional QFT sampler**

Using our QFT circuit, we can construct the sampler circuit. This is an $n$-qubit circuit consisting of two registers: $|\text{in}\rangle$ of size $m$ and $|0\rangle^{n-m}$. The variational parameters will be firstly encoded into the $|\text{in}\rangle$ register by setting its state into the state

$$ |\text{in}\rangle = \sum_{i=0}^{2^m - 1} \theta_i|i\rangle,$$

where $\theta = [\theta_0, \theta_1, \cdots, \theta_{2^m - 1}]$ with $\theta \in \mathbb{C}$ and is our parameter vector which has unit norm.

In [None]:
def one_sampler(n, m, params):
    """
    Parameters:
        n: int
            Number of qubits in sampler
        m: int
            Number of qubits in parametrized register
        params: list[complex]
            List of of size 2**m of complex parameters
    """
    p  = QuantumRegister(m, "in")
    g  = QuantumRegister(n - m, "g")
    qc = QuantumCircuit(p, g)
    
    # Encode parameters into first m qubits
    qc.initialize(params, p)
    
    # Perform QFT
    qc.compose(qft(n), inplace=True)
    
    return transpile(qc, basis_gates=["cx", "u"], optimization_level=3)

In [None]:
params  = np.random.random(2 ** 2) + np.random.random(2 ** 2) * 1j
params  = params / np.linalg.norm(params)
sampler = one_sampler(5, 2, params)
sampler.draw()

**Probability of random variable from circuit**

In [None]:
def prob_from_circ(qc, r, backend=Aer.get_backend("statevector_simulator"), shots=10000, delta=0.01, maxiter=3):
    """
    Calculate |<r|psi>|**2 where r is the sample and psi the final state of the circuit
    
    Parameters:
        qc: QuantumCircuit
            Circuit that prepares final state
        r: int
            Random variable to sample
        backend: AerBackend
            Backend to run circuit on
        shots: int
            Number of shots per circuit
        delta: float
            Accuracy for FAE algorithm
        maxiter: int
            Iterations for FAE algorithm
    """
    b    = format(r, 'b').zfill(qc.num_qubits)[::-1]
    qc_i = qc.copy()
    for i in range(qc.num_qubits):
        if b[i] == "0":
            qc_i.x(i)
            
    problem = EstimationProblem(
        state_preparation = qc_i,
        objective_qubits  = [i for i in range(qc.num_qubits)],
    )

    fae = FasterAmplitudeEstimation(
        delta            = delta,
        maxiter          = maxiter,
        quantum_instance = backend,
    )
    result = fae.estimate(problem)
    
    return result.estimation

In [None]:
params  = np.random.random(2 ** 2) + np.random.random(2 ** 2) * 1j
params  = params / np.linalg.norm(params)
sampler = one_sampler(4, 2, params)

vec = Statevector(sampler).data
for i in range(len(vec)):
    vec[i] = np.real(vec[i] * np.conj(vec[i]))

total = 0
for var in range(2**4):
    prob   = prob_from_circ(sampler, var)
    total += prob
    print(f"Experimental p({var}) = {prob}. Theoretical: {np.round(vec[var], 4)}.")
    
print(f"Total probability: {total}")

**Multistage QFT sampler**

In [None]:
def multi_sampler(D, n, m, params):
    """
    Parameters:
        D: int
            Dimension of independent target distribution
        n: int
            Number of qubits in each one dimensional sampler
        m: int
            Number of qubits in parametrized register
        params: list[list[complex]]
            List of size D of lists of complex parameters of size 2 ** m
            This need to be already processed with function
    """
    circs = []
    for k in range(D):
        circs.append(one_sampler(n, m, params[k]))
    return circs

In [None]:
D = 2
n = 4
m = 2

params = []
for _ in range(D):
    params.append(np.random.random(2 ** m) + np.random.random(2 ** m) * 1j)

funcs = [
    lambda var, theta: theta / (np.sqrt(np.dot(theta, theta.conj()))),
    lambda var, theta: theta / (np.sqrt(np.dot(theta, theta.conj())))
]

r          = np.random.randint(2**m, size=D) # we need to define the random variable to sample
new_params = []
for k in range(len(params)):
    new_params.append(funcs[k](r[0:k], params[k]))
circs = multi_sampler(D, n, m, new_params)

for i in range(D):
    print(f"Sampler {i}:")
    display(circs[i].draw())

For convenience, we can take the code that computes the processed parameters and turn it into a function that will come in handy later on.

In [None]:
def process_params(r, funcs, params):
    """
    Parameters:
        r: list[int]
            Multi-dimensional random variable
        funcs: list[lambda or func]
            List of processing functions
        params: list[list[complex]]
            List of lists of complex parameters
    """
    new_params = []
    for k in range(len(params)):
        new_params.append(funcs[k](r[0:k], params[k]))
    return new_params

**Probability from multistage sampler**

In [None]:
def prob_from_multi_circ(q, r, backend=Aer.get_backend("statevector_simulator"), shots=10000):
    """
    Parameters:
        q: list[QuantumCircuits]
            List of circuits that approximate distribution
        r: list[int]
            Multi-dimensional random variable
        backend: AerBackend
            Backend to run circuits on
        shots: int
            Number of shots per circuit
    """
    q_x = 1
    for i in range(len(r)):
        q_x *= prob_from_circ(q[i], r[i], backend=backend, shots=shots)
    return q_x

**Cross entropy**

Given a target probability distribution $p(x)$ which is hard to sample but easily computable for a given $x$, and a proposal probability $q(x)$ in the form of a one- or multi-stage QFT sampler, we will use the cross entropy as a measure to quantify the similarity of these two distributions. This is given by:

$$ H(p, q) = - \sum_x p(x) \log q(x; \theta)$$

This measure will also be used as our loss function $L(\theta)$. We are going to use the logarightm base 2 to work on units of bits, which is more intuitive. 

In [None]:
def cross_entropy(p, q, r, mode, backend=Aer.get_backend("statevector_simulator"), shots=10000):
    """
    Parameters:
        p: lambda or func
            Function to compute p(x)
        q: QuantumCircuit or list[int, int, int, list[list[complex]], list[func]]
            If one-dimensional, it should be a single circuit
            If multisampler, it should be a list of arguments with
            D, n, m, params and funcs
        r: list[int] or list[list[int]]
            If one-dimensional, it should be a list of integers
            If multisampler, if should be a list of lists of integers,
            each with same length as number of samplers
        mode: str
            "one" for one qubit sampler and "multi" for multistage
        backend: AerBackend
            Backend to run circuits on
        shots: int
            Number of shots for each sampler
    """
    entropy = 0
    
    all_q_x = []
    for var in r:
        if mode == "one":
            all_q_x.append(prob_from_circ(q, var, backend=backend, shots=shots))
        else:
            all_q_x.append(prob_from_multi_circ(multi_sampler(q[0], q[1], q[2], process_params(var, q[4], q[3])), 
                                                var, backend=backend, shots=shots))
            
    for i in range(len(r)):
        if np.isclose(all_q_x[i], 0):
            continue
        entropy += p(r[i]) * np.log2(all_q_x[i])
        
    return -entropy

In [None]:
D = 3
n = 4
m = 2

# For this example we experiment with a Linear Basis Regression Model
# in the second and third samplers, the first one remains the identity

params = []

# Params for first sampler
params.append(np.random.random(2 ** m) + np.random.random(2 ** m) * 1j)

# Params for second and third sampler
# Needs two (three) 2**m dimensional vectors
params.append([np.random.random(2 ** m) + np.random.random(2 ** m) * 1j for _ in range(2)])
params.append([np.random.random(2 ** m) + np.random.random(2 ** m) * 1j for _ in range(3)])

# Define LBRM function for second and third samplers
def lbrm(var, theta):
    vec = np.array(theta[-1])
    for i in range(len(var)):
        vec += var[i] * theta[i]
    return vec / np.sqrt(np.dot(vec, vec.conj()))

funcs = [
    lambda var, theta: theta / (np.sqrt(np.dot(theta, theta.conj()))),
    lbrm,
    lbrm,
]

r     = np.random.randint(2**m, size=D)
circs = multi_sampler(D, n, m, process_params(r, funcs, params))

for i in range(D):
    print(f"Sampler {i}:")
    display(circs[i].draw())

In [None]:
# We will make the target distribution be an equal distribution
# There are 3 random variables, each with 2**4 possible values,
# therefore there are 4096 total random variables
def target(x):
    return 1 / 4096

# We will only sample 64 random variables for efficiency
r_var = []
for i in range(4):
    for j in range(4):
        for k in range(4):
            r_var.append([i, j, k])

# Note that the result is not favorable since we don't optimize anything yet
entropy = cross_entropy(target, [D, n, m, params, funcs], r_var, mode="multi")
print(f"Calculated cross entropy: {entropy}")

**Gradient of $L(\theta)$**

Before calculating the gradient of the loss function, we need to be able to calculate the partial derivative of a one-qubit sampler with respect to $\theta$.

In [None]:
def u_qft_x_j(x, j, N):
    """
    Return the j element of the x row of the unitary of QFT of size N
    """
    return (1 / np.sqrt(2**N)) * np.exp(1j * 2 * np.pi * x * j / (2 ** N))

def qft_derivative(x, theta, N):
    """
    Parameters:
        x: int
            Random variable
        theta: list[complex]
            List of complex parameters
        N: int
            Number of qubits in sampler
    """
    m   = len(theta)
    u_x = np.array([u_qft_x_j(x, j, N) for j in range(m)])
    dot = np.dot(u_x.conj(), theta.conj())
    return dot * u_x

In [None]:
params  = np.random.random(2 ** 2) + np.random.random(2 ** 2) * 1j
params  = params / np.linalg.norm(params)
qft_derivative(3, params, 4)

Now we can compute the total gradient. We will do this for the case of a one-qubit sampler, and then use that function to compute the case of a multistage sampler.

In [None]:
def one_loss_gradient(p, q, r, params, backend=Aer.get_backend("statevector_simulator"), shots=10000):
    """
    Parameters:
        p: lambda or func
            Function to compute p(x)
        q: QuantumCircuit
            Circuit approximating target distribution
        r: list[int]
            List of random variables to calculate gradient with
        params: list[complex]
            List of complex parameters
        backend: AerBackend
            Backend to run circuits on
        shots: int
            Number of shots for each sampler
    """
    B    = len(r)
    N    = q.num_qubits
    grad = np.zeros(len(params), dtype="complex")
    
    for i in range(B):
        num   = p(r[i])
        den   = prob_from_circ(q, r[i], backend=backend, shots=shots)**2
        if den == 0:
            frac = num / 10e-5
        else:
            frac = num / den
        grad += frac * qft_derivative(r[i], params, N)
        
    return - (1 / B) * grad 

In [None]:
params  = np.random.random(2 ** 2) + np.random.random(2 ** 2) * 1j
params  = params / np.linalg.norm(params)
sampler = one_sampler(5, 2, params)

def target(x):
    return 1 / (2 ** 5)

one_loss_gradient(target, sampler, [i for i in range(2**5)], params)

In [None]:
def one_gradient_change(mu, grad, prev_grad):
    m = (mu * prev_grad) + ((1 - mu) * grad.conj())
    return m

Finally, we can expand to the multisampler case. 

In [None]:
def multi_loss_gradient(p, q, r, params, funcs, backend=Aer.get_backend("statevector_simulator"), shots=10000):
    """
    Parameters:
        p: lambda or func
            Function to compute p(x)
        q: list[int, int, int]
            Arguments to create multisampler, it should
            be a list of arguments with D, n, and m
        r: list[list[int]]
            List of random variables to calculate gradient with
        params: list[list[complex]]
            List of complex parameters
        funcs: list[lambda or func]
            List of functions
        backend: AerBackend
            Backend to run circuits on
        shots: int
            Number of shots for each sampler
    """
    D    = q[0]
    n    = q[1]
    m    = q[2]
    B    = len(r)
    grad = []
    
    # TODO: derivative of function
    for k in range(len(params)):
        grad_k = np.zeros(2**m, dtype="complex")
        for i in range(B):
            new_params = np.array(process_params(r[i], funcs, params))
            circs      = multi_sampler(D, n, m, new_params)
            frac_1     = qft_derivative(r[i][k], new_params[k], n) / prob_from_circ(circs[k], r[i][k], backend=backend, shots=shots)
            frac_2     = p(r[i]) / prob_from_multi_circ(circs, r[i], backend=backend, shots=shots)
            grad_k    += frac_1 * frac_2
        grad.append(- (1 / B) * grad_k)
        
    return grad

In [None]:
D = 3
n = 4
m = 2

params = []
params.append(np.random.random(2 ** m) + np.random.random(2 ** m) * 1j)
params.append([np.random.random(2 ** m) + np.random.random(2 ** m) * 1j for _ in range(2)])
params.append([np.random.random(2 ** m) + np.random.random(2 ** m) * 1j for _ in range(3)])

funcs = [
    lambda var, theta: theta / (np.sqrt(np.dot(theta, theta.conj()))),
    lbrm,
    lbrm
]

def target(x):
    return 1 / (4096)

var = []
for i in range(2):
    for j in range(2):
        for k in range(2):
            var.append([i, j, k])

gradient = multi_loss_gradient(target, [D, n, m], var, params, funcs)
for i, grad in enumerate(gradient):
    print(f"Gradient {i}: {grad}")

**Accepting samples**

In [None]:
def accept(p, q, r, r_hat, backend=Aer.get_backend("statevector_simulator"), shots=10000):
    """
    Parameters:
        p: lambda or func
            Function to compute p(x)
        q: QuantumCircuit or list[int, int, int, list[], list[]]
            Circuit or arguments to create circuit that
            approximate distribution. List should have
            D, n, m, funcs, and params. 
        r: int or list[int]
            Last accepted sample
        r_hat: int or list[int]
            Sample to accept or reject
        backend: AerBackend
            Backend to run circuits on
        shots: int
            Number of shots for each sampler
    """
    if isinstance(r, int):
        num = p(r_hat) * prob_from_circ(q, r, backend=backend, shots=shots)
        den = p(r) * prob_from_circ(q, r_hat, backend=backend, shots=shots)
    else:
        D      = q[0]
        n      = q[1]
        m      = q[2]
        funcs  = q[3]
        params = q[4]
        
        q_r  = multi_sampler(D, n, m, process_params(r, funcs, params))
        q_rh = multi_sampler(D, n, m, process_params(r_hat, funcs, params))
        
        num = p(r_hat) * prob_from_multi_circ(q_r, r, backend=backend, shots=shots)
        den = p(r) * prob_from_multi_circ(q_rh, r_hat, backend=backend, shots=shots)
    if den != 0:
        prob = num / den
    else:
        prob = float("inf")
    return np.random.rand() < min([1, prob])

In [None]:
D = 2
n = 4
m = 2

params = []
for _ in range(D):
    params.append(np.random.random(2 ** m) + np.random.random(2 ** m) * 1j)

funcs = [
    lambda var, theta: theta / (np.sqrt(np.dot(theta, theta.conj()))),
    lambda var, theta: theta / (np.sqrt(np.dot(theta, theta.conj())))
]

def target(x):
    return 1 / 256

r        = np.random.randint(2**n, size=D) # set some starting variable
accepted = [r]
counter  = 0
for i in range(10): # test 10 random variables
    while True:
        r_hat = np.random.randint(2**n, size=D)
        counter += 1
        if accept(target, [D, n, m, funcs, params], r, r_hat):
            accepted.append(r_hat)
            r = r_hat
            break
print(f"Total tested: {counter}. Accepted: {len(accepted)-1}.")
print(accepted)

**Approximating one-dimensional distributions**

Now that we have everything we need in place, we can start testing out our algorithm. First, we will work with the one-dimensional sampler to approximate probability distributions that depend on one-dimensional random variables.

In [None]:
import scipy.stats as stats
import matplotlib.pyplot as plt
from IPython.display import clear_output

In [None]:
plt.rcParams["figure.figsize"] = (9,6)

We are going to start trying to approximate a simple normal distribution. We are going to use 5 qubits, so the random variable will be in the range $[0, 32)$. 

In [None]:
std  = (2**5-1) / 6
mean = (2**5-1) / 2
norm = stats.norm(loc=mean, scale=std)
x    = np.linspace(norm.ppf(0.01), norm.ppf(0.99), 100)
plt.plot(x, norm.pdf(x), 'b-', lw=5, alpha=0.6, label='normal pdf')

We are going to use $2$ qubits to encode the complex parameters. This time, we will be smarter when choosing our starting parameters. We will choose the parameters such that they give an equal superposition of all states in the final state. As you can see in the cell below, this is done by setting the initial parameters to $\left[\sqrt{\frac{1}{2^{m-1}}} + \sqrt{\frac{1}{2^{m-1}}}i, 0, \cdots, 0\right]$. Then, we will optimize these parameters to accurately approximate the normal distribution. So, we initialize the sampler will look as follows.

In [None]:
n = 5
m = 2

params    = np.zeros(2**m, dtype="complex")
params[0] = np.sqrt(1 / (2 ** (m - 1))) + np.sqrt(1 / (2 ** (m - 1))) * 1j
sampler   = one_sampler(n, m, params)

Statevector(sampler)

Here we have some parameters that give an equal superposition after going through the QFT. In other words, we have a sampler that represents a uniform distribution in the range $[0, 32)$. Therefore, it is worth exploring the cross entropy, acceptance ratio and change in the parameters after one learning step of this sampler to make sure all the functions we have defined up to this point are working correctly.

In [None]:
fig, ax = plt.subplots(1, 1)

def uniform(x):
    return 1 / 32

s  = Statevector(sampler).data
x  = np.array([i for i in range(2**n)])
y1 = [np.real(np.dot(s[i], s[i].conj())) for i in x]
y2 = [uniform(x) for i in x]

ax.set_ylim([0, 0.06])
ax.plot(x, y1, 'r-', lw=5, alpha=0.6, label='sampler pdf')
ax.plot(x, y2, 'b-', lw=5, alpha=0.6, label='uniform pdf')
ax.legend()

r       = 0
samples = [r]
for r_hat in range(1, 32):
    if accept(uniform, sampler, r, r_hat):
        samples.append(r_hat)
        r = r_hat

entropy = cross_entropy(uniform, sampler, [i for i in range(2**n)], mode="one")

grad       = one_loss_gradient(uniform, sampler, [i for i in range(2**n)], params)
alpha      = 0.01
mu         = 0.9
new_params = params - alpha * one_gradient_change(mu, grad, 0)
new_params = new_params / np.linalg.norm(new_params)

print(f"Cross entropy of sampler against target: {entropy}.")
print(f"Acceptance ratio of {len(samples) / 32}.")
print(f"Euclidean norm between initial and udpated params: {np.linalg.norm(params - new_params)}.")

As you can see, we have that the cross entropy is equal to $5$ bits, which is exactly what we should expect with an uniform distribution over $2^{5}$ random variables. (Remember that the cross entropy of two distributions when they are equivalent is equal to that equivalent distribution). Then, all of the possible 32 random variables are accepted, and the difference between the initial parameters and the updated parameters after one learning step is negligible. If that is not enough, the graph also shows us that the target distribution and the distribution given by our QFT sampler are the same. Therefore, we can assure our functions are working correctly!

Getting back on track with the normal distribution we will try to optimize for, to calculate the gradient of the loss function, we are going to be using $32$ samples, i.e., the whole domain in the function. We do this in this case since we have a relatively small domain, so we can take advantage of this and be more precise on our calculations.

In [None]:
samples = [i for i in range(2**n)]

Then we use the samples to calculate the gradient vector of the loss function.

In [None]:
grad = one_loss_gradient(norm.pdf, sampler, samples, params)
grad

And finally we update our complex parameter vector as follows.

In [None]:
alpha      = 0.01
mu         = 0.9
new_params = params - alpha * one_gradient_change(mu, grad, 0) # prev_grad is 0 here since its the first change we do
new_params = new_params / np.linalg.norm(new_params)
print(f"New params: {new_params}")
print(f"Difference from old params: {np.linalg.norm(new_params - params)}")

This is only one step of the optimization procedure. We need to put everything we did into a loop and repeat it a large amount of times until convergence. Every 100 steps we are going to save the parameters into a CSV file so we can analyze the learning procedure of our algorithm once we reach convergence.

In [None]:
import csv

In [None]:
n = 5
m = 3

alpha  = 0.01
mu     = 0.9
change = 0

std  = (2**n-1) / 6
mean = (2**n-1) / 2
norm = stats.norm(loc=mean, scale=std)

params    = np.zeros(2**m, dtype="complex")
params[0] = 1 + 1 * 1j
params    = params / np.linalg.norm(params)

samples = [i for i in range(2**n)]

d_steps  = []
d_params = []

for step in range(0, 10000):
    # Create circuit
    sampler = one_sampler(n, m, params)
    
    # Record progress every one hundred learning steps
    if step % 100 == 0:
        d_steps.append(step+1)
        d_params.append(params)
        
        with open('data/ten_qubit_normal.csv', 'w', newline='') as f:
            writer = csv.writer(f)
            writer.writerow(["step", "params"])
            for i in range(len(d_steps)):
                writer.writerow([d_steps[i], d_params[i]])
    
    # Compute gradient
    grad = one_loss_gradient(norm.pdf, sampler, samples, params)
    
    # Update parameters
    change     = one_gradient_change(mu, grad, change)
    new_params = params - alpha * change
    new_params = new_params / np.linalg.norm(new_params)
    
    # Check for convergence
    diff = np.linalg.norm(new_params - params)
    if diff < 1e-18:
        break

    # If didn't converge, update parameters and continue loop
    params = new_params
    
    # Print output
    clear_output(wait=True)
    print(f"Learning step: {step+1}.")
    print(f"Norm change between old and updated parameters: {diff}.")
        
        
print(f"Converged in step {step+1}.")

# Record last step and write final file
d_steps.append(step+1)
d_params.append(params)

with open('data/ten_qubit_normal.csv', 'w', newline='') as f:
    writer = csv.writer(f)
    writer.writerow(["step", "params"])
    for i in range(len(d_steps)):
        writer.writerow([d_steps[i], d_params[i]])

In [None]:
sampler = one_sampler(n, m, params)
entropy = cross_entropy(norm.pdf, sampler, [i for i in range(2**n)], mode="one")
print(f"Cross entropy of final approximation: {entropy}")

n_entropy = (np.log2(2 * np.pi * (std**2)))/2 + 0.5
print(f"Entropy of normal distribution: {n_entropy}")

In [None]:
fig, ax = plt.subplots(1, 1)

s = Statevector(sampler).data
x = np.array([i for i in range(2**n)])
y = [np.real(np.dot(s[i], s[i].conj())) for i in x]

ax.plot(x, y, 'r-', lw=5, alpha=0.6, label='sampler pdf')
ax.plot(x, norm.pdf(x), 'b-', lw=5, alpha=0.6, label='norm pdf')