In [None]:
!pip install transformers[torch]

In [None]:
!git clone https://github.com/br-ai-ns-institute/Zero-ShotNER.git

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os

folder_name = "NLP"
out_path = "/content/drive/MyDrive/" + folder_name

if not os.path.exists(out_path):
    os.makedirs(out_path)


In [None]:
%cd /content/Zero-ShotNER

/content/Zero-ShotNER


In [None]:
!pip install gdown
import gdown

In [None]:
drive_link = "https://drive.google.com/u/0/uc?id=1-AkNr4DDfqmXQc-E4Zk7bND4XLnGCG0s&export=download"
output_file = "./datasets/train.csv"
gdown.download(drive_link, output_file, quiet=False)

In [None]:
drive_link = "https://drive.google.com/u/0/uc?id=1-DXQd3y27RdaYIhy4PTS0naoHz7rn6U9&export=download"
output_file = "./datasets/test.csv"
gdown.download(drive_link, output_file, quiet=False)

# Start

In [None]:
import pandas as pd
import pickle
import numpy as np

import os

import torch
from torch.utils.data import Subset

import transformers
from transformers import AutoTokenizer
from transformers import BertForTokenClassification, Trainer, TrainingArguments

import random
import time

import ast

from sklearn.model_selection import train_test_split

# Parameters

In [None]:
model_name = "bert-base-cased"
# model_name = "dmis-lab/biobert-v1.1"
tokenizer = AutoTokenizer.from_pretrained(model_name)
class_unseen = 'PETITIONER'
num_train_epochs = 20
max_length = 512
sampled_data_size_train = None
sampled_data_size_test = None
# max_length = 256

# Data Processing

In [None]:
df_train_val = pd.read_csv("datasets/train.csv", index_col=0)
if sampled_data_size_train != None:
  df_train_val = df_train_val.sample(sampled_data_size_train).reset_index(drop=True)

df_test = pd.read_csv("datasets/test.csv", index_col=0)
if sampled_data_size_test != None:
  df_test = df_test.sample(sampled_data_size_test).reset_index(drop=True)

df_train, df_valid = train_test_split(df_train_val, test_size=0.2, random_state=42)

df_train = df_train[['class', 'text', 'labels']].reset_index(drop=True)
df_train['text'] = df_train['text'].apply(ast.literal_eval)
df_train['class'] = df_train['class'].apply(ast.literal_eval)
df_train['labels'] = df_train['labels'].apply(ast.literal_eval)

df_valid = df_valid[['class', 'text', 'labels']].reset_index(drop=True)
df_valid['text'] = df_valid['text'].apply(ast.literal_eval)
df_valid['class'] = df_valid['class'].apply(ast.literal_eval)
df_valid['labels'] = df_valid['labels'].apply(ast.literal_eval)

df_test = df_test[['class', 'text', 'labels']].reset_index(drop=True)
df_test['text'] = df_test['text'].apply(ast.literal_eval)
df_test['class'] = df_test['class'].apply(ast.literal_eval)
df_test['labels'] = df_test['labels'].apply(ast.literal_eval)

In [None]:
df_test

Unnamed: 0,class,text,labels
0,[STATUTE],"[True,, our, Constitution, has, no, 'due, proc...","[0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ..."
1,[PRECEDENT],"[True,, our, Constitution, has, no, 'due, proc...","[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ..."
2,[JUDGE],"[(See, Principles, of, Statutory, Interpretati...","[0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0]"
3,"[OTHER, PERSON]","[Their, Lordships, have, said, --, , ""It, is, ...","[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ..."
4,[GPE],"[Their, Lordships, have, said, --, , ""It, is, ...","[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ..."
...,...,...,...
1976,[JUDGE],"[High, Court, Of, Judicature, At, Allahabad\n,...","[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ..."
1977,[LAWYER],"[High, Court, Of, Judicature, At, Allahabad\n,...","[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ..."
1978,[RESPONDENT],"[High, Court, Of, Judicature, At, Allahabad\n,...","[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ..."
1979,[COURT],"[High, Court, Of, Judicature, At, Allahabad\n,...","[1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, ..."


In [None]:
def get_tokenized_encodings(df):
  tokenized_encodings = tokenizer(df["class"].to_list(),
                                  df["text"].to_list(),
                                  truncation=True,
                                  is_split_into_words=True,
                                  add_special_tokens=True,
                                  padding='max_length',
                                  max_length=max_length)
  return tokenized_encodings

def align_lables(df, tokenized_encodings, label_all_tokens = True):

    labels = list()
    for i, label in enumerate(df['labels']):
        word_ids = tokenized_encodings.word_ids(batch_index=i)  # Map tokens to their respective word.
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:  # Set the special tokens to -100.
            if word_idx is None:
                label_ids.append(-100)
            elif word_idx != previous_word_idx:
                if word_idx < len(label):
                    label_ids.append(label[word_idx])
                else:
                    label_ids.append(-100)
            else:
                label_ids.append(label[word_idx] if label_all_tokens else -100)
            previous_word_idx = word_idx
        stop = label_ids.index(-100, 2)
        label_ids = label_ids[:1] + [1 for x in label_ids[1:stop]] + label_ids[stop:]
        labels.append(label_ids)
    return labels

class Custom_Dataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

In [None]:
tokenized_encodings_train = get_tokenized_encodings(df_train)
tokenized_encodings_valid = get_tokenized_encodings(df_valid)
tokenized_encodings_test = get_tokenized_encodings(df_test)

labels_train = align_lables(df_train, tokenized_encodings_train, label_all_tokens = True)
labels_valid = align_lables(df_valid, tokenized_encodings_valid, label_all_tokens = True)
labels_test = align_lables(df_test, tokenized_encodings_test, label_all_tokens = True)

dataset_train = Custom_Dataset(tokenized_encodings_train, labels_train)
dataset_valid = Custom_Dataset(tokenized_encodings_valid, labels_valid)
dataset_test = Custom_Dataset(tokenized_encodings_test, labels_test)

In [None]:
def get_task_specific_datasets(class_name):
    indexes_train = list(df_train.index[df_train['class'].apply(lambda c : " ".join(c) != class_name)])
    indexes_valid = list(df_valid.index[df_valid['class'].apply(lambda c : " ".join(c) != class_name)])
    indexes_test = list(df_test.index[df_test['class'].apply(lambda c : " ".join(c) == class_name)])
    # print(indexes_test)
    df_withOne_test = df_test.loc[indexes_test]

    dataset_withoutOne_train = Subset(dataset_train, indexes_train)
    dataset_withoutOne_valid = Subset(dataset_valid, indexes_valid)
    dataset_withOne_test = Subset(dataset_test, indexes_test)

    lst_indexes_FewShot_train = list(df_train.index[(df_train['class'].apply(lambda c : " ".join(c) == class_name)) & (df_train['labels'].apply(lambda lst : sum(lst))>0)])

    indexes_UnseenClass_train_1 = random.choices(lst_indexes_FewShot_train, k=1)
    indexes_UnseenClass_train_10 = random.choices(lst_indexes_FewShot_train, k=10)
    indexes_UnseenClass_train_100 = random.choices(lst_indexes_FewShot_train, k=100)

    dataset_UnseenClass_train_1 = Subset(dataset_train, indexes_UnseenClass_train_1)
    dataset_UnseenClass_train_10 = Subset(dataset_train, indexes_UnseenClass_train_10)
    dataset_UnseenClass_train_100 = Subset(dataset_train, indexes_UnseenClass_train_100)

    return dataset_withoutOne_train, dataset_withoutOne_valid, dataset_withOne_test, dataset_UnseenClass_train_1, dataset_UnseenClass_train_10, dataset_UnseenClass_train_100, df_withOne_test

In [None]:
train0shot, valid0shot , dataset_withOne_test, train1shot, train10shot, train100shot, df_withOne_test = get_task_specific_datasets(class_unseen)

# Training

In [None]:
folder_name + '/Results'+class_unseen+'ZeroShot'

'NLP/ResultsPETITIONERZeroShot'

In [None]:
training_args = TrainingArguments(
    output_dir= out_path + '/Results'+class_unseen+'ZeroShot',   # output folder (folder to store the results)
    num_train_epochs=num_train_epochs,                               # number of training epochs
    per_device_train_batch_size=16,                   # batch size per device during training
    per_device_eval_batch_size=16,                    # batch size for evaluation
    weight_decay=0.01,                                # strength of weight decay
    logging_dir=out_path + '/Logs'+class_unseen+'ZeroShot',     # folder to store the logs
    #logging_steps=10000,
    #logging_strategy='steps',
    save_strategy='epoch',
    evaluation_strategy='epoch',
    load_best_model_at_end=True
)


model = BertForTokenClassification.from_pretrained(model_name, num_labels=2)

trainer = Trainer(
    model=model,                 # pre-trained model for fine-tuning
    args=training_args,          # training arguments defined above
    train_dataset=train0shot,    # dataset class object for training
    eval_dataset=valid0shot   # dataset class object for validation
)

start_time = time.time()
trainer.train()
total_time = time.time()-start_time

model_path = os.path.join(out_path, 'Results', class_unseen, 'ZeroShot', 'Model')
os.makedirs(model_path, exist_ok=True)
model.save_pretrained(model_path)

tokenizer_path = os.path.join(out_path, 'Results', class_unseen, 'ZeroShot','Tokenizer')
os.makedirs(tokenizer_path, exist_ok=True)
tokenizer.save_pretrained(tokenizer_path)

Some weights of the model checkpoint at bert-base-cased were not used when initializing BertForTokenClassification: ['cls.predictions.transform.LayerNorm.weight', 'cls.seq_relationship.bias', 'cls.seq_relationship.weight', 'cls.predictions.transform.dense.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.bias', 'cls.predictions.transform.dense.bias']
- This IS expected if you are initializing BertForTokenClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForTokenClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of BertForTokenClassification were not initialized from the model checkpoint at bert-base-cased and are newly initialized: ['cl

Epoch,Training Loss,Validation Loss
1,No log,0.264103
2,No log,0.244506


('/content/drive/MyDrive/NLP/Results/PETITIONER/ZeroShot/Tokenizer/tokenizer_config.json',
 '/content/drive/MyDrive/NLP/Results/PETITIONER/ZeroShot/Tokenizer/special_tokens_map.json',
 '/content/drive/MyDrive/NLP/Results/PETITIONER/ZeroShot/Tokenizer/vocab.txt',
 '/content/drive/MyDrive/NLP/Results/PETITIONER/ZeroShot/Tokenizer/added_tokens.json',
 '/content/drive/MyDrive/NLP/Results/PETITIONER/ZeroShot/Tokenizer/tokenizer.json')

In [None]:
for i, data_set_train in enumerate([train1shot, train10shot, train100shot]):
    training_args = TrainingArguments(
        output_dir=out_path + '/Results'+class_unseen+'FewShot'+str(i),  # output folder (folder to store the results)
        num_train_epochs=num_train_epochs,                                   # number of training epochs
        per_device_train_batch_size=16,                        # batch size per device during training
        per_device_eval_batch_size=16,                         # batch size for evaluation
        weight_decay=0.01,                                     # strength of weight decay
        logging_dir=out_path + '/Logs'+class_unseen+'FewShot'+str(i),    # folder to store the logs
        #logging_steps=10000,
        #logging_strategy='steps',
        save_strategy='epoch',
        evaluation_strategy='epoch',
        load_best_model_at_end=True
    )

    model0 = BertForTokenClassification.from_pretrained(model_path, num_labels=2)

    trainer = Trainer(
        model=model0,                # pre-trained model for fine-tuning
        args=training_args,          # training arguments defined above
        train_dataset=data_set_train,   # dataset class object for training
        eval_dataset=valid0shot   # dataset class object for validation
    )

    start_time = time.time()
    trainer.train()
    total_time = time.time()-start_time

    model_path = os.path.join(out_path, 'Results', class_unseen, 'FewShot',str(i), 'Model')
    os.makedirs(model_path, exist_ok=True)
    model0.save_pretrained(model_path)

    tokenizer_path = os.path.join(out_path, 'Results', class_unseen, 'FewShot', str(i), 'Tokenizer')
    os.makedirs(tokenizer_path, exist_ok=True)
    tokenizer.save_pretrained(tokenizer_path)




Epoch,Training Loss,Validation Loss
1,No log,0.268166
2,No log,0.27117




Epoch,Training Loss,Validation Loss
1,No log,0.239375
2,No log,0.233033




Epoch,Training Loss,Validation Loss
1,No log,0.289883
2,No log,0.357135


# Testing

In [None]:
from sklearn.metrics import confusion_matrix, f1_score, accuracy_score,recall_score,precision_score

In [None]:
def testing1(model_path):
    # testset = dataset_withOne_test
    # df_test_UnseenClass = df_withOne_test
    testset = dataset_withOne_test
    df_test_UnseenClass = df_withOne_test
    print(len(testset), len(df_test_UnseenClass))

    model = BertForTokenClassification.from_pretrained(model_path, num_labels=2)
    args = TrainingArguments(output_dir='./evaldir', per_device_eval_batch_size=16)

    evaler = Trainer(
        args=args,
        model=model
    )

    pred=evaler.predict(testset)
    test_indexs_new=df_test_UnseenClass.index.to_list()
    wids=np.array([testset.dataset.encodings.encodings[ii].word_ids for ii in test_indexs_new])
    wids[wids==None]=-1
    wids=wids.astype(int)

    type_ids=np.array([testset.dataset.encodings.encodings[ii].type_ids for ii in test_indexs_new],dtype=bool)
    pre=pred[0].argmax(axis=-1)
    pre_list=[]
    test_list=[]
    for ii in range(wids.shape[0]):
        test_list.append(wids[ii][type_ids[ii]])
        pre_list.append(pre[ii][type_ids[ii]])

    labels=[]
    for ii in range(len(pre_list)):
        labels.append(np.array(range(test_list[ii].max()+1)))
        for jj in labels[ii]:
            bb=np.where(test_list[ii]==jj)[0]
            labela=np.array(pre_list[ii])[bb].mean()
            if labela>0.01:
                labels[ii][jj]=1
            else:
                labels[ii][jj]=0


    labels_original = list(df_test_UnseenClass.iloc[:]['labels'])
    f1av=0.0
    lf1=0

    for ii in range(len(labels)):
        min_len = min(len(labels[ii]), len(labels_original[ii]))
        if len(labels[ii])==len(labels_original[ii]):
            f1av=f1av+f1_score( labels_original[ii], labels[ii],average=None)
            lf1=lf1+1
        else:
            f1av=f1av+f1_score( labels_original[ii][:min_len], labels[ii][:min_len],average=None)
            lf1=lf1+1
            # print(min_len)


            # print(ii, len(labels[ii]), len(labels_original[ii]))

    lab=np.array([])       # predicted labels
    labor=np.array([])     # true labels labele
    for ii in range(len(labels)):
        if len(labels[ii])==len(labels_original[ii]):
          lab=np.concatenate((lab,labels[ii]))
          labor=np.concatenate((labor,labels_original[ii]))
        else:
          min_len = min(len(labels[ii]),len(labels_original[ii]))
          lab=np.concatenate((lab,labels[ii][:min_len]))
          labor=np.concatenate((labor,labels_original[ii][:min_len]))

    accuracy = accuracy_score(labor, lab, normalize=True)
    precision = precision_score(labor, lab, average=None)
    recall = recall_score(labor, lab, average=None)
    f1 = f1_score(labor, lab, average=None)
    matrix = confusion_matrix(labor, lab)

    return [accuracy,precision,recall,f1,matrix]

In [None]:
# res = testing1(os.path.join(out_path, 'Results', class_unseen, 'ZeroShot', 'Model'))
# print(res)

# for i, data_set_train in enumerate([train1shot, train10shot, train100shot]):
#   res = testing1(os.path.join(out_path, 'Results', class_unseen, 'FewShot',str(i), 'Model'))
#   print(res)