In [1]:
import torch
import numpy as np
from torch import nn
from torch.utils.data import DataLoader, Dataset
from transformers import AutoModel, AutoTokenizer
from pytorch_lightning import Trainer
from pytorch_lightning import LightningModule
import json
import torchmetrics
from torchmetrics import F1Score
#import evaluate
import copy
from torch.nn.utils.rnn import pad_sequence
from bidict import bidict
#from seqeval.metrics import f1_score

In [2]:
import os
from tqdm.auto import tqdm

In [3]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [4]:
class LoadData(Dataset):
    def __init__(self, trainingDataPath, testDataPath, validationDataPath,coarseMapsDataPath):
        self.trainingDataPath = trainingDataPath
        self.testDataPath = testDataPath
        self.validationDataPath = validationDataPath
        self.coarseMapsDataPath = coarseMapsDataPath

        self.idxToCluster={}
        self.clusterToIdx={}

        self.IdxToMeaning = {}
        self.meaningToIdx = {}
        
    def readData(self, dataPath):
        # Read the JSON file
        with open(dataPath, "r") as f:
            json_data = json.load(f)

        # Some transformation so that we have data in the form {i:object{},i+1:object{},...}. where i is an index (0,...,N)
        json_data = {k: json_data[k] for k in list(json_data)}
        json_data = {index: json_data[k] for index,k in enumerate(list(json_data))}
        return json_data
        
    def retrieveMeaningMaps(self):
        # Read the JSON file
        with open(self.coarseMapsDataPath, "r") as f:
            json_data = json.load(f)

        position_meaning = 1
        for index, (key, list_meanings) in enumerate(json_data.items()):
            for meaning in list_meanings:
                #next(iter(meaning)) gives us the first element in a fast way
                self.IdxToMeaning[position_meaning] = next(iter(meaning)) 
                self.meaningToIdx[ next(iter(meaning)) ] = position_meaning
                position_meaning+=1
                
        #homonimyCluster_IdxToCluster = {index+1:key for index, (key, _) in enumerate(json_data.items()) }
        #homonimyCluster_ClusterToIdx = {value:key for (key, value) in homonimyCluster_IdxToCluster.items() }

        self.meaningToIdx["unk"] =  0
        self.IdxToMeaning[0] = "unk"
        
        return [ self.meaningToIdx, self.IdxToMeaning ]


    def retrieveClusterMaps(self):
        # Read the JSON file
        with open(self.coarseMapsDataPath, "r") as f:
            json_data = json.load(f)
                
        self.idxToCluster = {index+1:key for index, (key, _) in enumerate(json_data.items()) }
        self.clusterToIdx = {value:key for (key, value) in self.idxToCluster.items() }

        self.clusterToIdx["unk"] =  0
        self.idxToCluster[0] = "unk"
        
        return [ self.clusterToIdx, self.idxToCluster ]


    def retrieveClusterToMeaningsMaps(self):
        # Read the JSON file
        with open(self.coarseMapsDataPath, "r") as f:
            json_data = json.load(f)
        idxClusterToIdxMeanings = {}
        idxMeaningsToidxCluster = {}
        for index, (key, list_meanings) in enumerate(json_data.items()):
           idxClusterToIdxMeanings[ self.clusterToIdx[key] ] = [self.meaningToIdx[next(iter(meaning))] for meaning in list_meanings]
           for meaning in list_meanings:
               idxMeaningsToidxCluster[self.meaningToIdx[next(iter(meaning))]] = self.clusterToIdx[key]

            
            
        
        return idxClusterToIdxMeanings,idxMeaningsToidxCluster





        
    def retrieveDataSet(self):
        trainingData = self.readData(self.trainingDataPath)
        testData = self.readData(self.testDataPath)
        validationData = self.readData(self.validationDataPath)
        #coarseMapsData = readJson(self, self.validationDataPath)

        return [ trainingData, testData, validationData ] 
    



In [5]:
# Specify the path to your JSON file
trainingDataPath_coarse = 'train_coarse_grained.json'
testDataPath_coarse = 'test_coarse_grained.json'
validationDataPath_coarse = 'val_coarse_grained.json'
coarseMapsDataPath_coarse = 'coarse_fine_defs_map.json'


trainingDataPath_fine = 'train_fine_grained.json'
testDataPath_fine = 'test_fine_grained.json'
validationDataPath_fine = 'val_fine_grained.json'

manager_dataset_fine = LoadData(trainingDataPath_fine, testDataPath_fine, validationDataPath_fine,coarseMapsDataPath_coarse)


manager_dataset_coarse = LoadData(trainingDataPath_coarse, testDataPath_coarse, validationDataPath_coarse,coarseMapsDataPath_coarse)





trainingData_coarse, testData_coarse, validationData_coarse  = manager_dataset_coarse.retrieveDataSet()

trainingData_fine, testData_fine, validationData_fine  = manager_dataset_fine.retrieveDataSet()


clusterToIdx, idxToCluster = manager_dataset_coarse.retrieveClusterMaps()
meaningToIdx, IdxToMeaning = manager_dataset_coarse.retrieveMeaningMaps()

idxClusterToIdxMeanings,idxMeaningsToidxCluster = manager_dataset_coarse.retrieveClusterToMeaningsMaps()



In [6]:
class TokenClassificationDataset(Dataset):
    def __init__(self, data, tokenizer,homonimyClusterClusterToIdx):
        self.tokenizer = tokenizer
        self.homonimyClusterClusterToIdx = homonimyClusterClusterToIdx
        self.data = data
        #self.data = {k: data[k] for k in list(data)[:]}
        #self.data1 = {index: batch[k] for index,k in enumerate(list(batch)[:4])}

    def __len__(self):
        #return max(len(sub_array["lemmas"]) for _,sub_array in self.data.items())
        return len(self.data)
        
    def is_int_convertible(self, variable):
        try:
            int(variable)
            return True
        except (ValueError, TypeError):
            return False
            
    def __getitem__(self, data_index):
      batch = self.data[data_index]
      inputs_tokenized = self.tokenizer.batch_encode_plus(
            [batch["lemmas"]],
            add_special_tokens=True,  # Disable adding [CLS] and [SEP] tokens
            max_length=512,
            truncation=True,
            padding='max_length',
            is_split_into_words=True,
            return_tensors='pt'
        )
      output_tokenized = copy.copy(batch["senses"])

# convert senses in the correspondig number
      for (key, clusters) in output_tokenized.items():
        for index,cluster in enumerate(clusters):
          if not self.is_int_convertible(cluster):
              try:
                output_tokenized[key][index] = self.homonimyClusterClusterToIdx[cluster]
              except KeyError:
                # key "cluster" does not exist (last element)
                output_tokenized[key][index] = self.homonimyClusterClusterToIdx["unk"]
              
      candidates = copy.copy(batch["candidates"])
# convert candidates in the correspondig number
      for (key, clusters) in candidates.items():
        for index,cluster in enumerate(clusters):
            
          #if we cannot convert it in int it means that cluster is a string
            if not self.is_int_convertible(cluster):
              try:
                candidates[key][index] = self.homonimyClusterClusterToIdx[cluster]
              except KeyError:
                # key "cluster" does not exist (last element)
                candidates[key][index] = self.homonimyClusterClusterToIdx["unk"]
              
              
    # for every sense we have its number
      inputs_tokenized["senses"]= output_tokenized

    #indexes in which i need to assign label =! -100
      target_word_positions = [ inputs_tokenized.word_ids(batch_index=0)[idx] for idx in [ int(key) for key,_ in inputs_tokenized["senses"].items() ]  ]
      target_word_idx = [ int(key) for key,_ in inputs_tokenized["senses"].items() ]
      target_label = [ value[0] for key,value in inputs_tokenized["senses"].items() ]

      
    
      labels = []
      idx_label = 0
      #the position of the target token after tokenization
      position_target_token = [] 
        
      for iterator, idx in enumerate(inputs_tokenized.word_ids()):
            if idx in target_word_idx:
              labels.append(target_label[idx_label])
              position_target_token.append(iterator)
              idx_label+=1
              target_word_idx.remove(idx)
            else:
              labels.append(-100)

    #inputs_tokenized["labels"] = labels
      candidates_list = []
      for iterator,(_,value) in enumerate(candidates.items()):
              candidates_list.append( torch.tensor(value) )


      #candidates = { position_target_token[iterator]: value for iterator,(_,value) in enumerate(candidates.items()) }
      #candidates_list = torch.cat([ torch.tensor(elements) for elements in candidates_list.values()]).view(-1)
      candidates_list = torch.cat(candidates_list).view(-1)
      #candidates_list_padded = nn.ConstantPad1d((0, 50 - len(candidates_list)),0)(candidates_list)
      #batch["input_ids"] = inputs_tokenized["input_ids"]
      #batch["attention_mask"] = inputs_tokenized["attention_mask"]
      #batch["labels"] = torch.tensor(labels)
      #batch["decoder_attention_mask"] = outputs.attention_mask
        # "position_target_token": nn.ConstantPad1d((0, 512 - len(position_target_token)),-1)(torch.tensor([position_target_token],dtype=torch.int16))
      #print(candidates)
      return {"input_ids" : inputs_tokenized["input_ids"].squeeze(), "attention_mask": inputs_tokenized["attention_mask"].squeeze(),
              "labels":torch.tensor(labels, dtype=torch.long), "index": nn.ConstantPad1d((0, 512 - 1),-1)(torch.tensor([data_index])),
                "candidates": nn.ConstantPad1d((0, 512 - len(candidates_list)),-1)(candidates_list) } 


In [7]:

class CoarseClassifier(LightningModule):
    def __init__(self, num_classes, model_name_or_path,type_problem, learning_rate=2e-5):
        super(CoarseClassifier, self).__init__()
        
        self.num_classes = num_classes
        self.bert = AutoModel.from_pretrained(model_name_or_path, output_hidden_states=True)
        self.dropout = nn.Dropout(0.4)
        #self.bert.config.hidden_size
        self.classifier1 = nn.Linear(self.bert.config.hidden_size, num_classes )

        self.learning_rate = learning_rate
        #self.f1score = F1Score(task="multiclass",num_classes=self.num_classes ,ignore_index=-100)
        self.save_hyperparameters()

    def forward(self, input_ids, attention_mask):
        bert_output = self.bert(input_ids, attention_mask) #,pooled_output
        avarage_hidden_state = torch.stack(bert_output.hidden_states[-4:], dim=0).sum(dim=0)
        #hidden_state = outputs[0]  # (bs, seq_len, dim)
        #pooled_output = hidden_state[:, 0]  # (bs, dim
        #candidates = candidates.unsqueeze(1).expand(-1, 512, -1)
        #output = torch.cat((avarage_hidden_state, candidates), dim=2)
        #output = self.dropout(avarage_hidden_state)
        output = self.classifier1(avarage_hidden_state)
    

        return output

    def training_step(self, batch, batch_idx):
        input_ids = batch['input_ids']
        attention_mask = batch['attention_mask']
        #get the candidates (ignore all the values -1)
        #candidates  = torch.stack([candidate[:torch.nonzero(candidate == -1)[0][0]] for candidate in batch['candidates']])
        #candidates  = torch.stack([candidate for candidate in batch['candidates']])
        logits = self.forward(input_ids, attention_mask)

        loss = nn.CrossEntropyLoss(ignore_index=-100)(logits.view(-1, logits.shape[-1]), labels.view(-1))
        self.log('train_loss', loss,prog_bar=True, logger=True, on_step=True, on_epoch=True)
        return loss

    def validation_step(self, batch, batch_idx=None):
        input_ids = batch['input_ids']
        attention_mask = batch['attention_mask']
        #candidates  = torch.stack([candidate for candidate in batch['candidates']])
        #get the candidates (ignore all the values -1)

        
        logits = self.forward(input_ids, attention_mask)
        loss = nn.CrossEntropyLoss(ignore_index=-100)(logits.view(-1, logits.shape[-1]), labels.view(-1))
        self.log('val_loss', loss,prog_bar=True, logger=True, on_step=True, on_epoch=True)
        

        
    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=self.learning_rate)



In [8]:

class FineClassifier(LightningModule):
    def __init__(self,CoarseClassifier,num_classes_coarse, num_classes_fine, model_name_or_path,type_problem, learning_rate=2e-5):
        super(FineClassifier, self).__init__()
        
        self.num_classes_coarse = num_classes_coarse
        self.num_classes_fine = num_classes_fine
        self.bert = CoarseClassifier
        # Freeze the base model's parameters
        for param in self.bert.parameters():
            self.bert.requires_grad = False
            
        self.dropout = nn.Dropout(0.4)
        #self.bert.config.hidden_size
        self.classifier1 = nn.Linear(num_classes_coarse, num_classes_fine )
        #+1 because we have unknown label
        self.learning_rate = learning_rate
        #self.f1score = F1Score(task="multiclass",num_classes=self.num_classes ,ignore_index=-100)
        #self.save_hyperparameters()

    def forward(self, input_ids, attention_mask):
        bert_output = self.bert(input_ids, attention_mask) #,pooled_output
        #avarage_hidden_state = torch.stack(bert_output.hidden_states[-5:], dim=0).sum(dim=0)
        #hidden_state = outputs[0]  # (bs, seq_len, dim)
        #pooled_output = hidden_state[:, 0]  # (bs, dim
        #candidates = candidates.unsqueeze(1).expand(-1, 512, -1)
        #output = torch.cat((avarage_hidden_state, candidates), dim=2)
        #output = self.dropout(bert_output)
        #output = self.dropout(bert_output)
        output = self.classifier1(bert_output)
    

        return output

    def training_step(self, batch, batch_idx):
        input_ids = batch['input_ids']
        attention_mask = batch['attention_mask']
        #get the candidates (ignore all the values -1)
        #candidates  = torch.stack([candidate[:torch.nonzero(candidate == -1)[0][0]] for candidate in batch['candidates']])
        #candidates  = torch.stack([candidate for candidate in batch['candidates']])
        logits = self.forward(input_ids, attention_mask)

        loss = nn.CrossEntropyLoss(ignore_index=-100)(logits.view(-1, logits.shape[-1]), labels.view(-1))
        self.log('train_loss', loss,prog_bar=True, logger=True, on_step=True, on_epoch=True)
        return loss

    def validation_step(self, batch, batch_idx=None):
        input_ids = batch['input_ids']
        attention_mask = batch['attention_mask']
        #candidates  = torch.stack([candidate for candidate in batch['candidates']])
        #get the candidates (ignore all the values -1)

        
        logits = self.forward(input_ids, attention_mask)
        loss = nn.CrossEntropyLoss(ignore_index=-100)(logits.view(-1, logits.shape[-1]), labels.view(-1))
        self.log('val_loss', loss,prog_bar=True, logger=True, on_step=True, on_epoch=True)
        return loss
        


        
    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=self.learning_rate)



In [23]:
#classifier.training_step(trainDataset[1])

In [9]:
# Save the trained model in the current path
current_path = os.getcwd()

model_path = os.path.join(current_path, "saved_model")
model_name_or_path = 'kanishka/GlossBERT'
#model_name_or_path = "prajjwal1/bert-mini"
tokenizer = AutoTokenizer.from_pretrained(model_name_or_path)
batch_size = 16 #prima era 16
#batch = {k: train_data[k] for k in list(train_data)[:4]}


trainDataset_coarse = TokenClassificationDataset( trainingData_coarse, tokenizer, clusterToIdx)
testDataset_coarse = TokenClassificationDataset( testData_coarse, tokenizer, clusterToIdx)
valDataset_coarse = TokenClassificationDataset( validationData_coarse, tokenizer, clusterToIdx)

trainDataset_fine = TokenClassificationDataset( trainingData_fine, tokenizer, meaningToIdx)
testDataset_fine = TokenClassificationDataset( testData_fine, tokenizer, meaningToIdx)
valDataset_fine = TokenClassificationDataset( validationData_fine, tokenizer, meaningToIdx)



trainLoader_coarse = DataLoader(trainDataset_coarse, batch_size=batch_size, shuffle=False)
testLoader_coarse = DataLoader(testDataset_coarse, batch_size=batch_size, shuffle=False)
valLoader_coarse = DataLoader(valDataset_coarse, batch_size=batch_size, shuffle=False)

trainLoader_fine = DataLoader(trainDataset_fine, batch_size=batch_size, shuffle=False)
testLoader_fine = DataLoader(testDataset_fine, batch_size=batch_size, shuffle=False)
valLoader_fine = DataLoader(valDataset_fine, batch_size=batch_size, shuffle=False)




In [10]:
num_classes_coarse = len(clusterToIdx)
num_classes_fine = len(meaningToIdx) 
print(num_classes_coarse)
print(num_classes_fine)

2159
4477


In [11]:

classifier_coarse = CoarseClassifier(num_classes=num_classes_coarse, model_name_or_path=model_name_or_path,type_problem="coarse")

classifier_coarse = classifier_coarse.load_from_checkpoint(os.path.join(model_path, "model7.ckpt"),num_classes=num_classes_coarse, model_name_or_path=model_name_or_path,type_problem="coarse")


classifier_fine = FineClassifier(classifier_coarse,num_classes_coarse= num_classes_coarse, num_classes_fine=num_classes_fine, model_name_or_path=model_name_or_path,type_problem="fine")


classifier_fine = classifier_fine.load_from_checkpoint(os.path.join(model_path, "model7_fine.ckpt"),CoarseClassifier=classifier_coarse,num_classes_coarse= num_classes_coarse, num_classes_fine=num_classes_fine, model_name_or_path=model_name_or_path,type_problem="fine")
                                                                                                                                       
                                                                                                                                       
                                                                                                                                       

Some weights of the model checkpoint at kanishka/GlossBERT were not used when initializing BertModel: ['classifier.bias', 'classifier.weight']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of the model checkpoint at kanishka/GlossBERT were not used when initializing BertModel: ['classifier.bias', 'classifier.weight']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are

In [12]:
trainer = Trainer(max_epochs=5,accelerator='gpu') #prima era 4

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


In [13]:
trainer.fit(classifier_fine, train_dataloaders = trainLoader_fine)

  rank_zero_warn(
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name        | Type             | Params
-------------------------------------------------
0 | bert        | CoarseClassifier | 111 M 
1 | dropout     | Dropout          | 0     
2 | classifier1 | Linear           | 9.7 M 
-------------------------------------------------
120 M     Trainable params
0         Non-trainable params
120 M     Total params
483.251   Total estimated model params size (MB)
  rank_zero_warn(


Training: 0it [00:00, ?it/s]

`Trainer.fit` stopped: `max_epochs=5` reached.


In [18]:
#trainer.save_checkpoint(os.path.join(model_path, "model7_fine.ckpt"))

In [12]:
classifier_fine.to(device)

FineClassifier(
  (bert): CoarseClassifier(
    (bert): BertModel(
      (embeddings): BertEmbeddings(
        (word_embeddings): Embedding(30522, 768, padding_idx=0)
        (position_embeddings): Embedding(512, 768)
        (token_type_embeddings): Embedding(2, 768)
        (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
        (dropout): Dropout(p=0.1, inplace=False)
      )
      (encoder): BertEncoder(
        (layer): ModuleList(
          (0-11): 12 x BertLayer(
            (attention): BertAttention(
              (self): BertSelfAttention(
                (query): Linear(in_features=768, out_features=768, bias=True)
                (key): Linear(in_features=768, out_features=768, bias=True)
                (value): Linear(in_features=768, out_features=768, bias=True)
                (dropout): Dropout(p=0.1, inplace=False)
              )
              (output): BertSelfOutput(
                (dense): Linear(in_features=768, out_features=768, bias=True)
  

In [13]:
classifier_coarse.to(device)

CoarseClassifier(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(30522, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-11): 12 x BertLayer(
          (attention): BertAttention(
            (self): BertSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_

In [14]:
def get_position_token(labels):
    #get the position of target token AFTER tokenization
    result = []
    for iterator, label in enumerate(labels):
        if label != -100:
            #result.append({iterator:label})
            result.append(iterator)
    return result
#original_data can be testData, trainingData or validationData
def evaluate_model(classifier, loader, original_data):
    # Set the model to evaluation mode
    classifier.eval()
    result_labels_target = []
    result_predicted_labels = []
    
    with torch.no_grad():
        with tqdm(enumerate( loader ),desc="Batch", leave=False) as data:
            for step, samples in data:
                index = [ index_data[0] for index_data in samples['index'] ]

                result_prediction = classifier(samples['input_ids'].cuda(),samples['attention_mask'].cuda())
                #result_prediction = [ result[0] for result in result_prediction ]
                sample_non_tokanized = [ original_data[ index_data.tolist() ] for index_data in index ] 
                
                position_token = [get_position_token(labels) for labels in samples['labels'] ]

                for idx_sample in range(len(sample_non_tokanized)):
                    #print(position_token[idx_sample])
                    #print(result_prediction[idx_sample][0])
                    predicted_labels_idx = []
                    for iterator, (target_token_idx, candidates) in enumerate(sample_non_tokanized[idx_sample]["candidates"].items()):
                            #print(candidates)
                            # Get the predicted labels considering only the candidates
                            predicted_labels_idx.append(torch.argmax( 
                                torch.tensor([ result_prediction[idx_sample][ position_token[idx_sample][iterator] ][target_prob_idx] for target_prob_idx in candidates ]), dim=-1) )
                    labels_target = [ value[0] for key,value in sample_non_tokanized[idx_sample]["senses"].items() ]
                    predicted_labels = [ candidates[predicted_labels_idx[i]] for i,(key,candidates) in enumerate(sample_non_tokanized[idx_sample]["candidates"].items()) ]
                    result_labels_target.append(labels_target)
                    result_predicted_labels.append(predicted_labels)
                    
    result_labels_target =  [element for sublist in result_labels_target for element in sublist]
    result_predicted_labels =  [element for sublist in result_predicted_labels for element in sublist]
    
    return result_labels_target,result_predicted_labels




In [15]:
result_labels_target_fine,result_predicted_labels_fine = evaluate_model(classifier_fine,testLoader_fine, testData_fine)

Batch: 0it [00:00, ?it/s]

In [31]:
F1Score(task="multiclass",num_classes=num_classes_fine)(torch.tensor(result_labels_target_fine),torch.tensor(result_predicted_labels_fine))

tensor(0.8508)

<h1> Coarse grained - section 2 </h1>

In [32]:
result_labels_target_coarse,result_predicted_labels_coarse = evaluate_model(classifier_coarse,testLoader_coarse, testData_coarse)

Batch: 0it [00:00, ?it/s]

In [33]:
F1Score(task="multiclass",num_classes=num_classes_coarse)(torch.tensor(result_labels_target_coarse),torch.tensor(result_predicted_labels_coarse))

tensor(0.9403)