In [2]:
from typing import List, Tuple,Union,Dict, Any
import pandas as pd
from transformers import BertTokenizer,BertForTokenClassification,get_linear_schedule_with_warmup,PreTrainedTokenizer
from torch.utils.data import DataLoader,TensorDataset,Dataset
import torch
from torch.optim import AdamW
from sklearn.metrics import precision_recall_fscore_support

# Clear GPU memory
torch.cuda.empty_cache()

# Print PyTorch version
print(torch.__version__)

2.1.1+cu118


### Downloading test ,train, dev text files.

In [3]:
! wget -O "train.txt" "https://figshare.com/ndownloader/files/15320042"
! wget -O "dev.txt" "https://figshare.com/ndownloader/files/15320048"
! wget -O "test.txt" "https://figshare.com/ndownloader/files/15320045"

'wget' is not recognized as an internal or external command,
operable program or batch file.


'wget' is not recognized as an internal or external command,
operable program or batch file.
'wget' is not recognized as an internal or external command,
operable program or batch file.


### Downloading and preprocessing the data

In [4]:
def preprocess(filename: str) -> Union[pd.DataFrame, Tuple[pd.DataFrame, dict, dict]]:
    """
    Preprocesses a text file containing labeled words and tags.

    Args:
        filename (str): Path to the text file.

    Returns:
        pd.DataFrame or tuple: Processed DataFrame containing 'sentence' and 'word_labels'
                               or a tuple (DataFrame, labels2ids, id2labels) if filename is 'train.txt'.
    """

    # Read the text file into a DataFrame
    dataframe= pd.read_csv(filename,sep="\s+" ,names=['Word','Tag','Tag1'],header=None,skip_blank_lines=True)
    
    # Combine 'Word' and 'Tag' columns based on the presence of 'Tag1'
    dataframe['Word']=dataframe.apply(lambda row: f"{row['Word']} {row['Tag']}" if not pd.isna(row['Tag1']) else row['Word'], axis=1)
    
    # Fill missing values in 'Tag' column with values from 'Tag1'
    dataframe['Tag'] = dataframe['Tag1'].fillna(dataframe['Tag'])
    
    # Extract only relevant columns
    dataframe=dataframe[['Word','Tag']]

    #Foward fill NaN values
    if dataframe.isna().any().any():
        dataframe=dataframe.ffill()

    #Sort unique tags
    unique_tags=sorted(dataframe.Tag.unique())

    # Create dictionaries for label encoding
    labels2ids={k:v for v,k in enumerate(unique_tags)}
    id2labels={v:k for v,k in enumerate(unique_tags)}

    #Finds sentence end
    sentence_ends= ((dataframe['Word']=='.').cumsum()).shift(fill_value=0)

    #Creates a new column 'sentence' by joining words within each sentence 
    dataframe['sentence']=dataframe.groupby(sentence_ends)['Word'].transform(lambda x: ' '.join(x))

    #Creates new column 'word labels' by joining labels within each sentence 
    dataframe['word_labels']=dataframe.groupby(sentence_ends)['Tag'].transform(lambda x: ' '.join(x))

    #Drop duplicate rows
    dataframe=dataframe[['sentence','word_labels']].drop_duplicates().reset_index(drop=True)

    if (filename=='train.txt'):
        return dataframe,labels2ids,id2labels
        
    else:
        return dataframe

### Preparing the dataset

In [5]:
def tokenize_and_preserve_labels(row: pd.Series, tokenizer: PreTrainedTokenizer) -> Tuple[List[str], List[str]]:
    """
    Tokenizes a sentence and preserves labels for each subword.

    Args:
        row (pd.Series): A row containing 'sentence' and 'word_labels' columns.
        tokenizer (PreTrainedTokenizer): The tokenizer object.

    Returns:
        Tuple[List[str], List[str]]: Tokenized sentence and list of labels for each token.
    """
    
    sentence = row['sentence']
    text_labels = row['word_labels']
    
    if not sentence or not text_labels:
        return [],[]
    
    tokenized_sentence = []
    labels = []

    sentence = sentence.strip()

    for word, label in zip(sentence.split(), text_labels.split()):
        tokenized_word = tokenizer.tokenize(word)
        n_subwords = len(tokenized_word)

        tokenized_sentence.extend(tokenized_word)

        labels.extend([label] * n_subwords)

    assert len(tokenized_sentence)== len(labels)
    return tokenized_sentence, labels

In [6]:
class CustomDataset(Dataset):
    def __init__(self, dataframe: pd.DataFrame, label2ids: Dict[str, int], tokenizer: PreTrainedTokenizer, max_len: int):
        """
        Custom PyTorch Dataset for Named Entity Recognition.

        Args:
            dataframe (pd.DataFrame): Pandas DataFrame containing 'sentence' and 'word_labels'.
            label2ids (Dict[str, int]): Mapping of labels to ids.
            tokenizer (PreTrainedTokenizer): Tokenizer object.
            max_len (int): Maximum sequence length.
        """

        self.data=dataframe
        self.label2ids=label2ids
        self.tokenizer=tokenizer
        self.max_len=max_len
    
    def __getitem__(self, index: int) -> Dict[str, torch.Tensor]:
        """
        Retrieves a single data point (sample) from the dataset.

        Args:
            index (int): Index of the desired data point.

        Returns:
            Dict[str, torch.Tensor]: Dictionary containing 'ids', 'mask', and 'targets'.
        """
        # Get the row corresponding to the given index
        row = self.data.iloc[index]

        # Tokenize the sentence and preserve labels
        tokenized_sentence, labels = tokenize_and_preserve_labels(row, self.tokenizer)
        
        # Add special tokens [CLS] and [SEP]
        tokenized_sentence=["[CLS]"]+tokenized_sentence+["[SEP]"]

        # Insert 'O' labels at the beginning and end
        labels.insert(0,"O")
        labels.insert(-1,"O")

        # Truncate or pad the tokenized sentence to the specified maximum length
        if(len(tokenized_sentence)>self.max_len):
            tokenized_sentence=tokenized_sentence[:self.max_len]
            labels=labels[:self.max_len]
        else:
            tokenized_sentence=tokenized_sentence+["[PAD]" for _ in range(self.max_len-len(tokenized_sentence))]
            labels=labels+["O" for _ in range(self.max_len-len(labels))]

        # Create an attention mask (1 for non-padding tokens, 0 for padding tokens)
        attn_mask=[1 if token!='[PAD]' else 0 for token in tokenized_sentence]

        # Convert tokens to ids using the tokenizer
        ids=self.tokenizer.convert_tokens_to_ids(tokenized_sentence)

        # Map label strings to label ids using the provided mapping
        label_ids = [self.label2ids[label] for label in labels]
        
        # Return the data as a dictionary of torch Tensors    
        return{
            'ids' : torch.tensor(ids,dtype=torch.long),
            'mask' : torch.tensor(attn_mask,dtype=torch.long),
            'targets': torch.tensor(label_ids,dtype=torch.long)
        }
    def __len__(self):
        return len(self.data)

In [7]:
MAX_LEN = 128
TRAIN_BATCH_SIZE=4
VAL_BATCH_SIZE=2
EPOCHS = 1
MAX_GRAD_NORM = 10
LEARNING_RATE=1

# Initialize the BERT tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Preprocess the training data
train_data,label2ids,ids2labels=preprocess('train.txt')

# Preprocess the validation data
val_data=preprocess('dev.txt')

# Preprocess the test data
test_data=preprocess('test.txt')

# Create CustomDataset instances for training, validation, and test datasets
train_vector=CustomDataset(train_data,label2ids,tokenizer,MAX_LEN)
val_vector=CustomDataset(val_data,label2ids,tokenizer,MAX_LEN)
test_vector=CustomDataset(test_data,label2ids,tokenizer,MAX_LEN)

FileNotFoundError: [Errno 2] No such file or directory: 'train.txt'

In [None]:
# Access the first item in the training dataset
sample_item = train_vector[0]

# Extract 'ids' and 'targets' from the sample item
sample_ids = sample_item['ids'][:100]
sample_targets = sample_item['targets'][:100]

# Iterate through tokens and labels, printing them
for token, label in zip(tokenizer.convert_ids_to_tokens(sample_ids), sample_targets):
    # Retrieve the label name using the provided mapping
    label_name = ids2labels[label.item()]

    # Print token and corresponding label
    #print('{0:10}  {1}'.format(token, label_name))


[CLS]       O
variable    O
temperature  O
electron    B-CMT
para        I-CMT
##ma        I-CMT
##gne       I-CMT
##tic       I-CMT
resonance   I-CMT
studies     O
of          O
the         O
ni          B-MAT
##z         B-MAT
##n         B-MAT
fe          I-MAT
##rri       I-MAT
##te        I-MAT
/           O
o           B-MAT
##2         B-MAT
##si        B-MAT
nano        B-DSC
##com       B-DSC
##po        B-DSC
##sit       B-DSC
##e         B-DSC
effects     O
of          O
the         O
si          B-MAT
##lica      B-MAT
content     O
and         O
temperature  O
on          O
the         O
magnetic    B-PRO
properties  I-PRO
of          O
fe          B-MAT
##4         B-MAT
##nio       B-MAT
##8         B-MAT
##z         B-MAT
##n         B-MAT
/           O
o           B-MAT
##2         B-MAT
##si        B-MAT
nano        B-DSC
##com       B-DSC
##po        B-DSC
##sit       B-DSC
##es        B-DSC
have        O
been        O
studied     O
by          O
electron    B-CMT
pa

### Defining the model

In [None]:
# Initialize the BERT model for token classification
model = BertForTokenClassification.from_pretrained(
    'bert-base-uncased',
    num_labels=len(ids2labels),
    id2label=ids2labels,
    label2id=label2ids
)

# Move the model to the specified device (GPU if available, otherwise CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

Some weights of BertForTokenClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


BertForTokenClassification(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(30522, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-11): 12 x BertLayer(
          (attention): BertAttention(
            (self): BertSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=1e-12, el

### Training the model

In [None]:
# Create DataLoader for training dataset
train_loader = DataLoader(train_vector, batch_size=TRAIN_BATCH_SIZE, shuffle=True, num_workers=0)

# Create DataLoader for validation dataset
val_loader = DataLoader(val_vector, batch_size=VAL_BATCH_SIZE, shuffle=True, num_workers=0)

# Create DataLoader for test dataset
test_loader = DataLoader(test_vector, batch_size=VAL_BATCH_SIZE, shuffle=True, num_workers=0)

In [None]:
# Extract 'ids', 'mask', and 'targets' from the first item in the training dataset
sample_item = train_vector[0]
ids = sample_item['ids'].unsqueeze(0)
mask = sample_item['mask'].unsqueeze(0)
targets = sample_item['targets'].unsqueeze(0)

# Move tensors to the specified device (GPU if available, otherwise CPU)
ids = ids.to(device)
mask = mask.to(device)
targets = targets.to(device)

# Forward pass through the model
outputs = model(input_ids=ids, attention_mask=mask, labels=targets)

# Extract the initial loss from the model outputs
initial_loss = outputs.loss
initial_loss


tensor(2.7627, device='cuda:0', grad_fn=<NllLossBackward0>)

In [None]:
def train(model: BertForTokenClassification,
          optimizer: AdamW,
          scheduler: torch.optim.lr_scheduler,
          training_loader: DataLoader,
          device: torch.device,
          epochs: int,
          max_grad_norm: float) -> None:
    """
    Train the BERT-based token classification model.

    Args:
        model (BertForTokenClassification): The token classification model.
        optimizer (AdamW): The optimizer for updating model parameters.
        scheduler (torch.optim.lr_scheduler): Learning rate scheduler.
        training_loader (DataLoader): DataLoader for the training dataset.
        device (torch.device): The device to use for training (GPU or CPU).
        epochs (int): Number of training epochs.
        max_grad_norm (float): Maximum gradient norm for gradient clipping.

    Returns:
        None
    """
    model.train()

    for epoch in range(epochs):
        tr_loss, tr_preds, tr_labels = 0, [], []
        nb_tr_steps = 0

        for idx, batch in enumerate(training_loader):
            ids = batch['ids'].to(device, dtype=torch.long)
            mask = batch['mask'].to(device, dtype=torch.long)
            targets = batch['targets'].to(device, dtype=torch.long)

            optimizer.zero_grad()

            outputs = model(input_ids=ids, attention_mask=mask, labels=targets)
            loss, tr_logits = outputs.loss, outputs.logits
            tr_loss += loss.item()

            nb_tr_steps += 1

            flattened_targets = targets.view(-1)
            active_logits = tr_logits.view(-1, model.config.num_labels)
            flattened_predictions = torch.argmax(active_logits, axis=1)
            active_accuracy = mask.view(-1) == 1
            targets = torch.masked_select(flattened_targets, active_accuracy)
            predictions = torch.masked_select(flattened_predictions, active_accuracy)

            tr_preds.extend(predictions.cpu().numpy())
            tr_labels.extend(targets.cpu().numpy())

            loss.backward()

            torch.nn.utils.clip_grad_norm_(
                parameters=model.parameters(), max_norm=max_grad_norm
            )

            optimizer.step()
            scheduler.step()

        tr_loss /= nb_tr_steps

        # Calculate precision, recall, and F1 score
        precision, recall, f1, _ = precision_recall_fscore_support(tr_labels, tr_preds, average='weighted', zero_division=1)

        print(f"Epoch {epoch + 1}/{epochs}")
        print(f"Training Loss: {tr_loss:.4f}")
        print(f"Training Precision: {precision:.4f}")
        print(f"Training Recall: {recall:.4f}")
        print(f"Training F1 Score: {f1:.4f}")

# Example usage:
optimizer = AdamW(model.parameters(), lr=5e-05)
total_steps = len(train_loader) * EPOCHS
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=total_steps)

train(model, optimizer, scheduler, train_loader, device, epochs=EPOCHS, max_grad_norm=MAX_GRAD_NORM)


Epoch 1/1
Training Loss: 0.1672
Training Precision: 0.8476
Training Recall: 0.8540
Training F1 Score: 0.8492


### Evaluating the model

In [None]:
def evaluate(model: torch.nn.Module,
             dataloader: DataLoader,
             device: torch.device) -> None:
    """
    Evaluate the BERT-based token classification model.

    Args:
        model (torch.nn.Module): The token classification model.
        dataloader (DataLoader): DataLoader for the evaluation dataset.
        device (torch.device): The device to use for evaluation (GPU or CPU).

    Returns:
        None
    """
    model.eval()
    loss = 0
    all_preds, all_labels = [], []

    with torch.no_grad():
        for batch in dataloader:
            ids, mask, targets = batch['ids'].to(device), batch['mask'].to(device), batch['targets'].to(device)

            outputs = model(input_ids=ids, attention_mask=mask, labels=targets)
            batch_loss, logits = outputs.loss, outputs.logits

            loss += batch_loss.item()

            flattened_targets = targets.view(-1)
            active_logits = logits.view(-1, model.config.num_labels)
            flattened_predictions = torch.argmax(active_logits, axis=1)
            active_accuracy = mask.view(-1) == 1
            targets = torch.masked_select(flattened_targets, active_accuracy)
            predictions = torch.masked_select(flattened_predictions, active_accuracy)

            all_preds.extend(predictions.cpu().numpy())
            all_labels.extend(targets.cpu().numpy())

    average_loss = loss / len(dataloader)

    # Calculate precision, recall, and F1 score
    precision, recall, f1, _ = precision_recall_fscore_support(all_labels, all_preds, average='weighted',zero_division=1)

    print(f"Average Loss: {average_loss:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")

# Example usage:
print("Validation Evaluation:")
validate=evaluate(model, val_loader, device)

print("\nTest Evaluation:")
test=evaluate(model, test_loader, device)

Validation Evaluation:
Average Loss: 0.1022
Precision: 0.9000
Recall: 0.9006
F1 Score: 0.8995

Test Evaluation:
Average Loss: 0.1058
Precision: 0.9006
Recall: 0.9007
F1 Score: 0.8992


In [None]:
def predict_sentence(model, tokenizer, sentence, device, max_len=128):
    """
    Predicts labels for a given sentence using the fine-tuned BERT model.

    Args:
        model (BertForTokenClassification): The fine-tuned BERT model.
        tokenizer (BertTokenizer): The tokenizer used during training.
        sentence (str): The input sentence.
        device (torch.device): The device to use for prediction (GPU or CPU).
        max_len (int): Maximum sequence length.

    Returns:
        Tuple[str, List[Tuple[str, str]]]: A tuple containing the original sentence and a list of word-level predictions.
    """

    # Setting model to evaluation mode
    model.eval()

    # Tokenizes sentence
    inputs = tokenizer(sentence, padding='max_length', truncation=True, max_length=max_len, return_tensors="pt")
    ids = inputs["input_ids"].to(device)
    mask = inputs["attention_mask"].to(device)

    with torch.no_grad():
        outputs = model(ids, mask)
        logits = outputs.logits  # Use 'logits' instead of 'outputs[0]'

    active_logits = logits.view(-1, model.config.num_labels)
    flattened_pred = torch.argmax(active_logits, axis=1)

    tokens = tokenizer.convert_ids_to_tokens(ids.squeeze().tolist())
    token_pred = [model.config.id2label[i] for i in flattened_pred.cpu().numpy()]
    wp_preds = list(zip(tokens, token_pred))

    word_pred = []
    current_label = None
    current_word = ""

    for pair in wp_preds:
        if pair[0] in ['[CLS]', '[SEP]', '[PAD]']:
            continue
        
        label=pair[1].split('-')[-1] if '-' in pair[1] else pair[1]

        if current_label is None:
            current_label=label
            current_word=pair[0]
        elif label==current_label and current_label !='O':
            if pair[0].startswith("##"):
                current_word += pair[0][2:]
                if current_word in sentence.lower().split(' '):
                    original_word_index = sentence.lower().split(' ').index(current_word.lower())
                    original_word = sentence.split(' ')[original_word_index]
                    current_word = original_word
                    
            else:
                current_word += " " + pair[0]

        else:
            word_pred.append((current_word,current_label))
            current_label= label
            current_word=pair[0]
        
    if current_label is not None:
        word_pred.append((current_word,current_label))

    return word_pred
            
# Example usage:
test_sentence = "Vanadium oxide nanotubes are promising for gas sensors."
word_pred = predict_sentence(model, tokenizer, test_sentence, device)

#print(word_pred)

[('Vanadium oxide', 'MAT'), ('nanotubes', 'DSC'), ('are', 'O'), ('promising', 'O'), ('for', 'O'), ('gas sensors', 'APL'), ('.', 'O')]
