## Imports

In [10]:
import torch
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch.nn as nn
import os
import json
import gensim
import re
import torch.optim as optim
# from torchtext.vocab import GloVcoe, FastText
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from sklearn.metrics import accuracy_score, f1_score
from gensim.models import KeyedVectors
from tqdm.notebook import tqdm
from sklearn.metrics import classification_report
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

  from pandas.core import (


## Loading Test Data 

In [11]:
with open('data_processed/LR_test_tagged.json') as f:
    LR_test = json.load(f)

with open('data_processed/NER_test_tagged.json') as f:
    NER_test = json.load(f)

### Getting dictionary

In [3]:
unique_labels = []
for entry in NER_test:
    labels = entry['labels']
    for label in labels:
        unique_labels.append(label)

label_dict_NER = {}
unique_labels = list(set(unique_labels))
unique_labels = sorted(unique_labels)

for label_index in range(len(unique_labels)):
    label_dict_NER[unique_labels[label_index]] = label_index

# label_dict_NER['p0p'] = len(unique_labels)
# label_dict['UNK'] = len(unique_labels) + 1

output_dict = {}
for key, value in label_dict_NER.items():
    new_key = key.replace('B_', '').replace('I_', '')
    if new_key not in output_dict:
        output_dict[new_key] = len(output_dict)

label_dict_NER = output_dict
label_dict_NER

{'CASE_NUMBER': 0,
 'COURT': 1,
 'DATE': 2,
 'GPE': 3,
 'JUDGE': 4,
 'ORG': 5,
 'OTHER_PERSON': 6,
 'PETITIONER': 7,
 'PRECEDENT': 8,
 'PROVISION': 9,
 'RESPONDENT': 10,
 'STATUTE': 11,
 'WITNESS': 12,
 'O': 13}

In [25]:
unique_labels = []
for entry in NER_test:
    labels = entry['labels']
    for label in labels:
        unique_labels.append(label)
        
unique_labels=sorted(list(set(unique_labels)))
class_names = []
for ul in unique_labels:
    class_names.append(ul.replace("B_",'').replace("I_",''))
class_names_NER = sorted(list(set(class_names)))

In [26]:
class_names_LR = ["B","I","O"]

In [75]:
unique_labels = []
for entry in LR_test:
    labels = entry['labels']
    for label in labels:
        unique_labels.append(label)

label_dict_LR = {}
unique_labels = list(set(unique_labels))
unique_labels = sorted(unique_labels)

for label_index in range(len(unique_labels)):
    label_dict_LR[unique_labels[label_index]] = label_index

# label_dict_LR['p0p'] = len(unique_labels)
# label_dict['UNK'] = len(unique_labels) + 1
label_dict_LR

{'B': 0, 'I': 1, 'O': 2}

## EMBEDDING Model

### word2vec

In [16]:
w2vmodel = KeyedVectors.load_word2vec_format('.vector_cache/GoogleNews-vectors-negative300.bin', binary=True)

### GloVe


In [70]:
# glovemodel = GloVe(name='6B', dim=300)
glove_file = '.vector_cache/glove.6B.300d.txt'
glovemodel = {}
with open(glove_file, 'r', encoding='utf-8') as f:
    for line in f:
        values = line.split()
        word = values[0]
        vector = np.array(values[1:], dtype='float32')
        glovemodel[word] = vector

### fasttext

In [69]:
# fasttextmodel = FastText()
fasttext_file = '.vector_cache/wiki.en.vec'
fastmodel = {}
with open(fasttext_file, 'r', encoding='utf-8') as f:
    next(f)
    for line in f:
        values = line.rstrip().split(' ')
        word = values[0]
        vector = np.array([x for x in values[1:] if x], dtype='float32')
        fastmodel[word] = vector

FileNotFoundError: [Errno 2] No such file or directory: '.vector_cache/wiki.en.vec'

# Special Classes For Loading Model

## helper Functions

In [67]:
## Helper Functions
def argmax(vec):
    # return the argmax as a python int
    _, idx = torch.max(vec, 1)
    return idx.item()

def prepare_sequence(seq, to_ix):
    idxs = [to_ix[w] for w in seq]
    return torch.tensor(idxs, dtype=torch.long)

# Compute log sum exp in a numerically stable way for the forward algorithm
def log_sum_exp(vec):
    max_score = vec[0, argmax(vec)]
    max_score_broadcast = max_score.view(1, -1).expand(1, vec.size()[1])
    return max_score + \
        torch.log(torch.sum(torch.exp(vec - max_score_broadcast)))

def generate_tags_to_idx(data):
    START_TAG = "<START>"
    STOP_TAG = "<STOP>"
    unique_labels = []
    for entry in data:
        labels = entry['labels']
        for label in labels:
            unique_labels.append(label)
    unique_labels=sorted(list(set(unique_labels)))
    label_dict = {}
    for label_index in range(len(unique_labels)):
        label_dict[unique_labels[label_index]] = label_index
        
    label_dict[START_TAG]=len(label_dict)
    label_dict[STOP_TAG]=len(label_dict)
    
    output_dict = {}
    for key, value in label_dict.items():
        new_key = key.replace('B_', '').replace('I_', '')
        if new_key not in output_dict:
            output_dict[new_key] = len(output_dict)
    return output_dict

def get_embeds(sentence, embed_model, embedding_dim):
    word_vectors = []
    for word in sentence:
        # Check if the word exists in the Word2Vec model's vocabulary
        if word in embed_model:
            # Get the word vector for the current word
            word_vector = torch.tensor(embed_model[word], dtype=torch.float32)
            # Append the word vector to the list
            word_vectors.append(word_vector)
        else:
            # If the word is not in the vocabulary, append a zero vector
            word_vectors.append(torch.zeros(embedding_dim, dtype=torch.float32))

    # Convert the list of word vectors to a tensor
    embeds_tensor = torch.stack(word_vectors).to(device)

    return embeds_tensor

In [68]:
## Helper Functions
def argmax(vec):
    # return the argmax as a python int
    _, idx = torch.max(vec, 1)
    return idx.item()

def prepare_sequence(seq, to_ix):
    idxs = [to_ix[w] for w in seq]
    return torch.tensor(idxs, dtype=torch.long)

# Compute log sum exp in a numerically stable way for the forward algorithm
def log_sum_exp(vec):
    max_score = vec[0, argmax(vec)]
    max_score_broadcast = max_score.view(1, -1).expand(1, vec.size()[1])
    return max_score + \
        torch.log(torch.sum(torch.exp(vec - max_score_broadcast)))

def generate_tags_to_idx(data):
    START_TAG = "<START>"
    STOP_TAG = "<STOP>"
    unique_labels = []
    for entry in data:
        labels = entry['labels']
        for label in labels:
            unique_labels.append(label)
    unique_labels=sorted(list(set(unique_labels)))
    label_dict = {}
    for label_index in range(len(unique_labels)):
        label_dict[unique_labels[label_index]] = label_index
        
    label_dict[START_TAG]=len(label_dict)
    label_dict[STOP_TAG]=len(label_dict)
    
    output_dict = {}
    for key, value in label_dict.items():
        new_key = key.replace('B_', '').replace('I_', '')
        if new_key not in output_dict:
            output_dict[new_key] = len(output_dict)
    return output_dict

## Model Classes

In [66]:
class SequentialModel(nn.Module):
    def __init__(self, label_dict, base_model):
        super().__init__()
        self.sequential_model = base_model
        self.fc1 = nn.Linear(512, 128)
        self.fc2 = nn.Linear(128, len(label_dict))
        self.relu = nn.ReLU()

    def forward(self, x):
        x, hn = self.sequential_model(x)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        
        return x, hn

In [61]:
class LR_dataset(Dataset):
    def __init__(self, dataset, embedding="word2vec", padding=True):
        self.padding_word = "O"

        if embedding == "word2vec":
            self.model = w2vmodel
            self.model.add_vector(self.padding_word, np.zeros(300))
        elif embedding == "glove":
            self.model = glovemodel
            self.model[self.padding_word] = np.zeros(300)
        else:
            self.model = fastmodel
            self.model[self.padding_word] = np.zeros(300)

        self.data = dataset
        self.input = []
        self.labels = []
        if padding:
            for entry in dataset:
                padded_entry = entry["text"].split() + [self.padding_word] * (78 - len(entry["text"].split()))
                self.input.append(padded_entry)
                padded_labels = entry["labels"] + [self.padding_word] * (78 - len(entry["labels"]))
                self.labels.append(padded_labels)
        else:
            for entry in dataset:
                self.input.append(entry["text"].split())
                self.labels.append(entry["labels"])
        self.tag_to_index = label_dict_LR

    def __len__(self):
        return len(self.input)

    def __getitem__(self, idx):
        sentence = self.input[idx]
        word_vecs = []
        labels = []

        for word_index in range(len(sentence)):
            word = sentence[word_index]
            if word in self.model:
                word_vecs.append(self.model[word])
            else:
                word_vecs.append(self.model[self.padding_word])
            labels.append(self.tag_to_index[self.labels[idx][word_index]])

        return torch.tensor(word_vecs), torch.tensor(labels)

In [62]:
class NER_dataset(Dataset):
    def __init__(self, dataset, embedding="word2vec", padding=True):
        self.padding_word = "O"
        # self.padding_word = "p0p"

        if embedding == "word2vec":
            self.model = w2vmodel
            self.model.add_vector(self.padding_word, np.zeros(300))
        elif embedding == "glove":
            self.model = glovemodel
            self.model[self.padding_word] = np.zeros(300)
        else:
            self.model = fastmodel
            self.model[self.padding_word] = np.zeros(300)
            
        self.data = dataset
        self.input = []
        self.labels = []
        
        for i in range(len(self.data)):
            lst = self.data[i]["labels"]
            for j in range(len(lst)):
                if lst[j].startswith("B_"):
                    lst[j] = lst[j][2:]
                elif lst[j].startswith("I_"):
                    lst[j] = lst[j][2:]
            self.data[i]["labels"] = lst

        if padding:
            for entry in self.data:
                padded_entry = entry["text"].split() + [self.padding_word] * (75 - len(entry["text"].split()))
                self.input.append(padded_entry)
                padded_labels = entry["labels"] + [self.padding_word] * (75 - len(entry["labels"]))
                self.labels.append(padded_labels)
        else:
            for entry in self.data:
                self.input.append(entry["text"].split())
                self.labels.append(entry["labels"])
        self.tag_to_index = label_dict_NER

    def __len__(self):
        return len(self.input)

    def __getitem__(self, idx):
        sentence = self.input[idx]
        word_vecs = []
        labels = []
        
        # for word_index in range(len(sentence)):
        #     if sentence[word_index] not in self.model:
        #         word_vecs.append(np.zeros(300))
        #         labels.append(self.tag_to_index["O"])
        #         continue
        #     else:
        #         word_vecs.append(self.model[sentence[word_index]])
        #         labels.append(self.tag_to_index[self.labels[idx][word_index]])

        for word_index in range(len(sentence)):
            word = sentence[word_index]
            if word in self.model:
                word_vecs.append(self.model[word])
            else:
                word_vecs.append(self.model[self.padding_word])
            labels.append(self.tag_to_index[self.labels[idx][word_index]])

        return torch.tensor(word_vecs), torch.tensor(labels)

In [63]:
#need to remove embedding layer
class BiLSTM_CRF(torch.nn.Module):
    def __init__(self, tag_to_ix, embedding_model, hidden_dim):
        super(BiLSTM_CRF, self).__init__()
        START_TAG = "<START>"
        STOP_TAG = "<STOP>"
        self.embed_model=embedding_model
        self.embedding_dim=len(embedding_model['hello'])
        self.hidden_dim = hidden_dim
        # self.vocab_size = vocab_size
        self.tag_to_ix = tag_to_ix
        self.tagset_size = len(tag_to_ix)

        # do not need this we use word2vec output
        # self.word_embeds = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(self.embedding_dim, hidden_dim // 2,
                            num_layers=1, bidirectional=True)

        # Maps the output of the LSTM into tag space.
        self.hidden2tag = nn.Linear(hidden_dim, self.tagset_size)

        # Matrix of transition parameters.  Entry i,j is the score of
        # transitioning *to* i *from* j.
        self.transitions = nn.Parameter(
            torch.randn(self.tagset_size, self.tagset_size).to(device))

        # These two statements enforce the constraint that we never transfer
        # to the start tag and we never transfer from the stop tag
        self.transitions.data[tag_to_ix[START_TAG], :] = -10000
        self.transitions.data[:, tag_to_ix[STOP_TAG]] = -10000

        self.hidden = self.init_hidden()

    def init_hidden(self):
        return (torch.randn(2, 1, self.hidden_dim // 2),
                torch.randn(2, 1, self.hidden_dim // 2))

    def _forward_alg(self, feats):
        START_TAG = "<START>"
        STOP_TAG = "<STOP>"
        # Do the forward algorithm to compute the partition function
        init_alphas = torch.full((1, self.tagset_size), -10000.).to(device)
        # START_TAG has all of the score.
        init_alphas[0][self.tag_to_ix[START_TAG]] = 0.

        # Wrap in a variable so that we will get automatic backprop
        forward_var = init_alphas

        # Iterate through the sentence
        for feat in feats:
            alphas_t = []  # The forward tensors at this timestep
            for next_tag in range(self.tagset_size):
                # broadcast the emission score: it is the same regardless of
                # the previous tag
                emit_score = feat[next_tag].view(
                    1, -1).expand(1, self.tagset_size)
                # the ith entry of trans_score is the score of transitioning to
                # next_tag from i
                trans_score = self.transitions[next_tag].view(1, -1)
                # The ith entry of next_tag_var is the value for the
                # edge (i -> next_tag) before we do log-sum-exp
                next_tag_var = forward_var + trans_score + emit_score
                # The forward variable for this tag is log-sum-exp of all the
                # scores.
                alphas_t.append(log_sum_exp(next_tag_var).view(1))
            forward_var = torch.cat(alphas_t).view(1, -1).to(device)
        terminal_var = forward_var + self.transitions[self.tag_to_ix[STOP_TAG]]
        alpha = log_sum_exp(terminal_var)
        return alpha

    def _get_lstm_features(self, sentence):
        self.hidden = tuple(h.to(device) for h in self.init_hidden())
        # self.hidden.to(device)
        # embeds = self.word_embeds(sentence).view(len(sentence), 1, -1)
        embeds=sentence.unsqueeze(1)
        embeds=embeds.to(torch.float32).to(device)
        # print(embeds)
        lstm_out, self.hidden = self.lstm(embeds, self.hidden)
        lstm_out = lstm_out.view(len(sentence), self.hidden_dim)
        lstm_feats = self.hidden2tag(lstm_out)
        return lstm_feats

    def _score_sentence(self, feats, tags):
        START_TAG = "<START>"
        STOP_TAG = "<STOP>"
        # Gives the score of a provided tag sequence
        score = torch.zeros(1).to(device)
        tags = torch.cat([torch.tensor([self.tag_to_ix[START_TAG]], dtype=torch.long).to(device), tags]).to(device)
        # print(feats.shape)
        # print(tags)
        for i, feat in enumerate(feats):
            score = score + \
                self.transitions[tags[i + 1], tags[i]] + feat[tags[i + 1]]
        score = score + self.transitions[self.tag_to_ix[STOP_TAG], tags[-1]]
        return score

    def _viterbi_decode(self, feats):
        START_TAG = "<START>"
        STOP_TAG = "<STOP>"
        backpointers = []

        # Initialize the viterbi variables in log space
        init_vvars = torch.full((1, self.tagset_size), -10000.).to(device)
        init_vvars[0][self.tag_to_ix[START_TAG]] = 0

        # forward_var at step i holds the viterbi variables for step i-1
        forward_var = init_vvars
        for feat in feats:
            bptrs_t = []  # holds the backpointers for this step
            viterbivars_t = []  # holds the viterbi variables for this step

            for next_tag in range(self.tagset_size):
                # next_tag_var[i] holds the viterbi variable for tag i at the
                # previous step, plus the score of transitioning
                # from tag i to next_tag.
                # We don't include the emission scores here because the max
                # does not depend on them (we add them in below)
                next_tag_var = forward_var + self.transitions[next_tag]
                best_tag_id = argmax(next_tag_var)
                bptrs_t.append(best_tag_id)
                viterbivars_t.append(next_tag_var[0][best_tag_id].view(1))
            # Now add in the emission scores, and assign forward_var to the set
            # of viterbi variables we just computed
            forward_var = (torch.cat(viterbivars_t) + feat).view(1, -1)
            backpointers.append(bptrs_t)

        # Transition to STOP_TAG
        terminal_var = forward_var + self.transitions[self.tag_to_ix[STOP_TAG]]
        best_tag_id = argmax(terminal_var)
        path_score = terminal_var[0][best_tag_id]

        # Follow the back pointers to decode the best path.
        best_path = [best_tag_id]
        for bptrs_t in reversed(backpointers):
            best_tag_id = bptrs_t[best_tag_id]
            best_path.append(best_tag_id)
        # Pop off the start tag (we dont want to return that to the caller)
        start = best_path.pop()
        assert start == self.tag_to_ix[START_TAG]  # Sanity check
        best_path.reverse()
        return path_score, best_path

    def neg_log_likelihood(self, sentence, tags):
        feats = self._get_lstm_features(sentence)
        feats.to(device)
        forward_score = self._forward_alg(feats)
        gold_score = self._score_sentence(feats, tags)
        return forward_score - gold_score

    def forward(self, sentence):  # dont confuse this with _forward_alg above.
        # Get the emission scores from the BiLSTM
        lstm_feats = self._get_lstm_features(sentence)

        # Find the best path, given the features.
        score, tag_seq = self._viterbi_decode(lstm_feats)
        return score, tag_seq

# Test NER

## Loading the models (please replace the model file accordingly)

In [None]:
PATH="trained_models/t2_lstm_word2vec.pt"
model = torch.load(PATH)
model.eval()

In [None]:
PATH = "trained_models/t1_bilstm_word2vec.pth"
HIDDEN_DIM = 32
tag_to_ix=generate_tags_to_idx(NER_test)
vocab_size=len(w2vmodel['hello']) #Please change this
bilstm_model = BiLSTM_CRF(tag_to_ix, w2vmodel, HIDDEN_DIM)
bilstm_model.load_state_dict(torch.load(PATH))

## DataLoader for NER

In [None]:
NER_test_loader = DataLoader(NER_dataset(NER_test, "word2vec", padding=True), batch_size=64, shuffle=True)

## Test Loop for NER (Sequenntial)

In [23]:
def test_NER(model, test_data,count, tag_dict,embedding_model):
    true_labels = []
    predicted_labels = []
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)  # Move model to GPU if available
    with torch.no_grad():
        cnt=0
        for entry in test_data:
            try:
                cnt+=1
                if cnt>count:
                    break
                # Step 1. Get the inputs ready for the network
                sentence=entry['text'].split()
                sentence = entry["text"].split() + ['O'] * (75 - len(entry["text"].split()))
                padded_labels = entry["labels"] + ['O'] * (75 - len(entry["labels"]))
                tags=padded_labels
                new_tags = []
                for tag in tags:
                    new_tag= tag.replace('B_', '').replace('I_', '')
                    new_tags.append(new_tag)

                sentence_in = get_embeds(sentence,embedding_model,300)
                targets = torch.tensor([tag_dict[t] for t in new_tags], dtype=torch.long)

                # Step 2. Run the forward pass
                outputs, hn = model(sentence_in)
                _, predicted = torch.max(outputs, 1)

                # Convert predicted tags tensor to a list
                predicted_tags = predicted

                # Append true and predicted labels for F1 score calculation
                true_labels.extend(targets.tolist())
                predicted_labels.extend(predicted_tags)
            except:
                continue

    # Calculate F1 scores for each class
    f1 = f1_score(true_labels, predicted_labels, average = "macro")
    print("Macro F1 Score:{f1s}".format(f1s = f1))
    report = classification_report(true_labels, predicted_labels,  target_names= tag_dict.keys())
    print(report)

In [None]:
test_NER(model,NER_test,np.inf,label_dict_NER,w2vmodel)

In [44]:
model.eval()
all_test_labels = []
all_test_preds = []
with torch.no_grad():
    for inputs, labels in NER_test_loader:
        inputs, labels = inputs.to(device).to(torch.float32), labels.to(device).to(torch.long)
        outputs, hn = model(inputs)
        outputs = outputs.reshape(-1, outputs.shape[-1])
        labels = labels.reshape(-1)

        _, predicted = torch.max(outputs, 1)
        all_test_labels.extend(labels.cpu().numpy())
        all_test_preds.extend(predicted.cpu().numpy())

test_accuracy = accuracy_score(all_test_labels, all_test_preds)
test_f1 = f1_score(all_test_labels, all_test_preds, average='macro')
print(f"Test accuracy: {test_accuracy} Test F1: {test_f1}")
report = classification_report(all_test_labels, all_test_preds,  target_names= label_dict_NER.keys())
print(report)

  return torch.tensor(word_vecs), torch.tensor(labels)


Test accuracy: 0.96234375 Test F1: 0.6003203587716104


## Test loop for NER (BiLSTM)

In [57]:
from sklearn.metrics import f1_score
from sklearn.metrics import classification_report

def test(model, test_data,count,class_names,embed_model, tag_dict = tag_to_ix):
    true_labels = []
    predicted_labels = []
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)  # Move model to GPU if available
    with torch.no_grad():
        cnt=0
        for entry in test_data:
                cnt+=1
                if cnt>count:
                    break
                # Step 1. Get the inputs ready for the network
                sentence=entry['text'].split()
                tags=entry['labels']
                new_tags = []
                for tag in tags:
                    new_tag= tag.replace('B_', '').replace('I_', '')
                    new_tags.append(new_tag)

                sentence_in = get_embeds(sentence,embed_model,300)
                targets = torch.tensor([tag_dict[t] for t in new_tags], dtype=torch.long)

                # Step 2. Run the forward pass
                _, predicted_tags = model(sentence_in)
                # Convert predicted tags tensor to a list
                predicted_tags = predicted_tags

                # Append true and predicted labels for F1 score calculation
                true_labels.extend(targets.tolist())
                predicted_labels.extend(predicted_tags)

    # Calculate F1 scores for each class
    f1 = f1_score(true_labels, predicted_labels, average = "macro")
    print("Macro F1 Score:{f1s}".format(f1s = f1))
    report = classification_report(true_labels, predicted_labels,  target_names= class_names)
    print(report)

In [58]:
test(bilstm_model,NER_test,np.inf,class_names_NER, w2vmodel)

Macro F1 Score:0.6735100506799192
              precision    recall  f1-score   support

 CASE_NUMBER       0.78      0.64      0.70       459
       COURT       0.90      0.83      0.86       502
        DATE       0.88      0.89      0.89       319
         GPE       0.61      0.68      0.64       223
       JUDGE       0.42      0.42      0.42        12
           O       0.78      0.37      0.50       465
         ORG       0.74      0.77      0.75       447
OTHER_PERSON       0.82      0.45      0.58        20
  PETITIONER       0.92      0.91      0.92      1952
   PRECEDENT       0.95      0.88      0.91       689
   PROVISION       0.00      0.00      0.00        12
  RESPONDENT       0.88      0.88      0.88       595
     STATUTE       0.58      0.30      0.40       106
     WITNESS       0.96      0.98      0.97     22711

    accuracy                           0.95     28512
   macro avg       0.73      0.64      0.67     28512
weighted avg       0.94      0.95      0.94   

# Test LR

## Loading the model (please replace the model file accordingly)

In [71]:
PATH="trained_models/t2_lstm_word2vec.pt"
model = torch.load(PATH)
model.eval()

SequentialModel(
  (sequential_model): LSTM(300, 512, batch_first=True)
  (fc1): Linear(in_features=512, out_features=128, bias=True)
  (fc2): Linear(in_features=128, out_features=3, bias=True)
  (relu): ReLU()
)

In [73]:
PATH = "trained_models/t2_bilstm_word2vec.pth"
HIDDEN_DIM = 32
tag_to_ix=generate_tags_to_idx(LR_test)
vocab_size=len(w2vmodel['hello']) #Please change this
bilstm_model = BiLSTM_CRF(tag_to_ix, w2vmodel, HIDDEN_DIM)
bilstm_model.load_state_dict(torch.load(PATH))

<All keys matched successfully>

## DataLoader for LR

In [77]:
LR_test_loader = DataLoader(LR_dataset(LR_test, "word2vec", padding=True), batch_size=64, shuffle=True)    

In [78]:
print(len(LR_test_loader))
for data,label in LR_test_loader:
    print(data.shape)
    print(label.shape)
    break

6
torch.Size([64, 78, 300])
torch.Size([64, 78])


  return torch.tensor(word_vecs), torch.tensor(labels)


## Test Loop for LR

In [79]:
def test_LR(model, test_data,count, tag_dict,embedding_model):
    print(tag_dict)
    true_labels = []
    predicted_labels = []
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)  # Move model to GPU if available
    with torch.no_grad():
        cnt=0
        for entry in test_data:
            try:
                cnt+=1
                if cnt>count:
                    break
                # Step 1. Get the inputs ready for the network
                sentence=entry['text'].split()
                sentence = entry["text"].split() + ['O'] * (75 - len(entry["text"].split()))
                padded_labels = entry["labels"] + ['O'] * (75 - len(entry["labels"]))
                tags=padded_labels
                new_tags = []
                for tag in tags:
                    new_tag= tag.replace('B_', '').replace('I_', '')
                    new_tags.append(new_tag)

                sentence_in = get_embeds(sentence,embedding_model,300)
                targets = torch.tensor([tag_dict[t] for t in new_tags], dtype=torch.long)

                # Step 2. Run the forward pass
                outputs, hn = model(sentence_in)
                _, predicted = torch.max(outputs, 1)

                # Convert predicted tags tensor to a list
                predicted_tags = predicted

                # Append true and predicted labels for F1 score calculation
                true_labels.extend(targets.tolist())
                predicted_labels.extend(predicted_tags)
            except:
                continue

    # Calculate F1 scores for each class
    f1 = f1_score(true_labels, predicted_labels, average = "macro")
    print("Macro F1 Score:{f1s}".format(f1s = f1))
    report = classification_report(true_labels, predicted_labels)
    print(report)

In [80]:
test_LR(model,LR_test,np.inf,label_dict_LR,w2vmodel)

{'B': 0, 'I': 1, 'O': 2}
Macro F1 Score:0.3356147175593501
              precision    recall  f1-score   support

           0       0.83      0.01      0.02       463
           1       0.00      0.00      0.00       243
           2       0.97      1.00      0.99     23894

    accuracy                           0.97     24600
   macro avg       0.60      0.34      0.34     24600
weighted avg       0.96      0.97      0.96     24600



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
