In [8]:
%load_ext autoreload
%autoreload 2
from sklearn import datasets
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from lark import Lark
import time 
from sklearn.preprocessing import MinMaxScaler
from sklearn import metrics
from IPython.display import Markdown, display
import pickle
import os.path
from trustable_explanation import helper_functions
from  trustable_explanation.query import Query
import operator
from trustable_explanation import example_queries
from trustable_explanation.teacher import Teacher
from trustable_explanation.learner import Learner
from trustable_explanation.sygus_if import SyGuS_IF
import pandas as pd
import numpy as np
from tqdm import tqdm
from sklearn.ensemble import RandomForestClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.model_selection import GridSearchCV
from sklearn import tree
from sklearn.metrics import roc_auc_score
from trustable_explanation.blackbox import BlackBox
import matplotlib.pyplot as plt
from data.objects import zoo, iris, adult
from trustable_explanation.example_queries import DistanceQuery
import datetime




The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [9]:
select_blackbox = ['decision tree','neural network', 'random forest'][0]

dataset = ['zoo', 'adult', 'iris'][0]

df = None

if(dataset == "zoo"):
    dataObj = zoo.Zoo()
    df = dataObj.get_df()
    # fix target class
    target_class = [4] 
    _temp = {}
    for i in range(1, len(df[dataObj.target].unique())+1):
        if(i in target_class):
            _temp[i] = 1
        else:
            _temp[i] = 0
    df[dataObj.target] = df[dataObj.target].map(_temp)
elif(dataset == "adult"):
    dataObj = adult.Adult() 
    df = dataObj.get_df()
elif(dataset == "iris"):
    dataObj = iris.Iris()
    df = dataObj.get_df()





# declaration of classifier, X and y
X = df.drop([dataObj.target], axis=1)
y = df[dataObj.target]

# Split dataset into training set and test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, shuffle = True, random_state=2) # 70% training and 30% test

display(Markdown("# Train the blackbox"))

model_name = None
if(select_blackbox == 'decision tree'):
    model_name = 'data/model/dt_' + dataset + '.pkl'
elif(select_blackbox == "random forest"):
    model_name = 'data/model/rf_' + dataset + '.pkl'
elif(select_blackbox == "neural network"):
    model_name = 'data/model/nn_' + dataset + '.pkl'

else:
    raise ValueError("Black box not defined")



if(not os.path.isfile(model_name)):
    clf = None
    if(select_blackbox == 'decision tree'):
        param_grid = {'max_depth': np.arange(3, 10)}
        grid_tree = GridSearchCV(tree.DecisionTreeClassifier(random_state=0), param_grid)
        grid_tree.fit(X_train, y_train)
        tree_preds = grid_tree.predict_proba(X_test)[:, 1]
        tree_performance = roc_auc_score(y_test, tree_preds)
        clf = grid_tree.best_estimator_
        print(helper_functions.tree_to_code(clf,X_train.columns.tolist()))
    elif(select_blackbox == "random forest"):
        clf = RandomForestClassifier(n_estimators=100)
        clf.fit(X_train,y_train)

    elif(select_blackbox == "neural network"):
        clf = MLPClassifier(random_state=1, max_iter=300).fit(X_train, y_train) 
        clf.fit(X_train,y_train)
        
    else:
        raise ValueError("Black box not defined")

    

    # save the classifier
    with open(model_name, 'wb') as fid:
        pickle.dump(clf, fid)    

else:
    print("Loding model")
    with open(model_name, 'rb') as fid:
        clf = pickle.load(fid)
    
    if(select_blackbox == "decision tree"):
        print(helper_functions.tree_to_code(clf,X_train.columns.tolist()))



# os.system("rm " + model_name)


# Train the blackbox

Loding model
def tree(hair, feathers, eggs, milk, airborne, aquatic, predator, toothed, backbone, breathes, venomous, fins, legs, tail, domestic, catsize):

    if fins <= 0.5:
        return 0
    else:
        if breathes <= 0.5:
            return 1
        else:
            return 0



In [10]:

# our query is a halfspace and conjunction of the following
queries = [
    
    {
     
    }    
    # {
    #     'fins' : (operator.eq, 0)
    # },
    # {
    #     'breathes' : (operator.ge, 0)
    # },
    # {
    #     'breathes' : (operator.eq, 1)
    # },
    # {
    #     'milk' : (operator.eq, 1)
    # },
    

]

from sklearn.linear_model import LogisticRegression
bb = None
if(select_blackbox == 'decision tree'):
    bb = BlackBox(clf, clf.predict)
elif(select_blackbox == "random forest"):
    bb = BlackBox(clf, clf.predict)
elif(select_blackbox == "neural network"):
    bb = BlackBox(clf, clf.predict)
else:
    raise ValueError("Black box not defined")


select_query = ['decision tree', 'specific input'][0]

for selected_learner  in ["decision tree", "logistic regression", "sygus"][2:]:
    for _query in queries:
            
        query_class = None
        X = y = None
        if(select_query == "decision tree"):
            # We define query specilized for decision tree
            query_class = example_queries.DecisionTree(features=X_train.columns.tolist(), halfspace=_query)
            X = []
            y = []
        elif(select_query == "specific input"):        
            specific_input = X_train.iloc[0].tolist()
            query_class = example_queries.DistanceQuery(specific_input=specific_input, threshold=0.1, features = X_train.columns.tolist())
            X = [specific_input]
            y = [clf.predict([specific_input])[0]]
            print("Class (black-box)", y)
            
        else:

            raise ValueError(select_query +" is not a defined query.")
        display(Markdown("### Query"))
        
        q = Query(model = None, prediction_function = query_class.predict_function_query)
        print(query_class)

        iterations = 1

        for syntactic_grammar in [True, False]:
            
            for idx in range(iterations):

                if(selected_learner == "sygus"):
                    sgf = SyGuS_IF(feature_names=dataObj.attributes, feature_data_type=dataObj.attribute_type, function_return_type= "Bool", verbose=False, syntactic_grammar = syntactic_grammar )
                    l = Learner(model = sgf, prediction_function = sgf.predict_z3, train_function = sgf.fit, X = X, y=y)
                elif(selected_learner == "decision tree"):
                    dt_classifier = tree.DecisionTreeClassifier()
                    l = Learner(model = dt_classifier, prediction_function = dt_classifier.predict, train_function = dt_classifier.fit, X = X, y=y )
                elif(selected_learner == "logistic regression"):
                    clf_lr = LogisticRegression()
                    l = Learner(model = clf_lr, prediction_function = clf_lr.predict, train_function = clf_lr.fit, X = X, y=y )

                else:
                    raise ValueError("Learner not defined")


                t = Teacher(max_iterations=100000,epsilon=0.05, delta=0.05, timeout=100)
                _teach_start = time.time()
                l, flag = t.teach(blackbox = bb, learner = l, query = q, random_example_generator = helper_functions.random_generator, params_generator = (X_train,dataObj.attribute_type), verbose=False)

                _teach_end = time.time()


                



                cnt = 0
                for example in X_test.values.tolist():

                    blackbox_verdict = bb.classify_example(example)
                    learner_verdict = l.classify_example(example)
                    query_verdict = q.classify_example(example)
                    if(learner_verdict == (blackbox_verdict and query_verdict)):
                        cnt += 1



                # result
                entry = {}
                entry['dataset'] = dataset
                entry['blackbox'] = select_blackbox
                entry['query'] = str(query_class)
                if(selected_learner == "sygus"):
                    entry['explanation'] = l.model._function_snippet
                    entry['explanation size'] = l.model.get_formula_size()
                elif(selected_learner == "decision tree"):
                    os.system("mkdir -p data/output/dt")
                    _dt_explanation_file = "data/output/dt/" + str(datetime.datetime.now()) + ".pkl"
                    with open(_dt_explanation_file, 'wb') as fid:
                        pickle.dump(l.model, fid)
                    entry['explanation'] = _dt_explanation_file
                    entry['explanation size'] = None
                elif(selected_learner == "logistic regression"):
                    entry['explanation'] = l.model.coef_[0]
                    entry['explanation size'] = None
                else:
                    raise ValueError
                entry['explainer'] = selected_learner
                entry['syntactic grammar'] = syntactic_grammar
                entry['time learner'] = t.time_learner
                entry['time verifier'] = t.time_verifier
                entry['time'] = _teach_end - _teach_start
                entry['accuracy'] = cnt/len(y_test)
                entry['terminate'] = flag
                entry['random words checked'] = t.verifier.number_of_examples_checked
                entry['total counterexamples'] = len(l.y)
                entry['positive counterexamples'] = np.array(l.y).mean()

                
                result = pd.DataFrame()
                result = result.append(entry, ignore_index=True)
                result.to_csv('data/output/sanity_result.csv', header=False, index=False, mode='a')


                if(idx == iterations - 1):
                    display(Markdown("### Result for " + selected_learner))
                    if(selected_learner == "sygus"):
                        print("Learned explanation =>", l.model._function_snippet)
                        print("-explanation size:", l.model.get_formula_size())
                    elif(selected_learner == "decision tree"):
                        print("Learned explanation =>", helper_functions.tree_to_code(l.model,X_train.columns.to_list()), "\n\n")
                    elif(selected_learner == "logistic regression"):
                        feature_importance = l.model.coef_[0]
                        feature_importance = 100.0 * (feature_importance / (abs(feature_importance).max()))
                        sorted_idx = np.argsort(abs(feature_importance))
                        pos = np.arange(sorted_idx.shape[0]) + .5
                        featfig = plt.figure()
                        featax = featfig.add_subplot(1, 1, 1)
                        featax.barh(pos, feature_importance[sorted_idx], align='center')
                        featax.set_yticks(pos)
                        featax.set_yticklabels(np.array(X_train.columns.to_list())[sorted_idx])
                        featax.set_xlabel('Relative Feature Importance')
                        plt.tight_layout()   
                        plt.show()
                    else:
                        raise ValueError


                    print("\n\n\n-is learning complete?", flag)
                    print("-it took", _teach_end - _teach_start, "seconds")
                    print("-learner time:", t.time_learner)
                    print("-verifier time:", t.time_verifier)
                    print("correct: ", cnt, "out of ", len(y_test), "examples. Percentage: ", cnt/len(y_test))
                    print('random words checked', t.verifier.number_of_examples_checked)
                    print("Total counterexamples:", len(l.y))
                    print("percentage of positive counterexamples for the learner:", np.array(l.y).mean())
                    print()
                    print(", ".join(["\'" + column + "\'" for column in result.columns.tolist()]))

        if(select_query == "specific input"):
            break

### Query


SyGuS model is not fit yet


### Result for sygus

Learned explanation =>  (and (not breathes) fins)
-explanation size: 2



-is learning complete? True
-it took 0.4934115409851074 seconds
-learner time: 0.33123779296875
-verifier time: 0.16059136390686035
correct:  11 out of  11 examples. Percentage:  1.0
random words checked 190
Total counterexamples: 6
percentage of positive counterexamples for the learner: 0.5

'accuracy', 'blackbox', 'dataset', 'explainer', 'explanation', 'explanation size', 'positive counterexamples', 'query', 'random words checked', 'syntactic grammar', 'terminate', 'time', 'time learner', 'time verifier', 'total counterexamples'
SyGuS model is not fit yet


### Result for sygus

Learned explanation =>  (let ((_let_0 (= legs (/ 79 125)))) (let ((_let_1 (= legs (/ 837 1000)))) (let ((_let_2 (= legs (/ 53 250)))) (let ((_let_3 (= legs (/ 461 1000)))) (let ((_let_4 (= legs (/ 447 500)))) (let ((_let_5 (= legs (/ 741 1000)))) (let ((_let_6 (= legs (/ 243 250)))) (let ((_let_7 (= legs (/ 903 1000)))) (and fins (not breathes) (or (and (not airborne) (or (and backbone (or (and (not milk) (or (and tail (or (and eggs (or (and toothed (or (and (not hair) (or (and venomous (or (and predator catsize (not feathers) (not aquatic) (not domestic) (= legs (/ 211 1000))) (and domestic (or (and feathers (not aquatic) (not catsize) (not predator) _let_5) (and (not feathers) (or (and (not aquatic) (not catsize) (not predator) (= legs (/ 459 1000))) (and aquatic catsize (or (and predator (= legs (/ 11 20))) (and (not predator) (= legs (/ 353 1000))))))))))) (and predator (not venomous) (not domestic) (not catsize) (or (and feathers (not aquatic) (= legs (/ 821 1000))) (and aquatic (