# Models with MLM y NSP

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer
import random
import numpy as np
import kagglehub as kh
import os
import re
import pandas as pd

# Classes to prepare the dataset of both MLM and NSP
class BertPretrainingDataset(Dataset):
    def __init__(self, texts, tokenizer, max_length=512):
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.texts = self._prepare_texts(texts)

    def _prepare_texts(self, texts):
        # Split texts into sentences and create pairs
        sentence_pairs = []
        for i in range(len(texts)-1):
            # 50% chance of getting actual next sentence vs random sentence
            if random.random() < 0.5:
                next_sent_idx = i + 1
                is_next = 1
            else:
                next_sent_idx = random.randint(0, len(texts)-1)
                is_next = 0

            sentence_pairs.append({
                'sent1': texts[i],
                'sent2': texts[next_sent_idx],
                'is_next': is_next
            })
        return sentence_pairs

    def _apply_mlm(self, tokens):
        mlm_positions = []
        mlm_labels = []

        # Select 15% of tokens randomly for MLM
        n_tokens = len(tokens)
        n_mask = max(1, int(0.15 * n_tokens))
        mask_candidates = list(range(n_tokens))
        random.shuffle(mask_candidates)
        mask_positions = sorted(mask_candidates[:n_mask])

        for pos in mask_positions:
            mlm_positions.append(pos)
            mlm_labels.append(tokens[pos])

            prob = random.random()
            if prob < 0.8:  # 80% replace with [MASK]
                tokens[pos] = self.tokenizer.mask_token_id
            elif prob < 0.9:  # 10% replace with random token
                tokens[pos] = random.randint(0, self.tokenizer.vocab_size - 1)
            # 10% keep unchanged

        return tokens, mlm_positions, mlm_labels

    def __getitem__(self, idx):
        pair = self.texts[idx]

        # Tokenize sentences
        encoding = self.tokenizer(
            pair['sent1'],
            pair['sent2'],
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )

        input_ids = encoding['input_ids'].squeeze()
        attention_mask = encoding['attention_mask'].squeeze()

        # Apply MLM
        masked_input_ids, mlm_positions, mlm_labels = self._apply_mlm(input_ids.clone().tolist())

        return {
            'input_ids': torch.tensor(masked_input_ids),
            'attention_mask': attention_mask,
            'mlm_positions': torch.tensor(mlm_positions),
            'mlm_labels': torch.tensor(mlm_labels),
            'nsp_label': torch.tensor(pair['is_next'])
        }

    def __len__(self):
        return len(self.texts)

class TextClassificationDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length=512):
        self.tokenizer = tokenizer
        self.texts = texts
        self.labels = labels
        self.max_length = max_length

    def __getitem__(self, idx):
        encoding = self.tokenizer(
            self.texts[idx],
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )
        return {
            'input_ids': encoding['input_ids'].squeeze(),
            'attention_mask': encoding['attention_mask'].squeeze(),
            'labels': torch.tensor(self.labels[idx])
        }

    def __len__(self):
        return len(self.texts)

# Class for the pretrain model, with both MLM and NSP
class BertPretrainingModel(nn.Module):
    def __init__(self, bert_model):
        super().__init__()
        self.bert = bert_model
        self.config = bert_model.config
        self.mlm_head = nn.Linear(bert_model.config.hidden_size, bert_model.config.vocab_size)
        self.nsp_head = nn.Linear(bert_model.config.hidden_size, 2)

    def forward(self, input_ids, attention_mask, mlm_positions=None):
        outputs = self.bert(
            input_ids=input_ids,
            attention_mask=attention_mask,
            return_dict=True
        )

        sequence_output = outputs.last_hidden_state
        pooled_output = outputs.pooler_output

        # MLM prediction
        if mlm_positions is not None:
            mlm_output = sequence_output[torch.arange(sequence_output.size(0)).unsqueeze(1), mlm_positions]
            prediction_scores = self.mlm_head(mlm_output)
        else:
            prediction_scores = self.mlm_head(sequence_output)

        # NSP prediction
        seq_relationship_score = self.nsp_head(pooled_output)

        return prediction_scores, seq_relationship_score

class BertForTextClassification(nn.Module):
    def __init__(self, bert_model, num_classes):
        super().__init__()
        self.bert = bert_model
        self.classifier = nn.Linear(bert_model.config.hidden_size, num_classes)

    def forward(self, input_ids, attention_mask):
        outputs = self.bert(
            input_ids=input_ids,
            attention_mask=attention_mask,
            return_dict=True
        )
        pooled_output = outputs.pooler_output  # CLS token
        logits = self.classifier(pooled_output)
        return logits

# Function to pretrain BERT

In [None]:
# Function to pretrain x epochs and tell the loss
def pretrain_bert(model, train_dataloader, optimizer, num_epochs):
    model.train()
    mlm_criterion = nn.CrossEntropyLoss()
    nsp_criterion = nn.CrossEntropyLoss()

    # MLM positions and lable is the information to know the MASKED tokens.
    # NSP label is to know if the following sentence is the next sentence or not
    for epoch in range(num_epochs):
        total_loss = 0
        for batch in train_dataloader:
            input_ids = batch['input_ids']
            attention_mask = batch['attention_mask']
            mlm_positions = batch['mlm_positions']
            mlm_labels = batch['mlm_labels']
            nsp_labels = batch['nsp_label']

            optimizer.zero_grad()

            mlm_scores, nsp_scores = model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                mlm_positions=mlm_positions
            )

            # Calculate MLM loss
            mlm_loss = mlm_criterion(mlm_scores.view(-1, model.bert.config.vocab_size), mlm_labels.view(-1))

            # Calculate NSP loss
            nsp_loss = nsp_criterion(nsp_scores, nsp_labels)

            # Combined loss
            loss = mlm_loss + nsp_loss
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        avg_loss = total_loss / len(train_dataloader)
        print(f"Epoch {epoch+1}/{num_epochs}, Average Loss: {avg_loss:.4f}")

# Fine tune

In [None]:
def fine_tune_bert(model, dataloader, optimizer, num_epochs):
    model.train()
    criterion = nn.CrossEntropyLoss()

    # The final label is to predict the sentiment of the review
    for epoch in range(num_epochs):
        total_loss = 0
        for batch in dataloader:
            input_ids = batch['input_ids']
            attention_mask = batch['attention_mask']
            labels = batch['labels']

            optimizer.zero_grad()
            logits = model(input_ids=input_ids, attention_mask=attention_mask)

            # Calculate loss
            loss = criterion(logits, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        avg_loss = total_loss / len(dataloader)
        print(f"Epoch {epoch+1}/{num_epochs}, Average Loss: {avg_loss:.4f}")


# Get data from the file

In [None]:
def read_file(file_path):
  with open(file_path, 'r', encoding='UTF-8') as f:
    text=[]
    for line in f:
      text.append(line.replace('\n',''))
  return text

file_contents = read_file('./sample_text.txt')

In [None]:
from transformers import BertModel, BertTokenizer

# Initialize tokenizer and base model for file
tokenizer = BertTokenizer.from_pretrained('prajjwal1/bert-tiny')
base_model = BertModel.from_pretrained('prajjwal1/bert-tiny')

In [None]:
# Create dataset
dataset = BertPretrainingDataset(file_contents, tokenizer)
dataloader = DataLoader(dataset, batch_size=8, shuffle=True)

In [None]:
# Initialize model and optimizer
model = BertPretrainingModel(base_model)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)

In [None]:
# Start pretraining
pretrain_bert(model, dataloader, optimizer, num_epochs=10)
pretrained_model_state = model.state_dict() # <- Guardas en esa variable el modelo preentrenado
torch.save(pretrained_model_state, 'pretrained_bert_model.pth')

Epoch 1/10, Average Loss: 10.5509
Epoch 2/10, Average Loss: 9.1859
Epoch 3/10, Average Loss: 8.0982
Epoch 4/10, Average Loss: 7.2646
Epoch 5/10, Average Loss: 6.6895
Epoch 6/10, Average Loss: 6.2117
Epoch 7/10, Average Loss: 5.7615
Epoch 8/10, Average Loss: 5.5444
Epoch 9/10, Average Loss: 5.3004
Epoch 10/10, Average Loss: 5.1597


In [None]:
def clean_text(text):
    """
    Clean the text by removing html tags
    """

    text = re.sub(r"<br\s*/?>", "", text)
    text = re.sub(r"\s+", " ", text)

    return text

In [None]:
from sklearn.model_selection import train_test_split

path = kh.dataset_download("mahmoudshaheen1134/imdp-data")

full_path = os.path.join(path, os.listdir(path)[0])
data = pd.read_csv(full_path)
data = data.dropna()
data = data.drop_duplicates()

data['review'] = data['review'].apply(clean_text)
data['sentiment'] = data['sentiment'].apply(lambda x: 1 if x == 'positive' else 0)

# dataset = TextClassificationDataset(data['review'].tolist(), data['sentiment'].tolist(), tokenizer)
# dataloader = DataLoader(dataset, batch_size=8, shuffle=True)

# num_classes = 2
# model = BertForTextClassification(model, num_classes)
# optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)

# fine_tune_bert(model, dataloader, optimizer, num_epochs=10)
data['review']
# Split the data into training and validation sets
train_texts, val_texts, train_labels, val_labels = train_test_split(
    data['review'], data['sentiment'], test_size=0.95, random_state=42
)

# Create datasets and dataloaders
train_dataset = TextClassificationDataset(train_texts.tolist(), train_labels.tolist(), tokenizer)
val_dataset = TextClassificationDataset(val_texts.tolist(), val_labels.tolist(), tokenizer)

train_dataloader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=8, shuffle=False)

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer, BertModel
import random

# Modify BertPretrainingModel for classification
class BertClassificationModel(nn.Module):
    def __init__(self, bert_model, num_classes):
        super().__init__()
        self.bert = bert_model
        self.config = bert_model.config
        self.classification_head = nn.Linear(bert_model.config.hidden_size, num_classes)

    def forward(self, input_ids, attention_mask):
        # Get BERT outputs
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask, return_dict=True)
        pooled_output = outputs.pooler_output

        # Classification head
        logits = self.classification_head(pooled_output)
        return logits

# Example Classification Dataset
class ClassificationDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length=512):
        self.tokenizer = tokenizer
        self.texts = texts
        self.labels = labels
        self.max_length = max_length

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]

        encoding = self.tokenizer(
            text,
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )

        input_ids = encoding['input_ids'].squeeze()
        attention_mask = encoding['attention_mask'].squeeze()

        return {
            'input_ids': input_ids,
            'attention_mask': attention_mask,
            'labels': torch.tensor(label, dtype=torch.long)
        }

    def __len__(self):
        return len(self.texts)

# Training loop for fine-tuning
def fine_tune_bert(model, train_dataloader, optimizer, num_epochs):
    model.train()
    criterion = nn.CrossEntropyLoss()

    for epoch in range(num_epochs):
        total_loss = 0
        for batch in train_dataloader:
            input_ids = batch['input_ids']
            attention_mask = batch['attention_mask']
            labels = batch['labels']

            optimizer.zero_grad()

            # Forward pass
            logits = model(input_ids=input_ids, attention_mask=attention_mask)

            # Compute loss
            loss = criterion(logits, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        avg_loss = total_loss / len(train_dataloader)
        print(f"Epoch {epoch+1}/{num_epochs}, Average Loss: {avg_loss:.4f}")

# Example Classification Dataset (You will use your own data here)
# texts = ["This is a positive example.", "This is a negative example."]
# labels = [1, 0]  # 1 for positive, 0 for negative

tokenizer = BertTokenizer.from_pretrained('prajjwal1/bert-tiny')
base_model = BertModel.from_pretrained('prajjwal1/bert-tiny')

# Create classification dataset
# dataset = ClassificationDataset(texts, labels, tokenizer)
# dataloader = DataLoader(dataset, batch_size=2, shuffle=True)

# Initialize the fine-tuning model
num_classes = 2  # For binary classification (adjust as needed)
model = BertClassificationModel(base_model, num_classes)

# Initialize optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-5)

# Fine-tune the model
fine_tune_bert(model, train_dataloader, optimizer, num_epochs=10)

# Save the fine-tuned model
torch.save(model.state_dict(), 'fine_tuned_bert_model.pth')


Epoch 1/10, Average Loss: 0.6638
Epoch 2/10, Average Loss: 0.6277
Epoch 3/10, Average Loss: 0.5851
Epoch 4/10, Average Loss: 0.5358
Epoch 5/10, Average Loss: 0.4886
Epoch 6/10, Average Loss: 0.4383
Epoch 7/10, Average Loss: 0.3944
Epoch 8/10, Average Loss: 0.3543
Epoch 9/10, Average Loss: 0.3258
Epoch 10/10, Average Loss: 0.2839


In [None]:
def evaluate_model(model, dataloader):
    model.eval()
    correct_predictions = 0
    total_predictions = 0

    with torch.no_grad():
        total_batches = len(dataloader)
        for i, batch in enumerate(dataloader):
            if i % (total_batches // 10) == 0:
                print(f'{(i / total_batches) * 100:.0f}% has been processed')
            input_ids = batch['input_ids']
            attention_mask = batch['attention_mask']
            labels = batch['labels']

            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            _, predicted_class = torch.max(outputs, dim=1)

            correct_predictions += (predicted_class == labels).sum().item()
            total_predictions += labels.size(0)

    accuracy = correct_predictions / total_predictions
    return accuracy

# Evaluate the model on the validation dataset
accuracy = evaluate_model(model, val_dataloader)
print(f'Validation Accuracy: {accuracy:.4f}')

0% has been processed
10% has been processed
20% has been processed
30% has been processed
40% has been processed
50% has been processed
60% has been processed
70% has been processed
80% has been processed
90% has been processed
100% has been processed
Validation Accuracy: 0.8191
