# Code used for the experiments of "*Opening Up the Neural Network Classifier for Shap Score Computation*", submitted to Jelia 2023

(It should be executed on descending order)

# BNN and dDBCSFi(2) creation

## Random seeds

In [None]:
from IPython.display import clear_output
import os
from numpy.random import seed
import random as python_random
from tensorflow.random import set_seed

def set_random_seeds(py_hash_seed="7", np_seed=7, rand_seed=7, tf_seed=7):
  os.environ['PYTHONHASHSEED'] = py_hash_seed
  seed(np_seed)
  python_random.seed(rand_seed)
  set_seed(tf_seed)
  
set_random_seeds()
clear_output()

## Installations

In [None]:
import os.path
import time

def seconds_separator(seconds_passed):
  duration = seconds_passed
  hours = int(duration//3600)
  duration = duration%3600
  minutes = int(duration//60)
  duration = float(round(duration%60, 4))
  return str(hours)+":"+"0"*(minutes<10)+str(minutes)+":"+"0"*(duration<10)+str(duration)+"0"*(6-len(str(duration))+(duration>=10))

if (not os.path.exists("riss-solver")):
  beginning = time.monotonic()
  #Installation of libraries to work with SSDs
  !git clone https://github.com/Jorvan758/nnf2sdd.git
  %cd ./nnf2sdd
  %mv * ../
  %cd ../
  !rm -rf ./nnf2sdd
  !pip install pyeda pysdd graphviz
  #Installation of the simplifying solver
  !git clone https://github.com/Jorvan758/CNF_OBDD.git
  %cd CNF_OBDD/bnn/BinaryNet/cpp/
  %mv * ../../../../
  %cd ../../../../
  !rm -r CNF_OBDD
  %cd riss-solver/
  !sudo apt-get install zlib1g-dev
  !sudo apt-get install curl
  !cmake -D CMAKE_BUILD_TYPE=Release
  !make
  %cd ../
  #Larq
  !pip install larq
  clear_output()
  duration = time.monotonic() - beginning
  print("Time taken in installations:", seconds_separator(duration))

## Tensorflow

In [None]:
import seaborn as sns
from sklearn.model_selection import train_test_split
import itertools

from tensorflow.keras import *
from tensorflow import nn, matmul
from tensorflow import round as kround
from tensorflow.keras.constraints import MinMaxNorm
from tensorflow.keras.initializers import RandomNormal
import larq as lq
import networkx as nx
from networkx.drawing.nx_agraph import graphviz_layout
import matplotlib.pyplot as plt

def step_tanh(x):
  return kround(nn.sigmoid(x))*2 - 1
def step_sigm(x):
  return kround(nn.sigmoid(x))

def plot_the_network(the_model, resplt=10):
  the_weights = {}
  G = nx.Graph()
  the_colors = []
  previous_layer = []
  for node in range(the_model.input_shape[1]):
    G.add_node(str(node+1), layer=0)
    previous_layer += [str(node+1)]
    the_colors += ["b"]
  G.add_node("_"+str(0), layer=0)
  previous_layer += ["_"+str(0)]
  the_colors += ["g"]
  for the_layer in range(len(the_model.get_weights())//2):
    new_layer = []
    for node in range(the_model.get_weights()[the_layer*2].shape[1]):
      G.add_node(str(the_layer+1)+"_"+str(node+1), layer=the_layer+1)
      new_layer += [str(the_layer+1)+"_"+str(node+1)]
      the_colors += ["r"]
    index2 = 0
    for node_o in previous_layer:
      index1 = 0
      for node_d in new_layer:
        G.add_edge(node_o, node_d)
        if ("_0" in node_o): the_weights[(node_o, node_d)] = round(the_model.get_weights()[the_layer*2+1][index1], 2)
        else: the_weights[(node_o, node_d)] = round(the_model.get_weights()[the_layer*2][index2][index1], 2)
        index1 += 1
      index2 += 1
    if (the_layer < (len(the_model.get_weights())//2-1)):
      G.add_node(str(the_layer+1)+"_"+str(0), layer=the_layer+1)
      new_layer += [str(the_layer+1)+"_"+str(0)]
      the_colors += ["g"]
    previous_layer = new_layer
  pos = nx.multipartite_layout(G, subset_key="layer")
  plt.clf()
  plt.figure(figsize=[resplt,resplt])
  nx.draw(G, pos, edge_color='black', width=2, linewidths=1, node_size=100*resplt/(len(the_model.get_weights())//2),
          node_color=the_colors, alpha=0.9, with_labels=False)
  nx.draw_networkx_edge_labels(G, pos, edge_labels=the_weights, label_pos=0.2, rotate=False,
                               font_size=15*resplt/((len(the_model.get_weights())//2)+resplt))
  nx.draw(G, pos, edge_color='black', width=2, edgelist=[], linewidths=1, node_size=100*resplt/(len(the_model.get_weights())//2),
          node_color=the_colors, alpha=0.9, with_labels=False)
  plt.show()

## CNF formula

In [None]:
#These are the times taken for a generic BNN with 2 layers for a different number of variables:
#2:  0:00:00.0043
#3:  0:00:00.0121
#4:  0:00:00.0264
#5:  0:00:00.0492
#6:  0:00:00.0965
#7:  0:00:00.2507
#8:  0:00:00.6960
#9:  0:00:03.1606
#10: 0:00:17.4078
#11: 0:01:59.9104
#12: 0:10:33.7824
#13: 1:33:54.6341
from math import ceil
import numpy as np
import sys
np.set_printoptions(threshold=sys.maxsize)

def create_array(values:int=1, n_variables:int=1) -> np.array:
  return np.array([0]*(abs(values)-1)+[(values>0)*2-1]+[0]*(n_variables-abs(values)), dtype=np.int8).reshape(1,-1)

def delete_zeros(matrix:np.array, n_variables:int) -> np.array:
  return matrix[np.where((matrix == 0).sum(axis=1) != n_variables)[0]]

def simplify_cnf_formula(matrix:np.array, n_variables:int) -> np.array:
  simplified = matrix.copy()
  for x in range(n_variables-1, 0, -1):
    with_x_zeros = np.where((simplified == 0).sum(axis=1) == x)[0]
    if (with_x_zeros.size != 0) and ((simplified == 0).sum(axis=1).min() < x):
      comparables = simplified[with_x_zeros]
      erasables = simplified[np.where((simplified == 0).sum(axis=1) < x)[0]]
      with_more_zeros = np.where((simplified == 0).sum(axis=1) > x)[0]
      if (with_more_zeros.size != 0):
        saveables = simplified[with_more_zeros]
        for clause in comparables: erasables = erasables[np.where((erasables*(clause!=0) != clause).sum(axis=1) > 0)[0]]
        simplified = np.vstack([saveables, comparables, erasables])
      else:
        for clause in comparables: erasables = erasables[np.where((erasables*(clause!=0) != clause).sum(axis=1) > 0)[0]]
        simplified = np.vstack([comparables, erasables])
  return simplified

def conjunction_cnfs(matrix1:np.array, matrix2:np.array, n_variables:int) -> np.array:
  return simplify_cnf_formula(np.unique(np.vstack([matrix1, matrix2]), axis=0), n_variables)

def disjunction_cnfs(matrix1:np.array, matrix2:np.array, n_variables:int) -> np.array:
  new_ones = []
  for clause in matrix1:
    new_ones += [delete_zeros((matrix2+clause-clause*(matrix2==clause))*((matrix2*clause < 0).sum(axis=1) == 0).reshape(-1,1), n_variables)]
  return simplify_cnf_formula(np.unique(np.vstack(new_ones), axis=0), n_variables)

def cnf_negation(matrix:np.array, n_variables:int) -> np.array:
  final = np.zeros((1, n_variables))
  clauses = -matrix
  while (clauses.shape[0] > 0):
    new_ones = []
    for ind_clause2 in range(n_variables):
      if (clauses[0,ind_clause2] != 0):
        temp = create_array(clauses[0,ind_clause2]*(ind_clause2+1), n_variables)
        new_ones += [delete_zeros((final+temp-temp*(final==temp))*((final*temp < 0).sum(axis=1) == 0).reshape(-1,1), n_variables)]
    final = np.unique(np.vstack(new_ones), axis=0)
    clauses = clauses[1:]
  return simplify_cnf_formula(final, n_variables)

def encode_network(the_model, input_file="cnf_formula.cnf") -> str:
  beginning = time.monotonic()
  n_inputs = the_model.input_shape[1]
  inputs = [create_array(i, n_inputs) for i in range(1, n_inputs+1)]
  n_layer = 1
  for layer in the_model.layers[1:]:
    the_weights = layer.get_weights()
    outputs = []
    for id_neuron in range(layer.output_shape[1]):
      print(f'{seconds_separator(time.monotonic() - beginning)}   Layer: {n_layer}/{len(the_model.layers)-1} | Neuron: {id_neuron+1}/{layer.output_shape[1]}')
      D = ceil((-the_weights[1][id_neuron] + the_weights[0][:,id_neuron].sum())/2) + (the_weights[0][:,id_neuron] == -1).sum()
      previous = {}
      for id_input in range(len(inputs)):
        if (layer == the_model.layers[-1]): print(f'{seconds_separator(time.monotonic() - beginning)}   Working with the first {id_input+1} inputs')
        actual = {}
        if (the_weights[0][id_input,id_neuron] == 1): x = inputs[id_input]
        else: x = cnf_negation(inputs[id_input], len(inputs))
        for d in range(D):
          if (id_input < d): break
          if (len(inputs) < id_input+1+D-(d+1)): continue
          if (d == 0):
            if (id_input == 0): actual[d] = x
            else: actual[d] = disjunction_cnfs(x, previous[d], len(inputs))
          elif (id_input == d): actual[d] = conjunction_cnfs(x, previous[d-1], len(inputs))
          else:
            temp = conjunction_cnfs(x, previous[d-1], len(inputs))
            actual[d] = disjunction_cnfs(temp, previous[d], len(inputs))
        previous = actual
      outputs += [previous[D-1].astype(dtype=np.int8)]
    inputs = outputs
    n_layer += 1
  print(f'Total time taken: {seconds_separator(time.monotonic() - beginning)}')
  dimacs_cnf = inputs[-1]
  dimacs_cnf = str(dimacs_cnf*np.arange(1,dimacs_cnf.shape[1]+1)).replace(" 0", "").replace("]","").replace("[","").replace("\n", " 0\n")+" 0\n"
  while "  " in dimacs_cnf: dimacs_cnf = dimacs_cnf.replace("  ", " ")
  output_file = input_file[:-4]+"_final.cnf"
  with open(input_file, 'w') as f:
    f.write('p cnf %d %d\n' % (n_inputs, dimacs_cnf.count('\n')))
    f.write(dimacs_cnf)
  return input_file
  with open("whitelist.txt", 'w+') as f:
    f.write(str(1)+".."+str(n_inputs))
  !riss-solver/bin/coprocessor -dimacs=$output_file -verb=0 -config="-enabled_cp3 -cp3_stats -bve -bve_red_lits=1 -fm -no-cp3_limited -unhide -cp3_uhdIters=5 -cp3_uhdEE -cp3_uhdTrans -bce -bce-cle -no-bce-bce -dense -xor -no-xorFindSubs -xorEncSize=3 -xorLimit=100000 -no-xorKeepUsed -cp3_iters=2 -ee -cp3_ee_level=3 -cp3_ee_it -rlevel=2 -bve_early -revMin -init-act=3 -actStart=2048 -keepWorst=0.01 -whiteList=whitelist.txt" $input_file > /dev/null
  with open(input_file, "r") as file:
    c_o = int(file.readline()[:-1].split(" ")[3])
  with open(output_file, "r") as file:
    c_n = int(file.readline()[:-1].split(" ")[3])
  if (c_o < c_n): return input_file
  else: return output_file

## SDD

In [None]:
from circuits.linear import Classifier
from pysdd.sdd import SddManager
from graphviz import Source
import warnings
warnings.filterwarnings('ignore')

def plot_the_sdd(node, n_variables):
  result = node.dot()
  if (n_variables < 9):
    SUB = {0: "\N{SUBSCRIPT ZERO}",
          1: "\N{SUBSCRIPT ONE}",
          2: "\N{SUBSCRIPT TWO}",
          3: "\N{SUBSCRIPT THREE}",
          4: "\N{SUBSCRIPT FOUR}",
          5: "\N{SUBSCRIPT FIVE}",
          6: "\N{SUBSCRIPT SIX}",
          7: "\N{SUBSCRIPT SEVEN}",
          8: "\N{SUBSCRIPT EIGHT}",
          9: "\N{SUBSCRIPT NINE}"}
    for i in range(1, n_variables+1): result = result.replace(chr(i + 64)+"\"","x"+SUB[i]+" \"").\
                                                      replace(chr(i + 64)+"|","x"+SUB[i]+" |").replace("&not;&not;", "")
  return Source(result)

## dDBCSFi(2)

In [None]:
from math import factorial
import pickle

class InvalidInputs4dDBCSFi_2(Exception):
    def __init__(self):
        super().__init__("You need to pass a SDD or a binary perceptron.")
        
class InvalidInputs4SHAP(Exception):
    def __init__(self):
        super().__init__("You need to pass vectors with the correct number of variables.")

class dDBCSFi_2:
  def __init__(self, n_variables, SDD=None, perceptron=None):
    if (SDD == None) and (perceptron == None): raise InvalidInputs4dDBCSFi_2()
    self.SDD = SDD
    self.perceptron = perceptron
    self.formula = []
    self.variables = []
    self.n_vars = n_variables
    self.vars = np.arange(1, n_variables+1, dtype=np.int8)
    self.probabilities = [0.5]*n_variables

  def __compile_bnn_step(self, disjunction):
    formula = []
    set_o = set()
    #Iterates over all elements of the disjunction
    for conjunction in disjunction.elements():
      set_a = set()
      elements = []
      #Iterate over all the elements of the conjunction
      for element in conjunction:
        if (element.is_literal()): #Element
          ide = str(element)[13:str(element).index(",")]
          if (ide[:1] == "-"):
            set_a.add(self.vars[int(ide[1:])-1])
            ide = -self.vars[int(ide[1:])-1]
          else:
            set_a.add(self.vars[int(ide)-1])
            ide = self.vars[int(ide)-1]
        elif (element.is_decision()): #Formula
          ide = self.__compile_bnn_step(element)
          for id in ide[1]: set_a.add(self.vars[id-1])
          ide = ide[0]
          if (ide == []): ide = "0"
        elif (element.is_true()): ide = "1" #True
        else: ide = "0" #False
        if (ide != "1"):
          elements += [ide]
          if (ide == "0"): break
      #Check that the conjunction is not always F
      if ("0" in elements): continue
      else:
        #Check if the conjunction is always T
        if (elements == []):
          formula = "1"
          set_o = set()
          break
        else: #The conjunction varies
          if (not isinstance(formula, list)) or (formula != []): #(When the formula is a negative number, it seems to be confusing
                                                                 #it with a list, so I had to add a condition to see if it's a list)
            for id in set_o: #Check that the new formula matches the old
              if (id not in set_a):
                if (len(elements) == 1): elements = [False, elements[0], [True, self.vars[id-1], -self.vars[id-1]]]
                elif (len(elements) == 2): elements = [False, [False]+elements, [True, self.vars[id-1], -self.vars[id-1]]]
                else: elements = [False, elements, [True, self.vars[id-1], -self.vars[id-1]]]
            for id in set_a: #Check that the old formula matches the new
              if (id not in set_o):
                if (formula == []): formula = [True, self.vars[id-1], -self.vars[id-1]]
                else: formula = [False, formula, [True, self.vars[id-1], -self.vars[id-1]]]
                set_o.add(self.vars[id-1])
            #Combine the formulas
            if (len(elements) == 1): formula = [True, formula, elements[0]]
            elif (len(elements) == 2): formula = [True, formula, [False]+elements]
            else: formula = [True, formula, elements]
          else: #No need to smooth
            for id in set_a: set_o.add(self.vars[id-1])
            if (len(elements) == 1): formula = elements[0]
            else: formula = [False]+elements
    return formula, set_o

  def __print_formula_step(self, formula, depth=0):
    if (formula[0]): print(" "*depth+"Or")
    else: print(" "*depth+"And")
    if isinstance(formula[1], list):
      self.__print_formula_step(formula[1], depth+1)
    else:
      print(" "*(depth+1)+str(formula[1]))
    if isinstance(formula[2], list):
      self.__print_formula_step(formula[2], depth+1)
    else:
      print(" "*(depth+1)+str(formula[2]))

  def __evaluate_formula_step(self, formula, assignment=[]):
    #Check the first element
    if isinstance(formula[1], list): value1 = self.__evaluate_formula_step(formula[1], assignment)
    else: value1 = (assignment[abs(formula[1])-1] == np.sign(formula[1]))
    #Check the second element
    if isinstance(formula[2], list): value2 = self.__evaluate_formula_step(formula[2], assignment)
    else: value2 = (assignment[abs(formula[2])-1] == np.sign(formula[2]))
    #Evaluate the formula (I think they can be combined, but for now I don't need to and it's easier to read that way)
    if (formula[0]): #It's a disjunction
      if (value1) or (value2): finalvalue = True
      else: finalvalue = False
    else: #It's a conjunction
      if (value1) and (value2): finalvalue = True
      else: finalvalue = False
    return finalvalue

  def __count_nodes_step(self, formula):
    #Check the first element
    if isinstance(formula[1], list): value1 = self.__count_nodes_step(formula[1])
    else: value1 = 1
    #Check the second element
    if isinstance(formula[2], list): value2 = self.__count_nodes_step(formula[2])
    else: value2 = 1
    return value1+value2+1
  
  def __encode_perceptron(self):
    the_weights = self.perceptron.layers[-1].get_weights()[0].reshape(-1)
    the_bias = self.perceptron.layers[-1].get_weights()[1].reshape(-1)
    D = ceil((-the_bias + the_weights.sum())/2) + (the_weights == -1).sum()
    previous = {}
    smoothness = []
    for id_input in range(self.n_vars):
      actual = {}
      if (the_weights[id_input] == 1): x = self.vars[id_input]
      else: x = -self.vars[id_input]
      for d in range(D):
        if (id_input < d): break
        if (self.n_vars < id_input+1+D-(d+1)): continue
        if (d == 0):
          if (id_input == 0): actual[d] = x
          else: actual[d] = [True, [False, x, smoothness], [False, -x, previous[d]]]
        elif (id_input == d): actual[d] = [False, x, previous[d-1]]
        else: actual[d] = [True, [False, x, previous[d-1]], [False, -x, previous[d]]]
      previous = actual
      if (smoothness == []): smoothness = [True, self.vars[id_input], -self.vars[id_input]]
      else: smoothness = [False, smoothness, [True, self.vars[id_input], -self.vars[id_input]]]
    return previous[D-1], set(self.vars.flatten())

  def compile_bnn(self):
    if (self.SDD == None):
      self.formula, self.variables = self.__encode_perceptron()
    else:
      if (self.SDD.is_false()): self.formula, self.variables = "1", set()
      elif (self.SDD.is_true()): self.formula, self.variables = [], set()
      elif (self.SDD.is_decision()):
        self.formula, self.variables = self.__compile_bnn_step(self.SDD)
        if (not isinstance(self.formula, list)) and (self.formula != "1"): self.formula = [self.formula]
      else:
        ide = str(self.SDD)[13:str(self.SDD).index(",")]
        if (ide[:1] == "-"): self.formula, self.variables = [self.vars[int(ide[1:])-1]], set([self.vars[int(ide[1:])-1]])
        else: self.formula, self.variables = [-self.vars[int(ide)-1]], set([self.vars[int(ide)-1]])

  def print_formula(self, profundidad=0):
    if (self.formula == "1"): print("It's always True")
    elif (self.formula == []): print("It's always False")
    elif (len(self.formula) != 1): self.__print_formula_step(self.formula, profundidad)
    else: print(self.formula[0])

  def evaluate_formula(self, assignment=[]):
    if (self.formula == "1"): return 1.0
    elif (self.formula == []): return 0.0
    elif (len(self.formula) != 1): return float(self.__evaluate_formula_step(self.formula, assignment))
    else: return float(assignment[abs(self.formula[0])-1] == np.sign(self.formula[0]))
  
  def evaluate_formulas(self, assignments=[]):
    results = []
    for assignment in np.array(assignments):
      results += [self.evaluate_formula(assignment)]
    return results

  def count_nodes(self):
    if (self.formula == "1") or (self.formula == []): return 1
    elif (len(self.formula) == 1): return 1
    else: return self.__count_nodes_step(self.formula)

  def change_probabilities(self, new_probabilities):
    self.probabilities = new_probabilities

  def __get_gammas_and_deltas(self, formula, n_variable, vector):
    #Check the first element
    if isinstance(formula[1], list): gammas1, deltas1, variables1 = self.__get_gammas_and_deltas(formula[1], n_variable, vector)
    else:
      if (formula[1] == n_variable):
        gammas1, deltas1 = [1], [0]
        variables1 = set([formula[1]])
      elif (formula[1] == -n_variable):
        gammas1, deltas1 = [0], [1]
        variables1 = set([abs(formula[1])])
      elif (formula[1] == abs(formula[1])):
        gammas1 = [self.probabilities[formula[1]-1], vector[formula[1]-1]]
        deltas1 = [self.probabilities[formula[1]-1], vector[formula[1]-1]]
        variables1 = set([formula[1]])
      else:
        gammas1 = [1-self.probabilities[abs(formula[1])-1], 1-vector[abs(formula[1])-1]]
        deltas1 = [1-self.probabilities[abs(formula[1])-1], 1-vector[abs(formula[1])-1]]
        variables1 = set([abs(formula[1])])
    #Check the second element
    if isinstance(formula[2], list): gammas2, deltas2, variables2 = self.__get_gammas_and_deltas(formula[2], n_variable, vector)
    else:
      if (formula[2] == n_variable):
        gammas2, deltas2 = [1], [0]
        variables2 = set([formula[2]])
      elif (formula[2] == -n_variable):
        gammas2, deltas2 = [0], [1]
        variables2 = set([abs(formula[2])])
      elif (formula[2] == abs(formula[2])):
        gammas2 = [self.probabilities[formula[2]-1], vector[formula[2]-1]]
        deltas2 = [self.probabilities[formula[2]-1], vector[formula[2]-1]]
        variables2 = set([formula[2]])
      else:
        gammas2 = [1-self.probabilities[abs(formula[2])-1], 1-vector[abs(formula[2])-1]]
        deltas2 = [1-self.probabilities[abs(formula[2])-1], 1-vector[abs(formula[2])-1]]
        variables2 = set([abs(formula[2])])
    gammas, deltas, variables = [], [], set()
    #It's a disjunction
    if (formula[0]):
      for l in range(len(gammas1)):
        gammas += [gammas1[l] + gammas2[l]]
        deltas += [deltas1[l] + deltas2[l]]
      variables = variables1.copy()
    #It's a conjunction
    else:
      variables = variables1.union(variables2)
      for l in range(len(variables) - (n_variable in variables) + 1):
        gamma_temp = 0
        delta_temp = 0
        for l1 in range(len(gammas1)):
          for l2 in range(len(gammas2)):
            if (l1+l2 == l):
              gamma_temp += gammas1[l1]*gammas2[l2]
              delta_temp += deltas1[l1]*deltas2[l2]
        gammas += [gamma_temp]
        deltas += [delta_temp]
    return gammas, deltas, variables

  def obtain_SHAP(self, n_variable, vector):
    if (len(vector) != len(self.probabilities)): raise InvalidInputs4SHAP()
    if (self.formula == "1"): return 1/len(self.probabilities)
    elif (self.formula == []): return -1/len(self.probabilities)
    else:
      gammas, deltas = self.__get_gammas_and_deltas(self.formula, n_variable, vector)[:2]
      SHAP = 0
      n_X = len(gammas)
      for i in range(n_X):
        SHAP += (factorial(i)*factorial(n_X-i-1)/factorial(n_X))*(vector[n_variable-1] - self.probabilities[n_variable-1])*(gammas[i] - deltas[i])
      return SHAP

  def obtain_SHAPs(self, vector):
    SHAPs = []
    for i in range(len(vector)):
      SHAPs += [self.obtain_SHAP(i+1, vector)]
    return SHAPs

  def __form_graph(self, formula, level=0):
    self.the_level = min(self.the_level, level)
    if (formula[0]): #It's a disjunction
      id_a = "o"+str(self.ind_o)
      self.ind_o += 1
      self.G.add_node(id_a, layer=level)
      self.nodes_color += ["hotpink"]
      self.labels[id_a] = r"$\vee$"
    else: #It's a conjunction
      id_a = "y"+str(self.ind_a)
      self.ind_a += 1
      self.G.add_node(id_a, layer=level)
      self.nodes_color += ["c"]
      self.labels[id_a] = r"$\wedge$"
    for i in [1,2]:
      if (isinstance(formula[i], list)):
        id_b = self.__form_graph(formula[i], level-1)
      else:
        if (formula[i] == abs(formula[i])):
          id_b = "e"+str(self.ind_E)
          self.ind_E += 1
          self.G.add_node(id_b, layer=level-1)
          self.nodes_color += ["moccasin"]
          self.labels[id_b] = fr"$x_{formula[i]}$"
        else:
          id_b = "e"+str(self.ind_E)
          self.ind_E += 1
          self.G.add_node(id_b, layer=level-1)
          self.nodes_color += ["limegreen"]
          self.labels[id_b] = fr"$-x_{abs(formula[i])}$"
      self.G.add_edge(id_a, id_b)
    return id_a

  def __form_graph_alt(self, formula, level=0):
    self.the_level = min(self.the_level, level)
    if (formula[0]): #It's a disjunction
      id_a = "o"+str(self.ind_o)
      self.ind_o += 1
      self.G.add_node(id_a, layer=level)
      self.nodes_color += ["hotpink"]
      self.labels[id_a] = r"$\vee$"
    else: #It's a conjunction
      id_a = "y"+str(self.ind_a)
      self.ind_a += 1
      self.G.add_node(id_a, layer=level)
      self.nodes_color += ["c"]
      self.labels[id_a] = r"$\wedge$"
    for i in [1,2]:
      if (isinstance(formula[i], list)):
        id_b = self.__form_graph_alt(formula[i], level-1)
      else:
        if (formula[i] == abs(formula[i])):
          id_b = "e"+str(self.ind_E)
          self.ind_E += 1
          self.G.add_node(id_b, layer=level-1)
          self.nodes_color += ["moccasin"]
          self.labels[id_b] = fr"$x_{formula[i]}$"
        else:
          id_b = "n"+str(self.ind_n)
          id_c = "e"+str(self.ind_E)
          self.ind_n += 1
          self.ind_E += 1
          self.G.add_node(id_b, layer=level-1)
          self.G.add_node(id_c, layer=level-2)
          self.nodes_color += ["limegreen"]
          self.nodes_color += ["moccasin"]
          self.labels[id_b] = fr"$-$"
          self.labels[id_c] = fr"$x_{abs(formula[i])}$"
          self.G.add_edge(id_c, id_b)
      self.G.add_edge(id_b, id_a)
    return id_a

  def plot_formula(self, resplt=10, alternative=False):
    self.labels = {}
    self.nodes_color = []
    self.ind_a = 1
    self.ind_o = 1
    self.ind_T = 1
    self.ind_E = 1
    self.ind_n = 1
    self.the_level = 0
    self.G = nx.DiGraph()
    if (self.formula == "1"):
      self.G.add_node("T", layer=1)
      self.nodes_color += ["coral"]
      self.labels["T"] = fr"$T$"
    elif (self.formula == []):
      self.G.add_node("F", layer=1)
      self.nodes_color += ["thistle"]
      self.labels["F"] = fr"$F$"
    elif (len(self.formula) == 1):
      if (self.formula[0] == abs(self.formula[0])):
        self.G.add_node("e", layer=1)
        self.nodes_color += ["moccasin"]
        self.labels["e"] = fr"$x_{self.formula[0]}$"
      else:
        self.G.add_node("e", layer=1)
        self.nodes_color += ["limegreen"]
        self.labels["e"] = fr"$-x_{abs(self.formula[0])}$"
    elif (alternative): ignorable = self.__form_graph_alt(self.formula)
    else: ignorable = self.__form_graph(self.formula)
    if (self.the_level == 0): self.the_level = 1
    pos = nx.multipartite_layout(self.G, subset_key="layer", align="horizontal")
    plt.clf()
    plt.figure(figsize=[resplt,resplt])
    nx.draw_networkx(self.G, pos, node_size=200*resplt/abs(self.the_level), with_labels=True, node_color=self.nodes_color,
                     labels=self.labels, font_size=16*resplt/(abs(self.the_level)+resplt), arrows=alternative, width=(1+alternative))
    plt.show()
    del self.labels, self.nodes_color, self.ind_a, self.ind_o, self.ind_T, self.ind_E, self.the_level, self.G

  def corroborate_equivalence(self, the_model, test=-1):
    equivalent = True
    combinations = np.array(list(itertools.product([-1, 1], repeat=self.n_vars)))
    if (test < 0): relevants = len(combinations)
    else: relevants = test
    for i in combinations[:relevants]:
      res_net = the_model.predict(i.reshape(1,-1), verbose=0)[0][0]
      res_cir = self.evaluate_formula(i)
      if (res_net != res_cir):
        equivalent = False
        break
    if (equivalent): print(f'They are equivalent for the {min(relevants, len(combinations))} values reviewed')
    else: print("They aren't equivalent")

## SHAP

In [None]:
###Based on "shap-score-script.py" of Maximilian Schleich
import pandas as pd
from multiprocessing import Pool

#Method to compute all required factorials
def factorial_list(N):
  fact = [0] * (N + 1)
  fact[0] = 1
  fact[1] = 1
  for num in range(2, N + 1): fact[num] = fact[num-1] * num
  return fact

def generate_subsets(n_vairables, protected_variable, subsets=[[]], actual_variable=0):
  if (n_vairables == actual_variable): return subsets
  elif (protected_variable==actual_variable): return generate_subsets(n_vairables, protected_variable, subsets, actual_variable+1)
  else: return generate_subsets(n_vairables, protected_variable, [i+[actual_variable] for i in subsets]+subsets, actual_variable+1)

def SHAP_values(all_combinations, entity, counter, total):
  n_variables = all_combinations.shape[1]-1
  factorials = factorial_list(n_variables)
  cols = all_combinations.columns
  SHAPs = []
  for variable in range(n_variables):
    SHAP = 0
    subsets_variables = generate_subsets(n_variables, variable)
    for subset in subsets_variables:
      if (len(subset) > 0):
        cond = f'`{cols[subset[0]]}` == {entity[subset[0]]}'
        if (len(subset) > 1):
          for x in subset[1:]: cond += f' & `{cols[x]}` == {entity[x]}'
        rows_without_variable = all_combinations.query(cond)
      else: rows_without_variable = all_combinations
      aux = rows_without_variable.query(f'`{cols[variable]}` == {entity[variable]}')['prediction'].sum()/(2**(n_variables-1-len(subset)))
      aux -= rows_without_variable['prediction'].sum()/(2**(n_variables-len(subset)))
      SHAP += (factorials[len(subset)]*factorials[n_variables-len(subset)-1]/factorials[n_variables])*aux
    SHAPs += [SHAP]
  if (counter%4 == 1): print(counter, "of", total, "|", time.asctime(time.localtime(time.time())))
  return SHAPs

def SHAP_matrix_bnn(model, df, limit_d=None, limit_u=None): 
  shap_mat = [] 
  res = []
  full_df = pd.DataFrame(np.array(list(itertools.product([-1, 1], repeat=model.input_shape[1]))), columns=df.columns)
  full_df['prediction'] = model.predict(full_df)[:,0]
  clean_df = df.drop_duplicates().reset_index(drop=True)
  if (limit_u != None): clean_df = clean_df.head(limit_u)
  if (limit_d != None): clean_df = clean_df.tail(-limit_d)
  with Pool(processes=1, maxtasksperchild=100) as pool:
    for index, entity in clean_df.iterrows():
      res.append(pool.apply_async(SHAP_values, (full_df, entity, index+1, limit_u)))
    for r in res: 
      shap_mat.append(r.get())
  return pd.DataFrame(shap_mat, columns=df.columns)

def SHAP_matrix_bc(circuit, df, limit_d=None, limit_u=None):
  shap_mat = [] 
  res = []
  full_df = pd.DataFrame(np.array(list(itertools.product([-1, 1], repeat=circuit.n_vars))), columns=df.columns)
  full_df['prediction'] = circuit.evaluate_formulas(full_df)
  clean_df = df.drop_duplicates().reset_index(drop=True)
  if (limit_u != None): clean_df = clean_df.head(limit_u)
  if (limit_d != None): clean_df = clean_df.tail(-limit_d)
  with Pool(processes=1, maxtasksperchild=100) as pool:
    for index, entity in clean_df.iterrows():
      res.append(pool.apply_async(SHAP_values, (full_df, entity, index+1, limit_u)))
    for r in res: 
      shap_mat.append(r.get())
  return pd.DataFrame(shap_mat, columns=df.columns)

def SHAP_matrix_bc_fast(circuit, df, limit_d=None, limit_u=None):
  SHAPs = []
  counter = 1
  clean_df = df.drop_duplicates()
  if (limit_u != None): clean_df = clean_df.head(limit_u)
  if (limit_d != None): clean_df = clean_df.tail(-limit_d)
  for observation in (clean_df.to_numpy()+1)/2:
    if (counter%10 == 1): print(counter, "of", len(clean_df), "|", time.asctime(time.localtime(time.time())))
    counter += 1
    SHAPs += [circuit.obtain_SHAPs(observation)]
  return pd.DataFrame(SHAPs, columns=df.columns)

## Preprocessing of the dataset

Data source: https://www.kaggle.com/datasets/camnugent/california-housing-prices

In [None]:
print("Raw data")
dataset_url = "https://raw.githubusercontent.com/Jorvan758/dDBCSFi2/main/housingc.csv"
pd.read_csv(dataset_url)

In [None]:
pd.concat([pd.read_csv(dataset_url).min(axis=0), pd.read_csv(dataset_url).max(axis=0)], axis=1, keys=["min", "max"])

In [None]:
pd.read_csv(dataset_url)["ocean_proximity"].drop_duplicates()

In [None]:
print("Preprocessed data")
df = pd.read_csv(dataset_url)
df = (df > df.mean(axis=0))
df = pd.concat([pd.get_dummies(pd.read_csv("housingc.csv")["ocean_proximity"], prefix="op").astype(int),
                df.drop("ocean_proximity", axis=1).astype(int)], axis=1)
df

In [None]:
pearson_corr = df.corr(method='pearson').round(3)
plt.tight_layout()
fig, ax = plt.subplots(figsize=(len(df.columns)-1,len(df.columns)-1))
ax = sns.heatmap(pearson_corr,cmap='magma',linecolor='black',linewidths=1,annot=True)
ax.set_title('Pearson correlation')
plt.show()

In [None]:
X_train, X_test, y_train, y_test = train_test_split(df.drop("median_house_value", axis=1), df["median_house_value"],
                                                    test_size=0.5, stratify=df["median_house_value"], random_state=7)
X_train = X_train*2-1
X_test = X_test*2-1
X_train, X_test, y_train, y_test

## BNN training and testing

In [None]:
set_random_seeds()

n_input = len(X_train.columns)
n_neurons = n_input
input_layer = layers.Input(shape=(n_input,), name="Input_layer")
dense_layer1 = lq.layers.QuantDense(n_neurons, kernel_quantizer="ste_sign", kernel_constraint="weight_clip",
                                    kernel_initializer=RandomNormal(stddev=0.1),
                                    bias_initializer=RandomNormal(stddev=0.1),
                                    use_bias=True, name="Dense_layer_1")(input_layer)
output_layer = lq.layers.QuantDense(1, input_quantizer="ste_sign", kernel_quantizer="ste_sign",
                                    kernel_constraint="weight_clip", activation="sigmoid",
                                    kernel_initializer=RandomNormal(stddev=0.1),
                                    bias_initializer=RandomNormal(stddev=0.1),
                                    use_bias=True, name="Output_layer")(dense_layer1)
The_model = models.Model(input_layer, output_layer, name="The_model")
The_model.compile(optimizer='adam', loss="binary_crossentropy", metrics=['accuracy'])
The_model.summary()
print()

n_epochs = 200
callback = callbacks.EarlyStopping(monitor='loss', patience=n_epochs//5, restore_best_weights=True)
beginning = time.monotonic()
record = The_model.fit(X_train, y_train, batch_size=len(X_train), epochs=n_epochs, verbose=1,
                          validation_data=(X_test, y_test), shuffle=True, callbacks=[callback])
duration = time.monotonic() - beginning
print("\nTime taken to train:", seconds_separator(duration),"\n")

fig, axs= plt.subplots(nrows=1, ncols=2, figsize=(12,5))
plt.sca(axs[0])
plt.plot(record.epoch[1:]+[len(record.epoch)], record.history["loss"], label = "Training data")
plt.plot(record.epoch[1:]+[len(record.epoch)], record.history["val_loss"], label = "Test data")
plt.title("Loss")
plt.xlabel("Number of epochs")
plt.ylabel("Loss")
plt.legend(loc="upper right")
plt.sca(axs[1])
plt.plot(record.epoch[1:]+[len(record.epoch)], record.history["accuracy"], label = "Training data")
plt.plot(record.epoch[1:]+[len(record.epoch)], record.history["val_accuracy"], label = "Test data")
plt.title("Accuracy")
plt.xlabel("Number of epochs")
plt.ylabel("Accuracy")
plt.legend(loc="lower right")
plt.show()

In [None]:
print("Before binarizing (Loss & Accuracy)")
print("Training:", The_model.evaluate(X_train, y_train, verbose=0))
print("Testing:", The_model.evaluate(X_test, y_test, verbose=0))
The_model.layers[2].activation = step_sigm
The_model.set_weights([((i>0)*2-1)*(i.ndim==2)+(i)*(i.ndim==1) for i in The_model.get_weights()])
print("\nAfter binarizing (Loss & Accuracy)")
print("Training:", The_model.evaluate(X_train, y_train, verbose=0))
print("Testing:", The_model.evaluate(X_test, y_test, verbose=0))

## Conversion of the BNN to a dDBCSFi(2)

In [None]:
beginning = time.monotonic()
cnff_name = 'CNFf.cnf'
cnff_name = encode_network(The_model, cnff_name)
duration = time.monotonic() - beginning
print("Time taken to create the formula:", seconds_separator(duration),"\n")

beginning = time.monotonic()
mgr = SddManager()
ssd_manager, node = mgr.from_cnf_file(bytes(cnff_name, encoding='utf-8'))
duration = time.monotonic() - beginning
print("Time taken to create the SDD:", seconds_separator(duration),"\n")

beginning = time.monotonic()
The_circuit = dDBCSFi_2(n_input, SDD=node)
The_circuit.compile_bnn()
duration = time.monotonic() - beginning
print("Time taken to create the dDBCSFi(2):", seconds_separator(duration))

In [None]:
beginning = time.monotonic()
The_circuit.corroborate_equivalence(The_model, -1)
duration = time.monotonic() - beginning
print("Time taken to verify the equivalence of dDBCSFi(2) with the BNN:", seconds_separator(duration))

print("\nThe circuit has", The_circuit.count_nodes(), "nodes")

In [None]:
plot_the_network(The_model, 20)

In [None]:
The_circuit.plot_formula(60)

# Experiments

## SHAP on the BNN, black-box

In [None]:
beginning = time.monotonic()
SHAPs_1 = SHAP_matrix_bnn(The_model, X_train, None, 20)
duration = time.monotonic() - beginning
print("\nTime taken to calculate the SHAPs for the original model:", seconds_separator(duration))

SHAPs_1 = pd.DataFrame(SHAPs_1, columns=X_train.columns)
SHAPs_1.to_csv("BNN_SHAPs_direct_1-20.csv")
SHAPs_1

In [None]:
beginning = time.monotonic()
SHAPs_1 = SHAP_matrix_bnn(The_model, X_train, 20, 40)
duration = time.monotonic() - beginning
print("\nTime taken to calculate the SHAPs for the original model:", seconds_separator(duration))

SHAPs_1 = pd.DataFrame(SHAPs_1, columns=X_train.columns)
SHAPs_1.to_csv("BNN_SHAPs_direct_21-40.csv")
SHAPs_1

In [None]:
beginning = time.monotonic()
SHAPs_1 = SHAP_matrix_bnn(The_model, X_train, 40, 60)
duration = time.monotonic() - beginning
print("\nTime taken to calculate the SHAPs for the original model:", seconds_separator(duration))

SHAPs_1 = pd.DataFrame(SHAPs_1, columns=X_train.columns)
SHAPs_1.to_csv("BNN_SHAPs_direct_41-60.csv")
SHAPs_1

In [None]:
beginning = time.monotonic()
SHAPs_1 = SHAP_matrix_bnn(The_model, X_train, 60, 80)
duration = time.monotonic() - beginning
print("\nTime taken to calculate the SHAPs for the original model:", seconds_separator(duration))

SHAPs_1 = pd.DataFrame(SHAPs_1, columns=X_train.columns)
SHAPs_1.to_csv("BNN_SHAPs_direct_61-80.csv")
SHAPs_1

In [None]:
beginning = time.monotonic()
SHAPs_1 = SHAP_matrix_bnn(The_model, X_train, 80, 100)
duration = time.monotonic() - beginning
print("\nTime taken to calculate the SHAPs for the original model:", seconds_separator(duration))

SHAPs_1 = pd.DataFrame(SHAPs_1, columns=X_train.columns)
SHAPs_1.to_csv("BNN_SHAPs_direct_81-100.csv")
SHAPs_1

In [None]:
SHAPs_1 = pd.concat([pd.read_csv('BNN_SHAPs_direct_1-20.csv').iloc[:,1:],
                     pd.read_csv('BNN_SHAPs_direct_21-40.csv').iloc[:,1:],
                     pd.read_csv('BNN_SHAPs_direct_41-60.csv').iloc[:,1:],
                     pd.read_csv('BNN_SHAPs_direct_61-80.csv').iloc[:,1:],
                     pd.read_csv('BNN_SHAPs_direct_81-100.csv').iloc[:,1:]]).reset_index(drop=True)
print(pd.concat([SHAPs_1.mean(axis=0).round(6), SHAPs_1.std(axis=0).round(6)], axis=1), "\n\n",
      pd.concat([SHAPs_1.abs().mean(axis=0).round(6), SHAPs_1.abs().std(axis=0).round(6)], axis=1), sep='')

## SHAP on the dDBCSFi(2), black-box

In [None]:
beginning = time.monotonic()
SHAPs_2 = SHAP_matrix_bc(The_circuit, X_train, None, 20)
duration = time.monotonic() - beginning
print("\nTime taken to calculate the SHAPs for the circuit, without the algorithm:", seconds_separator(duration))

SHAPs_2 = pd.DataFrame(SHAPs_2, columns=X_train.columns)
SHAPs_1.to_csv("dDBCSFi(2)_SHAPs_direct_1-20.csv")
SHAPs_2

In [None]:
beginning = time.monotonic()
SHAPs_2 = SHAP_matrix_bc(The_circuit, X_train, 20, 40)
duration = time.monotonic() - beginning
print("\nTime taken to calculate the SHAPs for the circuit, without the algorithm:", seconds_separator(duration))

SHAPs_2 = pd.DataFrame(SHAPs_2, columns=X_train.columns)
SHAPs_1.to_csv("dDBCSFi(2)_SHAPs_direct_21-40.csv")
SHAPs_2

In [None]:
beginning = time.monotonic()
SHAPs_2 = SHAP_matrix_bc(The_circuit, X_train, 40, 60)
duration = time.monotonic() - beginning
print("\nTime taken to calculate the SHAPs for the circuit, without the algorithm:", seconds_separator(duration))

SHAPs_2 = pd.DataFrame(SHAPs_2, columns=X_train.columns)
SHAPs_1.to_csv("dDBCSFi(2)_SHAPs_direct_41-60.csv")
SHAPs_2

In [None]:
beginning = time.monotonic()
SHAPs_2 = SHAP_matrix_bc(The_circuit, X_train, 60, 80)
duration = time.monotonic() - beginning
print("\nTime taken to calculate the SHAPs for the circuit, without the algorithm:", seconds_separator(duration))

SHAPs_2 = pd.DataFrame(SHAPs_2, columns=X_train.columns)
SHAPs_1.to_csv("dDBCSFi(2)_SHAPs_direct_61-80.csv")
SHAPs_2

In [None]:
beginning = time.monotonic()
SHAPs_2 = SHAP_matrix_bc(The_circuit, X_train, 80, 100)
duration = time.monotonic() - beginning
print("\nTime taken to calculate the SHAPs for the circuit, without the algorithm:", seconds_separator(duration))

SHAPs_2 = pd.DataFrame(SHAPs_2, columns=X_train.columns)
SHAPs_1.to_csv("dDBCSFi(2)_SHAPs_direct_81-100.csv")
SHAPs_2

In [None]:
SHAPs_2 = pd.concat([pd.read_csv('dDBCSFi(2)_SHAPs_direct_1-20.csv').iloc[:,1:],
                     pd.read_csv('dDBCSFi(2)_SHAPs_direct_21-40.csv').iloc[:,1:],
                     pd.read_csv('dDBCSFi(2)_SHAPs_direct_41-60.csv').iloc[:,1:],
                     pd.read_csv('dDBCSFi(2)_SHAPs_direct_61-80.csv').iloc[:,1:],
                     pd.read_csv('dDBCSFi(2)_SHAPs_direct_81-100.csv').iloc[:,1:]]).reset_index(drop=True)
print(pd.concat([SHAPs_1.mean(axis=0).round(6), SHAPs_1.std(axis=0).round(6)], axis=1), "\n\n",
      pd.concat([SHAPs_1.abs().mean(axis=0).round(6), SHAPs_1.abs().std(axis=0).round(6)], axis=1), sep='')

## SHAP on the dDBCSFi(2), open-box

In [None]:
beginning = time.monotonic()
SHAPs_3 = SHAP_matrix_bc_fast(The_circuit, X_train, None, 100)
duration = time.monotonic() - beginning
print("\nTime taken to calculate the SHAPs for the circuit, with the algorithm:", seconds_separator(duration))

SHAPs_3 = pd.DataFrame(SHAPs_3, columns=X_train.columns)
SHAPs_3.to_csv("dDBCSFi(2)_SHAPs_efficient.csv")
SHAPs_3

In [None]:
print(pd.concat([SHAPs_3.mean(axis=0).round(6), SHAPs_3.std(axis=0).round(6)], axis=1), "\n\n",
      pd.concat([SHAPs_3.abs().mean(axis=0).round(6), SHAPs_3.abs().std(axis=0).round(6)], axis=1), sep='')