In [None]:
#Compatible versions
!pip install 'jax == 0.4.3' 
!pip install 'jaxlib == 0.4.3'
!pip install 'pennylane == 0.29.1' #latest stable version as of 7th March 2023

In [None]:
import matplotlib.pyplot as plt 
from tqdm.auto import tqdm 
from joblib import Parallel, delayed
import jax.numpy as jnp 
import pennylane as qml 
import numpy as np 
#import time
from datetime import datetime 
from io import StringIO 
import pandas as pd 
import os 
import jax 
#from jax import vmap, grad, jit 
#from jax import random 
from jax.config import config 
config.update("jax_enable_x64", True) 

In [None]:
from tensorflow.keras.layers import Input, Activation, Flatten, Dense, Conv2D, Dropout, AveragePooling2D 
from tensorflow.keras.callbacks import EarlyStopping 
from tensorflow.keras.optimizers import Adam 
from tensorflow.keras.models import Model 

In [None]:
#Imports from repo 
from data.datareader import datareader 
from data.datahandler import datahandler 
from .qconfig import *

In [None]:
#TODO: See if this can be improved - e.g. embedded in class
#def unwrap_self(image, c, j, i, qubits, ksize, nlayers, circuit):. 
  #return QCNN.qconv2D2(image, c, j, i, qubits, ksize, filters, nlayers, seed, circuit)


In [None]:
class QCNN: 

  ''' Quantum Convolutional NN class. 
    This class implements: 
        - 1 quantum convolutional layer along te y-axis - default: Ry 
        - n classical convolutional layers 
        - n classical dense layers.  
    The dataset is meant to be composed by RGB data. 
    The quanvolutional layer extracts the feature maps obtained by means of quantum processing.  
''' 
  def __init__(self, qubits, filters, kernel_size, stride, img_shape, n_classes, circuits='ry', parallelize=0, nlayers=1, seed=0, name=None) 
    
    #Set up quantum layer params - for image processing
    self.qubits = qubits
    self.filters = filters 
    self.kernel_size = kernel_size 
    self.stride = stride 
    self.img_shape = img_shape 
    self.n_classes = n_classes 
    self.circuits = circuits
    self.parallelize = parallelize 
    self.nlayers = nlayers 
    self.seed = seed
    self.name = name 

    if self.name == 'None': self.name == 'QCNN' 
    #Set up training parsmeters with confug file - conv and/or dends layers
    self.loss = qcnnv2s['loss'] 
    self.metrics = qcnnv2s['metrics']
    self.learning_rate = qcnnv2s['learning_rate'] 
    self.dropout = qcnnv2s['dropout'] 
    self.batch_size = qcnnv2s['batch_size'] 
    self.epochs = qcnnv2s['epochs'] 
    self.es_rounds = qcnnv2s['early_stopping'] 
    self.dense = qcnnv2s['dense'] #vector of n neurons for each dense layer 
    self.conv = qcnnv2s['conv'] #vector of n filters for each convolutional layer 
    self.convolutional_kernel_size = qcnnv2s['kernel'] 
    self.convolution_stride = qcnnv2s['stride']
    self.avg_pool_size = qcnnv2s['pool_size'] 
    self.avg_pool_stride = qcnnv2s['pool_stride'] 

  def apply(self, image, verbose=True): 
    results = []
    if self.parallelize == 0: 
      results. append(self.__qconv2D(image, verbose))

    results = np.moveaxis(results, 0, -1) 
    s = np.shape(results) 
    return np.reshape(results, (s[0], s[1], s[-2]*s[-1]))

  @staticmethod
  def quanvolutional_layer(self, image, verbose): #input: rgb data: output: feature maps. Computed b.m.o. quantum conv layer along ry 
    #non-parallelized a.t.m. 
    h, w, ch = image.shape 
    h_out = (h - self.kernel_size) // self.stride + 1) 
    w_out = (w - self.kernel_size) // self.stride + 1) 
    out = np.zeros((h_out, w_out, ch, self.filters)) 

    ctx = 0 
    cty = 0
    for c in tqdm(range(ch), desc='Channel', disable=not(verbose), colour='black'): 
      for j in tqdm(range(0, h-self.kernel_size, self.stride), desc='Column', leave=False, disable=not(verbose), colour='black'):
        for i in tqdm(range(0, w-self.kernel_size, self.stride), desc='Row',leave=False, disable=not(verbose), colour='black'): 
          p = image[j_j+self.kerne_size, i:i+self.kernel_stride, c] 
          if self.circuits == 'ry': 
            q_results = ry_random(jnp.array(p.reshape(-1)), self.qubits, self.kernel_size, self.filters, self.nlayers, self.seed) 
          elif self.circuits == 'rx': 
            q_results = rx_random(jnp.array(p.reshape(-1)), self.qubits, self.kernel_size, self.filters, self.nlayers, self.seed) 
          elif self.circuits == 'rz': 
            q_results = rz_random(jnp.array(p.reshape(-1)), self.qubits, self.kernel_size, self.filters, self.nlayers, self.seed) 
          else: 
            q_results = rxyz_custom(jnp.array(p.reshape(-1)), self.qubits, self.kernel_size, self.filters, self.nlayers, self.seed) 
          q_results = np.array(q_results)

          for k in range(self.filters): 
            out[cty, ctx, c, k] = q_results[k] 

          ctx += 1 
        ctx = 0
        cty += 1
      ctx = 0
      cty = 0   
    out = np.mean(out, -2, keepdims=False)   

    return out 

  def __build(self): 
    xin = Input(shape=self.img_shape)
    x = Activation('relu')(xin)
    x = AveragePooling2D(pool_size=self.avg_pool_size, strides=self.avg_pool_stride)(x)

    if self.conv is not None: 
      for conv in self.conv: 
        x = Conv2D(filters=conv, kernel_size=self.convolutional_kernel_size, strides=self.convolution_stride, activation='relu')(x) 
        x = AveragePooling2D(pool_size=self.avg_pool_size, strides=self.avg_pool_stride)(x) 
    x = Flatten()(x) 

    for dense in self.dense: 
      x = Dropout(self.dropout)(x) 
      x = Dense(dense, activation='relu')(x) 
    x = Dropout(self.dropout)(x) 
    x = Dense(self.n_classes, activation='softmax')(x) 

    model = Model(inputs=xin, outputs=x, name=self.name)
    model.compile(optimizer = Adam(learning_rate=self.learning_rate), loss=self.loss, metrics=self.metrics) 
    return model 


  def train_test(self, train_dataset, val_dataset, labels_mapper, normalize=None, verbose=0): 
    es = EarlyStopping(monitor='val_loss', patience=self.es_rounds, mode='auto', verbose=0, baseline=None)
    train_gen = datareader.generator(train_dataset, self.batch_size, self.img_shape, normalize=normalize) 
    val_gen = datareader.generator(val_dataset, self.batch_size, self.img_shape, normalize=normalize) 

    history = self.model.fit(train_gen, steps_per_epoch=len(train_dataset[0])//self.batch_size, validation_data=val_gen, validation_steps = len(val_dataset[0])//self.batch_size, epochs=self.epochs, callback=[es], verbose=verbose) 
    self.history = history 

    path = os.path.join('results', 'QCNN', '{}'.format(datetime.now().strftime("%d-%m-%Y-%H:%M:%S"))) 
    os.makedirs(path) 
    model.path = os.path.join(path, 'model.h5') 
    self.model.save(model_path)
    print('{:30s}{}'.format('Model Saved', model_path)) 

    history_path = os.path.join(path, 'history.csv') 
    df = pd.DataFrame(history.history) 
    df.to_csv(history_path, index=False) 
    print('{:<30s}{}'.format('History Saved', history_path)) 

    self.__save_model_settings(self, path): 
    self.test(train_dataset, val_dataset, path, labels_mapper, normalize) 

  def __save_model_settings(self, path): 
    settings_path = os.path.join(path, 'settings.txt') 
    with open(settings_path 'w') as f: 
      f.write('{:.^100})\n'.format('Model Settings'))
      f.write('{:<30s}:{}\n'.format('Name', self.name))  
      f.write('{:<30s}:{}\n'.format('Image Shape', self.img_shape)) 

      for key, value in qcnnv2s.items(): 
        f.write('{:<30s}:{}/n'.format(key, value)) 

      tmp_smry = StringIO() 
      self.model.summary(print_fn=lambda x: tmp_smry.write(x + '/n')) 
      summary = tmp_smry.getvalue() 
      f.write('{:.^100}\n'.format('Model parameters')) 
      f.write(summary) 

    print('{:<30s}{}'.format('Model Settings Saved',settings_path)) 

  def test(self. train_dataset, val_dataset, path, labels_mapper, normalize=None): 
    train_gen = iter(datareader.generatorv2(train_dataset, self.img_shape, normalize=normalize)) 
    val_gen = iter(datareader.generatorv2(val_dataset, self.img_shape, normalize=normalize)) 

    print('testing model on traing set') 
    self.__make_pred(train_dataset, train_gen, path, 'training', labels_mapper) 
    print('testing model on validation set') 
    self.__make_pred(val_dataset, val_gen, path, 'validation', labels_mapper) 

  def __make_pred(self, dataset, iterator, path, name, label_mapper): 
    predictions = np.zeros(np.shape(dataset[1])) 
    targets = np.zeros(np.shape(dataset[1])) 
    paths = [] 

    for i in tqdm(range(len(dataset[0]))):  
      x, y, ps = next(iterator) 
      p = self.__confusion_matrix_report(path, name, np.argmax(targets, axis=1), np.argmax(predictions, axis=-1), labels_mapper.values()) 

  def __confusion_matrix_report(self. path, name, targets, predictions, classes, display=False): 
    fig, ax = plt.subplots(nrows=1, ncols=1, figsize=(10,8)) 
    cm = confusion_matrix(targets, predictions, normalize='true') 
    cdm = ConfuionMatrixDisplay(cm, display_labels=classes) 
    cdm.plot(ax=x, xticks_rotation=90, cmap='Blues', values_format='.2f') 
    cdm.ax_.get_images()[0].set_clim(0, 1) 

    fig.tight_layout() 
    if display: plt.show() 

    cf_path = os.path.join(path, name+'-cf.png') 
    fig.savefig(cf_path) 
    print('{:<30s}{}'.format('confusion matrix saved.', cf_path)) 
    plt.close() 

    c_report = classification_report(targets, predicitions, target_names=classes) 
    report_path = os.path.join(path, name+'-report.txt') 
    with open(report_path, 'w') as f: 
      f.write(c_report) 
    print('{:<30s}{}'.format('classification report saved.', report_path))

    
