In [10]:
pip install datasets



In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from datasets import load_dataset
from transformers import AutoTokenizer
from sklearn.model_selection import train_test_split
import random
import numpy as np
from tqdm import tqdm


Imports and Dataset Setup

In [2]:

class SentenceDataset(Dataset):
    def __init__(self, inputs, labels):
        self.inputs = inputs
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return self.inputs[idx], self.labels[idx]

def collate_fn(batch):
    forward_inputs = [item[0][0] for item in batch]
    backward_inputs = [item[0][1] for item in batch]
    labels = [item[1] for item in batch]

    # Pad sequences
    forward_padded = torch.nn.utils.rnn.pad_sequence([torch.tensor(x) for x in forward_inputs], batch_first=True)
    backward_padded = torch.nn.utils.rnn.pad_sequence([torch.tensor(x) for x in backward_inputs], batch_first=True)

    return (forward_padded, backward_padded), torch.tensor(labels)


In [3]:

def create_fill_in_the_blank(sentence):
    words = sentence.split()
    if len(words) < 6:
        return None

    # Enhanced cleaning
    words = [word.strip().lower() for word in words
            if len(word.strip()) > 2 and word.isalnum()]

    if len(words) < 6:
        return None

    # More strategic split point selection
    split_point = len(words) // 2

    # Avoid splitting in the middle of phrases
    while split_point > 0 and words[split_point].lower() in {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at'}:
        split_point -= 1

    latter_half = words[split_point:]
    if len(latter_half) < 2:
        return None

    # More strategic blank selection
    valid_positions = [
        i for i, word in enumerate(latter_half)
        if len(word) > 2 and word not in {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at'}
    ]

    if not valid_positions:
        return None

    blank_index = random.choice(valid_positions)
    blank_word = latter_half.pop(blank_index)

    # Create parts with more context
    part_a = " ".join(words[:split_point])
    part_b = " ".join(latter_half[::-1])

    return part_a, part_b, blank_word

PREPROCESS DATA

In [4]:
def process_dataset(dataset, tokenizer, max_samples=2000):
    print("Processing sentences...")
    sentences = []
    for item in tqdm(dataset["train"]):
        for sentence in item["article"].split("."):
            sentence = sentence.strip()
            if len(sentence) > 10:
                result = create_fill_in_the_blank(sentence)
                if result:
                    sentences.append(result)

    sentences = sentences[:max_samples]
    print(f"Total processed sentences: {len(sentences)}")

    # Split data
    train_data, val_data = train_test_split(sentences, test_size=0.2, random_state=42)

    # Tokenize data
    def tokenize_data(data):
        inputs, labels = [], []
        for part_a, part_b, blank_word in tqdm(data):
            try:
                part_a_tokens = tokenizer.encode(part_a, truncation=True, max_length=50)
                part_b_tokens = tokenizer.encode(part_b, truncation=True, max_length=50)
                label_token = tokenizer.encode(blank_word, add_special_tokens=False)[0]
                inputs.append((part_a_tokens, part_b_tokens))
                labels.append(label_token)
            except Exception as e:
                print(f"Error tokenizing: {str(e)}")
                continue
        return inputs, labels

    train_inputs, train_labels = tokenize_data(train_data)
    val_inputs, val_labels = tokenize_data(val_data)

    return train_inputs, train_labels, val_inputs, val_labels


BiDirectionalLSTM

In [5]:
class BiDirectionalLSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim):
        super(BiDirectionalLSTM, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)

        # Upgraded to multi-layer LSTM
        self.forward_lstm = nn.LSTM(
            embedding_dim,
            hidden_dim,
            num_layers=2,
            dropout=0.3,
            batch_first=True,
            bidirectional=True
        )
        self.backward_lstm = nn.LSTM(
            embedding_dim,
            hidden_dim,
            num_layers=2,
            dropout=0.3,
            batch_first=True,
            bidirectional=True
        )

        # Add attention mechanism
        self.forward_attention = nn.MultiheadAttention(hidden_dim * 2, num_heads=4, dropout=0.1)
        self.backward_attention = nn.MultiheadAttention(hidden_dim * 2, num_heads=4, dropout=0.1)

        # Enhanced output layers
        self.forward_fc = nn.Sequential(
            nn.Linear(hidden_dim * 2, hidden_dim),
            nn.LayerNorm(hidden_dim),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(hidden_dim, output_dim)
        )
        self.backward_fc = nn.Sequential(
            nn.Linear(hidden_dim * 2, hidden_dim),
            nn.LayerNorm(hidden_dim),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(hidden_dim, output_dim)
        )

        # Enhanced confidence scorer
        self.confidence_scorer = nn.Sequential(
            nn.Linear(hidden_dim * 4, hidden_dim * 2),
            nn.LayerNorm(hidden_dim * 2),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(hidden_dim * 2, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1),
            nn.Sigmoid()
        )

        self.dropout = nn.Dropout(0.3)
        self.layer_norm = nn.LayerNorm(hidden_dim * 2)

    def forward(self, forward_x, backward_x, training=True):
        # Enhanced embedding with dropout
        forward_embedded = self.dropout(self.embedding(forward_x))
        backward_embedded = self.dropout(self.embedding(backward_x))

        # Bi-directional LSTM processing
        forward_output, (forward_hidden, _) = self.forward_lstm(forward_embedded)
        backward_output, (backward_hidden, _) = self.backward_lstm(backward_embedded)

        # Apply attention
        forward_output = forward_output.transpose(0, 1)  # [batch, seq, hidden] -> [seq, batch, hidden]
        backward_output = backward_output.transpose(0, 1)

        forward_attended, _ = self.forward_attention(
            forward_output, forward_output, forward_output
        )
        backward_attended, _ = self.backward_attention(
            backward_output, backward_output, backward_output
        )

        # Get final hidden states
        forward_hidden = torch.cat([forward_hidden[-2], forward_hidden[-1]], dim=1)
        backward_hidden = torch.cat([backward_hidden[-2], backward_hidden[-1]], dim=1)

        # Apply layer normalization
        forward_hidden = self.layer_norm(forward_hidden)
        backward_hidden = self.layer_norm(backward_hidden)

        # Get predictions
        forward_logits = self.forward_fc(forward_attended[-1])
        backward_logits = self.backward_fc(backward_attended[-1])

        # Calculate confidence with enhanced hidden states
        confidence = self.confidence_scorer(
            torch.cat([forward_hidden, backward_hidden], dim=1)
        )

        if training:
            return forward_logits, backward_logits, confidence
        else:
            return self.select_prediction(forward_logits, backward_logits, confidence)

    def select_prediction(self, forward_logits, backward_logits, confidence):
        # Enhanced prediction selection
        forward_probs = F.softmax(forward_logits, dim=1)
        backward_probs = F.softmax(backward_logits, dim=1)

        # Temperature scaling for sharper distributions
        temperature = 0.7
        forward_probs = forward_probs.pow(1/temperature)
        backward_probs = backward_probs.pow(1/temperature)

        # Normalize
        forward_probs = forward_probs / forward_probs.sum(dim=1, keepdim=True)
        backward_probs = backward_probs / backward_probs.sum(dim=1, keepdim=True)

        # Weighted average with enhanced confidence
        weighted_probs = (forward_probs * confidence +
                         backward_probs * (1 - confidence))

        return weighted_probs

def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, device, num_epochs):
    best_accuracy = 0
    patience = 3
    patience_counter = 0
    best_val_loss = float('inf')

    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        correct_predictions = 0
        total_predictions = 0

        for batch_inputs, batch_labels in tqdm(train_loader, desc=f"Epoch {epoch+1}"):
            forward_inputs, backward_inputs = batch_inputs
            forward_inputs = forward_inputs.to(device)
            backward_inputs = backward_inputs.to(device)
            batch_labels = batch_labels.to(device)

            optimizer.zero_grad()

            # Get predictions and confidence
            forward_logits, backward_logits, confidence = model(
                forward_inputs, backward_inputs, training=True
            )

            # Calculate losses with label smoothing
            forward_loss = criterion(forward_logits, batch_labels)
            backward_loss = criterion(backward_logits, batch_labels)

            # Dynamic confidence weighting
            confidence_weight = confidence.mean()
            loss = (forward_loss * confidence_weight +
                   backward_loss * (1 - confidence_weight))

            # Add L2 regularization
            l2_lambda = 0.01
            l2_reg = torch.tensor(0., device=device)
            for param in model.parameters():
                l2_reg += torch.norm(param)
            loss += l2_lambda * l2_reg

            loss.backward()

            # Gradient clipping
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()

            total_loss += loss.item()

            # Calculate accuracy
            _, predicted = torch.max(forward_logits, 1)
            correct_predictions += (predicted == batch_labels).sum().item()
            total_predictions += batch_labels.size(0)

        # Validation
        model.eval()
        val_loss = 0
        val_correct = 0
        val_total = 0

        with torch.no_grad():
            for batch_inputs, batch_labels in tqdm(val_loader, desc="Validation"):
                forward_inputs, backward_inputs = batch_inputs
                forward_inputs = forward_inputs.to(device)
                backward_inputs = backward_inputs.to(device)
                batch_labels = batch_labels.to(device)

                outputs = model(forward_inputs, backward_inputs, training=False)
                _, predicted = outputs.max(1)

                val_total += batch_labels.size(0)
                val_correct += predicted.eq(batch_labels).sum().item()

                # Calculate validation loss
                forward_logits, backward_logits, _ = model(
                    forward_inputs, backward_inputs, training=True
                )
                val_loss += criterion(forward_logits, batch_labels).item()

        val_accuracy = 100. * val_correct / val_total
        avg_val_loss = val_loss / len(val_loader)

        print(f"\nEpoch {epoch+1}")
        print(f"Training Loss: {total_loss/len(train_loader):.4f}")
        print(f"Training Accuracy: {100. * correct_predictions/total_predictions:.2f}%")
        print(f"Validation Loss: {avg_val_loss:.4f}")
        print(f"Validation Accuracy: {val_accuracy:.2f}%")

        # Early stopping with patience
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            patience_counter = 0
            if val_accuracy > best_accuracy:
                best_accuracy = val_accuracy
                torch.save(model.state_dict(), 'best_model.pth')
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print(f"\nEarly stopping triggered after {epoch + 1} epochs")
                break

        scheduler.step(val_accuracy)


In [6]:

def main():

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    # Load dataset and tokenizer
    print("Loading dataset...")
    dataset = load_dataset("race", "all")
    tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

    # Process and prepare data
    train_inputs, train_labels, val_inputs, val_labels = process_dataset(
        dataset, tokenizer, max_samples=2000
    )

    # Create dataloaders
    train_dataset = SentenceDataset(train_inputs, train_labels)
    val_dataset = SentenceDataset(val_inputs, val_labels)

    train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, collate_fn=collate_fn)
    val_loader = DataLoader(val_dataset, batch_size=16, collate_fn=collate_fn)

    # Initialize model
    model = BiDirectionalLSTM(
        vocab_size=tokenizer.vocab_size,
        embedding_dim=256,
        hidden_dim=512,
        output_dim=tokenizer.vocab_size
    ).to(device)

    # Training setup
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.AdamW(model.parameters(), lr=1e-4, weight_decay=0.01)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, mode='max', patience=2, factor=0.5
    )

    # Train the model
    print("Starting training...")
    train_model(
        model=model,
        train_loader=train_loader,
        val_loader=val_loader,
        criterion=criterion,
        optimizer=optimizer,
        scheduler=scheduler,
        device=device,
        num_epochs=5
    )

if __name__ == "__main__":
    main()

Using device: cpu
Loading dataset...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Processing sentences...


100%|██████████| 87866/87866 [00:26<00:00, 3376.38it/s]


Total processed sentences: 2000


100%|██████████| 1600/1600 [00:00<00:00, 4640.31it/s]
100%|██████████| 400/400 [00:00<00:00, 4876.66it/s]


Starting training...


Epoch 1: 100%|██████████| 100/100 [03:31<00:00,  2.11s/it]
Validation: 100%|██████████| 25/25 [00:17<00:00,  1.40it/s]



Epoch 1
Training Loss: 46.4921
Training Accuracy: 0.62%
Validation Loss: 9.1086
Validation Accuracy: 2.25%


Epoch 2: 100%|██████████| 100/100 [03:30<00:00,  2.10s/it]
Validation: 100%|██████████| 25/25 [00:18<00:00,  1.32it/s]



Epoch 2
Training Loss: 42.9491
Training Accuracy: 1.38%
Validation Loss: 8.3264
Validation Accuracy: 2.25%


Epoch 3: 100%|██████████| 100/100 [03:31<00:00,  2.11s/it]
Validation: 100%|██████████| 25/25 [00:18<00:00,  1.32it/s]



Epoch 3
Training Loss: 40.7458
Training Accuracy: 1.25%
Validation Loss: 8.3140
Validation Accuracy: 0.75%


Epoch 4: 100%|██████████| 100/100 [03:30<00:00,  2.11s/it]
Validation: 100%|██████████| 25/25 [00:18<00:00,  1.35it/s]



Epoch 4
Training Loss: 39.4026
Training Accuracy: 2.38%
Validation Loss: 8.3003
Validation Accuracy: 2.50%


Epoch 5: 100%|██████████| 100/100 [03:29<00:00,  2.09s/it]
Validation: 100%|██████████| 25/25 [00:18<00:00,  1.36it/s]


Epoch 5
Training Loss: 38.4406
Training Accuracy: 3.81%
Validation Loss: 8.2911
Validation Accuracy: 2.25%





In [7]:
import torch
from transformers import AutoTokenizer
from typing import Tuple, List

def load_trained_model(model_path: str, device: torch.device) -> Tuple[BiDirectionalLSTM, AutoTokenizer]:
    """Load the trained model and tokenizer."""
    tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

    model = BiDirectionalLSTM(
        vocab_size=tokenizer.vocab_size,
        embedding_dim=256,
        hidden_dim=512,
        output_dim=tokenizer.vocab_size
    ).to(device)

    model.load_state_dict(torch.load(model_path))
    model.eval()

    return model, tokenizer

def prepare_sentence(sentence: str, blank_position: int, tokenizer: AutoTokenizer) -> Tuple[torch.Tensor, torch.Tensor]:
    """Prepare a sentence for prediction by creating a blank at the specified position."""
    words = sentence.split()
    if blank_position >= len(words):
        raise ValueError("Blank position is out of range")

    # Store the actual word and replace it with [MASK]
    actual_word = words[blank_position]
    words[blank_position] = "[MASK]"

    # Split into two parts at the mask
    part_a = " ".join(words[:blank_position+1])
    part_b = " ".join(words[blank_position+1:])[::-1]  # Reverse the second part

    # Tokenize
    forward_tokens = torch.tensor([tokenizer.encode(part_a, truncation=True, max_length=50)])
    backward_tokens = torch.tensor([tokenizer.encode(part_b, truncation=True, max_length=50)])

    return forward_tokens, backward_tokens, actual_word

def predict_blank(model: BiDirectionalLSTM,
                 tokenizer: AutoTokenizer,
                 forward_tokens: torch.Tensor,
                 backward_tokens: torch.Tensor,
                 top_k: int = 5) -> List[str]:
    """Predict the most likely words for the blank."""
    with torch.no_grad():
        outputs = model(forward_tokens, backward_tokens, training=False)
        probs, indices = torch.topk(outputs, k=top_k)

        # Convert to words
        predictions = []
        for idx, prob in zip(indices[0], probs[0]):
            word = tokenizer.decode([idx])
            predictions.append((word, prob.item()))

    return predictions

def test_model():
    # Setup
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model_path = 'best_model.pth'
    model, tokenizer = load_trained_model(model_path, device)

    # Test sentences
    test_sentences = [
        ("The students studied hard for their final exam", 5),  # "final"
        ("She walked quickly through the busy street to reach her destination", 7),  # "street"
        ("The chef prepared a delicious meal for his guests", 5),  # "meal"
        ("The sun was setting behind the tall mountains", 6),  # "mountains"
        ("Children played happily in the green park", 6)  # "park"
    ]

    print("\nTesting model with example sentences:")
    print("="*50)

    for sentence, blank_pos in test_sentences:
        print(f"\nOriginal sentence: {sentence}")
        print(f"Position of blank: {blank_pos}")

        try:
            # Prepare the sentence
            forward_tokens, backward_tokens, actual_word = prepare_sentence(sentence, blank_pos, tokenizer)
            forward_tokens = forward_tokens.to(device)
            backward_tokens = backward_tokens.to(device)

            # Get predictions
            predictions = predict_blank(model, tokenizer, forward_tokens, backward_tokens)

            # Print results
            print("\nPredicted words (with confidence scores):")
            for word, confidence in predictions:
                print(f"  {word}: {confidence:.4f}")
            print(f"Actual word was: {actual_word}")

        except Exception as e:
            print(f"Error processing sentence: {str(e)}")

        print("-"*50)

if __name__ == "__main__":
    test_model()

  model.load_state_dict(torch.load(model_path))



Testing model with example sentences:

Original sentence: The students studied hard for their final exam
Position of blank: 5

Predicted words (with confidence scores):
  that: 0.0444
  not: 0.0314
  are: 0.0272
  her: 0.0269
  was: 0.0229
Actual word was: their
--------------------------------------------------

Original sentence: She walked quickly through the busy street to reach her destination
Position of blank: 7

Predicted words (with confidence scores):
  that: 0.0309
  are: 0.0297
  should: 0.0272
  not: 0.0260
  they: 0.0245
Actual word was: to
--------------------------------------------------

Original sentence: The chef prepared a delicious meal for his guests
Position of blank: 5

Predicted words (with confidence scores):
  that: 0.0437
  for: 0.0318
  her: 0.0271
  was: 0.0252
  are: 0.0230
Actual word was: meal
--------------------------------------------------

Original sentence: The sun was setting behind the tall mountains
Position of blank: 6

Predicted words (with