# Testing the Architecture for Relation Extraction as sequence classification problem

In this notebook we made the compatibity test to recreate te experiment of  https://arxiv.org/abs/1904.05255v1



## 0. Importing libraries for tensor manipulation and encoder testing

In [1]:
from transformers import AutoTokenizer, BertModel
import torch
from torch import nn

# 1. Tensor manipulation operations with the relative position sequences

In [2]:
bert_test_model = BertModel.from_pretrained('dccuchile/bert-base-spanish-wwm-cased', add_pooling_layer=False)
tokenizer = AutoTokenizer.from_pretrained('dccuchile/bert-base-spanish-wwm-cased')

In [1]:
text_sample = " Siendo a todo ello testigos Juan Velázquez de Ortega, alguacil del Santo Oficio,  y Diego de Arganal y Juan de Vergara, vecinos y estantes en la dicha villa"
text_sample_masked = " Siendo a todo ello testigos [PERSON], alguacil del [ORG],  y Diego de Arganal y Juan de Vergara, vecinos y estantes en la dicha villa"
text_sample_prepared = f'[CLS] {text_sample} [SEP] Juan Velázquez de Ortega [SEP] Santo Oficio [SEP]'
text_sample_prepared_masked = f'[CLS] {text_sample_masked} [SEP] Juan Velázquez de Ortega [SEP] Santo Oficio [SEP]'

In [3]:
text_sample_prepared_masked

'[CLS]  Siendo a todo ello testigos [PERSON], alguacil del [ORG],  y Diego de Arganal y Juan de Vergara, vecinos y estantes en la dicha villa [SEP] Juan Velázquez de Ortega [SEP] Santo Oficio [SEP]'

In [4]:
#masked_input_0 = tokenizer(text_sample_prepared, return_tensors='pt', add_special_tokens = False)
masked_input_1 = tokenizer(text_sample_prepared_masked, return_tensors='pt', add_special_tokens = False)

In [5]:
#tokenizer.add_special_tokens(special_tokens_dict={})
#TODO We need to add the special tokens to the tokenizer to have an specific embedding for them.

In [6]:
# Here we obtain the logits from BERT encoder
with torch.no_grad():

    output_1 = bert_test_model(**masked_input_1) # type: ignore


In [7]:
output_1.keys()

odict_keys(['last_hidden_state'])

In [8]:
output_1[0].shape # corresponds to (Batch, tokenized sequence lenght, last hidden states)

torch.Size([1, 52, 768])

In [9]:

cls_ = output_1[0][:, 0, :] #checking on how to extract the specific CLS hidden state for another type of experimentation

In [10]:
# Here we develop the index technique to separate the EN1 [SEP] EN2 [SEP] part as in the article
sep_token_id = tokenizer.sep_token_id
number_of_sep = len((masked_input_1['input_ids']== sep_token_id).nonzero(as_tuple = True)[1]) # type: ignore # this returns a boolean vector with the positions of the tokens SEP

In [11]:
# TODO We need to rethink this part (IMPLEMENTED ON NOTEBOOK 0.1. Network_desing.ipynb)
'''sep_index_0_0 = (masked_input_0['input_ids'] == sep_token_id).nonzero(as_tuple=True)[1][0].item()
sep_index_1_0 = (masked_input_1['input_ids'] == sep_token_id).nonzero(as_tuple=True)[1][0].item()
sep_index_0_1 = (masked_input_0['input_ids'] == sep_token_id).nonzero(as_tuple=True)[1][1].item()
sep_index_1_1 = (masked_input_1['input_ids'] == sep_token_id).nonzero(as_tuple=True)[1][1].item()'''


"sep_index_0_0 = (masked_input_0['input_ids'] == sep_token_id).nonzero(as_tuple=True)[1][0].item()\nsep_index_1_0 = (masked_input_1['input_ids'] == sep_token_id).nonzero(as_tuple=True)[1][0].item()\nsep_index_0_1 = (masked_input_0['input_ids'] == sep_token_id).nonzero(as_tuple=True)[1][1].item()\nsep_index_1_1 = (masked_input_1['input_ids'] == sep_token_id).nonzero(as_tuple=True)[1][1].item()"

In [12]:
'entiti_1_hidden_states_0.shape, entiti_2_hidden_states_0.shape, cls_token_0.shape'

'entiti_1_hidden_states_0.shape, entiti_2_hidden_states_0.shape, cls_token_0.shape'

In [13]:
'tensor_concatenated = torch.cat((cls_, entiti_1_hidden_states_0, entiti_2_hidden_states_0), dim=1)'

'tensor_concatenated = torch.cat((cls_, entiti_1_hidden_states_0, entiti_2_hidden_states_0), dim=1)'

In [14]:
'tensor_concatenated.shape'

'tensor_concatenated.shape'

In [15]:
'suqeezed_concat = tensor_concatenated.squeeze(0)'

'suqeezed_concat = tensor_concatenated.squeeze(0)'

In [16]:
suqeezed_concat.shape

NameError: name 'suqeezed_concat' is not defined

## 2. Creating the Relation extraction part

In [17]:
V_1 = torch.randn(size=(8, 16, 768)) #simulating the hidden state of BERT (Batch size, sequence lenght, hidden_dimensions)

V_2 = torch.randn(size= (8, 16, 20)) # Simulating the embedding of Relative position of ENT1 (Batch size, sequence lenght, hidden_dimensions)

V_3 = torch.randn(size= (8, 16, 20)) # Simulating the embedding of Relative position of ENT2 (Batch size, sequence lenght, hidden_dimensions)

In [18]:
Stacked = torch.stack(tensors=(V_1, V_2, V_3), dim=0) # They could not use  stack

RuntimeError: stack expects each tensor to be equal size, but got [8, 16, 768] at entry 0 and [8, 16, 20] at entry 1

In [19]:
added = V_1 + V_2 + V_3 # Neither element wise addition

RuntimeError: The size of tensor a (768) must match the size of tensor b (20) at non-singleton dimension 2

In [20]:
Stacked_positions = torch.stack(tensors=(V_2, V_3), dim=2) # Nor stacking just the positions as it makes an illshaped tensor 
Stacked_positions.shape

torch.Size([8, 16, 2, 20])

In [21]:
V_4 = torch.concat(tensors=(V_1, V_2, V_3), dim=2) # This is the correct formula
V_4.shape

torch.Size([8, 16, 808])

In [22]:
# Creating the biLSTM as the article stated. We can try to with a GRU 
lstm = nn.LSTM(
    input_size=V_1.shape[2] + V_2.shape[2] + V_3.shape[2], #Simulating the input size of the concatenated tensor
    hidden_size=768, #Hidden size reported in the article
    num_layers=1, #layers reported in the article
    batch_first=True,
    bidirectional=True # As the article suggests, the hidden size of every LSTM is 768, so the final hidden size of th BiLSTM is 768 *2
)


In [23]:
gru = nn.GRU(
    input_size=V_1.shape[2] + V_2.shape[2] + V_3.shape[2],
    hidden_size=768,
    num_layers=1,
    batch_first=True,
    bidirectional=True
    )

In [24]:
hidd_state, cell_state = lstm(V_4)
gru_hidd, gru_cell_state = gru(V_4)

In [25]:
type(cell_state), type(gru_cell_state)

(tuple, torch.Tensor)

In [26]:
cell_state[0].shape #Final hidden state of the whole sequence (2 due to bidirectionality)
cell_state[1].shape #Final cell state of the whole sequence (2 due to bidirectionality)

torch.Size([2, 8, 768])

In [27]:
concatenated_hidden = torch.concat(tensors=(cell_state[0][0], cell_state[0][1]), dim=-1)

In [28]:
concatenated_hidden.shape

torch.Size([8, 1536])

In [29]:
gru_cell_state.shape #Final hidden state of the whole sequence (2 due to bidirectionality)


torch.Size([2, 8, 768])

In [30]:
hidd_state.shape, gru_hidd.shape # hidden states per element in the sequence. the ones we want to use to perfom the inference (?) we can prepare anothe model.

(torch.Size([8, 16, 1536]), torch.Size([8, 16, 1536]))

In [31]:
# As the MLP expects a Vector of (batch size, number of classes) we need to average the tensors to pass the Tokenclassification into SequenceClassification task.
m = nn.AdaptiveAvgPool1d(1)
hidden_permuted = hidd_state.permute(0,2,1) # this is necessary as Tensor.permute expects (Batch, Features, Sequence lenght)
hidden_avg_pool = m(hidden_permuted)
print(hidden_permuted.shape)
print(hidden_avg_pool.shape)
print(hidden_avg_pool.squeeze(2).shape)

torch.Size([8, 1536, 16])
torch.Size([8, 1536, 1])
torch.Size([8, 1536])


In [32]:
# the MLP it is not defined, so we created one class with a GELU activation. Here we use final hidden states from LSTM/GRU
class Classifier_MLP(nn.Module):
    def __init__(
            self,
            input_dim,
            hidden_dim, #It is said the dimensions are 300 in the article.
            dropout_rate,
            Gelu_aproximation,
            output_dim
            ):
        super(Classifier_MLP, self).__init__()
        self.MLP_block = nn.Sequential(
            nn.Dropout(p=dropout_rate),
            nn.Linear(input_dim, hidden_dim),
            nn.GELU(approximate=Gelu_aproximation), # Most of the articles and new activations use Gelu as a better option than normal Relu. We choose based on this asumption
            nn.Dropout(p=dropout_rate),
            nn.Linear(hidden_dim, hidden_dim),
            nn.GELU(approximate=Gelu_aproximation),
            nn.Dropout(p=dropout_rate),
            nn.Linear(hidden_dim, output_dim)
            )
    def forward(self, x):
        pooled_input = x
        return self.MLP_block(pooled_input)

In [33]:
# This one makes the pooling with per token hidden state.
class Classifier_MLP_with_Pooling(nn.Module):
    def __init__(
            self,
            input_dim,
            hidden_dim, #It is said the dimensions are 300 in the article.
            dropout_rate,
            Gelu_aproximation,
            output_dim
            ):
        super(Classifier_MLP_with_Pooling, self).__init__()
        self.pooler = nn.AdaptiveAvgPool1d(1)
        self.MLP_block = nn.Sequential(
            nn.Dropout(p=dropout_rate),
            nn.Linear(input_dim, hidden_dim),
            nn.GELU(approximate=Gelu_aproximation), # Most of the articles and new activations use Gelu as a better option than normal Relu. We choose based on this asumption
            nn.Dropout(p=dropout_rate),
            nn.Linear(hidden_dim, hidden_dim),
            nn.GELU(approximate=Gelu_aproximation),
            nn.Dropout(p=dropout_rate),
            nn.Linear(hidden_dim, output_dim)
            )
    def forward(self, x):
        pooled_input = self.pooler(x.permute(0,2,1))
        pooled_input = pooled_input.squeeze(2)
        return self.MLP_block(pooled_input)

In [34]:
mlp = Classifier_MLP_with_Pooling(input_dim=1536, hidden_dim=300,dropout_rate=0.1,Gelu_aproximation='none', output_dim=1)

In [35]:
out = mlp(hidd_state)

In [36]:
out.shape #The output shape is a pooled output of the original token classification sequence with (Batch_size and number of classes.)

torch.Size([8, 10])

## 3. Final Network:

### Classifier with Pooling at the end

In [None]:
from torch import nn
import torch
from typing import Optional, Union, Tuple
from torch.nn import CrossEntropyLoss, MSELoss, BCEWithLogitsLoss
from transformers import BertPreTrainedModel, BertModel
from transformers.models.bert.modeling_bert import SequenceClassifierOutput


class Classifier_MLP(nn.Module):
    def __init__(
            self,
            input_dim,
            hidden_dim,
            dropout_rate,
            Gelu_aproximation,
            output_dim
            ):
        super(Classifier_MLP, self).__init__()
        self.MLP_block = nn.Sequential(
            nn.Dropout(p=dropout_rate),
            nn.Linear(input_dim, hidden_dim),
            nn.GELU(approximate=Gelu_aproximation), 
            nn.Dropout(p=dropout_rate),
            nn.Linear(hidden_dim, hidden_dim),
            nn.GELU(approximate=Gelu_aproximation),
            nn.Dropout(p=dropout_rate),
            nn.Linear(hidden_dim, output_dim)
            )
    def forward(self, x):
        pooled_input = x
        return self.MLP_block(pooled_input)
class PooledSimpleBertLikeForSequenceClassification(BertPreTrainedModel):
    def __init__(
            self,
            config,
            Recurrent_net_config: str,
            num_positional_embeddings: int,
            positional_embedding_dim: int,
            padding_idx: Optional[int],
            MLP_hidden_size:int,
            Gelu_aproximation: str 
            ):
        super().__init__(config)
        self.num_labels = config.num_labels
        self.config = config

        self.bert = BertModel(config, add_pooling_layer=False)
        classifier_dropout = (
            config.classifier_dropout if config.classifier_dropout is not None else config.hidden_dropout_prob
        )
        self.positional_embedding = nn.Embedding(
            num_embeddings=num_positional_embeddings,
            embedding_dim=positional_embedding_dim,
            padding_idx=padding_idx
            )
        if isinstance(Recurrent_net_config, str):
            if Recurrent_net_config == 'LSTM':
                self.Bidirectional_block = nn.LSTM(
                    input_size=self.config.hidden_size +(self.positional_embedding.embedding_dim *2),
                    hidden_size=self.config.hidden_size,
                    num_layers=1,
                    batch_first=True,
                    bidirectional=True
                )
            else:
                Recurrent_net_config == 'GRU':
                self.Bidirectional_block = nn.GRU(
                    input_size=self.config.hidden_size +(self.positional_embedding.embedding_dim *2),
                    hidden_size=self.config.hidden_size,
                    num_layers=1,
                    batch_first=True,
                    bidirectional=True
                )
        else:
            raise ValueError('The value must be "LSTM" or "GRU"')
    
        self.classifier = Classifier_MLP_with_Pooling(
            input_dim=self.config.hidden_size*2,
            hidden_dim= MLP_hidden_size,
            dropout_rate=classifier_dropout,
            Gelu_aproximation=Gelu_aproximation,
            output_dim=self.num_labels
        )

        # Initialize weights and apply final processing
        self.post_init()

    def forward(
        self,
        input_ids: Optional[torch.Tensor] = None,
        attention_mask: Optional[torch.Tensor] = None,
        token_type_ids: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.Tensor] = None,
        head_mask: Optional[torch.Tensor] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        labels: Optional[torch.Tensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ) -> Union[Tuple[torch.Tensor], SequenceClassifierOutput]:
        r"""
        labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*):
            Labels for computing the sequence classification/regression loss. Indices should be in `[0, ...,
            config.num_labels - 1]`. If `config.num_labels == 1` a regression loss is computed (Mean-Square loss), If
            `config.num_labels > 1` a classification loss is computed (Cross-Entropy).
        """
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict

        outputs = self.bert(
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )
        #TODO We need to specify the forward process and the maniputalation of the tensors.
  
        sep_token_id = 5
        sep_indices = (input_ids == sep_token_id).nonzero(as_tuple=True)[1]
        number_of_sep = len(sep_indices)

        cls_output = outputs[0][:, 0, :]

        if number_of_sep == 3:
            sep_index_0 = sep_indices[0].item()
            sep_index_1 = sep_indices[1].item()
            sep_index_2 = sep_indices[2].item()
            entiti_1_output = outputs[0][:, sep_index_0 + 1:sep_index_1, :]
            entiti_2_output = outputs[0][:, sep_index_1 + 1:sep_index_2, :]
            pooled_output = torch.cat((cls_output.unsqueeze(1), entiti_1_output, entiti_2_output), dim=1)
        elif number_of_sep == 2:
            sep_index_0 = sep_indices[0].item()
            sep_index_1 = sep_indices[1].item()
            entiti_1_output = outputs[0][:, sep_index_0 + 1:sep_index_1, :]
            pooled_output = torch.cat((cls_output.unsqueeze(1), entiti_1_output), dim=1)
        else:
            raise ValueError("Input does not contain the required number of [SEP] tokens, which is 2 or 3.")

        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output)

        loss = None
        if labels is not None:
            if self.config.problem_type is None:
                if self.num_labels == 1:
                    self.config.problem_type = "regression"
                elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int):
                    self.config.problem_type = "single_label_classification"
                else:
                    self.config.problem_type = "multi_label_classification"

            if self.config.problem_type == "regression":
                loss_fct = MSELoss()
                if self.num_labels == 1:
                    loss = loss_fct(logits.squeeze(), labels.squeeze())
                else:
                    loss = loss_fct(logits, labels)
            elif self.config.problem_type == "single_label_classification":
                loss_fct = CrossEntropyLoss()
                loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
            elif self.config.problem_type == "multi_label_classification":
                loss_fct = BCEWithLogitsLoss()
                loss = loss_fct(logits, labels)
        if not return_dict:
            output = (logits,) + outputs[2:]
            return ((loss,) + output) if loss is not None else output

        return SequenceClassifierOutput(
            loss=loss,
            logits=logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )

### Classifier with the final states of the LSTM/GRU

In [None]:
from torch import nn
import torch
from typing import Optional, Union, Tuple
from torch.nn import CrossEntropyLoss, MSELoss, BCEWithLogitsLoss
from transformers import BertPreTrainedModel, BertModel
from transformers.models.bert.modeling_bert import SequenceClassifierOutput


class Classifier_MLP(nn.Module):
    def __init__(
            self,
            input_dim,
            hidden_dim,
            dropout_rate,
            Gelu_aproximation,
            output_dim
            ):
        super(Classifier_MLP, self).__init__()
        self.MLP_block = nn.Sequential(
            nn.Dropout(p=dropout_rate),
            nn.Linear(input_dim, hidden_dim),
            nn.GELU(approximate=Gelu_aproximation), 
            nn.Dropout(p=dropout_rate),
            nn.Linear(hidden_dim, hidden_dim),
            nn.GELU(approximate=Gelu_aproximation),
            nn.Dropout(p=dropout_rate),
            nn.Linear(hidden_dim, output_dim)
            )
    def forward(self, x):
        pooled_input = x
        return self.MLP_block(pooled_input)
class PooledSimpleBertLikeForSequenceClassification(BertPreTrainedModel):
    def __init__(
            self,
            config,
            Recurrent_net_config: str,
            num_positional_embeddings: int,
            positional_embedding_dim: int,
            padding_idx: Optional[int],
            MLP_hidden_size:int,
            Gelu_aproximation: str 
            ):
        super().__init__(config)
        self.num_labels = config.num_labels
        self.config = config

        self.bert = BertModel(config, add_pooling_layer=False)
        classifier_dropout = (
            config.classifier_dropout if config.classifier_dropout is not None else config.hidden_dropout_prob
        )
        self.positional_embedding = nn.Embedding(
            num_embeddings=num_positional_embeddings,
            embedding_dim=positional_embedding_dim,
            padding_idx=padding_idx
            )
        if isinstance(Recurrent_net_config, str):
            if Recurrent_net_config == 'LSTM':
                self.Bidirectional_block = nn.LSTM(
                    input_size=self.config.hidden_size +(self.positional_embedding.embedding_dim *2),
                    hidden_size=self.config.hidden_size,
                    num_layers=1,
                    batch_first=True,
                    bidirectional=True
                )
            else:
                Recurrent_net_config == 'GRU':
                self.Bidirectional_block = nn.GRU(
                    input_size=self.config.hidden_size +(self.positional_embedding.embedding_dim *2),
                    hidden_size=self.config.hidden_size,
                    num_layers=1,
                    batch_first=True,
                    bidirectional=True
                )
        else:
            raise ValueError('The value must be "LSTM" or "GRU"')
    
        self.classifier = Classifier_MLP_with_Pooling(
            input_dim=self.config.hidden_size*2,
            hidden_dim= MLP_hidden_size,
            dropout_rate=classifier_dropout,
            Gelu_aproximation=Gelu_aproximation,
            output_dim=self.num_labels
        )

        # Initialize weights and apply final processing
        self.post_init()

    def forward(
        self,
        input_ids: Optional[torch.Tensor] = None,
        attention_mask: Optional[torch.Tensor] = None,
        token_type_ids: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.Tensor] = None,
        head_mask: Optional[torch.Tensor] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        labels: Optional[torch.Tensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ) -> Union[Tuple[torch.Tensor], SequenceClassifierOutput]:
        r"""
        labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*):
            Labels for computing the sequence classification/regression loss. Indices should be in `[0, ...,
            config.num_labels - 1]`. If `config.num_labels == 1` a regression loss is computed (Mean-Square loss), If
            `config.num_labels > 1` a classification loss is computed (Cross-Entropy).
        """
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict

        outputs = self.bert(
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )
        #TODO We need to specify the forward process and the maniputalation of the tensors.
  
        sep_token_id = 5
        sep_indices = (input_ids == sep_token_id).nonzero(as_tuple=True)[1]
        number_of_sep = len(sep_indices)

        cls_output = outputs[0][:, 0, :]

        if number_of_sep == 3:
            sep_index_0 = sep_indices[0].item()
            sep_index_1 = sep_indices[1].item()
            sep_index_2 = sep_indices[2].item()
            entiti_1_output = outputs[0][:, sep_index_0 + 1:sep_index_1, :]
            entiti_2_output = outputs[0][:, sep_index_1 + 1:sep_index_2, :]
            pooled_output = torch.cat((cls_output.unsqueeze(1), entiti_1_output, entiti_2_output), dim=1)
        elif number_of_sep == 2:
            sep_index_0 = sep_indices[0].item()
            sep_index_1 = sep_indices[1].item()
            entiti_1_output = outputs[0][:, sep_index_0 + 1:sep_index_1, :]
            pooled_output = torch.cat((cls_output.unsqueeze(1), entiti_1_output), dim=1)
        else:
            raise ValueError("Input does not contain the required number of [SEP] tokens, which is 2 or 3.")

        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output)

        loss = None
        if labels is not None:
            if self.config.problem_type is None:
                if self.num_labels == 1:
                    self.config.problem_type = "regression"
                elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int):
                    self.config.problem_type = "single_label_classification"
                else:
                    self.config.problem_type = "multi_label_classification"

            if self.config.problem_type == "regression":
                loss_fct = MSELoss()
                if self.num_labels == 1:
                    loss = loss_fct(logits.squeeze(), labels.squeeze())
                else:
                    loss = loss_fct(logits, labels)
            elif self.config.problem_type == "single_label_classification":
                loss_fct = CrossEntropyLoss()
                loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
            elif self.config.problem_type == "multi_label_classification":
                loss_fct = BCEWithLogitsLoss()
                loss = loss_fct(logits, labels)
        if not return_dict:
            output = (logits,) + outputs[2:]
            return ((loss,) + output) if loss is not None else output

        return SequenceClassifierOutput(
            loss=loss,
            logits=logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )