In [None]:
import matplotlib.pyplot as plt
import numpy as np

import cmath

np.random.seed(42)

# Lost function
def square_loss(targets, predictions):
    loss = 0
    for t, p in zip(targets, predictions):
        loss = loss + (t - p) ** 2
    loss = loss / len(targets)
    return 0.5*loss

In [None]:
from pathlib import Path

In [None]:
#Create file if not exist, else give information
def create_File(directory_path):
    # Check if the directory does not exist before creating it
    path = Path(directory_path)
    if not path.exists():
        path.mkdir(parents=True)
        print(f"Directory {directory_path} created successfully.")
    else:
        print(f"Directory {directory_path} already exists.")

In [None]:
import numpy as np
import matplotlib.pyplot as plt

from qiskit import QuantumCircuit
from qiskit.visualization import circuit_drawer
from qiskit.primitives import Estimator
from qiskit.quantum_info import SparsePauliOp

scaling = 1
r = 5
degree = 3 # degree of the target function


class parallel_Qt_Device:
    """
    A class representing the Unitary matrix U(x) = W^(L+1) ⊗ S(x) ⊗ ... W^2 ⊗ S(x) ⊗ W^1.
    This structure used to represent specific class of function as Fourier Series in parallel construction.

    Attributes:
        n_layers(integer) : represent the number of qubits.
        qc (qiskit.circuit.QuantumCircuit): Circuit to run
    """
    def __init__(self, n_layers = 1):
        """
        Initializes an QDevice object.

        Parameters:
            n_layers(integer) : represent the number of qubits.
            qc (qiskit.circuit.QuantumCircuit): Circuit to run
        """
        self.wires = n_layers
        self.circuit_length = range(n_layers)
        self.qc = QuantumCircuit(n_layers)
    
    def run(self, weights, x=None):
        """
        Run the circuit to have expectation values.
        
        Parameters:
            weights (np.ndarray([np.float64])) : weights to optimize. They should have specific shape (2*r, 3) where r represent number of repetition in the circuit and 3 is used specifically for the angles of the qiskit.circuit.library.UGate.

            x (np.float64) : specific data point

        Returns:
            np.float64 : observed value from the circuit using pauli-Z Gate
        """
        self.qc = QuantumCircuit(self.wires, self.wires)
        self.W(weights[0])
        self.S(x)  
        self.W(weights[1])
    
        return self.observe_wire(wires=0)
    
    
    #Used to observe the wires equivalent of qml.expval(qml.PauliZ)
    #Should change to more general version of Pauli matrix as observable
    #using this link https://qiskit.org/ecosystem/ibm-runtime/tutorials/how-to-getting-started-with-estimator.html
    #Function differs from the model why?shifted to the top
    def observe_wire(self, wires=0, observable=(QuantumCircuit.z, 0)):
        """
        Calculates the expectation value of the observable

        Parameters:
            Parameters are used for future implementation to make it more generlized and flexiable

        Returns:
            Returning the resulting values of the observable
        """
        op = (self.wires-1)*'I' + 'Z'
        observable = SparsePauliOp(op)
        estimator = Estimator()
        
        job = estimator.run(self.qc, observable)
        result = job.result()
        
        return result.values[0]
    
    def StronglyEntanglingLayers(self, weights, wires = None):
        if wires is None:
            wires = self.circuit_length
        elif not isinstance(wires, list):
            print("Wires Require One dimensional List!")
            return
        else:
            #check the shape of the list
            pass
        
        if self.wires <= 1:
            print("Cannot create such a circuit because you need to have more than 1 qubit!")

        #This part does not cover ranges of the qml.StronglyEntanglingLayers in Pennylane
        for layer in range(weights.shape[0]):
            for i in wires:
            
                thetas = weights[layer][i]
                
                self.qc.u(thetas[0],thetas[1], thetas[2], i)
            for j in wires[:-1]:
                self.qc.cx(j, j+1)
            self.qc.cx(i, 0)
            
    def S(self, x):
        """Data encoding circuit block."""
        for w in self.circuit_length:
            self.qc.rx(scaling*x, w)
    
    def W(self, theta):
        """Trainable circuit block."""
        self.StronglyEntanglingLayers(theta)
        
    def circuit_info(self):
        print("Length of the circuit: {}".format(self.circuit_length))
        
    
    def draw(self):
        # Draw the circuit
        return self.qc.draw('mpl')
        diagram = circuit_drawer(self.qc, output='text')
        print(diagram)
        #self.qc.draw()

    def save_circuit_image(self, filename:str):
        self.qc.draw('mpl', filename=filename)


In [None]:
scaling = 1 # scaling of the data
coeffs = [0.15 + 0.15j]*degree # coefficients of non-zero frequencies
coeff0 = 0.1 # coefficient of zero frequency

def target_function(x):
    """Generate a truncated Fourier series of degree, where the data gets re-scaled."""
    res = coeff0
    for idx, coeff in enumerate(coeffs):
        exponent = complex(0, scaling*(idx+1)*x)
        conj_coeff = np.conjugate(coeff)
        res += coeff * np.exp(exponent) + conj_coeff * np.exp(-exponent)
    return np.real(res)

In [None]:
from sklearn.preprocessing import MinMaxScaler

x = np.linspace(-6, 6, 70)
data = np.array([target_function(x_) for x_ in x])

# Reshape the data to make it 2D
data = data.reshape(-1, 1)

# Create a MinMaxScaler
scaler = MinMaxScaler(feature_range=(-1, 1))

# Fit the scaler on your data and transform it
target_y = scaler.fit_transform(data).flatten()

plt.plot(x, target_y, c='black')
plt.scatter(x, target_y, facecolor='white', edgecolor='black')
plt.ylim(-(degree/2), (degree/2))
plt.show()

In [None]:
p_qt = parallel_Qt_Device(r)
p_qt.circuit_info()

In [None]:
trainable_block_layers = 3
weights = 2*np.pi*np.random.random(size=(2, trainable_block_layers, r, 3))

x = np.linspace(-6, 6, 70)
random_quantum_model_y = [p_qt.run(weights, x=x_) for x_ in x]

directory_path = "./part2/Circuits/"

create_File(directory_path)
p_qt.save_circuit_image('./part2/Circuits/r={}&degree={}_qc.png'.format(r, degree))

init_weights = random_quantum_model_y

plt.plot(x, random_quantum_model_y, c='blue')
plt.ylim(-1,1)
plt.show()

p_qt.draw()

In [None]:
from qiskit.algorithms.optimizers import ADAM as AdamOptimizer
from scipy.optimize import minimize

def cost(weights, x, y):
    weights=np.reshape(weights, w_shape)

    predictions = [p_qt.run(weights, x=x_) for x_ in x]
    return square_loss(y, predictions)

w_shape = weights.shape

max_steps = 1
maxiter = 5
lr=0.3
opt = AdamOptimizer(maxiter=maxiter, lr=lr)
batch_size = 25
cst = []  # initial cost

for step in range(max_steps):

    # Select batch of data
    batch_index = np.random.randint(0, len(x), (batch_size,))
    x_batch = x[batch_index]
    y_batch = target_y[batch_index]

    # Update the weights by one optimizer step
    res = opt.minimize(lambda w: cost(w, x, target_y), weights.reshape(-1), cst=cst)

    weights = res.x

In [None]:
weights = np.reshape(weights, w_shape)
predictions = [p_qt.run(weights, x=x_) for x_ in x]


# Annotate with extra information
extra_info = f'r: {r}\ndegree: {degree}'
plt.annotate(extra_info, xy=(0, 1), xycoords='axes fraction', fontsize=10, ha='center', va='bottom')
extra_info = 'parallel'
plt.annotate(extra_info, xy=(0.5, 1.1), xycoords='axes fraction', fontsize=10, ha='center', va='center')

extra_info = f'max steps: {max_steps}\nmaxiter: {maxiter}\nlearning rate:{0.3}'
plt.annotate(extra_info, xy=(1, 1), xycoords='axes fraction', fontsize=10, ha='center', va='bottom')


plt.plot(x, target_y, c='black', label = 'reference data')
plt.scatter(x, target_y, facecolor='white', edgecolor='black')
plt.plot(x, predictions, c='blue', label = 'model results')
plt.plot(x, init_weights, c='r', label = 'initial weights')
plt.ylim(-(degree/2), (degree/2))
# Add labels and a legend
plt.xlabel('x')
plt.ylabel('weights')
plt.legend()

directory_path = "./part2/Freq_Graphs/"

create_File(directory_path)
plt.savefig("./part2/Freq_Graphs/parallel_attributes_r={}_degree={}_step={}_maxiter={}.png".format(r, degree,max_steps, maxiter))

plt.show()

In [None]:
# Annotate with extra information
extra_info = f'r: {r}\ndegree: {degree}'
plt.annotate(extra_info, xy=(0, 1), xycoords='axes fraction', fontsize=10, ha='center', va='bottom')
extra_info = 'parallel loss'
plt.annotate(extra_info, xy=(0.5, 1.1), xycoords='axes fraction', fontsize=10, ha='center', va='center')

cst = cst[1:]
#plt.yscale('log')
plt.plot(range(len(cst)), cst)
plt.ylabel("Cost")
plt.xlabel("Step")
plt.ylim(0, 1)

directory_path = "./part2/Loss_Func_Graphs/"

create_File(directory_path)
plt.savefig("./part2/Loss_Func_Graphs/parallel_loss_attributes_r={}_degree={}_step={}_maxiter={}.png".format(r, degree,max_steps, maxiter))
plt.show()

In [None]:
print(len(cst))

In [None]:
def write_attributes_to_file(attributes, filename, file_path="./part2/"):
    open_file = file_path + filename
    create_File(file_path)

    # Write each attribute on a new line
    with open(open_file, 'w') as file:
        for key, value in attributes.items():
            file.write(f"{key}: {value}\n")

file_name = f'parallel_attributes_r={r}_degree={degree}.txt'

attributes_to_store = {
    'x' : x.tolist(),
    'init_weights': init_weights,
    'predictions': predictions,
    'target_y': target_y.tolist(),
    'degree' : degree,
    'r' : r,
    'cost' : cst
}


write_attributes_to_file(attributes_to_store, file_name, "./part2/text_files/")