# Run always

In [None]:
!pip install transformers
!pip install keras_nlp
!pip install datasets

In [None]:
from google.colab import drive
drive.mount('/content/drive')

import sys
sys.path.append('/content/drive/MyDrive/Thesis/CODE')

In [None]:
import numpy as np
import pandas as pd
import os
import logging
import csv
import pickle
from tqdm import tqdm
import tensorflow as tf
from collections import Counter
import time
import multiprocessing
from matplotlib import pyplot as plt
from tensorflow.keras import optimizers
from datetime import datetime

from hyperparameters import hyperparams_features, hyperparams
from data_loader import load_erisk_data
from load_save_model import load_saved_model_weights
from feature_encoders import encode_emotions, encode_pronouns, encode_stopwords, encode_liwc_categories
from resource_loader import load_NRC, load_LIWC, load_vocabulary, load_stopwords

In [None]:
root_dir = "/content/drive/MyDrive/Thesis/Data"

Importing Data

In [None]:
task = "Self-Harm"

writings_df_sh = pd.read_pickle(root_dir + "/Processed Data/tokenized_df_" + task + ".pkl")

#CREATE VOCABULARY, PROCESS DATA, DATAGENERATOR
user_level_data_sh, subjects_split_sh, vocabulary_sh = load_erisk_data(writings_df_sh,train_prop= 1,
                                                           hyperparams_features=hyperparams_features,
                                                           logger=None, by_subset=True)

print(f"There are {len(user_level_data_sh)} subjects, of which {len(subjects_split_sh['train'])} train and {len(subjects_split_sh['test'])} test.")


In [None]:
task = "Anorexia"

writings_df_a = pd.read_pickle(root_dir + "/Processed Data/tokenized_df_" + task + ".pkl")

#CREATE VOCABULARY, PROCESS DATA, DATAGENERATOR
user_level_data_a, subjects_split_a, vocabulary_a = load_erisk_data(writings_df_a,train_prop= 1,
                                                           hyperparams_features=hyperparams_features,
                                                           logger=None, by_subset=True)

print(f"There are {len(user_level_data_a)} subjects, of which {len(subjects_split_a['train'])} train and {len(subjects_split_a['test'])} test.")

In [None]:
task = "Depression"

writings_df_d = pd.read_pickle(root_dir + "/Processed Data/tokenized_df_" + task + ".pkl")

#CREATE VOCABULARY, PROCESS DATA, DATAGENERATOR
user_level_data_d, subjects_split_d, vocabulary_d = load_erisk_data(writings_df_d,train_prop= 1,
                                                           hyperparams_features=hyperparams_features,
                                                           logger=None, by_subset=True)

print(f"There are {len(user_level_data_d)} subjects, of which {len(subjects_split_d['train'])} train and {len(subjects_split_d['test'])} test.")

# Basic data analysis for Data Chapter

Choosing Task

In [None]:
task = "Depression"

if task == "Self-Harm":
  writings_df = writings_df_sh
  user_level_data = user_level_data_sh
  subjects_split = subjects_split_sh
elif task == "Anorexia":
  writings_df = writings_df_a
  user_level_data = user_level_data_a
  subjects_split = subjects_split_a
elif task == "Depression":
  writings_df = writings_df_d
  user_level_data = user_level_data_d
  subjects_split = subjects_split_d
else:
  raise Exception("Unkown data set!")




Analyzing text lengths & number of texts

In [None]:
n_texts_train = []
text_lengths_train = []

n_texts_test = []
text_lengths_test = []
n_train =0
n_test = 0

text_lengths_train_pos = []
text_lengths_train_neg = []
train_pos = 0
train_neg = 0

for user, user_data in user_level_data.items():
  #test
  if len(user) >= 4 and user[-4:] == '0000':
    n_test+=1
    n_texts_test.append(len(user_data['texts']))

    for text in user_data['texts']:
        text_lengths_test.append(len(text))

  #train
  else:
    n_train+=1
    n_texts_train.append(len(user_data['texts']))

    if user_data['label'] ==1:
      train_pos+=1
      for text in user_data['texts']:
        text_lengths_train_pos.append(len(text))
    else:
      train_neg+=1
      for text in user_data['texts']:
        text_lengths_train_neg.append(len(text))

    for text in user_data['texts']:
      text_lengths_train.append(len(text))

print(f"Data set: {task}")
print(f"Average number of texts: {np.mean(np.concatenate([n_texts_test, n_texts_train]))}")
print(f"Average number of texts test: {np.mean(n_texts_test)}")
print(f"Average number of texts train: {np.mean(n_texts_train)}")

print(f"Average text length: {np.mean(np.concatenate([text_lengths_test, text_lengths_train]))}")
print(f"Average text length test: {np.mean(text_lengths_test)}")
print(f"Average text length train: {np.mean(text_lengths_train)}")

print(f"Average text length train POSITIVE: {np.mean(text_lengths_train_pos)}")
print(f"Average text length train NEGATIVE: {np.mean(text_lengths_train_neg)}")

# Define the maximum text length value for flexibility
max_text_length = 300
num_bins = 100

# Filter out text lengths over the defined maximum value
filtered_text_lengths_train_pos = [length for length in text_lengths_train_pos if length <= max_text_length]
filtered_text_lengths_train_neg = [length for length in text_lengths_train_neg if length <= max_text_length]


# Count the number of text lengths over the defined maximum value
num_over_max_pos = len(text_lengths_train_pos) - len(filtered_text_lengths_train_pos)
num_over_max_neg = len(text_lengths_train_neg) - len(filtered_text_lengths_train_neg)

max_length = 256
print(f"Percentage of posts deleted: {100*(num_over_max_neg+num_over_max_pos)/len(text_lengths_train)}%")

# Calculate the number of samples in each array
num_samples_pos = len(filtered_text_lengths_train_pos)
num_samples_neg = len(filtered_text_lengths_train_neg)

# Calculate the normalized histogram for both arrays
hist_pos, bins_pos = np.histogram(filtered_text_lengths_train_pos, bins=num_bins, range=(0, max_text_length))
hist_neg, bins_neg = np.histogram(filtered_text_lengths_train_neg, bins=num_bins, range=(0, max_text_length))

# Convert the histograms to percentages
hist_pos_percentage = (hist_pos / num_samples_pos) * 100
hist_neg_percentage = (hist_neg / num_samples_neg) * 100

# Width of each bar (set to half of the bin width to have bars next to each other)
bar_width = (max_text_length / num_bins) / 2

# Calculate the x positions for the bars
x_pos_pos = bins_pos[:-1]
x_pos_neg = bins_neg[:-1] + bar_width

# Plotting the histogram
plt.bar(x_pos_pos, hist_pos_percentage, width=bar_width, alpha=0.7, label='Depressed', align='center', color='tab:orange')
plt.bar(x_pos_neg, hist_neg_percentage, width=bar_width, alpha=0.7, label='Not Depressed', align='center', color='tab:blue')
plt.xlabel("Post Length (#words)")
plt.ylabel("Percentage")
plt.legend()
plt.title(str(task))
plt.grid(True)
plt.show()

Analyzing word usage (only for train)


In [None]:
#Emotion Lexicon
emotion_lexicon = load_NRC(hyperparams_features['nrc_lexicon_path'])
emotions = list(emotion_lexicon.keys())
print(emotions)

#LIWC Dicttionary
liwc_dict = load_LIWC(hyperparams_features['liwc_path'])
liwc_categories = set(liwc_dict.keys())
print(liwc_categories)

#Stopwords
stopwords_list = load_stopwords(hyperparams_features['stopwords_path'])
print(stopwords_list)

In [None]:
max_len = 256

total_words_pos = 0
total_words_neg = 0

total_emo_pos = np.zeros(len(emotions), dtype = int)
total_liwc_pos = np.zeros(len(liwc_categories), dtype=int)
total_stopwords_pos = np.zeros(len(stopwords_list), dtype = int)

total_emo_neg = np.zeros(len(emotions), dtype = int)
total_liwc_neg = np.zeros(len(liwc_categories), dtype=int)
total_stopwords_neg = np.zeros(len(stopwords_list), dtype = int)


for user, user_data in tqdm(user_level_data.items()):
  # if total_words_pos>0 and total_words_neg >0:
  #   break
  #train samples
  if len(user) >= 4 and not user[-4:] == '0000':
     for text in user_data['texts']:
      text = text[:max_len]   #only analyse up to maxlen

      encoded_emotions = np.array(encode_emotions(text, emotion_lexicon,emotions, relative = False))
      encoded_stopwords = np.array(encode_stopwords(text, stopwords_list, relative = False))
      encoded_liwc = np.array(encode_liwc_categories(text, liwc_categories,liwc_dict, relative = False))

      #positive
      if user_data['label'] ==1:
        total_words_pos += len(text)
        total_emo_pos += encoded_emotions
        total_liwc_pos += encoded_liwc
        total_stopwords_pos += encoded_stopwords
      #negative
      else:
        total_words_neg += len(text)
        total_emo_neg += encoded_emotions
        total_liwc_neg += encoded_liwc
        total_stopwords_neg += encoded_stopwords

rel_emo_pos = total_emo_pos/total_words_pos
rel_liwc_pos = total_liwc_pos/total_words_pos
rel_stopwords_pos = total_stopwords_pos/total_words_pos

rel_emo_neg = total_emo_neg/total_words_neg
rel_liwc_neg = total_liwc_neg/total_words_neg
rel_stopwords_neg = total_stopwords_neg/total_words_neg

In [None]:
#Saving the results to JSONs

import json

#making dictionaries with emotion/stopwrd/liwc : relative frequency
freq_emotions_pos = {}
freq_emotions_neg = {}

for i, emotion in enumerate(emotions):
  freq_emotions_pos[emotion] = rel_emo_pos[i]
  freq_emotions_neg[emotion] = rel_emo_neg[i]

freq_liwc_pos = {}
freq_liwc_neg = {}

for i, liwc_cat in enumerate(liwc_categories):
  freq_liwc_pos[liwc_cat] = rel_liwc_pos[i]
  freq_liwc_neg[liwc_cat] = rel_liwc_neg[i]

freq_stopwords_pos = {}
freq_stopwords_neg = {}

for i, stopwrd in enumerate(stopwords_list):
  freq_stopwords_pos[stopwrd] = rel_stopwords_pos[i]
  freq_stopwords_neg[stopwrd] = rel_stopwords_neg[i]

results = [freq_emotions_neg, freq_emotions_pos, freq_liwc_neg, freq_liwc_pos, freq_stopwords_neg, freq_stopwords_pos]
names_results = ["freq_emotions_neg", "freq_emotions_pos", "freq_liwc_neg", "freq_liwc_pos", "freq_stopwords_neg", "freq_stopwords_pos"]

#saving the dicts as json:

save_path_root = root_dir + "/Data Analysis/" + task

for i, results_dict in enumerate(results):

  save_path = save_path_root + "/" + names_results[i] + ".json"

  with open(save_path, "w") as file:
    json.dump(results_dict, file)

In [None]:
#analyzing the JSON saved results

#for task in ["Self-Harm, "Drpression", "Anorexia"]:
import json
save_path_root = root_dir + "/Data Analysis/" + task + "/"

with open(save_path_root + "freq_emotions_neg.json") as file:
  freq_emotions_neg = json.load(file)

with open(save_path_root + "freq_emotions_pos.json") as file:
  freq_emotions_pos = json.load(file)

with open(save_path_root + "freq_liwc_neg.json") as file:
  freq_liwc_neg = json.load(file)

with open(save_path_root + "freq_liwc_pos.json") as file:
  freq_liwc_pos = json.load(file)

with open(save_path_root + "freq_stopwords_neg.json") as file:
  freq_stopwords_neg = json.load(file)

with open(save_path_root + "freq_stopwords_pos.json") as file:
  freq_stopwords_pos = json.load(file)

negative_pos = freq_emotions_pos['negative']
negative_neg = freq_emotions_neg['negative']

positive_pos = freq_emotions_pos['positive']
positive_neg = freq_emotions_neg['positive']

sadness_pos = freq_emotions_pos['sadness']
sadness_neg = freq_emotions_neg['sadness']

health_pos = freq_liwc_pos['health']
health_neg = freq_liwc_neg['health']

personal_pos = (freq_stopwords_pos['i'] + freq_stopwords_pos['me'] + freq_stopwords_pos['my'] +freq_stopwords_pos['myself'])/4
personal_neg = (freq_stopwords_neg['i'] + freq_stopwords_neg['me'] + freq_stopwords_neg['my'] +freq_stopwords_neg['myself'])/4

disgust_pos = freq_emotions_pos['disgust']
disgust_neg = freq_emotions_neg['disgust']

friend_pos = freq_liwc_pos['friend']
friend_neg = freq_liwc_neg['friend']


print(f"People with {task} talk {100*(negative_pos-negative_neg)/negative_neg}% more negative.")

print(f"People with {task} talk {100*(positive_pos-positive_neg)/positive_neg}% more positive.")

print(f"People with {task} talk {100*(sadness_pos-sadness_neg)/sadness_neg}% more sadness.")

print(f"People with {task} talk {100*(health_pos-health_neg)/health_neg}% more health.")

print(f"People with {task} talk {100*(personal_pos-personal_neg)/personal_neg}% more personal.")

print(f"People with {task} talk {100*(disgust_pos-disgust_neg)/disgust_neg}% more disgust.")

print(f"People with {task} talk {100*(friend_pos-friend_neg)/friend_neg}% more friend.")


In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Replace the percentages below with your actual data
pos = [negative_pos, positive_pos, personal_pos, sadness_pos, health_pos, disgust_pos, friend_pos]
pos_pct = [100*cat for cat in pos]
neg = [negative_neg, positive_neg, personal_neg, sadness_neg, health_neg, disgust_neg, friend_neg]
neg_pct = [100*cat for cat in neg]
labels = ['Negative', 'Positive', 'Personal', 'Sadness', 'Health', 'Disgust', 'Friend']


# Calculate the number of pairs and set the width of the bars
num_pairs = len(labels)
bar_width = 0.35

# Create an array of positions for the bars
x = np.arange(num_pairs)

#plt.figure(figsize=(10, 30))

# Create the positive and negative bars
plt.bar(x+ bar_width, pos_pct, width=bar_width, color='tab:orange', label='Depressed')
plt.bar(x, neg_pct, width=bar_width, color='tab:blue', label='Not Depressed')

# Calculate percentage difference for each pair
percentage_diff = [(pos - neg) / neg * 100 for pos, neg in zip(pos_pct, neg_pct)]

# Add percentage difference text above each pair of bars in italics
for i, diff in enumerate(percentage_diff):
    plt.text(x[i] + bar_width / 2, max(pos_pct[i], neg_pct[i]) + 0.2, f'{diff:.1f}%', ha='center', fontstyle='italic')



# Add labels, title, and legend
plt.xlabel('Category')
plt.ylabel('Percentage of all words')
plt.title(f"{task}")
plt.xticks(x + bar_width / 2, labels)
plt.legend()

# Adjust the upper limit of the y-axis to make space for the text above the highest bar
plt.ylim(0, max(max(pos_pct), max(neg_pct)) + 1)  # Increase the '10' to provide more space if needed


# Add whitespace between the bars
plt.tight_layout()

# Show the plot
plt.show()


In [None]:
import statistics as stat
min_diff=100

total_diff = 0
all_diff = []

total_cats=0

for cat, percent in freq_emotions_pos.items():
  if freq_emotions_neg[cat]>0:
    diff = 100*(percent - freq_emotions_neg[cat])/freq_emotions_neg[cat]
    total_cats +=1
    total_diff+=abs(diff)
    all_diff.append(abs(diff))
  else:
    diff = percent

  if abs(diff) > min_diff:
    print(f"In emotions, people with {task} talk {diff} more about {cat}")



for cat, percent in freq_liwc_pos.items():
  if freq_liwc_neg[cat]>0:
    diff = 100*(percent - freq_liwc_neg[cat])/freq_liwc_neg[cat]
    total_cats +=1
    total_diff+=abs(diff)
    all_diff.append(abs(diff))
  else:
    diff = percent

  if abs(diff) > min_diff:
    print(f"In LIWC, people with {task} talk {diff} more about {cat}")

  total_cats +=1
  total_diff+=abs(diff)

for cat, percent in freq_stopwords_pos.items():
  if freq_stopwords_neg[cat]>0:
    diff = 100*(percent - freq_stopwords_neg[cat])/freq_stopwords_neg[cat]
    total_cats +=1
    total_diff+=abs(diff)
    all_diff.append(abs(diff))
  else:
    diff = percent

  total_cats +=1
  total_diff+=abs(diff)

  if abs(diff) > min_diff:
    print(f"In stopwords, people with {task} talk {diff} more about {cat}")

print(f"Average absolute difference across all categories for {task} is {total_diff/total_cats}%.")
print(f"Median absolute difference across all categories for {task} is {stat.median(all_diff)}%.")

print(stat.mean(all_diff))


# Analyzing Attention Scores for Results Chapter

Analyzing User-level attention

In [None]:
#root_dir = "/Users/ronhochstenbach/Desktop/Thesis/Data"
root_dir = "/content/drive/MyDrive/Thesis/Data"  #when cloning for colab


hyperparams_features = {
    "max_features": 20002,
    "embedding_dim": 300,
    "vocabulary_path": root_dir + '/Resources/vocabulary.pkl',
    "nrc_lexicon_path" : root_dir + "/Resources/NRC-emotion-lexicon-wordlevel-alphabetized-v0.92.txt",
    "liwc_path": root_dir + '/Resources/LIWC2007.dic',
    "stopwords_path": root_dir + '/Resources/stopwords.txt',
    "embeddings_path": root_dir + "/Resources/glove.840B.300d.txt",
    #"liwc_words_cached": "data/liwc_categories_for_vocabulary_erisk_clpsych_stop_20K.pkl"
    "BERT_path": root_dir + '/Resources/BERT-base-uncased/'
}

hyperparams = {
    "trainable_embeddings": True,
    "sum_layers": 1,
    'trainable_bert_layer': False,

    #Structurel
    "lstm_units": 128,
    "dense_bow_units": 20,
    "dense_numerical_units": 20,
    "lstm_units_user": 32,

    #Self-attention structure
    "num_heads": 3,
    "key_dim": 150,
    "num_layers":2,
    "use_positional_encodings": True,

    #Regularizers
    "dropout": 0.3,             #Appendix uban
    "l2_dense": 0.00001,        #Appendix uban (?)
    "l2_embeddings": 0.00001,   #Appendix uban (?)
    "norm_momentum": 0.1,

    "ignore_layer": ["bert_layer"],

    #Training
    "decay": 0.001,
    "lr": 0.0005,                   #appendix uban 0.0001 (han etc, 0.0005 hsan)
    "reduce_lr_factor": 0.5,        #originally 0.5
    "reduce_lr_patience": 55,        #originally 55
    "scheduled_reduce_lr_freq": 95,  #originally: 95
    "scheduled_reduce_lr_factor": 0.5,
    "freeze_patience": 2000,
    "threshold": 0.5,
    "early_stopping_patience": 5,

    "positive_class_weight": 2,     #6.5 = calculated, 2 = uban history & own hyperopt

    "maxlen": 256,
    "posts_per_user": None,
    "post_groups_per_user": None,
    "posts_per_group": 50,
    "batch_size": 32,   #normally 32
    "padding": "pre",
    "hierarchical": True,
    "sample_seqs": False,
    "sampling_distr": "exp",

}

hyperparams['optimizer'] = optimizers.legacy.Adam(learning_rate=hyperparams['lr'],
                                                  decay = hyperparams['decay'])

# with open('/Users/ronhochstenbach/Desktop/Thesis/Data/Resources/config.json', 'w') as file:
#     json.dump(hyperparams_features, file)

In [None]:
import pandas as pd

from load_save_model import load_saved_model_weights, load_params
from data_generator import DataGenerator_Base, DataGenerator_BERT
from data_loader import load_erisk_data

root_dir = "/content/drive/MyDrive/Thesis/Data"  #when cloning for colab
saved_path = root_dir + '/Final Trained Models (10 epochs)/Depression/Depression_HAN_2023-07-31 23:35:39.671308'
#hyperparams, hyperparams_features = load_params(saved_path, general_config_path="/content/drive/MyDrive/Thesis/Data/Resources/config.json")

task = "Self-Harm"          #"Self-Harm" - "Anorexia" - "Depression"
model_type = "HAN"          #"HAN" - "HAN_BERT"
print(f"Running {task} task using the {model_type} model!")


In [None]:
task = "Depression"

if task == "Self-Harm":
  writings_df = writings_df_sh
  user_level_data = user_level_data_sh
  subjects_split = subjects_split_sh
elif task == "Anorexia":
  writings_df = writings_df_a
  user_level_data = user_level_data_a
  subjects_split = subjects_split_a
elif task == "Depression":
  writings_df = writings_df_d
  user_level_data = user_level_data_d
  subjects_split = subjects_split_d
else:
  raise Exception("Unkown data set!")


Importing model, initializing datagenerator

In [None]:
model = load_saved_model_weights(saved_path, hyperparams, hyperparams_features, model_type, h5=True)

In [None]:
if model_type == "HAN" or model_type == "HSAN":
    data_gen_class = DataGenerator_Base
    data_generator_train = DataGenerator_Base(user_level_data, subjects_split, set_type='train',
                                              hyperparams_features=hyperparams_features,
                                              seq_len=hyperparams['maxlen'], batch_size=hyperparams['batch_size'],
                                              posts_per_group=hyperparams['posts_per_group'], post_groups_per_user=None,
                                              max_posts_per_user=hyperparams['posts_per_user'],
                                              compute_liwc=True,
                                              ablate_emotions='emotions' in hyperparams['ignore_layer'],
                                              ablate_liwc='liwc' in hyperparams['ignore_layer'])
elif model_type == "HAN_BERT" or model_type == "Con_HAN":
    data_gen_class = DataGenerator_BERT
    data_generator_train = DataGenerator_BERT(user_level_data, subjects_split, set_type='train',
                                              hyperparams_features=hyperparams_features, model_type=model_type,
                                              seq_len=hyperparams['maxlen'], batch_size=hyperparams['batch_size'],
                                              posts_per_group=hyperparams['posts_per_group'],
                                              post_groups_per_user=None,
                                              max_posts_per_user=hyperparams['posts_per_user'],
                                              compute_liwc=True,
                                              ablate_emotions='emotions' in hyperparams['ignore_layer'],
                                              ablate_liwc='liwc' in hyperparams['ignore_layer'])
else:
    raise Exception("Unknown type!")

In [None]:
import csv

if not model_type == "HAN_BERT":
  raise Exception("Use Bert for this!")

min_pos_chunks = 200

length_attn_pairs = []

total_chunk_attention_pos = np.zeros(50)
total_chunk_pos = 0

total_chunk_attention_neg = np.zeros(50)
total_chunk_neg = 0

for i, (x,y) in enumerate(tqdm(data_generator_train)):
  #Obtain post attention weights for the batch
  attention_layer = model.get_layer("attention_user")
  attention_function = tf.keras.backend.function(inputs=[model.input], outputs=[attention_layer.output])
  predictions = model.predict(x, verbose=False)
  attention_weights = np.squeeze(attention_function(x)[0])

  batch_tokens = x[0]

  for c in range(batch_tokens.shape[0]):    #in 32
    chunk_attentions = attention_weights[c]
    label = int(y[c])

    if label == 1:  #positive
      total_chunk_attention_pos += chunk_attentions
      total_chunk_pos += 1

    if label == 0:  #negative
      total_chunk_attention_neg += chunk_attentions
      total_chunk_neg +=1

    for p in range(batch_tokens.shape[1]):          #in 50

      length = np.count_nonzero(batch_tokens[c,p]) - 2 #Subtract special tokens
      post_attention = chunk_attentions[p]

      if length>0:
        length_attn_pairs.append([length, post_attention])

  print(f"After {i+1} batches, we have {total_chunk_pos} positive chunks, {total_chunk_neg} negative chunks, and {len(length_attn_pairs)} len/attn pairs!")

  #if total_chunk_pos >= min_pos_chunks:
  #  break

avg_chunk_att_pos = total_chunk_attention_pos / total_chunk_pos
avg_chunk_att_neg = total_chunk_attention_neg / total_chunk_neg

#saving to drive
name_chunck_att_pos = root_dir + "/Data Analysis/" + task + "/chunk_att_pos.csv"
name_chunck_att_neg = root_dir + "/Data Analysis/" + task + "/chunk_att_neg.csv"
name_length_att = root_dir + "/Data Analysis/" + task + "/name_length_att.csv"

np.savetxt(name_chunck_att_pos, avg_chunk_att_pos, delimiter=",")
np.savetxt(name_chunck_att_neg, avg_chunk_att_neg, delimiter=",")

with open(name_length_att, 'w', newline='') as f:
    csv_writer = csv.writer(f)
    csv_writer.writerows(length_attn_pairs)


Plotting user-level attention

In [None]:
task = "Anorexia"

root_dir = "/content/drive/MyDrive/Thesis/Data"

att_series_pos = np.genfromtxt(root_dir + '/Data Analysis/'+ task + '/chunk_att_pos.csv', delimiter=',')
att_series_pos = np.exp(att_series_pos)
att_series_pos = att_series_pos/sum(att_series_pos)

att_series_neg = np.genfromtxt(root_dir + '/Data Analysis/'+ task + '/chunk_att_neg.csv', delimiter=',')
att_series_neg = np.exp(att_series_neg)
att_series_neg = att_series_neg / sum(att_series_neg)


x_values = np.arange(1, 51)
plt.plot(x_values, att_series_pos, label='Anorexic', color='tab:orange')
plt.plot(x_values, att_series_neg, label='Not Anorexic', color = 'tab:blue')

# Add labels and title
plt.xlabel('Post')
plt.ylabel('Attention Score')
plt.title(task)

# Add a legend to distinguish the two arrays
plt.legend()

# Show the plot
plt.show()




Correlating user-level attention with post length

In [None]:
def resample_data(len_att, num_bins=256, binsize=100, replace = False):
    lens = len_att[:, 0]
    atts = len_att[:, 1]

    # Calculate the histogram to divide the data into bins
    hist, bin_edges = np.histogram(lens, bins=num_bins)

    # Initialize arrays to store the resampled data
    resampled_data = np.zeros((num_bins * binsize, 2))

    for i in range(num_bins):
        # Identify the data points within the current bin
        mask = (lens >= bin_edges[i]) & (lens < bin_edges[i + 1])
        bin_data = len_att[mask]

        if len(bin_data) >= binsize:
            # Randomly sample points from the current bin
            bin_indices = np.random.choice(len(bin_data), binsize, replace=False)
        else:
          if replace:
            # If the current bin has fewer data points than binsize, sample with replacement
            bin_indices = np.random.choice(len(bin_data), binsize, replace=True)
          else:
            raise Exception("samples with replacement")

        # Select the data points from the current bin
        bin_data = bin_data[bin_indices]

        # Assign the selected data points to the resampled array
        resampled_data[i * binsize: (i + 1) * binsize] = bin_data

    return resampled_data

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np

root_dir = "/content/drive/MyDrive/Thesis/Data"

bins = 25
binsize = 150
replace = False

# Create a 1x3 grid for subplots
fig, axes = plt.subplots(1, 3, figsize=(15, 5))

for i, task in enumerate(["Self-Harm", "Anorexia", "Depression"]):
    len_att = np.genfromtxt(root_dir + '/Data Analysis/'+ task + '/name_length_att.csv', delimiter=',')

    #scaling attns
    len_att[:,1] = np.exp(len_att[:,1])
    len_att[:,1] = len_att[:,1]/(sum(len_att[:,1])*50/len(len_att[:,1]))


    new = resample_data(len_att, num_bins=bins, binsize=binsize, replace=replace)

    # Plot the hexbin plot in the corresponding subplot
    hb = axes[i].hexbin(new[:, 0], new[:, 1], gridsize=bins, cmap='Blues', mincnt=1)

    # Add labels and title to the subplot
    if i==1:
      axes[i].set_xlabel('Post Length')
    if i==0:
      axes[i].set_ylabel('Attention Score')
    axes[i].set_title(task)

    # Show the colorbar to indicate the density
    if i==2:
      cb = fig.colorbar(hb, ax=axes[i])
      cb.set_label('Density')

# Adjust spacing between subplots and display the plot
plt.tight_layout()
plt.show()


Post-level attention

In [None]:
#Emotion Lexicon
emotion_lexicon = load_NRC(hyperparams_features['nrc_lexicon_path'])
emotions = list(emotion_lexicon.keys())
print(emotions)

#LIWC Dicttionary
liwc_dict = load_LIWC(hyperparams_features['liwc_path'])
liwc_categories = set(liwc_dict.keys())
print(liwc_categories)

#Stopwords
stopwords_list = load_stopwords(hyperparams_features['stopwords_path'])
print(stopwords_list)

pronouns=["i", "me", "my", "mine", "myself"]

vocabulary = load_vocabulary(hyperparams_features['vocabulary_path'])

Adapt inititialize models to pass sentEncoder

In [None]:
import tensorflow as tf
import torch
from keras.models import Model
from keras.layers import Dense, Dropout, Embedding, LSTM, Lambda, BatchNormalization, TimeDistributed, \
    Input, concatenate, Flatten, RepeatVector, Activation, Multiply, Permute, MultiHeadAttention
from keras_nlp.layers import SinePositionEncoding
from keras import regularizers
from keras import backend as K
from keras.metrics import AUC, Precision, Recall
from metrics import Metrics
from resource_loader import load_embeddings
from transformers import TFBertModel, TFRobertaModel, TFAlbertModel
from auxilliary_functions import create_embeddings

def build_HAN(hyperparams, hyperparams_features,
                             emotions_dim, stopwords_list_dim, liwc_categories_dim,
                             ignore_layer=[]):

    embedding_matrix = load_embeddings(hyperparams_features['embeddings_path'],
                                       hyperparams_features['embedding_dim'],
                                       hyperparams_features['vocabulary_path'])

    # Post/sentence representation - word sequence
    tokens_features = Input(shape=(hyperparams['maxlen'],), name='word_seq')
    embedding_layer = Embedding(hyperparams_features['max_features'],
                                hyperparams_features['embedding_dim'],
                                input_length=hyperparams['maxlen'],
                                embeddings_regularizer=regularizers.l2(hyperparams['l2_embeddings']),
                                weights=[embedding_matrix],
                                trainable=hyperparams['trainable_embeddings'],
                                name='embeddings_layer')(tokens_features)

    embedding_layer = Dropout(hyperparams['dropout'], name='embedding_dropout')(embedding_layer)

    lstm_layers = LSTM(hyperparams['lstm_units'],
                       return_sequences='attention' not in ignore_layer,
                       name='LSTM_layer')(embedding_layer)

    # Attention
    if 'attention' not in ignore_layer:
        attention_layer = Dense(1, activation='tanh', name='attention')
        attention = attention_layer(lstm_layers)
        attention = Flatten()(attention)
        attention_output = Activation('softmax')(attention)
        attention = RepeatVector(hyperparams['lstm_units'])(attention_output)
        attention = Permute([2, 1])(attention)

        sent_representation = Multiply()([lstm_layers, attention])
        sent_representation = Lambda(lambda xin: K.sum(xin, axis=1),
                                     output_shape=(hyperparams['lstm_units'],)
                                     )(sent_representation)
    else:
        sent_representation = lstm_layers

    if 'batchnorm' not in ignore_layer:
        sent_representation = BatchNormalization(axis=1, momentum=hyperparams['norm_momentum'],
                                                 name='sent_repr_norm')(sent_representation)

    sent_representation = Dropout(hyperparams['dropout'], name='sent_repr_dropout')(sent_representation)

    # Other features
    numerical_features_history = Input(shape=(
        hyperparams['posts_per_group'],
        emotions_dim + 1 + liwc_categories_dim
    ), name='numeric_input_hist')  # emotions and pronouns
    sparse_features_history = Input(shape=(
        hyperparams['posts_per_group'],
        stopwords_list_dim
    ), name='sparse_input_hist')  # stopwords

    posts_history_input = Input(shape=(hyperparams['posts_per_group'],
                                       hyperparams['maxlen']
                                       ), name='hierarchical_word_seq_input')

    # Hierarchy
    sentEncoder = Model(inputs=tokens_features,
                        outputs=sent_representation, name='sentEncoder')
    sentEncoder.summary()

    sentEncoder.compile(hyperparams['optimizer'], K.binary_crossentropy,
                               metrics=[AUC(), Precision(), Recall()])


    return sentEncoder

def build_HAN_BERT(hyperparams, hyperparams_features, model_type,
                             emotions_dim, stopwords_list_dim, liwc_categories_dim,
                             ignore_layer=[]):

    # Post/sentence representation - word sequence
    tokens_features_ids = Input(shape=(hyperparams['maxlen'],), name='word_seq_ids',dtype=tf.int32)
    tokens_features_attnmasks = Input(shape=(hyperparams['maxlen'],), name='word_seq_attnmasks',dtype=tf.int32)

    #extracting the last four hidden states and summing them
    if model_type == "HAN_BERT":
        # BERT_embedding_layer = TFBertModel.from_pretrained('bert-base-uncased')(
        #                                                     tokens_features_ids, attention_mask=tokens_features_attnmasks,
        #                                                     output_hidden_states=True, return_dict=True)[
        #                                                                            'hidden_states'][-4:]
        BERT_embedding_layer = TFBertModel.from_pretrained("prajjwal1/bert-tiny", from_pt=True)(
                                                            tokens_features_ids, attention_mask=tokens_features_attnmasks,
                                                            output_hidden_states=True, return_dict=True)[
                                                                                   'hidden_states'][-hyperparams['sum_layers']:]
        # BERT_embedding_layer = TFAlbertModel.from_pretrained("albert-base-v2", from_pt=True)(
        #                                                 tokens_features_ids, attention_mask=tokens_features_attnmasks,
        #                                                 output_hidden_states=True, return_dict=True)[
        #                                                                        'hidden_states'][-hyperparams['sum_layers']:]

    elif model_type == "HAN_RoBERTa":
        BERT_embedding_layer = TFRobertaModel.from_pretrained('roberta-base')(
                                                            tokens_features_ids, attention_mask=tokens_features_attnmasks,
                                                            output_hidden_states=True, return_dict=True)[
                                                                                   'hidden_states'][-4:]
    else:
        Exception("Unknown model type!")

    #embedding_layer = Lambda(lambda x: tf.add_n([layer for layer in x]))(BERT_embedding_layer)
    embedding_layer = Lambda(lambda x: tf.add_n(x))(BERT_embedding_layer)

    embedding_layer = Dropout(hyperparams['dropout'], name='embedding_dropout')(embedding_layer)

    lstm_layers = LSTM(hyperparams['lstm_units'],
                       return_sequences='attention' not in ignore_layer,
                       name='LSTM_layer')(embedding_layer)

    # Attention
    if 'attention' not in ignore_layer:
        attention_layer = Dense(1, activation='tanh', name='attention')
        attention = attention_layer(lstm_layers)
        attention = Flatten()(attention)
        attention_output = Activation('softmax')(attention)
        attention = RepeatVector(hyperparams['lstm_units'])(attention_output)
        attention = Permute([2, 1])(attention)

        sent_representation = Multiply()([lstm_layers, attention])
        sent_representation = Lambda(lambda xin: K.sum(xin, axis=1),
                                     output_shape=(hyperparams['lstm_units'],)
                                     )(sent_representation)
    else:
        sent_representation = lstm_layers

    if 'batchnorm' not in ignore_layer:
        sent_representation = BatchNormalization(axis=1, momentum=hyperparams['norm_momentum'],
                                                 name='sent_repr_norm')(sent_representation)

    sent_representation = Dropout(hyperparams['dropout'], name='sent_repr_dropout')(sent_representation)

    # Other features
    numerical_features_history = Input(shape=(
        hyperparams['posts_per_group'],
        emotions_dim + 1 + liwc_categories_dim
    ), name='numeric_input_hist')  # emotions and pronouns
    sparse_features_history = Input(shape=(
        hyperparams['posts_per_group'],
        stopwords_list_dim
    ), name='sparse_input_hist')  # stopwords

    post_history_ids = Input(shape=(hyperparams['posts_per_group'],
                                       hyperparams['maxlen']
                                       ), name='hierarchical_word_seq_input_ids')
    post_history_attnmasks = Input(shape=(hyperparams['posts_per_group'],
                                       hyperparams['maxlen']
                                       ), name='hierarchical_word_seq_input_attnmasks')

    # Hierarchy
    sentEncoder = Model(inputs=[tokens_features_ids,tokens_features_attnmasks],
                        outputs=sent_representation, name='sentEncoder')

    sentEncoder.summary()

    sentEncoder.compile(hyperparams['optimizer'], K.binary_crossentropy,
                            metrics=[AUC(), Precision(), Recall()])

    return sentEncoder

def build_Context_HAN(hyperparams, hyperparams_features,
                             emotions_dim, stopwords_list_dim, liwc_categories_dim,
                             ignore_layer=[]):

    # Post/sentence representation - word sequence
    tokens_features_ids = Input(shape=(hyperparams['maxlen'],), name='word_seq_ids',dtype=tf.int32)
    tokens_features_attnmasks = Input(shape=(hyperparams['maxlen'],), name='word_seq_attnmasks',dtype=tf.int32)

    #extracting the last four hidden states and summing them
    BERT_embedding_layer = TFBertModel.from_pretrained("prajjwal1/bert-tiny", from_pt=True)(
                                                        tokens_features_ids, attention_mask=tokens_features_attnmasks,
                                                        output_hidden_states=True, return_dict=True)[
                                                                               'hidden_states'][-hyperparams['sum_layers']:]

    #embedding_layer = Lambda(lambda x: tf.add_n([layer for layer in x]))(BERT_embedding_layer)
    embedding_layer = Lambda(lambda x: tf.add_n(x))(BERT_embedding_layer)

    embedding_layer = Dropout(hyperparams['dropout'], name='embedding_dropout')(embedding_layer)

    lstm_layers = LSTM(hyperparams['lstm_units'],
                       return_sequences='attention' not in ignore_layer,
                       name='LSTM_layer')(embedding_layer)

    # Attention
    if 'attention' not in ignore_layer:
        attention_layer = Dense(1, activation='tanh', name='attention')
        attention = attention_layer(lstm_layers)
        attention = Flatten()(attention)
        attention_output = Activation('softmax')(attention)
        attention = RepeatVector(hyperparams['lstm_units'])(attention_output)
        attention = Permute([2, 1])(attention)

        sent_representation = Multiply()([lstm_layers, attention])
        sent_representation = Lambda(lambda xin: K.sum(xin, axis=1),
                                     output_shape=(hyperparams['lstm_units'],)
                                     )(sent_representation)
    else:
        sent_representation = lstm_layers

    if 'batchnorm' not in ignore_layer:
        sent_representation = BatchNormalization(axis=1, momentum=hyperparams['norm_momentum'],
                                                 name='sent_repr_norm')(sent_representation)

    sent_representation = Dropout(hyperparams['dropout'], name='sent_repr_dropout')(sent_representation)

    # Other features
    numerical_features_history = Input(shape=(
        hyperparams['posts_per_group'],
        emotions_dim + 1 + liwc_categories_dim
    ), name='numeric_input_hist')  # emotions and pronouns
    sparse_features_history = Input(shape=(
        hyperparams['posts_per_group'],
        stopwords_list_dim
    ), name='sparse_input_hist')  # stopwords

    post_history_ids = Input(shape=(hyperparams['posts_per_group'],
                                       hyperparams['maxlen']
                                       ), name='hierarchical_word_seq_input_ids')
    post_history_attnmasks = Input(shape=(hyperparams['posts_per_group'],
                                       hyperparams['maxlen']
                                       ), name='hierarchical_word_seq_input_attnmasks')

    # Hierarchy
    sentEncoder = Model(inputs=[tokens_features_ids,tokens_features_attnmasks],
                        outputs=sent_representation, name='sentEncoder')


    sentEncoder.compile(hyperparams['optimizer'], K.binary_crossentropy,
                            metrics=[AUC(), Precision(), Recall()])

    return sentEncoder

In [None]:
def initialize_model(hyperparams, hyperparams_features, model_type,
                     logger=None, session=None, transfer=False):
    if not logger:
        logger = logging.getLogger('training')
        ch = logging.StreamHandler(sys.stdout)
        # create formatter
        formatter = logging.Formatter("%(asctime)s;%(levelname)s;%(message)s")
        # add formatter to ch
        ch.setFormatter(formatter)
        # add ch to logger
        logger.addHandler(ch)
        logger.setLevel(logging.DEBUG)
    logger.info("Initializing model...\n")
    if 'emotions' in hyperparams['ignore_layer']:
        emotions_dim = 0
    else:
        emotions = load_NRC(hyperparams_features['nrc_lexicon_path'])
        emotions_dim = len(emotions)
    if 'liwc' in hyperparams['ignore_layer']:
        liwc_categories_dim = 0
    else:
        liwc_categories = load_LIWC(hyperparams_features['liwc_path'])
        liwc_categories_dim = len(liwc_categories)
    if 'stopwords' in hyperparams['ignore_layer']:
        stopwords_dim = 0
    else:
        stopwords_list = load_stopwords(hyperparams_features['stopwords_path'])
        stopwords_dim = len(stopwords_list)

    # Initialize model

    if model_type == 'HAN':
        sentEncoder = build_HAN(hyperparams, hyperparams_features,
                                         emotions_dim, stopwords_dim, liwc_categories_dim,
                                         ignore_layer=hyperparams['ignore_layer'])
    elif model_type == 'HAN_BERT' or model_type == "HAN_RoBERTa":
        sentEncoder = build_HAN_BERT(hyperparams, hyperparams_features, model_type,
                                         emotions_dim, stopwords_dim, liwc_categories_dim,
                                         ignore_layer=hyperparams['ignore_layer'])
    elif model_type == 'Con_HAN':
        sentEncoder = build_Context_HAN(hyperparams, hyperparams_features,
                                          emotions_dim, stopwords_dim, liwc_categories_dim,
                                          ignore_layer=hyperparams['ignore_layer'])
    else:
        raise Exception("Unknown model!")

    return sentEncoder

In [None]:
def load_saved_model_weights(model_path, hyperparams, hyperparams_features, model_type, h5=False):
    metrics_class = Metrics(threshold=hyperparams['threshold'])
    dependencies = {
    'f1_m': metrics_class.f1_m,
    'precision_m': metrics_class.precision_m,
    'recall_m': metrics_class.recall_m,
    }
    loaded_sentEncoder = initialize_model(hyperparams, hyperparams_features, model_type)
    loaded_sentEncoder.summary()
    path = model_path + "_weights"
    by_name = False
    if h5:
        path += ".h5"
        by_name=True
    #loaded_model.load_weights(path, by_name=by_name)
    loaded_sentEncoder.load_weights(path, by_name=True)
    return loaded_sentEncoder

In [None]:
#importing models (takes 3,5 mins)

hyperparams['optimizer'] = optimizers.legacy.Adam(learning_rate=hyperparams['lr'],
                                                  decay = hyperparams['decay'])
hyperparams['batch_size'] = 1


root_dir = "/content/drive/MyDrive/Thesis/Data"

han_path = root_dir + '/Final Trained Models (10 epochs)/Depression/Depression_HAN_2023-07-31 23:35:39.671308'
bert_path = root_dir +  '/Final Trained Models (10 epochs)/Depression/Depression_HAN_BERT_2023-08-01 18:29:44.243405'
conHan_path = root_dir + '/Final Trained Models (10 epochs)/Depression/Depression_Con_HAN_2023-07-28 23:52:39.351657'

# model_type = "HAN"
# han_model, sentEncoder_HAN = load_saved_model_weights(han_path, hyperparams, hyperparams_features, model_type, h5=True)

# model_type = "HAN_BERT"
# sentEncoder_BERT = load_saved_model_weights(bert_path, hyperparams, hyperparams_features, model_type, h5=True)

model_type = "Con_HAN"
sentEncoder_BERT = load_saved_model_weights(conHan_path, hyperparams, hyperparams_features, model_type, h5=True)



In [None]:
from transformers import BertTokenizerFast
from keras.utils import pad_sequences

def encode_text_BERT(tokens):
  tokenizer = BertTokenizerFast.from_pretrained('prajjwal1/bert-tiny',
                                                      do_lower_case=True)
  encodings = tokenizer(tokens, add_special_tokens=True, max_length=hyperparams['maxlen'],
                              padding='max_length', truncation=True,
                              return_attention_mask=True, is_split_into_words=True
                        )
  encoded_token_ids = encodings['input_ids']
  encoded_token_attnmasks = encodings['attention_mask']
  encoded_emotions = encode_emotions(tokens, emotion_lexicon, emotions)
  encoded_pronouns = encode_pronouns(tokens, pronouns)
  encoded_stopwords = encode_stopwords(tokens, stopwords_list)
  encoded_liwc = encode_liwc_categories(tokens, liwc_categories, liwc_dict)

  user_token_ids = encoded_token_ids
  user_token_attnmasks = encoded_token_attnmasks

  user_categ_data = [[encoded_emotions + [encoded_pronouns] + encoded_liwc]]
  user_sparse_data = [[encoded_stopwords]]

  return (user_token_ids, user_token_attnmasks, user_categ_data, user_sparse_data)


def encode_text_HAN(tokens):
  # Using voc_size-1 value for OOV token
  encoded_tokens = [vocabulary.get(w, hyperparams_features['max_features'] - 1) for w in tokens]
  encoded_emotions = encode_emotions(tokens, emotion_lexicon, emotions)
  encoded_pronouns = encode_pronouns(tokens, pronouns)
  encoded_stopwords = encode_stopwords(tokens, stopwords_list)
  encoded_liwc = encode_liwc_categories(tokens, liwc_categories, liwc_dict)

  user_tokens = [np.array(pad_sequences([encoded_tokens], maxlen=hyperparams['maxlen'],
                                                      padding="pre",
                                                      truncating="pre"))]
  user_categ_data = [[encoded_emotions + [encoded_pronouns] + encoded_liwc]]
  user_sparse_data = [[encoded_stopwords]]

  return (user_tokens, user_categ_data, user_sparse_data
          )

def get_post_attentions(sentence, model, model_type):

  if model_type == "HAN":
    sentence = sentence[0]
  elif model_type == "HAN_BERT":
    sentence_ids = np.array(sentence[0]).reshape((1, -1))
    sentence_masks = np.array(sentence[1]).reshape((1, -1))
    print(sentence_ids)
    print(sentence_masks)

  attention_layer = model.get_layer("attention")

  attention_function = tf.keras.backend.function(inputs=[model.input], outputs=[attention_layer.output])

  if model_type == "HAN":
    predictions = model.predict(sentence, verbose=False)
    attention_weights = np.squeeze(attention_function(sentence)[0])
  elif model_type == "HAN_BERT":
    predictions = model.predict([sentence_ids, sentence_masks], verbose=False)
    attention_weights = np.squeeze(attention_function([sentence_ids, sentence_masks])[0])

  attention_weights = np.exp(attention_weights)
  attention_weights = attention_weights / sum(attention_weights)

  return attention_weights


In [None]:
sentence = "I am feeling very blue, despite wearing a blue t-shirt".split()
hyperparams['batch_size'] = 1


# HAN_encoded = encode_text_HAN(sentence)
# weights_HAN = get_post_attentions(HAN_encoded, sentEncoder_HAN, "HAN")[-len(sentence):]

BERT_encoded = encode_text_BERT(sentence)
weights_BERT = get_post_attentions(BERT_encoded, sentEncoder_BERT, "HAN_BERT")
print(weights_BERT)


for i, word in enumerate(sentence):
  print(f"{word}: {weights_BERT[i+1]}")



Visualizing multi-head self-attention

In [None]:
#Loading model (DEPRESSION)
from load_save_model import load_saved_model_weights

hyperparams['optimizer'] = optimizers.legacy.Adam(learning_rate=hyperparams['lr'],
                                                  decay = hyperparams['decay'])
hyperparams['batch_size'] = 1

root_dir = "/content/drive/MyDrive/Thesis/Data"

model_type = "Con_HAN"
ConHan_path = root_dir + '/Final Trained Models (10 epochs)/Depression/Depression_Con_HAN_2023-07-28 23:52:39.351657'

ConHan_model = load_saved_model_weights(ConHan_path, hyperparams, hyperparams_features, model_type, h5=True)

In [None]:
from data_generator import DataGenerator_BERT
#Create DataGen
data_gen= DataGenerator_BERT(user_level_data, subjects_split, set_type='test',
                                          hyperparams_features=hyperparams_features, model_type=model_type,
                                          seq_len=hyperparams['maxlen'], batch_size=hyperparams['batch_size'],
                                          posts_per_group=hyperparams['posts_per_group'],
                                          post_groups_per_user=None,
                                          max_posts_per_user=hyperparams['posts_per_user'],
                                          compute_liwc=True,
                                          ablate_emotions='emotions' in hyperparams['ignore_layer'],
                                          ablate_liwc='liwc' in hyperparams['ignore_layer'])


In [None]:
from transformers import BertTokenizerFast
tokenizer = BertTokenizerFast.from_pretrained('prajjwal1/bert-tiny',
                                                           do_lower_case=True)

#returns the texts of a chunk in a dict
def text_dict(x):
  token_ids = x[0][0]
  texts = {}

  for i in range(token_ids.shape[0]):
    ids = token_ids[i,:]
    text = tokenizer.decode(ids, skip_special_tokens=True)
    texts[str(i)] = text

  return texts

In [None]:
import matplotlib.pyplot as plt
import json

printed = 0

print_top = 3

for iter, (x, y) in enumerate(data_gen):
  if y == 1: #only check positives
    printed+=1
    print(iter)

    texts = text_dict(x)

    #obtaining weights of the final attention layer.
    attention_input = tf.keras.backend.function(inputs=[ConHan_model.input], outputs=[ConHan_model.get_layer("MH-attention_layer_0").output])(x)[0]
    attention_layer = ConHan_model.get_layer("MH-attention_layer_1")
    _, attention_scores = attention_layer(attention_input, attention_input, return_attention_scores=True)

    # Reshape attention_scores from (1, 3, 50, 50) to (3, 50, 50)
    attention_scores = tf.squeeze(attention_scores, axis=0)

    # Create a figure with 3 subplots
    fig, axs = plt.subplots(1, 3, figsize=(15, 5))

    # Loop through attention scores and create subplots

    saved_texts = {}

    for i, ax in enumerate(axs):
        ax.set_title(f"Head {i+1}")
        attention_map = attention_scores[i]

        #Printing the texts of the 3 highest attention posts of this head
        total_att_per_post = np.sum(attention_map, axis=0)
        highest_att_posts = np.argsort(total_att_per_post)[-print_top:]

        for index in highest_att_posts:
          print(f"Post {index}: {texts[str(index)]}")

          if str(index) not in saved_texts.keys():
            saved_texts[str(index)] = texts[str(index)]

        im = ax.imshow(attention_map, cmap='Blues')  # You can use other colormaps as well

        # Add numbers to horizontal and vertical axes
        ax.set_xticks(np.arange(0, 50, 5))  # Add ticks at positions 0, 10, 20, ..., 40
        ax.set_yticks(np.arange(0, 50, 10))  # Add ticks at positions 0, 10, 20, ..., 40
        ax.set_xticklabels(np.arange(0, 50, 5))  # Label ticks as 1, 11, 21, ..., 50
        ax.set_yticklabels(np.arange(50, 0, -10))  # Label ticks as 50, 40, 30, ..., 1

        ax.axis('on')  # Turn axes back on

    # Add colorbar
    cbar_ax = fig.add_axes([0.95, 0.15, 0.02, 0.7])  # [left, bottom, width, height]
    cbar = fig.colorbar(im, cax=cbar_ax)
    cbar.set_label('Attention Score', rotation=270, labelpad=15)  # Add a label above the colorbar

    # Avoid using plt.tight_layout()
    plt.subplots_adjust(right=0.92, wspace=0.1)  # Adjust the right side and the spacing between subplots

    #saving texts and plot
    path = root_dir  + '/Data Analysis/AttPlots/'

    plt.savefig(path+str(iter)+'_attention_plot.png', bbox_inches='tight', dpi=300)
    plt.show()

    with open(path+str(iter)+'_attention_texts.json', 'w') as file:
      json.dump(saved_texts, file)

  if printed==20:
    break