In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [11]:
%pip install langchain
%pip install tqdm



In [3]:
"""
data annotation with transfer learning (zero shot learning mechanism) using distilBERT and roberta
"""
import pandas as pd
from transformers import DistilBertForTokenClassification
from transformers import DistilBertTokenizerFast
from transformers import RobertaForTokenClassification
from transformers import RobertaTokenizer
from transformers import pipeline

In [13]:
"""
Text classification using distilBERT, fine tuned on herb dataset
"""
import warnings
warnings.simplefilter('ignore')
import numpy as np
from tqdm import tqdm
from sklearn import metrics
import torch
from torch.utils.data import Dataset, DataLoader, RandomSampler, SequentialSampler
from transformers import DistilBertTokenizer, DistilBertModel
from transformers import Trainer, DistilBertForSequenceClassification
from transformers import Seq2SeqTrainingArguments
import logging
logging.basicConfig(level=logging.ERROR)

In [15]:
from torch import cuda
device = 'cuda' if cuda.is_available() else 'cpu'

In [4]:
def load_dataset_from_pickle(**kwargs)->pd.core.frame.DataFrame:
    return pd.read_pickle(kwargs['file_path'])

In [7]:
def save_dataset_to_pickle(**kwargs)->None:
    kwargs['dataset'].to_pickle(kwargs['file_path'])

In [8]:
class NER:
    model=None
    model_pipeline=None
    tokenizer=None
    dataset:pd.core.frame.DataFrame=None
    ner_dataset:pd.core.frame.DataFrame=None
    model_choice:str=""
    def __init__(self,**kwargs)->None:
        self.dataset = kwargs['tokenised_dataset']
        self.model_choice = kwargs['model_choice']
        self.ner_dataset = pd.DataFrame(columns=list(self.dataset.columns))
    def __select_model(self)->None:
        match(self.model_choice):
            case 'DistilBert':
                self.model = DistilBertForTokenClassification.from_pretrained('distilbert-base-uncased')
                self.tokenizer = DistilBertTokenizerFast.from_pretrained('distilbert-base-uncased',tokenization_strategy='word',model_max_length=512,padding=True,truncation=True)
            case 'Roberta':
                self.model = RobertaForTokenClassification.from_pretrained('roberta-base-uncased')
                self.tokenizer = RobertaTokenizer.from_pretrained('roberta-base-uncased', tokenization_strategy='word',model_max_length=512)
            case _:
                self.model = None
    def __entity_recognition(self,**kwargs)->None:
        sentences = kwargs['data_sentences']
        self.model_pipeline = pipeline('ner',model=self.model,tokenizer=self.tokenizer)
        result:list=[]
        for sentence in sentences:
            result.append(self.model_pipeline(sentence))
        return result

    def entity_recognition(self)->None:
        self.__select_model()
        tmp_dict:dict = {i:[] for i in ('entity','word')}
        for field in self.dataset.columns:
            ner_results = self.dataset.apply(lambda row: self.__entity_recognition(data_sentences=row[field]),axis=1)
            for result in ner_results[0][0]:
                tmp_dict['entity'].append(result[0]['entity'])
                tmp_dict['word'].append(result[0]['word'])
            self.ner_dataset[field] = tmp_dict
            tmp_dict['entity'] = []
            tmp_dict['word'] = []
        save_dataset_to_pickle(dataset=self.ner_dataset,file_path=rf'/content/drive/MyDrive/herbology/ner_dataset.pkl')


In [29]:
class CustomDataset(torch.utils.data.Dataset):
        def __init__(self, dataframe, tokenizer, max_len):
            self.tokenizer = tokenizer
            self.data = dataframe
            self.max_len = max_len

        def __len__(self):
            return len(self.data)

        def __getitem__(self, index):
            # Select the sentence and label at the specified index in the data frame
            sentence = str(self.data.loc[index, 'text']) # Assuming 'text' column contains text
            labels = self.data.loc[index, 'Family'] # Assuming 'labels' column contains labels

            # Preprocess the text
            encoding = self.tokenizer.encode_plus(
                sentence,
                add_special_tokens=True,
                max_length=self.max_len,
                return_token_type_ids=True,
                pad_to_max_length=True,
                return_attention_mask=True,
                return_tensors='pt',
                truncation=True
            )

            # Return the processed data as a dictionary
            return {
                'ids': encoding['input_ids'].flatten(),
                'mask': encoding['attention_mask'].flatten(),
                'token_type_ids': encoding['token_type_ids'].flatten(),
                'targets': torch.tensor(labels, dtype=torch.float) # Assuming labels are numerical
            }

In [14]:
class DistilBERTClass(torch.nn.Module):
    def __init__(self):
        super(DistilBERTClass, self).__init__()
        self.l1 = DistilBertModel.from_pretrained("distilbert-base-uncased")
        self.pre_classifier = torch.nn.Linear(768, 768)
        self.dropout = torch.nn.Dropout(0.1)
        self.classifier = torch.nn.Linear(768, 6)

    def forward(self, input_ids, attention_mask, token_type_ids):
        output_1 = self.l1(input_ids=input_ids, attention_mask=attention_mask)
        hidden_state = output_1[0]
        pooler = hidden_state[:, 0]
        pooler = self.pre_classifier(pooler)
        pooler = torch.nn.Tanh()(pooler)
        pooler = self.dropout(pooler)
        output = self.classifier(pooler)
        return output

In [30]:
class FineTuneDistilBERT():
    dataset:pd.core.frame.DataFrame =None
    training_dataset:pd.core.frame.DataFrame=None
    testing_dataset:pd.core.frame.DataFrame = None
    hyper_parameters = {'MAX_LEN':0,
                        'TRAINING_BATCH_SIZE':0,
                        'VALID_BATCH_SIZE':0,
                        'EPOCHS':0,
                        'LEARNING_RATE':0}
    tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased',truncation=True, do_lower_case=True)
    training_params:dict[str]={}
    testing_params:dict[str]={}
    training_loader=None
    testing_loader=None
    optimiser = None
    y_pred:list = []
    y_true:list = []
    final_output = None
    output_model_file = './models/pytorch_distilbert_news.bin'
    def __init__(self, **kwargs):
        self.dataset = kwargs['dataset']
        self.dataset = self.dataset.reset_index(drop=True)
        self.training_dataset=self.dataset.sample(frac=0.8,random_state=200)
        self.testing_dataset=self.dataset.drop(self.training_dataset.index).reset_index(drop=True)
        self.hyper_parameters = kwargs['hyper_parameters']

    def __get_test_train_params(self, **kwargs)->None:
        self.training_params = kwargs['training_params']
        self.testing_params = kwargs['testing_params']
        self.training_set = CustomDataset(self.training_dataset, self.tokenizer, self.hyper_parameters['MAX_LEN'])
        self.testing_set = CustomDataset(self.testing_dataset, self.tokenizer, self.hyper_parameters['MAX_LEN'])
        self.training_loader = DataLoader(self.training_dataset, **self.training_params)
        self.testing_loader = DataLoader(self.testing_dataset, **self.testing_params)

    def __model_initialisation(self):
        self.model = DistilBERTClass()
        self.model.to(device)

    def __los_fn(self,**kwargs):
        outputs = kwargs['outputs']
        targets = kwargs['targets']
        return torch.nn.BCEWithLogitsLoss()(outputs, targets)

    def __optimiser(self)->None:
        self.optimiser = torch.optim.Adam(params =  self.model.parameters(), lr=self.hyper_parameters['LEARNING_RATE'])
    def __hamming_score(self)->None:
        acc_list:list =[]
        for i in range(self.y_true.shape[0]):
            set_true:set = set(np.where(self.y_true[i])[0])
            set_pred:set = set(np.where(self.y_pred[i])[0])
            tmp_a = None
            if len(set_true) == 0 and len(set_pred) == 0:
                tmp_a = 1
            else:
                tmp_a = len(set_true.intersection(set_pred)) /\
                    float(len(set_true.union(set_pred)))
            acc_list.append(tmp_a)
        return np.mean(acc_list)

    def train_prerequisites(self)->None:
        self.__get_test_train_params(training_params={'batch_size':self.hyper_parameters['TRAINING_BATCH_SIZE'],
                                                      'shuffle':True,
                                                      'num_workers':0}, testing_params={
                                                          'batch_size':self.hyper_parameters['VALID_BATCH_SIZE'],
                                                          'shuffle':True,
                                                          'num_workers':0
                                                      })
        self.__model_initialisation()
        self.__optimiser()

    def __train(self,**kwargs)->None:
        self.model.train()
        for _,data in tqdm(enumerate(self.training_loader, 0)):
            ids = data['ids'].to(device, dtype = torch.long)
            mask = data['mask'].to(device, dtype = torch.long)
            token_type_ids = data['token_type_ids'].to(device, dtype = torch.long)
            targets = data['targets'].to(device, dtype = torch.float)

            outputs = self.model(ids, mask, token_type_ids)

            self.optimiser.zero_grad()
            loss = self.__los_fn(outputs=outputs, targets=targets)
            if _%5000==0:
                print(f'Epoch: {kwargs["epoch"]}, Loss:  {loss.item()}')

            loss.backward()
            self.optimiser.step()

    def train(self)->None:
        self.train_prerequisites()
        for epoch in range(self.hyper_parameters['EPOCHS']):
            self.__train(epoch=epoch)

    def __validation(self):
        self.model.eval()
        fin_targets:list = []
        fin_outputs:list = []
        with torch.no_grad():
            for _, data in tqdm(enumerate(self.testing_loader, 0)):
                ids = data['ids'].to(device, dtype = torch.long)
                mask = data['mask'].to(device, dtype = torch.long)
                token_type_ids = data['token_type_ids'].to(device, dtype = torch.long)
                targets = data['targets'].to(device, dtype = torch.float)
                outputs = self.model(ids, mask, token_type_ids)
                fin_targets.extend(targets.cpu().detach().numpy().tolist())
                fin_outputs.extend(torch.sigmoid(outputs).cpu().detach().numpy().tolist())
        return fin_outputs, fin_targets

    def validation(self):
        self.y_pred, self.y_true = self.__validation()
        self.final_outputs = np.array(self.y_pred) >=0.5

    def loss_errors(self)->None:
        self.hamming_loss = metrics.hamming_loss(self.y_true, self.final_outputs)
        self.accuracy_score = metrics.accuracy_score(self.y_true, self.final_outputs)
        self.hamming_score = self.__hamming_score()
        print(f"Hamming Loss: {self.hamming_loss}")
        print(f"Accuracy Score: {self.accuracy_score}")
        print(f"Hamming Score: {self.hamming_score}")

    def save_model(self)->None:
        torch.save(self.model, self.output_model_file)
        print('saved')




In [30]:
def perform_ner(**kwargs)->None:
    dataset = kwargs['dataset']
    ner = NER(tokenised_dataset=dataset,model_choice='DistilBert')
    ner.entity_recognition()

In [22]:
def main()->None:
    dataset = load_dataset_from_pickle(file_path=rf'/content/drive/MyDrive/herbology/tokens.pkl')
    #perform_ner(dataset=dataset)
    ftdb = FineTuneDistilBERT(dataset=dataset, hyper_parameters={'MAX_LEN':128,
                        'TRAINING_BATCH_SIZE':16,
                        'VALID_BATCH_SIZE':16,
                        'EPOCHS':3,
                        'LEARNING_RATE':1e-05})
    ftdb.train()
    ftdb.validation()
    ftdb.loss_errors()
    ftdb.save_model()

In [31]:
main()

0it [00:00, ?it/s]


KeyError: 906