In [None]:
import os
import sys
sys.path.append('../..')
import numpy as np
import deepbayesHF
import deepbayesHF.optimizers as optimizers
from deepbayesHF import PosteriorModel
from deepbayesHF.analyzers import FGSM
from deepbayesHF.analyzers import eps_LRP
import tensorflow as tf
from tensorflow.keras.models import *
from tensorflow.keras.layers import *
import cv2
import random
import matplotlib.pyplot as plt
from collections import namedtuple

from joblib import Parallel, delayed
import subprocess
from statistics import mode
import json
import tensorflow as tf

sys.path.append('../../IntegratedGradients')
from IntegratedGradients import *

In [None]:
# Load mnist data and scale down to SCALE (trying 14x14 initially)

(X_train, y_train), (X_test, y_test) = tf.keras.datasets.mnist.load_data()
SCALE = (14,14)
if not SCALE == X_train[0].shape:
    X_train = np.array(list(map(lambda x:cv2.resize(x,SCALE,interpolation=cv2.INTER_CUBIC),X_train)))
X_train = np.array(list(map(lambda x:cv2.cvtColor(x,cv2.COLOR_GRAY2RGB),X_train)))
X_train = X_train/255.
X_train = X_train.astype("float32").reshape(len(X_train), SCALE[0],SCALE[1],3)

if not SCALE == X_test.shape:
    X_test = np.array(list(map(lambda x:cv2.resize(x,SCALE,interpolation=cv2.INTER_CUBIC),X_test)))
X_test = np.array(list(map(lambda x:cv2.cvtColor(x,cv2.COLOR_GRAY2RGB),X_test)))
X_test = X_test/255.
X_test = X_test.astype("float32").reshape(len(X_test), SCALE[0], SCALE[1],3)

# make it a binary classification task (X or not X)
target = 8
y_train = np.array([1 if y == target else 0 for y in y_train])
y_test = np.array([1 if y == target else 0 for y in y_test])

# filter so we get about 50% target class and 50% other
target_idxs = [i for i in range(len(y_train)) if y_train[i] == 1]
other_idxs = [i for i in range(len(y_train)) if y_train[i] == 0]

#uncomment the shuffle for training only
#random.shuffle(other_idxs)
other_idxs = other_idxs[:len(target_idxs)]
# pick len(target_idxs) samples from the other_idxs
X_train = np.array([x for i,x in enumerate(X_train) if i in other_idxs or i in target_idxs])
y_train = np.array([y for i,y in enumerate(y_train) if i in other_idxs or i in target_idxs])

In [None]:
# train a Bayesian model
model_name = f'mnist{SCALE[0]}x{SCALE[1]}_32_16_binary_target{target}_coverage_rgb'
opt = optimizers.VariationalOnlineGuassNewton()
likelihood = tf.keras.losses.SparseCategoricalCrossentropy()

inputs = Input(shape=X_train[0].shape)
tmp = Flatten()(inputs)
tmp = Dense(256,activation='relu')(tmp)
tmp = Dense(128,activation='relu')(tmp)
predictions = Dense(2,activation='softmax')(tmp)
model = Model(inputs=inputs,outputs=predictions)

bayes_model = opt.compile(model,loss_fn=likelihood,
                          epochs=25, learning_rate=0.25,
                          inflate_prior=2.0, log_file='tmp/log.txt')
bayes_model.train(X_train,y_train,X_test,y_test)
bayes_model.save(model_name)

In [None]:
# read in our model
model_name = f'mnist{SCALE[0]}x{SCALE[1]}_32_16_binary_target{target}_coverage_rgb'
bayes_model = PosteriorModel(model_name)
y_pred = bayes_model.predict(X_test,n=50)
check_accuracy = tf.keras.metrics.Accuracy(name="train_acc")
check_accuracy(y_test,np.argmax(y_pred,axis=1))
print()
print('Loaded model accuracy:',f'{check_accuracy.result().numpy()*100:.2f}%')

In [None]:
N = 50
# pick a random input for testing
n = 0
while True:
    n = np.random.randint(len(y_train))
    X = X_train[n].reshape(1,*SCALE,3).astype(float)
    y_hat = np.argmax(bayes_model.predict(X,n=N))
    y_class = y_hat
    if y_hat == y_train[n]:
        break
        
plt.imshow(X.reshape(*SCALE,3))
print('Prediction:',y_class)
input_shape = X.flatten().shape

print("Index:",n)

In [None]:
tf.compat.v1.disable_eager_execution()

# naive Bayesian explanation generation

exps = []
X = X_train[n].reshape(1,*SCALE,3).astype(float)  
for i in range(50):   
    model = Sequential()

    model.add(Input(shape=X_train[0].shape))
    model.add(Flatten())
    model.add(Dense(256,activation='relu'))
    model.add(Dense(128,activation='relu'))
    model.add(Dense(2,activation='softmax'))

    model_weights = bayes_model.sample()
    model.set_weights(model_weights)
    #print(model.predict(X))
    model.compile(optimizer='sgd', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    model.fit(X,y_train[n].reshape(1,*y_train[n].shape),epochs=1,batch_size=1)
    model.set_weights(model_weights)
    #print(model.predict(X))

    ig = integrated_gradients(model)
    exp = ig.explain(X.reshape(*SCALE,3),outc=y_train[n])
    exp = exp[:,:,0]
    max_rel = np.max(exp)
    #limit = 0.05*max_rel
    #exp[exp < limit] = 0
    exp[exp > 0] = 1
    exps.append(exp)

In [None]:
# visualise result

cmap = dict()
names = []
es = []
for e in exps:
    if not str(e) in names:
        names.append(str(e))
        es.append(e)
        cmap[names.index(str(e))] = 0
    cmap[names.index(str(e))] += 1

res = max(cmap,key=cmap.get)
res_image = es[res]
plt.imshow(res_image)
plt.axis('off')
plt.show()
cov = (cmap[res]/50)*100

print("P_cover:",cov)

In [None]:
from memo import memo

@memo
def generate_min_exps(expl,threshold):
    exps = []
    for i in range(len(expl)):
        orig_expl = expl
        if orig_expl[i] == 0:
            continue
        else:
            if i == len(expl) - 1:
                s = sum(orig_expl[:i])
            else:
                s = sum(orig_expl[:i])+sum(orig_expl[i+1:])
            if s < threshold:
                exps.append(expl)
                break
            else:
                new_expl = tuple(orig_expl[:i]) + (0,)
                if i < len(expl)-1:
                    new_expl = new_expl + tuple(orig_expl[i+1:])
                new_exps = generate_min_exps(new_expl,threshold)
                exps += new_exps
    return exps

In [None]:
tf.compat.v1.disable_eager_execution()

# smarter method of generating covering explanation

exps = []
X = X_train[n].reshape(1,*SCALE,3).astype(float)

for i in range(50):

    model = Sequential()

    model.add(Input(shape=X_train[0].shape))
    model.add(Flatten())
    model.add(Dense(256,activation='relu'))
    model.add(Dense(128,activation='relu'))
    model.add(Dense(2,activation='softmax'))

    model_weights = bayes_model.sample()
    model.set_weights(model_weights)
    #print(model.predict(X))
    model.compile(optimizer='sgd', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    model.fit(X,y_train[n].reshape(1,*y_train[n].shape),epochs=1,batch_size=1)
    model.set_weights(model_weights)
    #print(model.predict(X))

    ig = integrated_gradients(model)
    exp = ig.explain(X.reshape(*SCALE,3),outc=y_train[n])
    exp = exp[:,:,0]
    max_rel = np.max(exp)
    #th = max(np.abs(np.min(ex)), np.abs(np.max(ex)))
    #plt.imshow(ex[:,:,0], cmap="seismic", vmin=-1*th, vmax=th)
    #plt.show()
    limit = 0.1*max_rel
    #limit = 0
    exp[exp < limit] = 0
    exp[exp > 0] = 1
    #plt.imshow(exp)
    #plt.show()
    exp_list = list(set(generate_min_exps(tuple(exp.flatten()),0.9*np.sum(exp))))
    exps += exp_list

In [None]:
# get covering explanation and probability
cmap = dict()
names = []
es = []
print(len(exps))
c = 0
for e in exps:
    if c < 10:
        plt.imshow(np.array(e).reshape(*SCALE))
        plt.show()
    c += 1
    if not str(e) in names:
        names.append(str(e))
        es.append(np.array(e).reshape(*SCALE))
        cmap[names.index(str(e))] = 0
    cmap[names.index(str(e))] += 1

res = max(cmap,key=cmap.get)
res_image = es[res]
plt.imshow(res_image)
plt.axis('off')
plt.show()
cov = (cmap[res]/50)*100

print("P_cover:",cov)