<a href="https://colab.research.google.com/github/danielhou13/cogs402longformer/blob/main/src/Token_attention_with_head_importance_pdf.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

This notebook is primarily the same as the section on converting to a PDF in [Token_attention_with_head_importance](https://colab.research.google.com/drive/1iVojJQp0CZS484tMZqIizosXPLxgKvRX?usp=sharing); however, this notebook solely focuses on converting the attentions into a PDF visualization. This notebook also predicts over the dataset and finds interesting examples to visualize such as false negatives, false postiives, and very confident predictions.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# import sys
# sys.path.append('/content/drive/My Drive/{}'.format("cogs402longformer/"))

In [None]:
pip install datasets --quiet

In [None]:
pip install transformers

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


Import Dataset and Model

In [None]:
import os

import numpy as np
import pandas as pd

import torch
import torch.nn as nn

Import the Reserach Papers dataset

In [None]:
from datasets import load_dataset
from transformers import AutoModelForSequenceClassification, AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained('allenai/longformer-base-4096')
model_path = 'danielhou13/longformer-finetuned_papers_v2'
model_path2 = 'danielhou13/longformer-finetuned-news-cogs402'
model_path3 = 'allenai/longformer-base-4096'

# def longformer_finetuned_papers(model):
#     model = AutoModelForSequenceClassification.from_pretrained(model, num_labels = 2)
#     return model

# def preprocess_function(tokenizer, example, max_length):
#     example.update(tokenizer(example['text'], padding='max_length', max_length=max_length, truncation=True))
#     return example

# def get_papers_dataset(dataset_type):
#     max_length = 2048
#     dataset = load_dataset("danielhou13/cogs402dataset")[dataset_type]

#     # tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
#     dataset = dataset.map(lambda x: preprocess_function(tokenizer, x, max_length), batched=True)
#     setattr(dataset, 'input_columns', ['input_ids', 'attention_mask'])
#     setattr(dataset, 'target_columns', ['labels'])
#     setattr(dataset, 'max_length', max_length)
#     setattr(dataset, 'tokenizer', tokenizer)
#     return dataset

# def papers_test_set():
#     return get_papers_dataset('test')

Import the news dataset

In [None]:
def preprocess_function(tokenizer, example, max_length):
    example.update(tokenizer(example['text'], padding='max_length', max_length=max_length, truncation=True))
    return example

def longformer_finetuned_news(model):
    model = AutoModelForSequenceClassification.from_pretrained(model, num_labels = 2)
    return model

def get_news_dataset(dataset_type):
    max_length = 2048
    dataset = load_dataset("danielhou13/cogs402dataset2")[dataset_type]

    tokenizer = AutoTokenizer.from_pretrained('allenai/longformer-base-4096')
    dataset = dataset.map(lambda x: preprocess_function(tokenizer, x, max_length), batched=True)

    labels = map(int, dataset['hyperpartisan'])
    print(type(dataset['hyperpartisan']))
    labels = list(labels)
    dataset = dataset.add_column("labels", labels)

    dataset = dataset.remove_columns(['title', 'hyperpartisan', 'url', 'published_at', 'bias'])
    print(dataset)
    setattr(dataset, 'input_columns', ['input_ids', 'attention_mask'])
    setattr(dataset, 'target_columns', ['labels'])
    setattr(dataset, 'max_length', max_length)
    setattr(dataset, 'tokenizer', tokenizer)
    return dataset

def news_train_set():
    return get_news_dataset('train')

def news_test_set():
    return get_news_dataset('validation')

Load papers model and dataset and preprocess it

In [None]:
cogs402_test = news_test_set()
model = longformer_finetuned_news(model_path)
columns = cogs402_test.input_columns + cogs402_test.target_columns
print(columns)
cogs402_test.set_format(type='torch', columns=columns)
cogs402_test=cogs402_test.remove_columns(['text'])

Using custom data configuration danielhou13--cogs402dataset2-52067477e0d49a06
Reusing dataset parquet (/root/.cache/huggingface/datasets/danielhou13___parquet/danielhou13--cogs402dataset2-52067477e0d49a06/0.0.0/7328ef7ee03eaf3f86ae40594d46a1cec86161704e02dd19f232d81eee72ade8)


  0%|          | 0/2 [00:00<?, ?it/s]



  0%|          | 0/3 [00:00<?, ?ba/s]

<class 'list'>
Dataset({
    features: ['text', 'input_ids', 'attention_mask', 'labels'],
    num_rows: 2500
})
['input_ids', 'attention_mask', 'labels']


Load news model and dataset and preprocess it

In [None]:
# cogs402_test = news_test_set()
# model = longformer_finetuned_news()
# columns = cogs402_test.input_columns + cogs402_test.target_columns
# print(columns)
# cogs402_test.set_format(type='torch', columns=columns)

In [None]:
if torch.cuda.is_available():
    model = model.cuda()

print(model.device)

cuda:0


Predict using the model on the selected dataset

In [None]:
from transformers import Trainer, TrainingArguments

batch_size = 1
gradient_acc = 4
model_name = f"longformer-finetuned_papers"
training_args = TrainingArguments(output_dir=f"models/{model_name}",
                                  num_train_epochs = 2,
                                  learning_rate=2e-5,
                                  per_device_train_batch_size=batch_size,
                                  per_device_eval_batch_size=batch_size,
                                  weight_decay=0.01,
                                  evaluation_strategy="epoch",
                                  disable_tqdm=False,
                                  push_to_hub=False,
                                  log_level="error",
                                  fp16=True,
                                  gradient_accumulation_steps=gradient_acc,
                                  gradient_checkpointing=True,
                                  save_strategy = "epoch")

F1 and accuracy are good general metrics for model performance. Recall and precision can be used if we require low false negatives or false positives.

In [None]:
from sklearn.metrics import accuracy_score, f1_score

def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    f1 = f1_score(labels, preds, average="weighted")
    acc = accuracy_score(labels, preds)
    return {"accuracy": acc, "f1": f1}

In [None]:
from transformers import DataCollatorWithPadding
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

trainer = Trainer(
    model=model,
    args=training_args,
    compute_metrics=compute_metrics,
    tokenizer=tokenizer,
    data_collator = data_collator
)

In [None]:
preds_output = trainer.predict(cogs402_test)

False negatives and False postives are usually very interesting examples.

In [None]:
y_preds = np.argmax(preds_output.predictions, axis=1)
y_true = np.array(cogs402_test["labels"])

In [None]:
diff = y_true-y_preds
correct = np.where(diff == 0)[0]

pos = np.where((y_true-y_preds == 0) & (y_true==1))[0]
neg = np.where((y_true-y_preds == 0) & (y_true==0))[0]

false_pos = np.where(diff == -1)[0]
false_neg = np.where(diff == 1)[0]

print('Correctly classified: ', correct)

print('cor pos: ', pos)
print('cor neg: ', neg)

print('False positives: ', false_pos)
print('False negatives: ', false_neg)

Correctly classified:  [   0    1    3 ... 2494 2496 2498]
cor pos:  [   0    1    3 ... 2494 2496 2498]
cor neg:  [ 136  373  376  440  520  700  710  792 1060 1181 1577 1608 1674 1837
 2089 2099 2127 2160 2170 2233 2449]
False positives:  [   2    4    6 ... 2495 2497 2499]
False negatives:  [  32  561  762 1093 1283]


Take example for evaluation based on randomness

In [None]:
rand_pos = np.random.choice(pos, size=1)
rand_neg = np.random.choice(neg, size=1)
rand_fp = np.random.choice(false_pos, size=1)
rand_fn = np.random.choice(false_neg, size=1)

In [None]:
# print(tokenizer.convert_ids_to_tokens(cogs402_test["input_ids"][rand_neg[0]]))

Some other interesting examples include the examples that are the most confidently predicted to be positive or negative. I.e. the examples with the highest predicted probability.

In [None]:
highest_pos = [np.argmax(preds_output.predictions[:,1])]
# highest_neg = [np.argmax(preds_output.predictions[:,0])]
highest_neg = [np.argmax(np.delete(preds_output.predictions, 1933, 0)[:,0])]
print(highest_pos)
print(highest_neg)

[672]
[1577]


In [None]:
test_val = highest_pos
print(test_val)
testexam = cogs402_test[test_val]

[672]


In [None]:
output = model(testexam["input_ids"].cuda(), attention_mask=testexam['attention_mask'].cuda(), labels=testexam['labels'].cuda(), output_attentions = True)
batch_attn = output[-2]
output_attentions = torch.stack(batch_attn).cpu()
global_attention = output[-1]
output_global_attentions = torch.stack(global_attention).cpu()
print("output_attention.shape", output_attentions.shape)
print("gl_output_attention.shape", output_global_attentions.shape)

output_attention.shape torch.Size([12, 1, 12, 2048, 514])
gl_output_attention.shape torch.Size([12, 1, 12, 2048, 1])


In [None]:
print(testexam['labels'][0])
print(output[1].argmax())

tensor(0)
tensor(1, device='cuda:0')


In [None]:
# print(os.getcwd())
# yes = torch.load("resources/longformer_test2/epoch_3/aggregate_attn.pt")

Convert sliding attention matrix to correct seq_len x seq_len matrix

In [None]:
def create_head_matrix(output_attentions, global_attentions):
    new_attention_matrix = torch.zeros((output_attentions.shape[0], 
                                      output_attentions.shape[0]))
    for i in range(output_attentions.shape[0]):
        test_non_zeroes = torch.nonzero(output_attentions[i]).squeeze()
        test2 = output_attentions[i][test_non_zeroes[1:]]
        new_attention_matrix_indices = test_non_zeroes[1:]-257 + i
        new_attention_matrix[i][new_attention_matrix_indices] = test2
        new_attention_matrix[i][0] = output_attentions[i][0]
        new_attention_matrix[0] = global_attentions.squeeze()[:output_attentions.shape[0]]
    return new_attention_matrix


def attentions_all_heads(output_attentions, global_attentions):
    new_matrix = []
    for i in range(output_attentions.shape[0]):
        matrix = create_head_matrix(output_attentions[i], global_attentions[i])
        new_matrix.append(matrix)
    return torch.stack(new_matrix)

def all_batches(output_attentions, global_attentions):
    new_matrix = []
    for i in range(output_attentions.shape[0]):
        matrix = attentions_all_heads(output_attentions[i], global_attentions[i])
        new_matrix.append(matrix)
    return torch.stack(new_matrix)

def all_layers(output_attentions, global_attentions):
    new_matrix = []
    for i in range(output_attentions.shape[0]):
        matrix = all_batches(output_attentions[i], global_attentions[i])
        new_matrix.append(matrix)
    return torch.stack(new_matrix)

In [None]:
converted_mat = all_layers(output_attentions, output_global_attentions).detach().cpu().numpy()
print(converted_mat.shape)

(12, 1, 12, 2048, 2048)


In [None]:
print(testexam['input_ids'])

tensor([[    0, 41552,   642,  ...,     1,     1,     1]])


In [None]:
all_tokens = tokenizer.convert_ids_to_tokens(testexam["input_ids"][0])

Load head importance model and scale the attentions by head importance

In [None]:
# head_importance = torch.load("/content/drive/MyDrive/cogs402longformer/t3-visapplication/resources/papers/pretrained/head_importance.pt")
head_importance = torch.load("/content/drive/MyDrive/cogs402longformer/t3-visapplication/resources/news/head_importance.pt")

In [None]:
def scale_by_importance(attention_matrix, head_importance):
  new_matrix = np.zeros_like(attention_matrix)
  for i in range(attention_matrix.shape[0]):
    head_importance_layer = head_importance[i]
    for j in range(attention_matrix.shape[1]):
      new_matrix[i,j] = attention_matrix[i,j] * np.expand_dims(head_importance_layer, axis=(1,2))
  return new_matrix

In [None]:
converted_mat_importance = scale_by_importance(converted_mat, head_importance)

Get the sum of the attentions for all the tokens (column-wise). In other words, find out how much every word is attended to

In [None]:
attention_matrix_importance = converted_mat_importance.sum(axis=3)
print(attention_matrix_importance.shape)

(12, 1, 12, 2048)


A dataframe is good for picking out information from the example, but it isn't the best being a easy to read visualization. Its easier to see how much each word is attended to in an example if we have the actual example, with the words highlighted based on the magnitude of attention.

We use https://github.com/jiesutd/Text-Attention-Heatmap-Visualization to show how much each token in the example is attended to, up to the max number of tokens we specified earlier.

Get top k attended words for each head, for each example in batch, for each layer

In [None]:
## convert the text/attention list to latex code, which will further generates the text heatmap based on attention weights.
import numpy as np

latex_special_token = ["!@#$%^&*(){}"]

def generate(text_list, attention_list, latex_file, color='red', rescale_value = False):
	assert(len(text_list) == len(attention_list))
	if rescale_value:
		attention_list = rescale(attention_list)
	word_num = len(text_list)
	text_list = clean_word(text_list)
	with open(latex_file,'w') as f:
		f.write(r'''\documentclass[varwidth]{standalone}
\special{papersize=210mm,297mm}
\usepackage{color}
\usepackage{tcolorbox}
\usepackage{CJK}
\usepackage{adjustbox}
\tcbset{width=0.9\textwidth,boxrule=0pt,colback=red,arc=0pt,auto outer arc,left=0pt,right=0pt,boxsep=5pt}
\begin{document}
\begin{CJK*}{UTF8}{gbsn}'''+'\n')
		string = r'''{\setlength{\fboxsep}{0pt}\colorbox{white!0}{\parbox{0.9\textwidth}{'''+"\n"
		for idx in range(word_num):
			string += "\\colorbox{%s!%s}{"%(color, attention_list[idx])+"\\strut " + text_list[idx]+"} "
		string += "\n}}}"
		f.write(string+'\n')
		f.write(r'''\end{CJK*}
\end{document}''')

def rescale(input_list):
	the_array = np.asarray(input_list)
	the_max = np.max(the_array)
	the_min = np.min(the_array)
	rescale = (the_array - the_min)/(the_max-the_min)*100
	return rescale.tolist()


def clean_word(word_list):
	new_word_list = []
	for word in word_list:
		for latex_sensitive in ["\\", "%", "&", "^", "#", "_",  "{", "}"]:
			if latex_sensitive in word:
				word = word.replace(latex_sensitive, '\\'+latex_sensitive)
		new_word_list.append(word)
	return new_word_list

In [None]:
def normalize(data):
    return (data - np.min(data)) / (np.max(data) - np.min(data))

In [None]:
average_attention = attention_matrix_importance.squeeze().sum(axis=1)
average_attention = average_attention.sum(axis=0)
average_attention = normalize(average_attention)
print(average_attention * 100)

[100.          1.005807    1.0239174 ...   0.          0.
   0.       ]


In [None]:
title_all = f"news_{test_val[0]}_base.tex"
generate(all_tokens, (average_attention*100), title_all, 'red')

Adding up all the heads isn't probably the best way of finding out the important tokens of the example because the last layer is what drives the prediction. This part focuses on only the last layer's attention for visualization.

In [None]:
print(attention_matrix_importance[11].squeeze().shape)
average_attention_final_layer = attention_matrix_importance[11].squeeze().sum(axis=0)
average_attention_final_layer = normalize(average_attention_final_layer)
print(average_attention_final_layer*100)

(12, 2048)
[2.30078    0.46655098 0.27441218 ... 0.         0.         0.        ]


In [None]:
title_last_layer = f"news_{test_val[0]}_base_layer_12_only.tex"
generate(all_tokens, (average_attention_final_layer*100), title_last_layer, 'red')