<a href="https://colab.research.google.com/github/hristijanpeshov/SHAP-Explainable-Lexicon-Model/blob/master/model_evaluation_summary.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# User Input

In [None]:
# enter source lexicon name (this will appear in the results dataset)
lexicon_name = 'nasdaq'

# enter the location of lexicons (please make sure that there are only lexicons files in the folder)
lexicons_folder_loc = '/content/drive/MyDrive/nasdaq/concatenated datasets/lexicons'

# enter the location of tokenizer
tokenizer_loc = '/content/drive/MyDrive/roberta/roberta_tokenizer'

# enter the location of all evaluation datasets (please make sure that there are only evaluation files in the folder)
eval_datasets_folder_loc = '/content/drive/MyDrive/datasets/evaluation datasets'

# ShapDictModel

In [None]:
!pip install transformers

In [None]:
import pandas as pd
import numpy as np
from sklearn.metrics import classification_report, accuracy_score, precision_score, f1_score, recall_score, matthews_corrcoef
import re
from sklearn.metrics import confusion_matrix
import torch
import nltk
import torch
nltk.download('wordnet')
nltk.download('averaged_perceptron_tagger')
nltk.download('omw-1.4')
from nltk.corpus import wordnet
from nltk.stem import WordNetLemmatizer

class ShapDictModel:
  def __init__(self, dataset, tokenizer, word_column, category_column, decision_makers, count_column, dataset_source = None):
    self.dataset_source = str.upper(dataset_source) if dataset_source is not None else 'both'
    self.word_column = word_column
    self.tokenizer = tokenizer
    self.count_column = count_column

    # Lemmatizer
    self.lemmatizer = WordNetLemmatizer()

    # source column values
    self.lm_source = 'LM'
    self.tm_source = 'OUR_WORDS'

    # column prefix
    self.lm_prefix = 'LM_'
    self.tm_prefix = 'TM_'

    # category values
    self.positive_category_value = 'positive'
    self.negative_category_value = 'negative'

    # opposite prefix
    self.opposite_prefix = 'opposite_'

    # number of required coefficient when both sources or one sorce
    self.coefficient_number_both_sources = 4
    self.coefficient_number_one_source = 2

    # prefix when one source is chosen
    self.prefix = 'TM_' if self.dataset_source == self.tm_source else 'LM_' if self.dataset_source == self.lm_source else ''

    # source column postfix
    self.source_column = 'src'

    # dataset on which results are calculated
    self.dataset = dataset if self.dataset_source == 'both' else self.extract_dataset_from_source(dataset, self.dataset_source)

    # function that will calculate the score
    self.calculate_score = self.calculate_score_both_dataset_sources if self.dataset_source == 'both' else self.calculate_score_one_dataset_source

    # category when one source is chosen
    self.category = category_column

    # decision makers
    self.decision_makers = decision_makers
    print()
    print(f'Created ShapDictModel with decision makers: {self.decision_makers}')
    print()


  def extract_dataset_from_source(self, dataset, source, only_source_columns=True):
    # depending on which source is chosen, the full dataset will be modified to return the required dataset

    # if LM is chosen as source, then the returned dataset will contain only the words that were originally from the LM dataset with LM_ added as prefix to the columns
    if source == self.lm_source:
      prefix = self.lm_prefix
      column = f'{self.lm_prefix}{self.source_column}'
      opposite_column = f'{self.tm_prefix}{self.source_column}'

      dataset_source = self.lm_source
      opposite_dataset_source = self.tm_source
    else:
      # if OUR_WORDS is chosen as source, then the returned dataset will contain only the words that were originally from our words dataset with TM_ added as prefix to the columns
      prefix = self.tm_prefix
      column = f'{self.tm_prefix}{self.source_column}'
      opposite_column = f'{self.lm_prefix}{self.source_column}'

      dataset_source = self.tm_source
      opposite_dataset_source = self.lm_source

    source_dataset = dataset[(dataset[column] == dataset_source) & ((dataset[opposite_column] == dataset_source) | (dataset[opposite_column] == opposite_dataset_source))]

    # filtering so just the necessary columns will remain
    if only_source_columns:
      columns = list(source_dataset.columns)
      source_columns = [self.word_column] + [column for column in columns if prefix in column]

      return source_dataset[source_columns]

    return source_dataset

  def calculate_score_both_dataset_sources(self, word_occurence, coefficients):
    tm_accumulated_score, tm_opposite_accumulated_score = self.calculate_dataset_source_score(word_occurence, self.tm_prefix)
    lm_accumulated_score, lm_opposite_accumulated_score = self.calculate_dataset_source_score(word_occurence, self.lm_prefix)

    c1, c2, c3, c4 = coefficients

    weighted_score = (c1 * tm_accumulated_score + c2 * tm_opposite_accumulated_score + c3 * lm_accumulated_score + c4 * lm_opposite_accumulated_score)

    return weighted_score

  def calculate_score_one_dataset_source(self, word_occurence, coefficients):
    accumulated_score, opposite_accumulated_score = self.calculate_dataset_source_score(word_occurence, self.prefix)

    c1, c2 = coefficients

    weighted_score = (c1 * accumulated_score + c2 * opposite_accumulated_score)

    return weighted_score

  def predict_sentence_label(self, sentence, label_t, coefficients):
    words = self.tokenizer.tokenize(sentence)
    cleaned_words = self.clean_and_lemmatize_words(words)

    decision_score = 0
    for word in cleaned_words:
      # it can only have one occurence, but to check if it occurs at all
      word_occurences = self.dataset.loc[self.dataset[self.word_column] == word].values
      if len(word_occurences) == 0:
        continue

      word_occurence = word_occurences[0]

      decision_score += self.calculate_score(word_occurence, coefficients)

    label = 1 if decision_score > 0 else 0 if decision_score < 0 else -1

    return label

  def __get_wordnet_pos(self, word):
    """Map POS tag to first character lemmatize() accepts"""
    tag = nltk.pos_tag([word])[0][1][0].upper()
    tag_dict = {"J": wordnet.ADJ,
                "N": wordnet.NOUN,
                "V": wordnet.VERB,
                "R": wordnet.ADV}

    return tag_dict.get(tag, wordnet.NOUN)

  def clean_and_lemmatize_words(self, words):
    lower_case_words = [str(word).lower().replace('ġ', '').strip() for word in words]
    return [self.lemmatizer.lemmatize(word, self.__get_wordnet_pos(word)) for word in lower_case_words]

  def calculate_word_dm_score(self, word_occurence, decision_maker_column, count_column):
    columns = list(self.dataset.columns)

    decision_maker_index = columns.index(decision_maker_column)

    value = word_occurence[decision_maker_index]

    return value

  def calculate_dataset_source_score(self, word_occurence, column_prefix):
    columns = list(self.dataset.columns)
    category_index = columns.index(f'{column_prefix}{self.category}')

    word_category = word_occurence[category_index]

    selected_category_sign = 1
    opposite_category_sign = 1
    if word_category == self.positive_category_value:
      opposite_category_sign = -1
    elif word_category == self.negative_category_value:
      selected_category_sign = -1

    if selected_category_sign == opposite_category_sign:
      return 0, 0

    opposite_column_prefix = f'{column_prefix}{self.opposite_prefix}'

    selected_category_score = 0
    opposite_category_score = 0
    for decision_maker in self.decision_makers:
      selected_category_score += self.calculate_word_dm_score(word_occurence, f'{column_prefix}{decision_maker}', f'{column_prefix}{self.count_column}') * selected_category_sign
      opposite_category_score += self.calculate_word_dm_score(word_occurence, f'{opposite_column_prefix}{decision_maker}', f'{opposite_column_prefix}{self.count_column}') * opposite_category_sign

    return selected_category_score, opposite_category_score

  def calculate_model_accuracy(self, true_labels, predicted_labels):
    accuracy_indicators = [true_label == predicted_label for true_label, predicted_label in zip(true_labels, predicted_labels)]

    return np.asarray(accuracy_indicators).sum() / len(true_labels)


  def predict_and_evaluate(self, sentences, true_labels, coefficients):
    if self.dataset_source == 'both':
      assert len(coefficients) == self.coefficient_number_both_sources, f'{self.coefficient_number_both_sources} coefficients required, provided {len(coefficients)}'
    else:
      assert len(coefficients) == self.coefficient_number_one_source, f'{self.coefficient_number_one_source} coefficients required, provided {len(coefficients)}'

    predicted_labels = [self.predict_sentence_label(sentence, label, coefficients) for sentence, label in zip(sentences, true_labels)]

    results = []
    try:
      our_acc = self.calculate_model_accuracy(true_labels, predicted_labels)
      print(f'Our method accuracy score: {our_acc}')

      acc = accuracy_score(true_labels, predicted_labels)
      print(f'Accuracy score: {acc}')
      results.append(acc)

      pr = precision_score(true_labels, predicted_labels, average="macro")
      print(f'Precision score: {pr}')
      results.append(pr)

      rec = recall_score(true_labels, predicted_labels, average="macro")
      print(f'Recall score: {rec}')
      results.append(rec)

      f1 = f1_score(true_labels, predicted_labels, average="macro")
      print(f'F1 score: {f1}')
      results.append(f1)

      mcc = matthews_corrcoef(true_labels, predicted_labels)
      print(f'MCC score: {mcc}')
      results.append(mcc)

      print()
      print("Classification Report:")
      cl_report = classification_report(true_labels, predicted_labels, zero_division=0)
      print(cl_report)

      print()
      print("Confusion Matrix:")
      conf_matrix = confusion_matrix(true_labels, predicted_labels)
      print(conf_matrix)

    except:
      print('Error while trying to calculate metrics')
      none_list = [None] * (6 - len(results))
      final_result = results + none_list

      our_acc, acc_final, pr_final, rec_final, f1_final, mcc_final = final_result
      return predicted_labels, final_result, {'Our Accuracy': our_acc, 'Accuracy': acc_final, 'Precision': pr_final, 'Recall': rec_final, 'F1': f1_final, 'MCC': mcc_final}, None, None

    # return predicted_labels

    # just for creating of the results dataset
    acc_final, pr_final, rec_final, f1_final, mcc_final = results
    return predicted_labels, results, {'Accuracy': acc_final, 'Precision': pr_final, 'Recall': rec_final, 'F1': f1_final, 'MCC': mcc_final}, cl_report, conf_matrix


  def __normalize_column(self, dataset, column):
    column_max_value = dataset[column].max()
    if column_max_value == 0:
      return dataset

    dataset[column] = dataset[column].apply(lambda value: value / column_max_value)

    return dataset

  def normalize_dataset(self, dataset):
    columns_to_normalize = dataset.select_dtypes(include=np.number).columns.tolist()

    print(f'Columns to normalize: {columns_to_normalize}')

    dataset_copy = dataset.copy(True)
    modified_datasets = [self.__normalize_column(dataset_copy, column) for column in columns_to_normalize]

    print('Columns normalized')

    return modified_datasets[-1]

In [None]:
def convert_to_num(data, column):
  sentiment_map = {
      'positive': 1,
      'negative': 0
  }
  return data[column].apply(lambda s: sentiment_map[s]).values

In [None]:
import sys
import pytz

def create_summary_results(lexicon_source, lexicon_datasets, evaluation_datasets, drive_loc):

  evaluation_summary = []
  evaluation_summary_one_line = []
  for lexicon_name in lexicon_datasets:
    lexicon = lexicon_datasets[lexicon_name]
    lexicon_normalized = 'normalized' in lexicon_name

    for evaluate_dataset_name in evaluation_datasets:
      evaluate_dataset = evaluation_datasets[evaluate_dataset_name]
      sentences = evaluate_dataset['text'].values
      true_labels = evaluate_dataset['sentiment'].values
      coefs_our_words = [0.8, 0.2]
      coefs_lmd = [0.9, 0.5]
      coefs = coefs_our_words + coefs_lmd

      dataset = lexicon.copy(True)

      lexicon_type = 'normalized' if lexicon_normalized else 'merged'
      log_name = f'{lexicon_source}-{lexicon_type}-{evaluate_dataset_name}'
      log_location = f'{drive_loc}/{log_name}.txt'
      f = open(log_location, 'w')

      evalutaion_result, evalutaion_result_single_line = evaluate(dataset, sentences, true_labels, coefs, coefs_our_words, coefs_lmd, lexicon_source, lexicon_normalized, evaluate_dataset_name, log_location, f)
      evaluation_summary = evaluation_summary + evalutaion_result
      evaluation_summary_one_line.append(evalutaion_result_single_line)

      f.close()


  evaluation_summary_df = pd.DataFrame(evaluation_summary, columns = ['Lexicon Source', 'Lexicon Normalized', 'Evaluation Dataset', 'Words Source', 'Decision Maker', 'C1', 'C2', 'C3', 'C4',
                        'Accuracy',	'Precision', 'Recall', 'F1', 'MCC', 'Log'])
  evaluation_summary_one_line_df = pd.DataFrame(evaluation_summary_one_line, columns = ['Lexicon Source', 'Lexicon Normalized', 'Evaluation Dataset', 'Decision Maker', 'C1', 'C2', 'C3', 'C4',
                        'OUR + LMD',	'OUR', 'LMD', 'LMD on LMD', 'OUR on LMD', 'OUR + LMD on LMD', 'Log'])

  evaluation_summary_df.to_csv(f'{drive_loc}/summary_df.csv', index=False)
  evaluation_summary_one_line_df.to_csv(f'{drive_loc}/summary_one_line_df.csv', index=False)

  return evaluation_summary_df, evaluation_summary_one_line_df

def create_repetition_string(repetitive_string, times, split):
  repetition = [repetitive_string] * times

  return f"{split}".join(repetition)

def write_result_to_file(f, execution_number, results_dictionary, classification_report, confussion_matrix):
  f.write(f'Execution number: {execution_number}')
  f.write(create_repetition_string('\n', 2, ''))

  [f.write(f'{metric}: {results_dictionary[metric]}\n') for metric in results_dictionary]

  f.write(create_repetition_string('\n', 3, ''))

  f.write("Classification Report:\n")
  f.write(classification_report)

  f.write(create_repetition_string('\n', 2, ''))

  f.write("Confusion Matrix:\n")
  f.write(str(confussion_matrix))

  f.write(create_repetition_string('\n', 3, ''))
  f.write(create_repetition_string('-', 70, ''))
  f.write(create_repetition_string('\n', 4, ''))

def end_of_section(f):
  for i in range (4):
    f.write(create_repetition_string('-', 70, ''))
    f.write(create_repetition_string('\n', 1, ''))

  f.write(create_repetition_string('\n', 2, ''))

def create_had_responses_df(sentences, predicted_labels, true_labels):
  lm_responses_df = pd.DataFrame(list(zip(sentences, predicted_labels, true_labels)), columns = ['sentences', 'predicted_label', 'true_label'])

  had_responses_df = lm_responses_df[lm_responses_df['predicted_label'] != -1]

  return had_responses_df

In [None]:
def run_shap_dict_model(dataset, sentences, true_labels, coefs, dataset_source = None):
  shap_dict_model = ShapDictModel(dataset, tokenizer, 'word', 'category', ['average_shap_values'], 'count', dataset_source = dataset_source)
  result = shap_dict_model.predict_and_evaluate(sentences, true_labels, coefs)

  return result

def evaluation_result(result, coefs, lexicon_source, lexicon_name, evaluate_dataset_name, words_source, log_location, dataset_source = None):
  _, metrics_list, metrics_dict, cl_report, conf_matrix = result

  if dataset_source == 'OUR_WORDS':
    new_coefs = coefs + ['\\', '\\']
  elif dataset_source == 'LM':
    new_coefs = ['\\', '\\'] + coefs
  else:
    new_coefs = coefs

  return [lexicon_source, lexicon_name, evaluate_dataset_name, words_source, 'average_shap_values'] + new_coefs + metrics_list + [log_location]

def get_accuracy(result):
   _, metrics_list, metrics_dict, cl_report, conf_matrix = result

   return metrics_dict['Accuracy']

def evaluation_result_one_line(metrics_list, coefs, lexicon_source, lexicon_name, evaluate_dataset_name, log_location):
  return [lexicon_source, lexicon_name, evaluate_dataset_name, 'average_shap_values'] + coefs + metrics_list + [log_location]


def evaluate(dataset, sentences, true_labels, coefs, coefs_our_words, coefs_lmd, lexicon_source, lexicon_name, evaluate_dataset_name, log_location, f):
  evaluation_results = []
  accuracy_results = []
  f.write('OUR WORDS + LMD')
  f.write(create_repetition_string('\n', 2, ''))
  result = run_shap_dict_model(dataset, sentences, true_labels, coefs)
  f.write(str(result))
  f.write('\n')
  evaluation_results.append(evaluation_result(result, coefs, lexicon_source, lexicon_name, evaluate_dataset_name, 'OUR + LMD', log_location))
  accuracy_results.append(get_accuracy(result))

  f.write(create_repetition_string('\n', 2, ''))
  f.write('OUR WORDS')
  f.write(create_repetition_string('\n', 2, ''))
  result = run_shap_dict_model(dataset, sentences, true_labels, coefs_our_words, dataset_source='OUR_WORDS')
  f.write(str(result))
  f.write('\n')
  evaluation_results.append(evaluation_result(result, coefs_our_words, lexicon_source, lexicon_name, evaluate_dataset_name, 'OUR', log_location, dataset_source='OUR_WORDS'))
  accuracy_results.append(get_accuracy(result))

  f.write(create_repetition_string('\n', 2, ''))
  f.write('LMD')
  f.write(create_repetition_string('\n', 2, ''))
  result = run_shap_dict_model(dataset, sentences, true_labels, coefs_lmd, dataset_source='LM')
  f.write(str(result))
  f.write('\n')
  evaluation_results.append(evaluation_result(result, coefs_lmd, lexicon_source, lexicon_name, evaluate_dataset_name, 'LMD', log_location, dataset_source='LM'))
  accuracy_results.append(get_accuracy(result))

  f.write(create_repetition_string('\n', 2, ''))

  had_responses_df = create_had_responses_df(sentences, result[0], true_labels)

  f.write('LMD accuracy for those that LMD had answer for')
  f.write(f"LMD accuracy score: {accuracy_score(had_responses_df['true_label'].values, had_responses_df['predicted_label'].values)}")
  result = run_shap_dict_model(dataset, had_responses_df['sentences'].values, had_responses_df['true_label'].values, coefs_lmd, dataset_source='LM')
  f.write(str(result))
  f.write('\n')
  evaluation_results.append(evaluation_result(result, coefs_lmd, lexicon_source, lexicon_name, evaluate_dataset_name, 'LMD on LMD', log_location, dataset_source='LM'))
  accuracy_results.append(get_accuracy(result))

  f.write(create_repetition_string('\n', 2, ''))
  f.write('OUR WORDS accuracy for those that LMD had answer for')
  f.write(create_repetition_string('\n', 2, ''))
  # prediction by using explainable words applied only on the sentences on which LMD had a prediction (there were words in the sentence that were also in LMD)
  result = run_shap_dict_model(dataset, had_responses_df['sentences'].values, had_responses_df['true_label'].values, coefs_our_words, dataset_source='OUR_WORDS')
  f.write(str(result))
  f.write('\n')
  evaluation_results.append(evaluation_result(result, coefs_our_words, lexicon_source, lexicon_name, evaluate_dataset_name, 'OUR on LMD', log_location, dataset_source='OUR_WORDS'))
  accuracy_results.append(get_accuracy(result))

  f.write('\n')
  f.write('OUR WORDS + LMD accuracy for those that LMD had answer for')
  f.write(create_repetition_string('\n', 2, ''))
  # prediction by using explainable words + LMD words, applied again only on the sentences on which LMD had a prediction (there were words in the sentence that were also in LMD)
  result = run_shap_dict_model(dataset, had_responses_df['sentences'].values, had_responses_df['true_label'].values, coefs)
  f.write(str(result))
  f.write('\n')
  evaluation_results.append(evaluation_result(result, coefs, lexicon_source, lexicon_name, evaluate_dataset_name, 'OUR + LMD on LMD', log_location))
  accuracy_results.append(get_accuracy(result))

  return evaluation_results, evaluation_result_one_line(accuracy_results, coefs, lexicon_source, lexicon_name, evaluate_dataset_name, log_location)

# Evaluate lexicons

In [None]:
import os
from os import listdir
from os.path import isfile, join
import pandas as pd

def extract_file_name(file_loc):
  return file_loc.split('/')[-1].split('.')[0]

def extract_datasets_map(datasets_location):
  location = datasets_location if datasets_location[-1] == '/' else f'{datasets_location}/'
  files_locations = [join(location, f) for f in listdir(location) if isfile(join(location, f))]

  print(f'Reading datasets from: {location} ...')

  assert files_locations != 0, 'No files found in the provided location'

  datasets_map = {}
  for f in files_locations:
    print(f'Reading dataset: {f} ...')
    dataset = pd.read_csv(f)
    datasets_map[extract_file_name(f)] = dataset

  print(f'Reading datasets successfully finished ...')

  return datasets_map


def create_results_folder(loc):
  parent_location = os.path.abspath(os.path.join(loc, os.pardir))
  mod_location = parent_location if parent_location[-1] == '/' else f'{parent_location}/'

  results_location = f'{mod_location}results'

  if not os.path.exists(results_location):
    os.makedirs(results_location)

  print(f'Created results dataset on location: {results_location} ...')

  return results_location

In [None]:
tokenizer = torch.load(tokenizer_loc)

lexicon_datasets_map = extract_datasets_map(lexicons_folder_loc)

eval_datasets_map = extract_datasets_map(eval_datasets_folder_loc)

results_folder_loc = create_results_folder(lexicons_folder_loc)

df, df_one_line = create_summary_results(lexicon_name, lexicon_datasets_map, eval_datasets_map, results_folder_loc)

In [None]:
df_one_line