In [None]:
from sys import path
from os.path import dirname, abspath
path.append(dirname(dirname(dirname(abspath("__file__")))))

In [None]:
# for NER model
from transformers import AutoModel
from torchcrf import CRF
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from scripts.utils_bert import *
from scripts.metrics import f1score
import pytorch_lightning as pl
from multiprocessing import cpu_count
from platform import system
from os import environ
from itertools import chain
from sklearn.metrics import f1_score

# set the environment to run tokenizers in parallel
environ["TOKENIZERS_PARALLELISM"] = "false"
pl.seed_everything(seed=42)

### NER Model (Definition)
#### NER model is only used in inference mode by loading the saved_weights
#### Hence, all training elements of the model can be removed

In [None]:
# some important hyperparameters
BATCH_SIZE = 128
N_JOBS = cpu_count() if system() != "Windows" else 0

BERT_TYPE = "roberta-base"
TAG2IDX = {'B': 0, 'I': 1, 'O': 2, 'E': 3, 'S': 4, '<': 5, ">":6, "$": 7}

### RoBERTa-NER

In [None]:
class BERT_NER(pl.LightningModule):
    def __init__(self, 
                 bert_type=BERT_TYPE, 
                 num_tags=len(TAG2IDX),
                 test_dataset=None):
        
        super().__init__()
        self.bert = AutoModel.from_pretrained(bert_type)
        self.crf = CRF(num_tags=num_tags, batch_first=True)
        self.fc = nn.Linear(768, num_tags)
        ## Hyperparameters ##
        self.batch_size = BATCH_SIZE
        ## Datasets ##
        self.test_dataset = test_dataset


    def test_dataloader(self):
        return DataLoader(self.test_dataset, 
                          batch_size=self.batch_size,
                          num_workers=N_JOBS,
                          shuffle=False,
                          drop_last=False)
    

    def forward(self, input_ids, attention_masks):
        out = self.bert(input_ids, attention_masks).last_hidden_state
        out = self.fc(out)
        return out

    
    def _shared_evaluation_step(self, batch, batch_idx):
        ids, masks, lbls = batch
        emissions = self(ids, masks)
        loss = -self.crf(emissions, lbls, mask=masks)
        pred = self.crf.decode(emissions, mask=masks)
        r, p, f1 = f1score(lbls, pred, model="transformer")
        return loss, r, p, f1
    

    def test_step(self, batch, batch_idx):
        loss, r, p, f1 = self._shared_evaluation_step(batch, batch_idx)
        self.log("test_loss", loss, on_step=False, on_epoch=True, prog_bar=True)
        self.log("test_recall", r, on_step=False, on_epoch=True, prog_bar=True)
        self.log("test_precision", p, on_step=False, on_epoch=True, prog_bar=True)
        self.log("test_f1score", f1, on_step=False, on_epoch=True, prog_bar=True)

### RoBERTa-Classification

In [None]:
class BERT_SUG(pl.LightningModule):
    def __init__(self, 
                 dropout=0.5,
                 bert_type=BERT_TYPE,
                 test_dataset=None):

        super().__init__()
        self.bert = AutoModel.from_pretrained(bert_type)
        self.fc = nn.Linear(768, 1)
        self.dropout = nn.Dropout(p=dropout)
        self.layer_norm = nn.LayerNorm(768)
        self.loss_fn = nn.BCEWithLogitsLoss()
        ## Hyperparameters ##
        self.batch_size = BATCH_SIZE
        ## Datasets ##
        self.test_dataset = test_dataset


    def test_dataloader(self):
        return DataLoader(self.test_dataset, 
                          batch_size=self.batch_size,
                          num_workers=N_JOBS,
                          shuffle=False,
                          drop_last=False)

    def predict_dataloader(self):
        return DataLoader(self.test_dataset, 
                          batch_size=self.batch_size,
                          num_workers=N_JOBS,
                          shuffle=False,
                          drop_last=False)
    
    
    def _f1score(self, logits, lbls):
        lbls = torch.flatten(lbls)
        preds = torch.flatten(torch.round(torch.sigmoid(logits)))
        return f1_score(lbls.tolist(), preds.tolist(), zero_division=0)
        

    def forward(self, input_ids, attention_masks):
        out = self.bert(input_ids, attention_masks).pooler_output
        out = self.layer_norm(out)
        out = self.dropout(out)
        out = self.fc(out)
        return out
    
    
    def _shared_evaluation_step(self, batch, batch_idx):
        ids, masks, lbls = batch
        logits = self(ids, masks)
        loss = self.loss_fn(logits, lbls.float())
        f1 = self._f1score(logits, lbls)
        return loss, f1


    def test_step(self, batch, batch_idx):
        loss, f1 = self._shared_evaluation_step(batch, batch_idx)
        self.log("test_loss", loss, on_step=False, on_epoch=True, prog_bar=True)
        self.log("test_f1score", f1, on_step=False, on_epoch=True, prog_bar=True)


    def predict_step(self, batch, batch_idx):
        ids, masks, _ = batch
        logits = self(ids, masks)
        preds = torch.flatten(torch.round(torch.sigmoid(logits)))
        return preds.tolist()


### Use RoBERTa-Classification Model for predicting Suggestions

In [None]:
# similarly read the test data and create the dataset
encoded_input, targets = get_encoded_input("../../../data/test_290818.txt", 
                                           tag2idx=TAG2IDX, 
                                           tokenizer_name=BERT_TYPE,
                                           return_classification_targets=True)

test_dataset = TensorDataset(torch.LongTensor(encoded_input["input_ids"]),
                             torch.BoolTensor(encoded_input["attention_mask"]),
                             torch.LongTensor(targets).unsqueeze(1))

In [None]:
model = BERT_SUG(bert_type=BERT_TYPE,
                 test_dataset=test_dataset)

trainer = pl.Trainer(accelerator="gpu",
                     precision=16,
                     log_every_n_steps=1)

In [None]:
model.load_state_dict(torch.load(f"../../../src_sug/saved_weights/{BERT_TYPE}-sug-on-NER-dataset.ckpt")["state_dict"])
trainer.test(model)

In [None]:
preds = list(chain(*trainer.predict(model)))

### Use the sentences which are predicted to have a Suggestion as input to NER Model

In [None]:
sug_idx = torch.LongTensor([i for (i, y) in enumerate(preds) if y == 1.0])

In [None]:
# similarly read the test data and create the dataset
encoded_input, extended_labels = get_encoded_input("../../../data/test_290818.txt", 
                                                   tag2idx=TAG2IDX, 
                                                   tokenizer_name=BERT_TYPE)

test_dataset = TensorDataset(torch.index_select(torch.LongTensor(encoded_input["input_ids"]), dim=0, index=sug_idx),
                             torch.index_select(torch.BoolTensor(encoded_input["attention_mask"]), dim=0, index=sug_idx),
                             torch.index_select(torch.LongTensor(extended_labels), dim=0, index=sug_idx))

In [None]:
model = BERT_NER(bert_type=BERT_TYPE,
                 test_dataset=test_dataset)

trainer = pl.Trainer(accelerator="gpu",
                     precision=16,
                     log_every_n_steps=1)

In [None]:
model.load_state_dict(torch.load(f"../../saved_weights/{BERT_TYPE}-ner.ckpt")["state_dict"])
trainer.test(model)