For implementation of this local and global explanations, following resource is referred:


1.   https://shap-lrjball.readthedocs.io/en/latest/example_notebooks/general/Explainable%20AI%20with%20Shapley%20Values.html

In [None]:
!pip install datasets

In [None]:
import datasets
from datasets import Dataset

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import KFold
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments, DataCollatorWithPadding
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, cohen_kappa_score
import torch
from torch.utils.data import DataLoader, Dataset

In [None]:
# Loading the data from the skill file
df1 = pd.read_excel("/content/Skill_with_question_id.xlsx")
df1.head(5)

In [None]:
df1['Constructing_Explanations'].replace('-','2', inplace = True)
df1 = df1[~df1['Constructing_Explanations'].isnull()]
df1 = df1[df1['Constructing_Explanations'].str.isnumeric()]
print(df1.head(5))

def to_skill(label):
    skill = int(label)
    if skill == 1:
        return 1
    elif skill == 0:
        return 0

df1['Constructing_Explanations'] = df1.Constructing_Explanations.apply(to_skill)
df1 = df1.dropna()

In [None]:
df1 = df1.reindex(columns=['Answer','Constructing_Explanations','Solution','Planning_Investigations','Student','Analyzing_Data'])
df1.rename(columns = {'Answer':'text','Constructing_Explanations':'label'}, inplace = True)

In [None]:
df1.head(5)

In [None]:
df_train = df1.iloc[:2512] # splitting the dataframe rowwise with first 50% of the data
df_train_pd_whole = df_train.iloc[:,:2]# splitting the training data columnwise and taking only text and label columns
train_dataset = Dataset.from_dict(df_train_pd_whole) # converting the dataframe into datasets.arrow_dataset.Dataset

df_val = df1.iloc[2512:3767] #spliting data row-wise 25% of the data for validation set
df_val_pd_whole = df_val.iloc[:,:2]# spliting the valdation dataset columnwise only to take text and the label
validation_dataset = Dataset.from_dict(df_val_pd_whole) # converting the dataframe into datasets.arrow_dataset.Dataset

df_test = df1.iloc[3767:] #spliting data row-wise 25% of the data for test set
df_test_pd_whole = df_test.iloc[:,:2]# spliting the test dataset columnwise only to take text and the label
test_dataset = Dataset.from_dict(df_val_pd_whole) # converting the dataframe into datasets.arrow_dataset.Dataset

#converting datasets.arrow_dataset.Dataset into datasets.dataset_dict.DatasetDict'
final_dataset_dict = datasets.DatasetDict({"train":train_dataset,"test":test_dataset, "validation":validation_dataset})
final_dataset_dict

In [None]:
train_final_dataset_dict = final_dataset_dict["train"]

In [None]:
!pip install shap

In [None]:
import datasets
import numpy as np
import scipy as sp
import torch
import transformers
from datasets import Dataset
import shap

In [None]:
model_CE = AutoModelForSequenceClassification.from_pretrained("deepset/gbert-large", num_labels=2, ignore_mismatched_sizes=True).to(torch.device('cuda'))
model_CE.load_state_dict(torch.load('/content/gdrive/MyDrive/Thesis/Model/FT_CE_EB_GBERTlarge.pth'))

In [None]:
model_CE

In [None]:
tokenizer = AutoTokenizer.from_pretrained('deepset/gbert-large', truncation=True, padding=True, max_length=512)

In [None]:
#defining a prediction function
def f(x):
    encodings = [tokenizer.encode_plus(v, padding="max_length", max_length=512, truncation=True, return_tensors="pt") for v in x]
    input_ids = torch.cat([e['input_ids'] for e in encodings], dim=0).cuda()
    attention_mask = torch.cat([e['attention_mask'] for e in encodings], dim=0).cuda()

    tv = torch.tensor(input_ids).cuda()
    outputs = model_CE(tv, attention_mask=attention_mask)[0].detach().cpu().numpy()
    scores = (np.exp(outputs).T / np.exp(outputs).sum(-1)).T
    val = sp.special.logit(scores[:, 1])  # using one vs rest of the logits available
    return val


In [None]:
# creating an explainer using a token masker
explainer = shap.Explainer(f, tokenizer)

In [None]:
train_final_dataset_dict[:50] #checking

In [None]:
#explaining the model's prediction on the Constructing Explanations skill of AFLEK data
shap_values = explainer(train_final_dataset_dict[:50], fixed_context=1, batch_size=2)

global positive

In [None]:
import numpy as np
import matplotlib.pyplot as plt

s_value = shap_values.values
d_value = shap_values.data

# selecing the features from the shap_values
feature_Names = [list(data) for data in d_value]

# Flattening SHAP values and feature names
flattened_s_value = []
flattened_feature = []

for shap_vals, feat_names in zip(s_value, feature_Names):
    flattened_s_value.extend(shap_vals)
    flattened_feature.extend(feat_names)

# storing the highest shap value for each unique feature
pos_feature_shap_dict = {}
seen_features = set()

for feature, sv in zip(flattened_feature, flattened_s_value):
    if feature:  #chekcing for blank features
        if feature in seen_features:
            continue  # skipping the feaures if it is already there to have a unique feature set
        seen_features.add(feature)
        if sv > 0:
            if feature in pos_feature_shap_dict:
                pos_feature_shap_dict[feature] = max(pos_feature_shap_dict[feature], sv)
            else:
                pos_feature_shap_dict[feature] = sv

# sorting features with max shap value in descening order
sorted_pos_f = sorted(pos_feature_shap_dict.items(), key=lambda x: x[1], reverse=True)

# function for plotting different number of features
def plotting_top_pos_f(n):
    top_n_pos_f = sorted_pos_f[:n]
    top_n_pos_fnames = [feature for feature, value in top_n_pos_f]
    top_n_pos_s_values = [value for feature, value in top_n_positive_features]

    plt.figure(figsize=(12, 8))
    plt.bar(top_n_pos_fnames, top_n_pos_s_values, color='#ff0052')
    plt.xticks(rotation=45, ha='right')
    plt.ylabel('Maximum SHAP Value (Positive Contributions)')
    plt.xlabel('Features')
    plt.title(f'Top {n} Unique Features Contributing to Positive Class')
    plt.show()

# Top 30
plotting_top_pos_f(30)

global negative

In [None]:
import numpy as np
import matplotlib.pyplot as plt


s_value = shap_values.values
d_value = shap_values.data

# selecing the features from the shap_values
feature_Names = [list(data) for data in d_value]

# Flattening SHAP values and feature names
flattened_s_value = []
flattened_feature = []

for shap_vals, feat_names in zip(s_value, feature_Names):
    flattened_s_value.extend(shap_vals)
    flattened_feature.extend(feat_names)

# storing the negative shap value for each unique feature
neg_feature_shap_dict = {}
seen_features = set()

for feature, sv in zip(flattened_feature, flattened_s_value):
    if feature:  #chekcing for blank features
        if feature in seen_features:
            continue  # skipping the feaures if it is already there to have a unique feature set
        seen_features.add(feature)
        if sv < 0:
            if feature in neg_feature_shap_dict:
                neg_feature_shap_dict[feature] = min(neg_feature_shap_dict[feature], sv) # because we wanted to check the minimun negative features to compare with the positive ones
            else:
                neg_feature_shap_dict[feature] = sv

# sorting features with min shap value
sorted_neg_features = sorted(neg_feature_shap_dict.items(), key=lambda x: x[1])

# selecting the bottom 30 unique features contributing to the negative class
top_30 = sorted_neg_features[:30]
top_30_neg_f_names = [feature for feature, value in top_30]
top_30_neg_sv = [value for feature, value in top_30]

# plotting the featurs
color_blue = '#1e88e5'
plt.figure(figsize=(12, 8))
plt.bar(top_30_neg_f_names, top_30_neg_sv, color=color_blue)
plt.xticks(rotation=45, ha='right')
plt.ylabel('Minimum SHAP Value (Negative Contributions)')
plt.xlabel('Features')
plt.title('Top 30 Unique Features Contributing to Negative Class')
plt.show()


Local Explanations
we can put the specific response index number for the following text plot, force plot and waterfall plot

In [None]:
shap.plots.text(shap_values[0])

In [None]:
shap.initjs()
shap.force_plot(shap_values[0].base_values, shap_values[0].values, shap_values[0].data)

In [None]:
shap.plots.waterfall(shap_values[0])

#Occlusion study

Generation of the occluded dataset

In [None]:
import json
import pandas as pd

# the evidence span is supplied separately in a json file
with open('/content/assembled.json', 'r') as file:
    data = json.load(file)

records = []
#iterating for every record in the data
for q_id, q_data in data.items():
    for s_id, s_data in q_data['answers'].items():
        # Checking if the label of the constructing explanation score is present(file structure)
        if 'labels' in s_data and 'Constructing explanations' in s_data['labels']:
            label = 1 if s_data['labels']['Constructing explanations'].get('score') == 'present' else 0
            tokens_list = s_data.get('tokens', [])
            e_list = s_data['labels']['Constructing explanations'].get('evidences', [])

            combined_sentence = []
            # processing for all the token for a sentence in response
            for sentence_tokens, sentence_evidences in zip(tokens_list, e_list):
                if label == 1:
                    # Masking the tokens if the evidence score is 1
                    masked_sentence = [
                        '[MASK]' if ev == 1 else tok for tok, ev in zip(sentence_tokens, sentence_evidences)
                    ]
                else:
                    # copying the unchanged response if there is no evidence span
                    masked_sentence = sentence_tokens

                combined_sentence.extend(masked_sentence)

            final_sentence = " ".join(combined_sentence)
            records.append({
                'StudentID': student_id,
                'Masked Sentence': final_sentence,
                'Label': label
            })

# saving the newly created data
df_records = pd.DataFrame(records)
excel_file_path = '18.06.13.44processed_data.xlsx'
df_records.to_excel(excel_file_path, index=False)


In [None]:
import pandas as pd
df1 = pd.read_excel("/content/CE_18.06.13.44processed_data.xlsx")
df1.head(5)

In [None]:
df1['Label'].replace('-','2', inplace = True)
df1 = df1[~df1['Label'].isnull()] # checking for null
df1['Label'].dtype

In [None]:
def to_skill(label):
    skill = int(label)
    if skill == 1:
        return 1
    elif skill == 0:
        return 0

df1['Label'] = df1.Label.apply(to_skill)
df1 = df1.dropna()

In [None]:
df1.rename(columns = {'Masked Sentence':'text','Label':'label'}, inplace = True) # renaming it

In [None]:
df_train = df1.iloc[:2512] # splitting the dataframe rowwise with first 50% of the data
df_train_pd_whole = df_train.iloc[:,1:3]# splitting the training data columnwise and taking only text and label columns
train_dataset = Dataset.from_dict(df_train_pd_whole) # converting the dataframe into datasets.arrow_dataset.Dataset

df_val = df1.iloc[2512:3767] #spliting data row-wise 25% of the data for validation set
df_val_pd_whole = df_val.iloc[:,1:3]# spliting the valdation dataset columnwise only to take text and the label
validation_dataset = Dataset.from_dict(df_val_pd_whole) # converting the dataframe into datasets.arrow_dataset.Dataset

df_test = df1.iloc[3767:] #spliting data row-wise 25% of the data for test set
df_test_pd_whole = df_test.iloc[:,1:3]# spliting the test dataset columnwise only to take text and the label
test_dataset = Dataset.from_dict(df_val_pd_whole) # converting the dataframe into datasets.arrow_dataset.Dataset

#converting datasets.arrow_dataset.Dataset into datasets.dataset_dict.DatasetDict'
final_dataset_dict = datasets.DatasetDict({"train":train_dataset,"test":test_dataset, "validation":validation_dataset})
final_dataset_dict

In [None]:
train_final_dataset_dict = final_dataset_dict["train"]

In [None]:
model_CEl = AutoModelForSequenceClassification.from_pretrained("deepset/gbert-large", num_labels=2, ignore_mismatched_sizes=True).to(torch.device('cuda'))
model_CEl.load_state_dict(torch.load('/content/gdrive/MyDrive/Thesis/Model/FT_CE_EB_GBERTLarge.pth'))

In [None]:
tokenizer = AutoTokenizer.from_pretrained('deepset/gbert-large', truncation=True, padding=True, max_length=512)

In [None]:
def f(x):
    encodings = [tokenizer.encode_plus(v, padding="max_length", max_length=512, truncation=True, return_tensors="pt") for v in x]
    input_ids = torch.cat([e['input_ids'] for e in encodings], dim=0).cuda()
    attention_mask = torch.cat([e['attention_mask'] for e in encodings], dim=0).cuda()

    tv = torch.tensor(input_ids).cuda()
    outputs = model_CEl(tv, attention_mask=attention_mask)[0].detach().cpu().numpy()
    scores = (np.exp(outputs).T / np.exp(outputs).sum(-1)).T
    val = sp.special.logit(scores[:, 1])  # use one vs rest logit units
    return val

In [None]:
# build an explainer using a token masker
explainer = shap.Explainer(f, tokenizer)

In [None]:
train_final_dataset_dict[:50]

In [None]:
shap_values = explainer(train_final_dataset_dict[:2000], fixed_context=1, batch_size=2)

##GLobal positive

In [None]:
import numpy as np
import matplotlib.pyplot as plt

s_value = shap_values.values
d_value = shap_values.data

# selecing the features from the shap_values
feature_Names = [list(data) for data in d_value]

# Flatten SHAP values and feature names
flattened_s_value = []
flattened_feature = []

for shap_vals, feat_names in zip(s_value, feature_Names):
    flattened_s_value.extend(shap_vals)
    flattened_feature.extend(feat_names)

# storing the highest shap value for each unique feature
pos_feature_shap_dict = {}
seen_features = set()

for feature, s_value in zip(flattened_feature, flattened_s_value):
    if feature:  #chekcing for blank features
        if feature in seen_features:
            continue  # skipping the feaures if it is already there to have a unique feature set
        seen_features.add(feature)
        if s_value > 0:
            if feature in pos_feature_shap_dict:
                pos_feature_shap_dict[feature] = max(pos_feature_shap_dict[feature], s_value)
            else:
                pos_feature_shap_dict[feature] = s_value

# sorting features with max shap value in descening order
sorted_pos_f = sorted(pos_feature_shap_dict.items(), key=lambda x: x[1], reverse=True)

# function for plotting different number of features
def plotting_top_pos_f(n):
    top_n_pos_f = sorted_pos_f[:n]
    top_n_pos_fnames = [feature for feature, value in top_n_pos_f]
    top_n_pos_s_values = [value for feature, value in top_n_positive_features]

    plt.figure(figsize=(12, 8))
    plt.bar(top_n_pos_fnames, top_n_pos_s_values, color='#ff0052')
    plt.xticks(rotation=45, ha='right')
    plt.ylabel('Maximum SHAP Value (Positive Contributions)')
    plt.xlabel('Features')
    plt.title(f'Top {n} Unique Features Contributing to Positive Class')
    plt.show()

# Top 30
plotting_top_pos_f(30)


##global negative

In [None]:
import numpy as np
import matplotlib.pyplot as plt


s_value = shap_values.values
d_value = shap_values.data

# selecing the features from the shap_values
feature_Names = [list(data) for data in d_value]

# Flattening SHAP values and feature names
flattened_s_value = []
flattened_feature = []

for shap_vals, feat_names in zip(s_value, feature_Names):
    flattened_s_value.extend(shap_vals)
    flattened_feature.extend(feat_names)

# storing the negative shap value for each unique feature
neg_feature_shap_dict = {}
seen_features = set()

for feature, sv in zip(flattened_feature, flattened_s_value):
    if feature:  #chekcing for blank features
        if feature in seen_features:
            continue  # skipping the feaures if it is already there to have a unique feature set
        seen_features.add(feature)
        if sv < 0:
            if feature in neg_feature_shap_dict:
                neg_feature_shap_dict[feature] = min(neg_feature_shap_dict[feature], sv) # because we wanted to check the minimun negative features to compare with the positive ones
            else:
                neg_feature_shap_dict[feature] = sv

# sorting features with min shap value
sorted_neg_features = sorted(neg_feature_shap_dict.items(), key=lambda x: x[1])

# selecting the bottom 30 unique features contributing to the negative class
top_30 = sorted_neg_features[:30]
top_30_neg_f_names = [feature for feature, value in top_30]
top_30_neg_sv = [value for feature, value in top_30]

# plotting the featurs
color_blue = '#1e88e5'
plt.figure(figsize=(12, 8))
plt.bar(top_30_neg_f_names, top_30_neg_sv, color=color_blue)
plt.xticks(rotation=45, ha='right')
plt.ylabel('Minimum SHAP Value (Negative Contributions)')
plt.xlabel('Features')
plt.title('Top 30 Unique Features Contributing to Negative Class')
plt.show()

Local Explanations
we can put the specific response index number for the following text plot, force plot and waterfall plot


In [None]:
shap.plots.text(shap_values[28])

In [None]:
shap.initjs()
shap.force_plot(shap_values[28].base_values, shap_values[28].values, shap_values[28].data)

In [None]:
shap.plots.waterfall(shap_values[28])