In [1]:
# Requires most of the same imports as for few-shot NER training
import os
import random
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
from torch import nn
import torch.nn.functional as F
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader
import transformers
from transformers import RobertaTokenizer, RobertaForMaskedLM, RobertaModel
from transformers import BertTokenizer, pipeline
from transformers import AdamW, get_linear_schedule_with_warmup
from torch.optim import Adam
import time
import json
import spacy


import random
from collections import Counter, defaultdict
import numpy as np
from sample_few_shot import get_label_dict
from finetune_model import RobertaNER, BertNER
from data import *
from torch.utils.tensorboard import SummaryWriter



In [2]:
# Checking if GPU is available
if torch.cuda.is_available():
    print("CUDA is available")
else:
    print("CUDA is not available")

CUDA is available


In [3]:
# Formatting data to be able to input it, into the model
def generate_batch(batch):
    text = [F.pad(torch.tensor(x[0]), (0,max_seq_len-len(x[0])), "constant", 1) for x in batch] # batch_size * max_seq_len 
    text = pad_sequence(text, batch_first = True)
    attention_mask = [torch.cat((torch.ones_like(torch.tensor(x[0])), torch.zeros(max_seq_len-len(x[0]), dtype=torch.int64)), dim=0)
        if len(x[0]) < max_seq_len else torch.ones_like(torch.tensor(x[0]))[:max_seq_len] for x in batch]
    attention_mask = pad_sequence(attention_mask, batch_first = True)
    label = [F.pad(torch.tensor(x[1]), (0,max_seq_len-len(x[1])), "constant", -100) for x in batch]
    label = pad_sequence(label, batch_first = True)
    orig_len = [len(x[0]) for x in batch]

    return text, attention_mask, label, orig_len

In [4]:
# Class with a lot of the information that we use hyper parameters and file names
class Args:
    def __init__(self):
        i = 0
        self.datapath = 'dataset'
        self.dataset = 'custom'
        self.train_text = f'FS_train_dataset{i}.words'
        self.train_ner = f'FS_train_dataset{i}.ner'
        self.test_text = f'FS_test_dataset{i}.words'
        self.test_ner = f'FS_test_dataset{i}.ner'
        self.model_save_name = f'FS_train_dataset{i}_finetuned_model'
        self.few_shot_sets = 1
        self.unsup_text = None
        self.unsup_ner = None
        self.base_model = 'roberta'
        self.epoch = 5
        self.train_cls_num = 4
        self.test_cls_num = 18
        self.max_seq_len = 128
        self.batch_size = 8
        self.soft_kmeans = False
        self.lr = 1e-04
        self.unsup_lr = 0.5
        self.warmup_proportion = 0.1
        self.weight_decay = 0.01
        self.use_truecase = False
        self.local_rank = None
        self.use_gpu = 'cuda'
        self.data_size = ''
        self.load_model = True
        self.reinit = False
        self.load_model_name = 'pretrained_models/lc_pretrained_190.pt'
        self.load_checkpoint = False
        self.load_dataset = False
        self.train_dataset_file = None
        self.test_dataset_file = None
        self.label2ids = None
        self.id2labels = None

args = Args()

In [5]:
label2id = {'O': 0,
 'B-Company_Name': 1,
 'I-Company_Name': 2,
 'B-Internal_Organization': 3,
 'I-Internal_Organization': 4,
 'B-Software_Name': 5,
 'I-Software_Name': 6,
 'B-Userbase_Information': 7,
 'I-Userbase_Information': 8,
 'B-Software_Purpose': 9,
 'I-Software_Purpose': 10,
 'B-Development_Scalability': 11,
 'I-Development_Scalability': 12,
 'B-Transaction_Scalability': 13,
 'I-Transaction_Scalability': 14,
 'B-Data_Scalability': 15,
 'I-Data_Scalability': 16}

In [6]:
id2label = {0: 'O',
 1: 'B-Company_Name',
 2: 'I-Company_Name',
 3: 'B-Internal_Organization',
 4: 'I-Internal_Organization',
 5: 'B-Software_Name',
 6: 'I-Software_Name',
 7: 'B-Userbase_Information',
 8: 'I-Userbase_Information',
 9: 'B-Software_Purpose',
 10: 'I-Software_Purpose',
 11: 'B-Development_Scalability',
 12: 'I-Development_Scalability',
 13: 'B-Transaction_Scalability',
 14: 'I-Transaction_Scalability',
 15: 'B-Data_Scalability',
 16: 'I-Data_Scalability'}

In [7]:
# Data preparation 
test_text = os.path.join('kg_tests', 'sigrid_text.words')
# test_text = os.path.join(args.datapath,args.dataset, args.test_text)

with open(test_text, encoding='utf-8') as f:
    test_words = f.readlines()     

    
# Making dummy tags
test_ner_tags = []
for t in test_words:
    word_list = t.split()
    tags = " ".join(["O"] * len(word_list))
    tags = tags + "\n"
    test_ner_tags.append(tags)


# The tokenizer for roberta
tokenizer = RobertaTokenizer.from_pretrained('roberta-base')    
    
# Getting the labell ids
label2ids, id2labels = [], []
processed_test_set, test_label_sentence_dicts = [], []

label2ids.append(label2id)
id2labels.append(id2label)

#Keeping track of the unprocessed data
unprocessed_test_ner_tags = test_ner_tags
unprocessed_test_words = test_words


# Processesing data to input into model
max_seq_len = args.max_seq_len
test_ner_tags, test_words, test_label_sentence_dict = process_data(test_ner_tags, test_words, tokenizer, label2id, max_seq_len,base_model=args.base_model,use_truecase=args.use_truecase)


sub_valid_ = [[test_words[i], test_ner_tags[i]] for i in range(len(test_ner_tags))] 

processed_test_set.append(sub_valid_) 


dataset_label_nums = [len(x) for x in label2ids]
test_num_data_point = sum([len(sub_valid_) for sub_valid_ in processed_test_set])

In [8]:
dataset_label_nums

[17]

In [11]:
# Importing the fine-tuned model
new_model = RobertaNER.from_pretrained('roberta-base', dataset_label_nums=dataset_label_nums, output_attentions=False, output_hidden_states=False, multi_gpus=True)
new_model = torch.nn.DataParallel(new_model)
i = 0
new_model.load_state_dict(torch.load(os.path.join("trained_model",f"FS_train_dataset{i}_finetuned_model_dict.pt")))

Some weights of the model checkpoint at roberta-base were not used when initializing RobertaNER: ['lm_head.bias', 'lm_head.dense.weight', 'lm_head.dense.bias', 'lm_head.layer_norm.weight', 'lm_head.layer_norm.bias', 'lm_head.decoder.weight']
- This IS expected if you are initializing RobertaNER from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPretraining model).
- This IS NOT expected if you are initializing RobertaNER from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of RobertaNER were not initialized from the model checkpoint at roberta-base and are newly initialized: ['background', 'classifier.weight', 'classifier.bias', 'classifiers.0.weight', 'classifiers.0.bias']
You should probably TRAIN this model on a down-stream task to be able to use it for p

<All keys matched successfully>

In [12]:
# Function to get predicition from the fine-tuned model
def get_predictions(data_):
    dataset_chosen = []
    data = []
    for i,d in enumerate(data_):
        one_dataset = [generate_batch(d)]
        data.extend(one_dataset)
        dataset_chosen.extend([i for x in range(len(one_dataset))])
        
    idx = 0
    f1ss = []
    pss = []
    rss = []
    
    
    new_model.eval()
    
    for j, (text, attention_mask, cls, orig_len) in enumerate(data):
        with torch.no_grad():
            text_1, attention_mask_1, cls_1 = text.to(device), attention_mask.to(device).to(device), cls.to(device)
            loss, outputs = new_model(text_1, attention_mask=attention_mask_1, labels=cls_1, dataset = dataset_chosen[j])
        preds = [[id2label[int(x)] for j,x in enumerate(y[1:orig_len[i]-1]) if int(cls[i][j + 1]) != -100] for i,y in enumerate(outputs)]
        gold = [[id2label[int(x)] for x in y[1:orig_len[i]-1] if int(x) != -100] for i,y in enumerate(cls)]
    
    return preds, gold

In [13]:
pred, original = get_predictions(processed_test_set)

In [14]:
# Function that retireve the indices of an entity when we split the original sentence into a list of tokens
def find_entities(labels):
    entities = {}
    start_idx = None

    for idx, label in enumerate(labels):
        if label.startswith('B-'):
            # Found the start of an entity
            if start_idx is not None:
                # Add the previous entity to the dictionary
                entity = labels[start_idx].split('-')[1]
                entities[entity] = entities.get(entity,[]) + [list(range(start_idx, idx))]
            
            start_idx = idx
        elif label.startswith('I-'):
            # Continue the current entity
            if start_idx is None:
                start_idx = idx
        else:
            # End of entity
            if start_idx is not None:
                entity = labels[start_idx].split('-')[1]
                entities[entity] = entities.get(entity,[]) + [list(range(start_idx, idx))]
                start_idx = None

    # Check if there's an entity that spans till the end of the list
    if start_idx is not None:
        entity = labels[start_idx].split('-')[1]
        entities[entity] = entities.get(entity,[]) + [list(range(start_idx, len(labels)))]

    return entities

# Visualize NER text

In [23]:
from spacy import displacy

In [46]:
# Given entity labels
entity_labels = ['Transaction_Scalability', 'Software_Purpose', 'Development_Scalability',
                 'Userbase_Information', 'Data_Scalability', 'Internal_Organization',
                 'Software_Name', 'Company_Name']

# Define a list of predefined colors
predefined_colors = ['#FFAAAA', '#AAFFAA', '#AAAAFF', '#FFAAEE', '#EEFFAA', '#AAEEFF', '#FFAABB', '#BBAAFF']

# Create a dictionary to map entity labels to predefined colors
entity_colours = {label: color for label, color in zip(entity_labels, predefined_colors)}

# Create the options dictionary with predefined colors
options = {"ents": list(entity_colours.keys()), "colors": entity_colours}


In [48]:
# Visualize each sentence
for i in range(len(pred)):
    ner_result = find_entities(pred[i])
    tokens = unprocessed_test_words[i].split()
    # Initialize the text and ents lists
    text = unprocessed_test_words[i]  # Join tokens to create the text string
    ents = []

    # Iterate through the dictionary and tokens to create entity annotations
    for label, indexes in ner_result.items():
        for index in indexes:
            start = sum(len(tokens[i]) + 1 for i in range(index[0]))
            end = start + sum(len(tokens[i]) + 1 for i in range(index[0], index[-1] + 1))
            ents.append({"start": start, "end": end, "label": label})
    # Create the dic_ents dictionary
    dic_ents = {
        "text": text,
        "ents": ents,
        "title": None
    }
    displacy.render(dic_ents, manual=True, style="ent",options=options)