<b>Data mining project - 2020/21</b><br>
<b>Authors</b>: [Alexandra Bradan](https://github.com/alexandrabradan), [Alice Graziani](https://github.com/alicegraziani25) and [Eleonora Cocciu](https://github.com/eleonoracocciu)<br>
<b>Python version</b>: 3.x<br>
<b>Last update: 21/05/2021<b>

In [1]:
# system library
import os
import sys
import json
import tqdm
import ast

# useful libraries
import math
import operator
import itertools
import statistics
import collections
from collections import Counter
from collections import OrderedDict

# pandas
import pandas as pd

# numpy
import numpy as np
from numpy import std
from numpy import mean
from numpy import percentile

# visualisarion
import pydotplus
import seaborn as sns
from matplotlib import colors
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from IPython.display import Image

# sklearn
from sklearn.pipeline import FeatureUnion
from sklearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import RandomizedSearchCV
from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import RepeatedStratifiedKFold
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from sklearn.metrics import classification_report, confusion_matrix

# dimensional reducers
from sklearn.decomposition import PCA
from sklearn.feature_selection import RFE
from sklearn.feature_selection import RFECV
from sklearn.decomposition import TruncatedSVD
from sklearn.feature_selection import VarianceThreshold
from sklearn.feature_selection import SelectKBest, f_classif, mutual_info_classif  # classification
from sklearn.feature_selection import SelectKBest, f_regression, mutual_info_regression  # regression

# scalers
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import RobustScaler
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import MaxAbsScaler
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import KBinsDiscretizer
from sklearn.preprocessing import OneHotEncoder

# performance visualisation 
from sklearn import tree
from scikitplot.metrics import plot_roc
from scikitplot.metrics import plot_precision_recall
from scikitplot.metrics import plot_cumulative_gain
from scikitplot.metrics import plot_lift_curve
from sklearn.model_selection import learning_curve
from mlxtend.plotting import plot_decision_regions
from yellowbrick.model_selection import LearningCurve

# tree classifiers
from sklearn.tree import DecisionTreeClassifier

# linear classifiers
from sklearn.linear_model import LassoCV
from sklearn.linear_model import Perceptron
from sklearn.linear_model import LinearRegression
from sklearn.linear_model import LogisticRegression

# neighbors classifiers
from sklearn.neighbors import KNeighborsClassifier

# naive_bayes classifiers
from sklearn.naive_bayes import GaussianNB
from sklearn.naive_bayes import BernoulliNB
from sklearn.naive_bayes import MultinomialNB

# ensemble classifiers
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.ensemble import RandomForestClassifier

# svm
from sklearn.svm import SVC
from sklearn.svm import LinearSVC

plt.rcParams["patch.force_edgecolor"] = True
%matplotlib inline

from yellowbrick.style import set_palette
set_palette('bold')

In [2]:
model_name = "MLPClassifier_1_layer"

<h6> Datasets loading </h6>

In [3]:
X_train = pd.read_csv('../../data/fma_metadata/X_train_merged.csv', index_col=0)
X_test = pd.read_csv('../../data/fma_metadata/X_test.csv', index_col=0)

y_train = pd.read_csv('../../data/fma_metadata/y_train_merged.csv', index_col=0)
y_test = pd.read_csv('../../data/fma_metadata/y_test.csv', index_col=0)

print(X_train.shape, X_test.shape)
print(y_train.shape, y_test.shape)

(92834, 55) (10874, 55)
(92834, 1) (10874, 1)


<h6>Continous, categorical/ordinal column retrieval</h6>

In [4]:
numeric_columns = []  # continous variables
for column_name in X_train.columns:
    if ("track_genre_top" not in column_name) and  \
          ("track_date_created_year" not in column_name) and \
            ("track_date_created_season" not in column_name):
                numeric_columns.append(column_name)
print("numeric_columns", len(numeric_columns))

numeric_columns 37


In [5]:
categoric_columns = []  # ordinal or categorical variables
for column_name in X_train.columns:
    if ("track_genre_top" in column_name) or  \
          ("track_date_created_year" in column_name) or \
            ("track_date_created_season" in column_name):
                categoric_columns.append(column_name)
print("categoric_columns", len(categoric_columns))

categoric_columns 18


<h6>Define current (filtered) train and test</h6>

In [6]:
X_tr = X_train[numeric_columns].copy()
y_tr = y_train.copy()
X_ts = X_test[numeric_columns].copy()
y_ts = y_test.copy()

"""
X_tr = X_train.copy()
y_tr = y_train.copy()
X_ts = X_test.copy()
y_ts = y_test.copy()
"""

'\nX_tr = X_train.copy()\ny_tr = y_train.copy()\nX_ts = X_test.copy()\ny_ts = y_test.copy()\n'

<h6>Load the black-box model</h6>

In [7]:
import pickle

model_info = None
model = None
with open('pickle/' + model_name + '_numeric.pickle', 'rb') as handle:
    model_info = pickle.load(handle)
    bb = model_info['tuned_model']



In [8]:
bb 

MLPClassifier(activation='tanh', alpha=0.001, hidden_layer_sizes=(5,),
              learning_rate='invscaling', momentum=0.2, random_state=42,
              tol=0.01)

<h6>Filter attributes</h6>

In [9]:
if model_info['model_name'] != 'Plain':
    n_features = model_info['params']['rfe__n_features_to_select']
    best_features = model_info['best_features']

    if n_features == len(best_features):
        X_tr = X_tr[best_features].copy()
        X_ts = X_ts[best_features].copy()
    else:
        print("Wrong feature filtering")
        sys.exit(-1)

<h6>Scale data </h6>

In [10]:
try:
    for column_name in X_tr.columns:
        scaler = model_info['params']['preprocessor__numeric__scaler']
        X_tr[column_name] = scaler.fit_transform(X_tr[column_name].values.reshape(-1,1))[:, 0]
        X_ts[column_name] = scaler.transform(X_ts[column_name].values.reshape(-1,1))[:, 0]
except KeyError:
    pass

<h6>Train model (again, for certainty)</h6>

In [11]:
bb.fit(X_tr.values, y_tr.values.ravel())

MLPClassifier(activation='tanh', alpha=0.001, hidden_layer_sizes=(5,),
              learning_rate='invscaling', momentum=0.2, random_state=42,
              tol=0.01)

In [12]:
def bb_predict(X):
    return bb.predict(X)

def bb_predict_proba(X):
    return bb.predict_proba(X)

In [13]:
y_pred = bb_predict(X_ts.values)
y_prob = bb_predict_proba(X_ts.values)

<h1> Local XAIs </h1>

<h6> Select correctly predicted records to explain </h6>

In [14]:
i2e_list = []
for i2e, (y_t, y_p) in enumerate(zip(y_ts.values.ravel(), y_pred)):
    if (y_t == y_p) and (y_t == 1):
        i2e_list.append(i2e)

In [15]:
len(i2e_list)

243

In [16]:
len([x for x in y_ts.values.ravel() if x == 1])

1080

<h2>LIME explanaibility </h2>

In [17]:
import lime
import lime.lime_tabular

In [18]:
# Creating the Lime Explainer
# Be very careful in setting the order of the class names
lime_explainer = lime.lime_tabular.LimeTabularExplainer(
    X_tr.values,
    training_labels=y_tr.values.ravel(),
    feature_names=X_tr.columns.tolist(),
    # feature_selection="lasso_path",
    class_names=["Studio_Recording", "Live_Recording"],
    discretize_continuous=False,
    #discretizer="entropy",
)

In [19]:
lime_dict = {}

In [20]:
for i2e in tqdm.tqdm(i2e_list):
    x = X_ts.iloc[i2e]
    exp = lime_explainer.explain_instance(X_ts.iloc[i2e], bb.predict_proba, num_features=5)
    # exp.show_in_notebook(show_table=True)
    # exp.as_pyplot_figure()
    
    # retrieve the features that made possible this outcome
    xai = exp.local_exp[1]
    xai_features = [X_tr.columns[c] for c, v in xai]
    xai_scores = [v for c, v in xai]
    
    xai_filtered_scores = []
    xai_filtered_features = []
    for i, score in enumerate(xai_scores):
        if score > 0:
            xai_filtered_scores.append(score)
            xai_filtered_features.append(xai_features[i])

    try:
        tmp_list = lime_dict[str(xai_filtered_features)] 
        tmp_list.append({i2e: xai_filtered_scores})
        lime_dict[str(xai_filtered_features)] = tmp_list
    except KeyError:
        lime_dict[str(xai_filtered_features)] = [{i2e: xai_filtered_scores}]

100%|██████████| 243/243 [04:47<00:00,  1.18s/it]


In [21]:
# count how many records where classified according to a given feature combination 
# (without taking into account repetions)
lime_dict_counter_with_repetitions = {}
for key, value in lime_dict.items():
    lime_dict_counter_with_repetitions[key] = len(value)

In [22]:
# lime_dict_counter_with_repetitions

In [23]:
# order alpabetical feature combinations
lime_dict_counter_alpabetical_ordered = {}
for key, value in lime_dict_counter_with_repetitions.items():
    key = sorted(ast.literal_eval(key))  # alpabetical ordered list
    try:
        lime_dict_counter_alpabetical_ordered[str(key)] += value
    except KeyError:
        lime_dict_counter_alpabetical_ordered[str(key)] = value

In [24]:
# lime_dict_counter_alpabetical_ordered

In [25]:
# order dict by incrising key length
def get_len(key):
    return len(key[0])

test_dict_list = list(lime_dict_counter_alpabetical_ordered.items())
test_dict_list.sort(key = get_len)
lime_dict_counter_alpabetical_ordered_incr_length = {ele[0] : ele[1]  for ele in test_dict_list}

In [26]:
# lime_dict_counter_alpabetical_ordered_incr_length

In [27]:
most_specific_length = sys.maxsize
for key, value in lime_dict_counter_alpabetical_ordered_incr_length.items():
    key_set = set(ast.literal_eval(key))
    if len(key_set) < most_specific_length:
        most_specific_length = len(key_set)
print("most_specific_length=%s" % most_specific_length)

most_specific_length=0


In [28]:
# count how many records where classified according to a given feature combination (without repetions)
# and taking into account the most general combinations
# N.B. a specific feature combination can have one or more general feature combinations as subset
lime_dict_counter = {}
accounted_indeces = set()
for i, (key, value) in enumerate(lime_dict_counter_alpabetical_ordered_incr_length.items()):
    key_set = set(ast.literal_eval(key))
    if len(key_set) == most_specific_length:
        for i2, (key2, value2) in enumerate(lime_dict_counter_alpabetical_ordered_incr_length.items()):
            key2_set = set(ast.literal_eval(key2))
            
            inters = key_set.intersection(key2_set)
            if len(inters) == len(key_set):
                try:
                    lime_dict_counter[key] += value2
                except:
                    lime_dict_counter[key] = value2

In [29]:
lime_dict_counter_descending_value = \
                    dict(OrderedDict(sorted(lime_dict_counter.items(), \
                                                      key=lambda kv: kv[1], reverse=True)))
lime_dict_counter_descending_value

{'[]': 243}

In [None]:
most_recurrent_and_specific_combination = next(iter(lime_dict_counter_descending_value))
most_recurrent_and_specific_combination = set(ast.literal_eval(most_recurrent_and_specific_combination))
for i, (key, value) in enumerate(lime_dict.items()):
    key = set(sorted(ast.literal_eval(key)))  # alpabetical ordered list
    if len(most_recurrent_and_specific_combination.intersection(key)) ==  \
                                            len(most_recurrent_and_specific_combination):
        for dict_v in value:
            i2e = next(iter(dict_v))
            exp = lime_explainer.explain_instance(X_ts.iloc[i2e], bb.predict_proba, num_features=5)
            print(i)
            exp.show_in_notebook(show_table=True)

            plt.figure(figsize=(8, 5))
            exp.as_pyplot_figure()
            plt.xlabel("Predicition probability contribution")
            plt.title("Local explanation for class Live_Recording in %s" % model_name)
            plt.show()
            print("------------------------------------------------------------------------------------------")

for i2e in tqdm.tqdm(i2e_list):
    x = X_ts.iloc[i2e]
    exp = lime_explainer.explain_instance(X_ts.iloc[i2e], bb.predict_proba, num_features=5)
    exp.show_in_notebook(show_table=True)