In [None]:
# drive mount. colab에 내 구글 드라이브 연결
from google.colab import drive
drive.mount('/content/drive')

# clone git repo
!git clone https://github.com/hila-chefer/Transformer-Explainability.git

# change directory
import os
os.chdir(f'./Transformer-Explainability')


#!pip install torch==1.7.0 torchvision==0.8.1 &> /dev/null # 일반 GPU/CPU를 사용하는 경우
!pip install torch==1.7.0+cu110 torchvision==0.8.1+cu110 torchaudio==0.7.0 -f https://download.pytorch.org/whl/torch_stable.html  # GPU A100을 사용하는 경우
!pip install transformers==3.5.1 &> /dev/null
!pip install captum &> /dev/null
!pip install matplotlib==3.2.2 &> /dev/null

In [None]:
##### 시각화 관련 필수 라이브러리
import torch

from transformers import BertTokenizer
from transformers import AutoTokenizer  # bert 모델에 따라 알맞은 tokenizer를 자동으로 로드

from BERT_explainability.modules.BERT.ExplanationGenerator import Generator
from BERT_explainability.modules.BERT.BertForSequenceClassification import BertForSequenceClassification
from BERT_explainability.modules.BERT.BERT_cls_lrp import BertForSequenceClassification as BertForClsOrigLrp

from captum.attr import visualization # XAI관련 라이브러리의 시각화 함수
#####


from torch.utils.data import TensorDataset, random_split
from torch.utils.data import DataLoader, RandomSampler, SequentialSampler
from transformers import get_linear_schedule_with_warmup

from sklearn.metrics import accuracy_score

import os
import json
import pickle
import numpy as np
import random
import gzip
from collections import OrderedDict

In [None]:
# GPU 찾기. 없으면 CPU로 동작
if torch.cuda.is_available():  
    device = torch.device("cuda")
    print('There are %d GPU(s) available.' % torch.cuda.device_count())
    print('We will use the GPU:', torch.cuda.get_device_name(0))
else:
    print('No GPU available, using the CPU instead.')
    device = torch.device("cpu")

# check torch is available
print(torch.__version__)
print(torch.tensor([1.0, 2.0]).cuda())

In [None]:
"""
preprocess한 데이터의 설명을 생성
"""
def generate_expl_from_preprocessed(pos_model_name, neg_model_name, data_name='amazon_book_only500000.npy', data_num=10, expl_method="transformer_attribution", saving_tag=''):
  root = "/content/drive/MyDrive/CS470_team_2in1"

  input_ids, attention_masks, ratings, products = np.load(root+"/dataset/"+data_name, allow_pickle=True)

  if expl_method=="transformer_attribution" :
    pos_model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=1)
    neg_model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=1)
  else:
    pos_model = BertForClsOrigLrp.from_pretrained('bert-base-uncased', num_labels=1)
    neg_model = BertForClsOrigLrp.from_pretrained('bert-base-uncased', num_labels=1)
  pos_model.cuda()
  neg_model.cuda()
  pos_model.load_state_dict(torch.load(root+'/colab/model/'+pos_model_name, map_location=device))
  neg_model.load_state_dict(torch.load(root+'/colab/model/'+neg_model_name, map_location=device))
  pos_model.eval()
  neg_model.eval()

  tokenizer = BertTokenizer.from_pretrained('bert-base-uncased', do_lower_case=True)

  pos_expl_generator = Generator(pos_model)
  neg_expl_generator = Generator(neg_model)
  if expl_method=="transformer_attribution" : pos_expl_func = pos_expl_generator.generate_LRP
  else: pos_expl_func = {"partial_lrp": pos_expl_generator.generate_LRP_last_layer,
                        "last_attn": pos_expl_generator.generate_attn_last_layer,
                        "attn_gradcam": pos_expl_generator.generate_attn_gradcam,
                        "lrp": pos_expl_generator.generate_full_lrp,
                        "rollout": pos_expl_generator.generate_rollout}[expl_method]
  if expl_method=="transformer_attribution" : neg_expl_func = neg_expl_generator.generate_LRP
  else: neg_expl_func = {"partial_lrp": neg_expl_generator.generate_LRP_last_layer,
                        "last_attn": neg_expl_generator.generate_attn_last_layer,
                        "attn_gradcam": neg_expl_generator.generate_attn_gradcam,
                        "lrp": neg_expl_generator.generate_full_lrp,
                        "rollout": neg_expl_generator.generate_rollout}[expl_method]
  
  records = []
  target_class=None

  for i in range(data_num):
    tmp = input_ids[i][0].numpy()
    pad_idx = np.where(tmp==0)[0]
    if len(pad_idx)>0:
      last_idx =  pad_idx[0]
      input_id = input_ids[i][:,:last_idx].to(device)
      attention_mask = attention_masks[i][:,:last_idx].to(device)
    else:
      input_id = input_ids[i].to(device)
      attention_mask = attention_masks[i].to(device)
    rating = ratings[i]
    product = products[i]
    tokens = tokenizer.convert_ids_to_tokens(input_id.flatten())


    # 문장에 대한 설명 생성
    pos_output = pos_model(input_ids=input_id, attention_mask=attention_mask)[0].detach().cpu().numpy()
    neg_output = neg_model(input_ids=input_id, attention_mask=attention_mask)[0].detach().cpu().numpy()

    # 설명 생성
    neg_expl = neg_expl_func(input_ids=input_id, attention_mask=attention_mask, index=0)[0].detach().cpu().numpy()
    pos_expl = pos_expl_func(input_ids=input_id, attention_mask=attention_mask, index=0)[0].detach().cpu().numpy()

    # true label 판단
    true_class = rating

    records.append([tokens, rating, product, np.array([[neg_output[0][0],pos_output[0][0]]]), neg_expl, pos_expl, true_class, 1 if pos_output[0]>=3 else 0])

    if i%1000==0: print(f"processed {i} data...")

  if saving_tag!='': saving_tag='-'+saving_tag
  np.save(root+"/colab/explanation/"+"amazon_book_expl-"+expl_method+saving_tag+".npy", records)

  #visualize_expl(records,10)


# interpret_all_sentences의 출력 또는 그 출력을 저장한 파일경로로부터 설명 생성.
def visualize_expl(records, visualization_num, normalize=True):
  # record가 파일 경로일 경우 불러오기
  if isinstance(records, str): records = np.load(records, allow_pickle=True)

  vis_datas = []
  for i in range(visualization_num):
    tokens, rating, product, output, neg_expl, pos_expl, true_class, pred_class = records[i]
    if normalize :
      #pos_expl = (pos_expl - pos_expl.min()) / (pos_expl.max() - pos_expl.min())
      #neg_expl = (neg_expl - neg_expl.min()) / (neg_expl.max() - neg_expl.min())
      pos_expl /= np.linalg.norm(pos_expl)
      neg_expl /= np.linalg.norm(neg_expl)
      neg_expl *= output[0][0]
      pos_expl *= output[0][1]
    neg_expl *= -1  # negative일 경우 빨간색으로 visualize하기 위해.

    # visualization 객체 생성해서 추가
    vis_datas.append(visualization.VisualizationDataRecord(
                                  pos_expl,
                                  output[0][pred_class],
                                  pred_class,
                                  true_class,
                                  1,
                                  pos_expl.sum(),       
                                  tokens,
                                  1))
    vis_datas.append(visualization.VisualizationDataRecord(
                                  neg_expl,
                                  output[0][pred_class],
                                  pred_class,
                                  true_class,
                                  0,
                                  neg_expl.sum(),       
                                  tokens,
                                  1))
  
  # visualize
  visualization.visualize_text(vis_datas)

In [None]:
generate_expl_from_preprocessed("balanced_positive_regression_only1000_testloss121.pt", "balanced_negative_regression_only1000_testloss131.pt", "amazon_book_test_rev.npy", 5000, expl_method="transformer_attribution", saving_tag='balanced_regression_only5000_tmp')
generate_expl_from_preprocessed("balanced_positive_regression_only1000_testloss121.pt", "balanced_negative_regression_only1000_testloss131.pt", "amazon_book_test_rev.npy", 5000, expl_method="partial_lrp", saving_tag='balanced_regression_only5000_tmp')
generate_expl_from_preprocessed("balanced_positive_regression_only1000_testloss121.pt", "balanced_negative_regression_only1000_testloss131.pt", "amazon_book_test_rev.npy", 5000, expl_method="last_attn", saving_tag='balanced_regression_only5000_tmp')
generate_expl_from_preprocessed("balanced_positive_regression_only1000_testloss121.pt", "balanced_negative_regression_only1000_testloss131.pt", "amazon_book_test_rev.npy", 5000, expl_method="attn_gradcam", saving_tag='balanced_regression_only5000_tmp')
generate_expl_from_preprocessed("balanced_positive_regression_only1000_testloss121.pt", "balanced_negative_regression_only1000_testloss131.pt", "amazon_book_test_rev.npy", 5000, expl_method="lrp", saving_tag='balanced_regression_only5000_tmp')
generate_expl_from_preprocessed("balanced_positive_regression_only1000_testloss121.pt", "balanced_negative_regression_only1000_testloss131.pt", "amazon_book_test_rev.npy", 5000, expl_method="rollout", saving_tag='balanced_regression_only5000_tmp')

In [None]:
generate_expl_from_preprocessed("balanced_positive_regression_only1000_testloss121.pt", "balanced_negative_regression_only1000_testloss131.pt", "rationale_preprocessed.npy", 100, expl_method="transformer_attribution", saving_tag='rationale_balanced_regression')

In [None]:
records = np.load('/content/drive/MyDrive/CS470_team_2in1/colab/explanation/balanced_only1000model/amazon_book_expl-transformer_attribution-balanced_regression_only5000.npy', allow_pickle=True)
visualize_expl(records,100)