In [1]:
#!python -m spacy download en
#!pip install --quiet transformers

In [2]:
import spacy
import csv
import pickle
import random
import string
import en_core_web_sm
import torch
from transformers import AutoTokenizer, BertModel, GPT2Model, AutoModelForTokenClassification, pipeline
import nltk
import pandas as pd
import numpy as np
from tqdm.auto import tqdm
from itertools import product

In [7]:
string.punctuation += '’'
string.punctuation += '–'
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger')

[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\Анастасия\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     C:\Users\Анастасия\AppData\Roaming\nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!


True

In [5]:
from utils import *

In [3]:
def get_embs_for_triplets(triplets, sentence_mapping, attention, attentions_types, with_label=False):
    sent_embeddings = []
      
    for triplet in triplets:
        #  tokenize target the same way as sentence to avoid index error
        if ',' in triplet[0] or "'" in triplet[0]:
            head = ' '.join(word_tokenize(triplet[0]))
        else:
            head = triplet[0]
      
        if ',' in triplet[1] or "'" in triplet[1]:
            tail = ' '.join(word_tokenize(triplet[1]))
        else:
            tail = triplet[1]
      
        if ',' in triplet[2] or "'" in triplet[2]:
            rel = ' '.join(word_tokenize(triplet[2]))
        else:
            rel = triplet[2]
        
        try:
            if head in sentence_mapping and tail in sentence_mapping and rel in sentence_mapping:
                #  get head, tail, rel indices in the matrix (len(sentence_mapping) == len(att_matrix)) 
                head_ind = sentence_mapping.index(head)
                tail_ind = sentence_mapping.index(tail)
                rel_ind = sentence_mapping.index(rel)   
                
                #  get vector of attention from every head
                head_rel_emb = attention[:, head_ind, rel_ind]
                rel_tail_emb = attention[:, rel_ind, tail_ind]
                head_tail_emb = attention[:, head_ind, tail_ind]
                rel_head_emb = attention[:, rel_ind, head_ind]
                tail_rel_emb = attention[:, tail_ind, rel_ind]
                tail_head_emb = attention[:, tail_ind, head_ind]
                #  add LMMS vectorization here?
                
                #  choose only neede vectors
                [1 1 1 0 0 0 1 1]
                attentions_to_be_used = [head_rel_emb, rel_tail_emb, head_tail_emb, rel_head_emb, tail_rel_emb, tail_head_emb] 
                attentions_to_use = tuple([att for i, att in enumerate(attentions_to_be_used) if attentions_types[i] == 1])

                #  concat chosen vectors into one
                triplet_emb = np.concatenate(attentions_to_use, axis=0).squeeze()

                #  add label if 'train' 
                if with_label:
                    rel_label = triplet[3]
                    sent_embeddings.append((triplet_emb, triplet, rel_label))
                else:
                    sent_embeddings.append((triplet_emb, triplet))
            else:
                pass
        except:
            pass

    return sent_embeddings

In [6]:
def return_embeddings(sentence, attentions_types, tokenizer, encoder, nlp, target=None, use_cuda=True, mode='train'):
   
    tokenizer_name = str(tokenizer.__str__)
    rel_pos = ['NN', 'NNP', 'NNS', 'JJR', 'JJS', 'MD', 'POS', 'VB', 'VBG', 'VBD', 'VBN', 'VBP', 'VBZ']
    head_tail_pos = ['NN', 'NNP', 'NNS', 'PRP']

    if mode == 'train':
        #  to process data with rel labels from dataset (pass target)
        inputs, tokenid2word_mapping, token2id, sentence_mapping = create_mapping_target(sentence, 
                                                                                         target, 
                                                                                         return_pt=True, 
                                                                                         tokenizer=tokenizer)
    
    else:
        #  to process data to predict
        outputs, tokenid2word_mapping, token2id, sentence_mapping, noun_chunks = create_mapping(sentence, 
                                                                                                return_pt=True, 
                                                                                                nlp=nlp,
                                                                                                tokenizer=tokenizer)

    with torch.no_grad():
        if use_cuda:
            for key in inputs.keys():
                inputs[key] = inputs[key].cuda()
        try:
            outputs = encoder(**inputs, output_attentions=True)
        except RuntimeError:
            print(sentence_mapping)
            return []

    attn = outputs[2]   

    new_matr = []
    
    for layer in attn:
        for head in layer.squeeze():
            if use_cuda:
                attn = head.cpu()
            attention_matrix = attn.detach().numpy()
            attention_matrix = attention_matrix[1:-1, 1:-1]
            merged_attention = compress_attention(attention_matrix, tokenid2word_mapping)
            new_matr.append(merged_attention)

    new_matr = np.array(new_matr)
    
    #  get candidates for head, tail and rel
    words = [token for token in sentence_mapping if token not in string.punctuation]
    nn_words = [word for word in words if nltk.pos_tag([word])[0][1] in head_tail_pos]
    other_words = [word for word in words if nltk.pos_tag([word])[0][1] in rel_pos]
    
    sent_embeddings = []

    if mode == 'train':
        #  get candidate triplets (in this case - for 'garbage class')
        triplets = [triplet for triplet in list(product(nn_words, nn_words, other_words)) 
                        if triplet[0] != triplet[1] and triplet[0] != triplet[2] and triplet[1] != triplet[2] and triplet not in target]
        other_triplets = [(t[0], t[1], t[2], '0') for t in triplets]
        
        #  get embeddings for 'garbage' class
        try:
            sent_embeddings.extend(get_embs_for_triplets(random.choices(other_triplets, k=len(target)), sentence_mapping, new_matr, attentions_types, with_label=True))
        except IndexError:
            pass
        
        #  get embeddings for target class
        sent_embeddings.extend(get_embs_for_triplets(target, sentence_mapping, new_matr, attentions_types, with_label=True))

        
    else:
        #  get candidate triplets from the sentence
        triplets = [triplet for triplet in list(product(nn_words, nn_words, other_words)) 
                      if triplet[0] != triplet[1] and triplet[0] != triplet[2] and triplet[1] != triplet[2]]
        
        #  get embeddings for candidate triplets (to be classified further)
        sent_embeddings.extend(get_embs_for_triplets(triplets, sentence_mapping, new_matr, attentions_types, with_label=False))
    
    return sent_embeddings

   

In [5]:
def get_embeddings_corpus(sample_size, attentions_types):  
    nlp = en_core_web_sm.load()
    selected_model = 'bert-base-cased'

    use_cuda = True

    tokenizer = AutoTokenizer.from_pretrained(selected_model)

    encoder = GPT2Model.from_pretrained(selected_model) if 'gpt' in selected_model.lower() else BertModel.from_pretrained(selected_model)
    encoder = encoder.cuda() if use_cuda else encoder.cpu()
    encoder.eval()

    data = pd.read_csv('trex_data.csv')
    data = data.sample(sample_size, random_state=666)

    embeddings_train = []
        
    for ind, row in tqdm(data.iterrows(), total=data.shape[0]):
        text = row['text']
        target = eval(row['target'])
        embeddings = return_embeddings(text, attentions_types, tokenizer, encoder, nlp, target, use_cuda, mode='train')
        embeddings_train.extend(embeddings)

        with open(f'trex_embeddings_{sample_size}.csv', 'a', newline='') as csvfile:
            csvwriter = csv.writer(csvfile, delimiter=',')
            for emb in embeddings:
                csvwriter.writerow(list(emb))
        
    return embeddings_train
        


### Test

In [16]:
attentions_types_example = [1, 1, 0, 0, 0, 0]
# attentions_to_be_used = [head_rel, rel_tail, head_tail, rel_head, tail_rel, tail_head] 
test = get_embeddings_corpus(10, attentions_types_example)

Some weights of the model checkpoint at bert-base-cased were not used when initializing BertModel: ['cls.predictions.transform.LayerNorm.bias', 'cls.predictions.bias', 'cls.predictions.transform.dense.weight', 'cls.seq_relationship.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.decoder.weight', 'cls.seq_relationship.weight', 'cls.predictions.transform.dense.bias']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


  0%|          | 0/10 [00:00<?, ?it/s]

In [21]:
test[0][0].shape

(288,)