In [1]:
#code of Interactive Quantum Classifier Inspired by Quantum Open System Theory
#LINK https://ieeexplore.ieee.org/document/9533917

#LINK https://ieeexplore.ieee.org/stamp/stamp.jsp?tp=&arnumber=9533917

#this code was written by Fernando Maciano de Paula Neto (fernando@cin.ufpe.br) and Eduardo Barreto Brito (ebb2@cin.ufpe.br)

In [2]:
import numpy as np
import numpy as np
import pandas as pd
import math

from sklearn import datasets
from sklearn.model_selection import train_test_split, KFold, StratifiedKFold
from sklearn.preprocessing import StandardScaler, MaxAbsScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, confusion_matrix, precision_score, recall_score, f1_score
from scipy.linalg import expm as expMatrix
from sklearn import preprocessing

In [3]:
def get_sigmaE(vectorX, vectorW):
  """
    Multiplies the input (vectorX) by the weights (vectorW),
    resulting in a diagonal matrix.
    In case any of the vectors contains imaginary parts, it's discarded.
    Equivalent of Equation #17 in the Article.
  """
  n = len(vectorX)
  sigmaE = np.zeros((n,n))
  for i in range(n):
    sigmaE[i,i] = np.real(vectorX[i])*np.real(vectorW[i])

  return sigmaE

In [4]:
def get_sigmaQ(n):
  """
    Sums sigmaX, sigmaY and sigmaZ to get sigmaQ.
    - sigmaX comes from Equation #7 = [0, 1   1, 0]
    - sigmaY comes from Equation #8 = [0, -i  i, 0]
    - sigmaZ comes from Equation #9 = [1, 0   0, -1]
    Equivalent of Equation #16 in the Article.
  """
  sigmaQ = np.zeros((n,n))
  sigmaX = np.array([[0,1], [1,0]])
  sigmaY = np.array([[0,-1j], [1j,0]])
  sigmaZ = np.array([[1,0], [0,-1]])
  sigmaQ = sigmaX + sigmaY + sigmaZ

  return sigmaQ

In [5]:
def get_U_operator(sigmaQ, sigmaE):
  """
  Makes the exponential matrix of tensor product between sigmaQ and sigmaE and multiplies it by j. 
  Equivalent of Equation #15 in the Article.
  """
  return np.matrix(expMatrix(1j*np.kron(sigmaQ, sigmaE)))

In [6]:
def get_p(psi):
  """
    Creates a matrix out of psi and multiply it against its inverse, 
    resulting in a column vector in the form [[alfa]. [beta]].
    Does the operation |psi><psi| from Equation #18 or #19 in the Article.
  """
  psi = np.matrix(psi)
  return psi * psi.getH()

In [7]:
def create_and_execute_classifier(vectorX, vectorW):
  """
    Applies the ICQ classifier using only the math behind the Quantum Classifier 
    described in Interactive Quantum Classifier Inspired by Quantum Open System Theory
    article. 
    After doing so, it gets the result of Equation #20 and returns Z as the predicted class and
    the probability of being the class 1.
    Works only for binary classifications, therefore, if the probability of class 0 is needed, it can
    be 1 - probability of being class 1.
  """

  # Eq #16
  sigmaQ = get_sigmaQ(2)

  # Eq #17
  sigmaE = get_sigmaE(vectorX, vectorW)

  # Eq #15
  U_operator = get_U_operator(sigmaQ, sigmaE)

  # Eq #18 applied on a Quantum state equivalent of Hadamard(|0>) = 1/sqrt(2) * (|0> + |1>) 
  p_cog = get_p([[1/np.sqrt(2)],[1/np.sqrt(2)]])

  # As we must have 1 row per attribute of the input, we need env to be as big as one instance of our input
  N = len(vectorX)

  # Eq #19 applied on a Quantum state equivalent of Hadamard(|000000...>) = 1/sqrt(N) * (|000000...> + ... + |11111111....>) 
  p_env = get_p([[1/np.sqrt(N)] for i in range(N)])

  # First part of Equation #20 in the Article
  quantum_operation = np.array(U_operator * (np.kron(p_cog, p_env)) * U_operator.getH())

  # Second part of Equation #20 in the Article
  p_cog_new = np.trace(quantum_operation.reshape([2,N,2,N]), axis1=1, axis2=3)

  # As the result is a diagonal matrix, the probability of being class 0 will be on position 0,0
  p_cog_new_00_2 = p_cog_new[0,0]

  # ... and the probability of being class 1 will be on position 1,1
  p_cog_new_11_2 = p_cog_new[1,1]
  if (p_cog_new_00_2 >= p_cog_new_11_2):
    z = 0
  else:
    z = 1
  return z, p_cog_new_11_2

In [8]:
def update_weights(weights, y, z, x, p, n):
  """
    Updates the weights. Equation #34 in the Article.
    
    y is the expected classification [0, 1];
    z is the actual classification [0, 1];
    x is the attribute vector;
    p is the probability of the class 1 (0, 1);
    n is the learning rate.
  """
  # Eq 33
  loss_derivative_on_weight = (1-(p**2))*x

  # Eq 34
  weights = weights-n*(z-y)*loss_derivative_on_weight
  return weights

In [9]:
def create_and_execute_classifiers(vectorX, vectorWs):
  """
    Creates classifiers with differents weights and outputs the index of
    the weight that has the highest probability of having class 1
  """
  list_p11_i = []

  for vectorWs_i in vectorWs:
    # First we create and execute the classifier
    zi, p11_i = create_and_execute_classifier(vectorX, vectorWs_i)  

    # Then we save the probability of being class 1
    list_p11_i.append(p11_i)
  
  # Finally, we get the biggest prob
  return np.argmax(list_p11_i)

In [10]:
def create_and_execute_1_classifier(X, Y, w, n=0.1):
  """"
    Creates, train and executes 1 classifier throughout all instances,
    updating the weights instance per instance (batch-size = 1).
   
    X is a NxM vector of Atributes
    Y is the N vector of Classes
    W is the M vector of Weights
    N is the learning rate
    
    Returns updated_weight, error
  """
  error = 0
  
  for x,y in zip(X,Y):
    # First we create and execute the classifier
    z, p11 = create_and_execute_classifier(x,w)

    # Then we update our weights based on our result
    w = update_weights(w,y,z,x,p11,n)

    # Next we store the error
    if (z != y):
      error += 1

  return w, error

In [11]:
def training_1_batch_classifier(X, Y, w, n=0.1):
  """
    Creates, trains and executes 1 classifier after all instances using the average
    of the input vector (X), class vector (Y), probability of being class 1 and assigned
    class (uses updates_weights(w,y_avg,z_avg,x_avg,p11_avg,n)).
    
    X is a NxM vector of Atributes
    Y is the N vector of Classes
    W is the M vector of Weights
    N is the learning rate

    Returns updated_weight, error
  """
  error = 0
  x_avg = 0
  y_avg = 0
  z_avg = 0
  p11_avg = 0
  lines = X.shape[0]

  # We do something similar to create_and_execute_1_classifier method, but updating only once
  for x,y in zip(X,Y):
    z, p11 = create_and_execute_classifier(x,w)
    x_avg += x/lines
    y_avg += y/lines
    z_avg += z/lines
    p11_avg += p11/lines
    if (z != y):
      error += 1

  w = update_weights(w,y_avg,z_avg,x_avg,p11_avg,n)

  return w, error

In [12]:
def training_n_steps_classifier(X, Y, w, batch_size, Nsteps, n=0.1, stop_error_zero=False):
  """
    Creates, trains and executes classifiers through batches using @training_1_batch_classifier method.
    
    X is a NxM vector of Atributes
    Y is the N vector of Classes
    w is the M vector of Weights
    batch_size is the number of instances used per batch training
    Nsteps is the number of times the classifier will be training. Similar to number of epochs
    n is the learning rate
    stop_error_zero defines whether we should stop when we have error equals zero

    Returns the weights after the training and the min error obtained after executing
  """
  # First we need to know how many batches we will have for training
  lines = X.shape[0]
  splits = math.ceil(lines / batch_size)
  
  min_error = np.Inf
  min_w_error = np.Inf

  for i in range(Nsteps):
    errors = 0
    # For each batch split, we need to train our classfier
    for split in range(splits):
      # First step is to define our dataset
      X_batch = X[split*batch_size:(split+1)*batch_size , :]
      Y_batch = Y[split*batch_size:(split+1)*batch_size]

      # We save the weights in which we're executing the current classifier in case this is the best one
      w_old = w[:]

      # Then we train and classify for this part of the dataset
      w, error = training_1_batch_classifier(X_batch,Y_batch,w=w,n=n)
      errors += error
    
    # and save the sum of errors for all batches if needed 
    if (min_error > errors):
      min_w_error = w_old
      min_error = errors
      
    if (errors == 0 and stop_error_zero):
      break
  return min_w_error, min_error

In [13]:
def replicate_classes(data_set, features_names, classes):
  """
    Creates datasets which assigns 1 to one class and 0 to another, and balance
    the number of instances of class 0 and class 1.
    
    Returns an array with all datasets. For Iris dataset
    list_x_y[0] = datasets where all instances of class 0 has class 1 and instances of other classes has class 0
    list_x_y[1] = datasets where all instances of class 1 has class 1 and instances of other classes has class 0
    list_x_y[2] = datasets where all instances of class 2 has class 1 and instances of other classes has class 0
    
    It also duplicates the instances with the targeted class, making all datasets having 100 instances of class 0
    and 100 instances of class 1 for Iris dataset.
  """
  list_x_y = []
  n_classes = len(classes)

  for class_i in classes:
    # We don't want to change the original dataset, so we make a copy of it
    y_class_i = data_set.copy()

    # First we change all classes to n + 1, in order for it to have a special value
    y_class_i.loc[ y_class_i["target"] == class_i  , "target"]  = n_classes+1

    # Then, we change every class different from the one we want to 0
    y_class_i.loc[ y_class_i["target"] < n_classes  , "target"]  = 0

    # Finally, we change every class that is equal to the one we want to 1
    y_class_i.loc[ y_class_i["target"] == (n_classes+1)  , "target"]  = 1

    # Last but not least, we replicate the dataset to have a balanced number of instances with
    # desired and undesired classes. As we're dealing only with Iris, which has 50/50/50, we only
    # need to replicate once
    y_class_i = pd.concat([y_class_i, y_class_i[y_class_i["target"]==1]], axis=0)
  
    # Now we split between attributes and target
    list_x_y.append([y_class_i[features_names], y_class_i["target"]])

  return list_x_y

In [14]:
def test_many_classifiers(X, y, list_of_weights_classifiers):
  """
    Uses the classifier weights to try to predict the correct class and then prints the metrics

    Returns precision, recall, f1_measure, accuracy, y_pred
  """
  lines = X.shape[0]
  y_pred = []

  # For each instance, we try to predict the correct class
  for i in range(lines):
    output_class = create_and_execute_classifiers(X.iloc[i, :].values, list_of_weights_classifiers)
    y_pred.append(output_class)
  
  # Then we get the correct classes to start recording
  y_true = y["target"].tolist()

  # To do so, we count the number of hits
  hit=0
  for y, yhat in zip(y_true, y_pred):
    if (y == yhat):
      hit+=1
  #print("Acertos: ", hit)

  # Finally, we calculate every score we need
  precision = precision_score(y_true, y_pred, labels=[0,1,2], average='micro')
  recall = recall_score(y_true, y_pred, labels=[0,1,2],average='micro')
  f1_measure = f1_score(y_true, y_pred, labels=[0,1,2], average='micro')
  accuracy = accuracy_score(y_true, y_pred)
  #print("Precision:", precision, ", Recall:", recall, ", F1-Score:", f1_measure)

  return precision, recall, f1_measure, accuracy, y_pred

In [15]:
def avg_metrics(measures):
  """ 
    Prints the average of the execution portraid in measures.
    The "measures" param must be a vector of the following tuple:
    (precision, recall, f1_measure, accuracy)
  """
  precisions = 0
  recalls = 0
  f1_measures = 0
  accs = 0
  count = 0

  for precision, recall, f1_measure, accuracy in measures:
    precisions += precision
    recalls += recall
    f1_measures += f1_measure
    accs += accuracy
    count+=1
    
  print("acc", accs/count)
  print("precision", precisions/count)
  print("recall", recalls/count)
  print("f1_measure", f1_measures/count)

In [16]:
def standard_scaling(df):
    """
        Uses the scikit-learn StandardScaler to normalize the dataset. 
        See https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.StandardScaler.html
    """
    df_scaled = df.copy()
    scaler = StandardScaler()
    scaler.fit(df_scaled)
    df_scaled = scaler.transform(df_scaled)
    return df_scaled

In [17]:
def maximum_absolute_scaling_by_column(df):
    """
        Divides the whole column by the max absolute value available.
    """
    # copy the dataframe
    df_scaled = df.copy()
    
    # apply maximum absolute scaling
    for column in df_scaled.columns:
        df_scaled[column] = df_scaled[column]  / df_scaled[column].abs().max()
    return df_scaled

In [18]:
def min_max_scaling_by_column(df):
    """
        Divides the whole column by the difference between its max and min values.
    """
    # copy the dataframe
    df_norm = df.copy()
    
    # apply min-max scaling
    for column in df_norm.columns:
        df_norm[column] = (df_norm[column] - df_norm[column].min()) / (df_norm[column].max() - df_norm[column].min())
        
    return df_norm

In [19]:
def min_max_scaling_by_column_type_2(df):
    """
        Divides the whole column through the following equation:
        column = (value - min) / (max - min) - 1
    """
    # copy the dataframe
    a, b= -1, 0
    df_norm = df.copy()
    
    # apply min-max scaling
    for column in df_norm.columns:
        df_norm[column] =(b-a)*(df_norm[column] - df_norm[column].min()) / (df_norm[column].max() - df_norm[column].min())+ a
        
    return df_norm

In [20]:
def training_k_fold_classifier(kfold, X, y, Nsteps, batch_size, n_learning_rate, features_names, classes, normalizing_function=min_max_scaling_by_column_type_2, random_state=42):
    # Instantiating the K-Fold cross validation object with 5 folds
    k_folds = StratifiedKFold(n_splits = kfold, shuffle = True, random_state = random_state)
    metrics = []

    # Iterating through each of the folds in K-Fold
    for train_index, test_index in k_folds.split(X, y):
      # print("TRAIN INDEX", train_index, "VAL_INDEX", val_index)

      # Splitting the training set from the validation set for this specific fold
      X_train, X_test = X.iloc[train_index, :], X.iloc[test_index, :]
      y_train, y_test = y.iloc[train_index], y.iloc[test_index]

      # Applies the normalizing_function on each Dataset
      X_train = normalizing_function(X_train)
      X_test = normalizing_function(X_test)
      
      # We create a new Pandas dataset
      dataset = pd.concat([X_train, y_train], axis=1)
      
      #print("DATASET", dataset)
      #X1, y1, X2, y2, X3, y3 

      # This list contains 3 values, each one being a dataset for each class
      # of the iris dataset. See @replicate_classes doc
      list_of_x_y = replicate_classes(dataset, features_names, classes)
      list_of_trainned_w = []
      count=0

      # Then, for each dataset of each class, we 
      for Xi, yi in list_of_x_y:
        # We initialize the weights vector with values 0.1. 
        # The number of rows must be the number of attributes, as it is a diagonal matrix
        w = [0.1 for i in range(Xi.shape[1])]

        # Now we need to convert our datasets into np arrays, as it's the type expected in the training_n_steps_classifier
        X_train = np.array(Xi)
        y_train = np.array(yi)

        # Now we update our weights and save the error for printing
        wi, error = training_n_steps_classifier(X_train,y_train, w, Nsteps=Nsteps, batch_size=batch_size, n=n_learning_rate, stop_error_zero=True)
        print("error #", count, error)

        list_of_trainned_w.append(wi)
        count+=1

      # For testing, we try every weights we already had before, in order to find the best one
      precision, recall, f1_measure, accuracy, y_pred = test_many_classifiers(X_test , y_test, list_of_trainned_w)

      # And then we append the metrics for this specific execution
      metrics.append([precision, recall, f1_measure, accuracy])
      print("metricas, precision, recall, f1_measure, acc", metrics[-1])
      #print("Confusion Matrix #", count, confusion_matrix(y_test, y_pred))

    return list_of_trainned_w, metrics

In [22]:
# Now that we have built all methods, we can have our training

# First thing is to load the iris dataset from sklearn
iris = datasets.load_iris(as_frame=True)

# Then we get our X and Y
X_iris = pd.DataFrame(data= iris['data'],
                      columns= iris['feature_names'])
y_iris = pd.DataFrame(data= iris['target'],
                      columns= ['target'])

# And then execute it
list_of_trainned_w, metrics = training_k_fold_classifier(kfold=10, 
                                                            X=X_iris, 
                                                            y=y_iris, 
                                                            Nsteps=2000, 
                                                            batch_size=60, 
                                                            n_learning_rate=0.009,
                                                            features_names=iris['feature_names'], 
                                                            classes=[0,1,2],
                                                            normalizing_function=min_max_scaling_by_column_type_2,
                                                            random_state=42) 

error # 0 16
error # 1 47
error # 2 16
metricas, precision, recall, f1_measure, acc [0.8666666666666667, 0.8666666666666667, 0.8666666666666667, 0.8666666666666667]
error # 0 18
error # 1 45
error # 2 17
metricas, precision, recall, f1_measure, acc [0.8, 0.8, 0.8000000000000002, 0.8]
error # 0 16


In [None]:
# Most important part - what are our metrics?
avg_metrics(metrics)

In [None]:
# Now that we have built all methods, we can have our training

# First thing is to load the iris dataset from sklearn
iris = datasets.load_iris(as_frame=True)

# Then we get our X and Y
X_iris = pd.DataFrame(data= iris['data'],
                      columns= iris['feature_names'])
y_iris = pd.DataFrame(data= iris['target'],
                      columns= ['target'])

# And then execute it
list_of_trainned_w, metrics = training_k_fold_classifier(kfold=10, 
                                                            X=X_iris, 
                                                            y=y_iris, 
                                                            Nsteps=2000, 
                                                            batch_size=60, 
                                                            n_learning_rate=0.009,
                                                            features_names=iris['feature_names'], 
                                                            classes=[0,1,2],
                                                            normalizing_function=min_max_scaling_by_column,
                                                            random_state=42) 

In [None]:
# Most important part - what are our metrics?
avg_metrics(metrics)

In [None]:
# Now that we have built all methods, we can have our training

# First thing is to load the iris dataset from sklearn
iris = datasets.load_iris(as_frame=True)

# Then we get our X and Y
X_iris = pd.DataFrame(data= iris['data'],
                      columns= iris['feature_names'])
y_iris = pd.DataFrame(data= iris['target'],
                      columns= ['target'])

# And then execute it
list_of_trainned_w, metrics = training_k_fold_classifier(kfold=10, 
                                                            X=X_iris, 
                                                            y=y_iris, 
                                                            Nsteps=2000, 
                                                            batch_size=60, 
                                                            n_learning_rate=0.009,
                                                            features_names=iris['feature_names'], 
                                                            classes=[0,1,2],
                                                            normalizing_function=maximum_absolute_scaling_by_column,
                                                            random_state=42) 

In [None]:
# Most important part - what are our metrics?
avg_metrics(metrics)