In [None]:
%%capture

!pip install pennylane

# Download of the required DataFactory and Logger classes from GitHub.
!rm Logger.py
!wget -O Logger.py https://raw.githubusercontent.com/LorenzoFioroni/bachelor-thesis/main/Logger.py
!rm DataFactory.py
!wget -O DataFactory.py https://raw.githubusercontent.com/LorenzoFioroni/bachelor-thesis/main/DataFactory.py

In [None]:
# Baseline
import pennylane as qml
from pennylane import numpy as np
import matplotlib.pyplot as plt
import torch

# Metrics
from sklearn.metrics import accuracy_score

# Utility
import DataFactory as factory   # Custom module for data generation
from Logger import Logger       # Custom module for data logging

In [None]:
# Plot of the available 2D dataSets
factory.plot2D()

# Logic

In [None]:
# This cell contains the logic for the qubit simulation. The "circuit" function
# takes as input the coordinates x of the point to be classified, the matrix
# y relative to the labelState for the measurement and the parameters needed during
# the process. Then for each layer, the "processingLayer" function is called which 
# splits the data in chunks with size 3 and applies the rotations to the qubit.

device = qml.device("default.qubit", wires=1)

# Performs the weighted data re-uploading process
def processingLayer(x, thetaRow, alphaRow, nChunks):
  for chunk, alphaChunk, thetaChunk in zip( np.array_split(x, nChunks), np.array_split(alphaRow, nChunks), np.array_split(thetaRow, nChunks) ):
    padding = [0]*(3-len(chunk))
    qml.Rot(*(chunk*alphaChunk), *padding, wires=0)
    qml.Rot(*thetaChunk, wires=0)


# Simulates the classification process for the given point and with the given
# parameters. Returns |<Ψ_class|Ψ_computed>|^2 
@qml.qnode(device, interface="torch")
def circuit(x, y, theta, alpha):
  nChunks = 1 + (len(x)-1) // 3
  for thetaRow, alphaRow in zip(theta, alpha):
    processingLayer(x, thetaRow, alphaRow, nChunks)
  return qml.expval(qml.Hermitian(y, wires=0))

In [None]:
# Function to compute the cassification cost for the given tensor of points
def cost(params, labelStates, expectedFidelity, x_batch, y_batch):
  loss = 0
  for x, y in zip(x_batch, y_batch):
    loss += (1 - circuit(x, labelStates[y], params[0], params[1]))
  return loss/len(x_batch)

In [None]:
# Function to compute the weighted cassification cost for the given tensor of points
def weightedCost(params, labelStates, expectedFidelity, x_batch, y_batch):
  loss = 0
  for x, y in zip(x_batch, y_batch):
    for c in range(len(labelStates)):
      loss += (params[2][c] * circuit(x, labelStates[c], params[0], params[1]) - expectedFidelity[y, c])**2
        
  return 0.5*loss/len(x_batch)

In [None]:
# Function to perform the parameters update
def closure(x_batch, y_batch, params, labelStates, expectedFidelity, opt, costFunction):
  opt.zero_grad()
  loss = costFunction(params, labelStates, expectedFidelity, x_batch, y_batch)
  loss.backward()
  return loss

In [None]:
# Function to predict the class for the given thensor of points. For each class the 
# classification process is performed. The assigned class is the one whose labelState 
# has the maximum overlap with the state |Ψ> after the classification. If classWeights 
# are provided, each overlap is weighted according to them. 
def predict(x, params, labelStates):
  predicted = torch.empty(len(x))
  predictions = torch.empty(len(labelStates))
  
  for i in range(len(x)):
    for j in range(len(labelStates)):
      predictions[j] = circuit(x[i], labelStates[j], params[0], params[1])

    if len(params) == 3: # classWeights
      predicted[i] = torch.argmax(params[2]*predictions)
    else: # No classWeights
      predicted[i] = torch.argmax(predictions)
      
  return predicted

In [None]:
# Function to plot the given tensor of points for a 2D problem with the colors
# specified in the tensor "y". The boundaries between two classes in the domain 
# are plotted dashed below the points. The default option is to show the plot and
# concurrently to save it with the provided name.
def plotDataSet(name, title, x, y, dataSet, logger, toTerminal = True, toFile = True):

  if dataSet.dim != 2: return
  
  x_shape, y_shape = dataSet.getShape()
  
  f = plt.figure(figsize=(3.5, 3.5))
  ax = f.add_subplot()
  ax.set_aspect('equal', adjustable='box')

  for i in range(len(x_shape)):
    plt.plot(x_shape[i], y_shape[i], lw=3, color="r", ls="dashed")
  plt.scatter(x[:,0], x[:,1], c=y, s = 6, cmap="Accent") 

  plt.xlim(-1,1)
  plt.ylim(-1,1)
  plt.title("{}\n{}".format(dataSet.name,title))

  plt.tight_layout()
  if toFile: logger.savePlot(plt, name)
  if toTerminal: plt.show()
  
  plt.close()

In [None]:
# Function to generate a simple plot showing the loss and accuracy evolutions 
# vs. the epoch of training.
def plotMetrics(metrics, logger):

  valid_present = metrics.shape[1] == 5

  print("")

  f = plt.figure(figsize=(12, 5))
  gs = f.add_gridspec(1, 2)
  
  ax = f.add_subplot(gs[0,0])
  plt.plot(metrics[:, 0], metrics[:, 1], label="training set")
  if valid_present: plt.plot(metrics[:, 0], metrics[:, 3], label="validation set")
  plt.ylabel("loss")
  plt.xlabel("# of epochs")
  plt.legend()
  plt.title("Loss")

  ax = f.add_subplot(gs[0,1])
  plt.plot(metrics[:, 0], metrics[:, 2], label="training set")
  if valid_present: plt.plot(metrics[:, 0], metrics[:,4], label="validation set")
  plt.ylabel("accuracy")
  plt.ylim(0, 1)
  plt.xlabel("# of epochs")
  plt.legend()
  plt.title("Accuracy")

  plt.tight_layout()
  logger.savePlot(plt, "metrics.jpg")
  plt.show()
  plt.close()

In [None]:
# Function to perform nEpochs epochs of training. Periodically plots the training set with 
# the predicted labels and saves it to file. If a validation set is provided, the parameters 
# which have performed the best on it are restored at the end of the training process
# Loss and accuracy are saved each epoch in order to plot them at the end of the simulation. 
def train(dataSet, opt, batchSize, lrDecay, nEpochs, params, costFunction, logger):
    
  labelStates, expectedFidelity = dataSet.getStates()
  x_train, y_train = dataSet.getTrain()
  x_valid, y_valid = dataSet.getValid()
  valid_present = len(x_valid)!=0 # Whether or not a validation set is present

  lr_scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer=opt, gamma=0.96)

  totEpochs = 0 # Epoch index
  
  nBatches =  len(x_train) // batchSize + int(len(x_train)%batchSize != 0)

  metrics = torch.empty(1,3 + 2*valid_present)

  header = ["loss", "accuracy", "valid_loss", "valid_accuracy"]
  logger.log(("epoch"+"\t{}"*(2+2*valid_present)).format(*header[:2*(1+valid_present)]))

  logFormat = "{: >5d}\t{: >4.3f}\t{: >8.3f}"
  if valid_present: logFormat += "\t{: >10.3f}\t{: >14.3f}"

  # Metrics at epoch 0
  loss = costFunction(params, labelStates, expectedFidelity, x_train,y_train)
  accuracy = accuracy_score(y_train, predict(x_train, params, labelStates))
  metrics_row = [0, loss, accuracy]

  if valid_present:
    val_loss = costFunction(params, labelStates, expectedFidelity, x_valid,y_valid)
    val_accuracy = accuracy_score(y_valid, predict(x_valid, params, labelStates))
    metrics_row. extend([val_loss, val_accuracy])
    
    bestParams = params
    bestLoss = val_loss
    bestEpoch = 0

  metrics[0, :] = torch.tensor(metrics_row)

  logger.log(logFormat.format(*metrics_row))

  for epoch in range(nEpochs):
    p = torch.randperm(len(x_train))
    x_train = x_train[p]
    y_train = y_train[p]

    # Training the circuit with batched data
    for x_batch, y_batch in zip( np.array_split(x_train, nBatches), np.array_split(y_train, nBatches) ):
      opt.step(lambda: closure(x_batch, y_batch, params, labelStates, expectedFidelity, opt, costFunction))

    totEpochs += 1

    # Learning rate decay
    if totEpochs > 5: 
      lr_scheduler.step()

    # Metrics at epoch totEpochs
    loss = costFunction(params, labelStates, expectedFidelity, x_train,y_train)
    accuracy = accuracy_score(y_train, predict(x_train, params, labelStates))
    metrics_row = [totEpochs, loss, accuracy]

    if valid_present:
      val_loss = costFunction(params, labelStates, expectedFidelity, x_valid,y_valid)
      val_accuracy = accuracy_score(y_valid, predict(x_valid, params, labelStates))
      metrics_row.extend([val_loss, val_accuracy])

      if val_loss <= bestLoss:
        bestLoss = val_loss
        bestParams = params
        bestEpoch = totEpochs

    metrics = torch.cat((metrics, torch.tensor([metrics_row])), 0)

    logger.log(logFormat.format(*metrics_row))

    # Plotting the training set every 5 epochs
    if totEpochs % 5 == 0:
      plotDataSet(
        "{}_epochs.jpg".format(totEpochs),
        "{} epochs".format(totEpochs),
        x_train,
        predict(x_train, params, labelStates),
        dataSet,
        logger,
        toTerminal=False
      )

  
  # Last parameters in case of no validation set, best parameters otherwise 
  return ( (bestParams, bestEpoch, metrics) if valid_present else (params, totEpochs, metrics) )

# Simulator

In [None]:
# Available dataSets. Specify which one to use in the cell below. The plots will be generated for 2D dataSets only
dataSetDict = {
  "circle": factory.Circle,                                    # 2D - 2 classes
  "binary annulus": factory.BinaryAnnulus,                     # 2D - 2 classes
  "non convex": factory.NonConvex,                             # 2D - 2 classes
  "annulus": factory.Annulus,                                  # 2D - 3 classes
  "three circles": factory.ThreeCircles,                       # 2D - 4 classes
  "squares": factory.Squares,                                  # 2D - 4 classes
  "wavy lines": factory.WavyLines,                             # 2D - 4 classes
  "sphere": factory.Sphere,                                    # 3D - 2 classes
  "four dimensional hypersphere": factory.FourDimHypersphere   # 4D - 2 classes

}

# Available optimizers. Specify which one to use in the cell below
optimizerDict = {
  "adam": torch.optim.Adam,
  "rmsprop": torch.optim.RMSprop,
  "sgd": torch.optim.SGD,
  "lbfgs": torch.optim.LBFGS
}

# Available cost functions. Specify which one to use in the cell below
costDict = {
  "fidelity": cost,
  "weighted fidelity": weightedCost
}

In [None]:
nTrain = 500                   # Number of elements in training set
nValid = 0                     # Number of elements in validation set
nTest = 3000                   # Number of elements in test set
nLayers = 4                    # Number of layers
nEpochs = 30                   # Number of training epochs
batchSize = 32                 # Batch size for training
dataSetKey = "circle"      # Dataset to use (from the list in the cell above)
optimizerKey = "adam"          # Optimizer to use (from the list in the cell above)
optParams = {                  # Parameters given to the optimizer
      "lr": 0.03,
}
lrDecay = 0.96                 # Learning rate decay factor
costKey = "weighted fidelity"  # Cost function to use (from the list in the cell above)
seed = None                    # Seed for reproducibility. To leave unset: None

# -----------------

logger = Logger(toTerminal = True, toFile = True)
logger.log("Optimizer: {}".format(optimizerKey), toTerminal = False)
logger.log("Optimizer params: {}".format(optParams), toTerminal = False)
logger.log("DataSet: {} - nTrain: {} - nValid: {} - nTest: {}".format(dataSetKey, nTrain, nValid, nTest), toTerminal = False)
logger.log("nLayers: {} - nEpochs: {} - batchSize: {}".format(nLayers, nEpochs, batchSize), toTerminal = False)
logger.log("Cost: {}".format(costKey), toTerminal = False)
logger.log("Seed: {}".format(seed), toTerminal = False)

# Applying the choices for the simulation
if seed: 
  torch.manual_seed(seed)
  np.random.seed(0)

dataSet = (dataSetDict[dataSetKey])(nTrain=nTrain, nValid=nValid, nTest=nTest, seed=seed)

x_train, y_train = dataSet.getTrain()
x_test, y_test = dataSet.getTest()

labelStates, expectedFidelity = dataSet.getStates()
costFunction = costDict[costKey]

# Extracting the parameters randomly from a normal distribution
params = [
  torch.randn(nLayers, (((dataSet.dim-1)//3)+1)*3).requires_grad_(True),  # theta
  torch.randn(nLayers, dataSet.dim).requires_grad_(True)   # alpha
]
if costKey == "weighted fidelity":
  params.append(torch.ones(dataSet.nClasses).requires_grad_(True))   # class weights

opt = (optimizerDict[optimizerKey])(params, **optParams)

# Plotting the trainin set with the correct classes and with those descending from the 
# classification with the random parameters just extracted
plotDataSet("true_classes.jpg", "True", x_train, y_train, dataSet, logger, toTerminal = False)
plotDataSet("0_epochs.jpg", "0 epochs", x_train, predict(x_train, params, labelStates), dataSet, logger)

logger.separator()

# Training
params, params_epoch, metrics = train(dataSet, opt, batchSize, lrDecay, nEpochs, params, costFunction, logger)

logger.separator()

# Printing the parameters after the classification 
logger.log("Parameters at epoch {}:\n".format(params_epoch))

logger.log("Theta:")
for row in params[0]:
  logger.log(("\t"+"{: >7.4f}  "*len(row)).format(*row))

logger.log("Alpha:")
for row in params[1]:
  logger.log(("\t"+"{: >7.4f}  "*len(row)).format(*row))

if costKey == "weighted fidelity":
  logger.log("Class weight:")
  logger.log(("\t"+"{: >7.4f}  "*len(params[2])).format(*params[2]))

logger.separator()

# Printing the accuracy on the test set
accuracy = accuracy_score(y_test, predict(x_test, params, labelStates))
logger.log("Accuracy on test set with the parameters above: {}\n".format(accuracy))

# Plotting the test set classified with the trained parameters and
# the evolutions of loss and accuracy
plotDataSet("test.jpg", "Test set", x_test, predict(x_test, params, labelStates), dataSet, logger)

plotMetrics(metrics, logger)

logger.close()