### Common Test 1.2

In [1]:
import re
import random
from pathlib import Path
import json

# Regex pattern to tokenize expressions:
# The pattern captures:
#   - Words (letters and underscores)
#   - Numbers (including those with decimal points)
#   - Mathematical operators and punctuation (like +, -, *, /, ^, parentheses, braces, etc.)
token_pattern = re.compile(r'([A-Za-z_]+|\d+\.\d+|\d+|[+\-*/^(){}\[\]:,])')

def normalize_indices(seq):
    """
    Normalizes indices of the form _<number> in a given sequence.
    Each unique index is replaced with a normalized index (_1, _2, ...)
    in order of appearance.
    """
    # Pattern to find indices like _123456
    index_pattern = re.compile(r'_(\d+)')
    mapping = {}
    normalized_seq = seq

    # Find all indices in order of appearance
    for match in index_pattern.finditer(seq):
        full_token = match.group(0)  # e.g., '_239'
        if full_token not in mapping:
            # Assign the next available normalized index (starting at 1)
            mapping[full_token] = f"_{len(mapping) + 1}"
    # Replace the indices using the mapping.
    for original, normalized in mapping.items():
        normalized_seq = normalized_seq.replace(original, normalized)
    return normalized_seq

def tokenize(text):
    """
    Tokenizes the given text using the token_pattern.
    Returns a list of tokens.
    """
    return token_pattern.findall(text)

def process_file(file_content):
    """
    Processes the content of a file.
    Assumes each row is of the format:
      event type : Feynman diagram : amplitude : squared amplitude
    Returns a list of dictionaries with tokenized amplitude and squared amplitude,
    along with the raw event type and Feynman diagram for reference.
    """
    processed_rows = []
    for line in file_content.strip().splitlines():
        # Skip empty lines
        if not line.strip():
            continue
        # Split the line into four parts by " : "
        parts = [p.strip() for p in line.split(" : ")]
        if len(parts) != 4:
            print(f"Skipping malformed line: {line}")
            continue
        event_type, feynman_diag, amplitude, squared_amplitude = parts

        # Normalize indices for amplitude and squared amplitude
        amplitude_norm = normalize_indices(amplitude)
        squared_amplitude_norm = normalize_indices(squared_amplitude)

        # Tokenize amplitude and squared amplitude
        amplitude_tokens = tokenize(amplitude_norm)
        squared_amplitude_tokens = tokenize(squared_amplitude_norm)

        processed_rows.append({
            "event_type": event_type,
            "feynman_diag": feynman_diag,
            "amplitude_tokens": amplitude_tokens,
            "squared_amplitude_tokens": squared_amplitude_tokens
        })
    return processed_rows

def load_and_process_folder(folder_path):
    """
    Reads all text files in the provided folder and processes them.
    Returns the aggregated list of processed rows.
    """
    all_data = []
    folder = Path(folder_path)
    # Glob for .txt files in the folder
    text_files = list(folder.glob("*.txt"))
    for file_path in text_files:
        with file_path.open("r", encoding="utf-8") as f:
            content = f.read()
            file_data = process_file(content)
            all_data.extend(file_data)
    return all_data

def split_dataset(data, train_ratio=0.8, val_ratio=0.1, test_ratio=0.1):
    """
    Shuffles and splits the data into train, validation, and test sets.
    """
    random.shuffle(data)
    total = len(data)
    train_end = int(total * train_ratio)
    val_end = train_end + int(total * val_ratio)
    train_set = data[:train_end]
    val_set = data[train_end:val_end]
    test_set = data[val_end:]
    return train_set, val_set, test_set

# Main processing
folder_path = "/kaggle/input/squared-amplitude/SYMBA - Test Data"  # Path to the folder containing extracted text files.
all_processed_data = load_and_process_folder(folder_path)
print(f"Total examples processed: {len(all_processed_data)}")

# Split the dataset
train_set, val_set, test_set = split_dataset(all_processed_data)
print(f"Train set: {len(train_set)} examples")
print(f"Validation set: {len(val_set)} examples")
print(f"Test set: {len(test_set)} examples")

# Optionally, save the splits to disk as JSON for further processing
output_dir = Path("processed_data")
output_dir.mkdir(exist_ok=True)
with open(output_dir / "train.json", "w", encoding="utf-8") as f:
    json.dump(train_set, f, indent=2)
with open(output_dir / "val.json", "w", encoding="utf-8") as f:
    json.dump(val_set, f, indent=2)
with open(output_dir / "test.json", "w", encoding="utf-8") as f:
    json.dump(test_set, f, indent=2)

print("Data processing and splitting completed.")


Total examples processed: 15552
Train set: 12441 examples
Validation set: 1555 examples
Test set: 1556 examples
Data processing and splitting completed.


### Common Task 2 BERT

In [16]:
import math
import json
import random
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

# ---------------------
# 1. Special Tokens & Vocabulary
# ---------------------
SPECIAL_TOKENS = {
    "<pad>": 0,
    "<bos>": 1,
    "<eos>": 2,
    "<mask>": 3
}

def build_vocab(dataset):
    """
    Build vocabulary from a dataset.
    Each example is a dict with keys "amplitude_tokens" and "squared_amplitude_tokens".
    Both lists are iterated over to add tokens to the vocabulary.
    """
    vocab = dict(SPECIAL_TOKENS)
    for example in dataset:
        for token in example["amplitude_tokens"]:
            if token not in vocab:
                vocab[token] = len(vocab)
        for token in example["squared_amplitude_tokens"]:
            if token not in vocab:
                vocab[token] = len(vocab)
    return vocab

# ---------------------
# 2. Dataset and DataLoader
# ---------------------
class LanguageModelDataset(Dataset):
    def __init__(self, data, vocab, max_len=200):
        """
        data: list of dicts; each dict has keys "amplitude_tokens" and "squared_amplitude_tokens".
              Each value is a list of token strings.
        vocab: mapping from token to index.
        max_len: maximum sequence length (sequence is truncated if longer).
        """
        self.data = data
        self.vocab = vocab
        self.max_len = max_len

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        example = self.data[idx]
        # Combine tokens from both columns.
        tokens = ([SPECIAL_TOKENS["<bos>"]] +
                  [self.vocab[token] for token in example["amplitude_tokens"]] +
                  [self.vocab[token] for token in example["squared_amplitude_tokens"]] +
                  [SPECIAL_TOKENS["<eos>"]])
        tokens = tokens[:self.max_len]
        return torch.tensor(tokens, dtype=torch.long)

def collate_fn(batch):
    batch_size = len(batch)
    max_len = max(len(x) for x in batch)
    padded = torch.full((batch_size, max_len), SPECIAL_TOKENS["<pad>"], dtype=torch.long)
    for i, seq in enumerate(batch):
        padded[i, :len(seq)] = seq
    return padded

# ---------------------
# 3. Positional Encoding Module
# ---------------------
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        """
        Registers positional encodings with shape (max_len, 1, d_model) to be added
        to inputs of shape (seq_len, batch, d_model).
        """
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)
        pe = torch.zeros(max_len, d_model)  # (max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)  # (max_len, 1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(1)  # (max_len, 1, d_model)
        self.register_buffer("pe", pe)

    def forward(self, x):
        # x: (seq_len, batch, d_model)
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)

# ---------------------
# 4. Masking Function for MLM
# ---------------------
def mask_tokens(inputs, mask_token_id, vocab_size, mlm_probability=0.15):
    """
    Prepare masked tokens inputs/labels for masked language modeling.
    inputs: Tensor of shape (batch, seq_len)
    Returns: masked_inputs, labels (labels is -100 for tokens that are not masked)
    """
    # Ensure we create new tensors on the same device as inputs.
    labels = inputs.clone()
    probability_matrix = torch.full(labels.shape, mlm_probability, device=inputs.device)
    special_tokens_mask = (inputs == SPECIAL_TOKENS["<bos>"]) | \
                          (inputs == SPECIAL_TOKENS["<eos>"]) | \
                          (inputs == SPECIAL_TOKENS["<pad>"])
    probability_matrix.masked_fill_(special_tokens_mask, value=0.0)
    masked_indices = torch.bernoulli(probability_matrix).bool()
    labels[~masked_indices] = -100  # Only compute loss on masked tokens.

    # 80% of the time, replace masked tokens with <mask>
    indices_replaced = torch.bernoulli(torch.full(labels.shape, 0.8, device=inputs.device)).bool() & masked_indices
    inputs[indices_replaced] = mask_token_id

    # 10% of the time, replace masked tokens with a random token
    indices_random = torch.bernoulli(torch.full(labels.shape, 0.5, device=inputs.device)).bool() & masked_indices & ~indices_replaced
    random_words = torch.randint(low=0, high=vocab_size, size=inputs.shape, device=inputs.device)
    inputs[indices_random] = random_words[indices_random]
    # The rest 10% keep the original token.
    return inputs, labels

# ---------------------
# 5. BERT-Style Model for Masked Language Modeling
# ---------------------
class BertForMaskedLM(nn.Module):
    def __init__(self, vocab_size, d_model=128, nhead=8, num_layers=6, dropout=0.1):
        """
        A BERT-style model for masked language modeling.
        It uses a bidirectional Transformer encoder (without causal masking).
        """
        super().__init__()
        self.d_model = d_model
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        encoder_layer = nn.TransformerEncoderLayer(d_model, nhead, dim_feedforward=4*d_model, dropout=dropout)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers)
        self.fc_out = nn.Linear(d_model, vocab_size)

    def forward(self, src, src_padding_mask=None):
        # src: (seq_len, batch)
        emb = self.embedding(src) * math.sqrt(self.d_model)
        emb = self.pos_encoder(emb)
        # Bidirectional encoding: no causal mask is applied.
        out = self.transformer_encoder(emb, src_key_padding_mask=src_padding_mask)
        logits = self.fc_out(out)  # (seq_len, batch, vocab_size)
        return logits

# ---------------------
# 6. Helper Function for Padding Masking
# ---------------------
def generate_padding_mask(batch, pad_idx=SPECIAL_TOKENS["<pad>"]):
    # batch: (batch, seq_len)
    return (batch == pad_idx)

# ---------------------
# 7. Training and Evaluation Functions for MLM
# ---------------------
def train_epoch_bert(model, dataloader, optimizer, criterion, device, mlm_probability=0.15):
    model.train()
    total_loss = 0.0
    for batch in dataloader:
        batch = batch.to(device)  # (batch, seq_len)
        masked_inputs, labels = mask_tokens(batch.clone(), SPECIAL_TOKENS["<mask>"], vocab_size, mlm_probability)
        # Transpose for Transformer: (seq_len, batch)
        masked_inputs = masked_inputs.transpose(0, 1)
        labels = labels.transpose(0, 1)
        padding_mask = generate_padding_mask(batch).to(device)
        optimizer.zero_grad()
        logits = model(masked_inputs, src_padding_mask=padding_mask)
        loss = criterion(logits.view(-1, logits.size(-1)), labels.reshape(-1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(dataloader)

def evaluate_bert(model, dataloader, criterion, device, mlm_probability=0.15):
    model.eval()
    total_loss = 0.0
    with torch.no_grad():
        for batch in dataloader:
            batch = batch.to(device)
            masked_inputs, labels = mask_tokens(batch.clone(), SPECIAL_TOKENS["<mask>"], vocab_size, mlm_probability)
            masked_inputs = masked_inputs.transpose(0, 1)
            labels = labels.transpose(0, 1)
            padding_mask = generate_padding_mask(batch).to(device)
            logits = model(masked_inputs, src_padding_mask=padding_mask)
            loss = criterion(logits.view(-1, logits.size(-1)), labels.reshape(-1))
            total_loss += loss.item()
    return total_loss / len(dataloader)

# ---------------------
# 8. Display Predictions for Masked Tokens
# ---------------------
def display_predictions_bert(model, dataloader, vocab, num_examples=5, device=torch.device("cpu")):
    """
    For each example, display the original sequence with masked tokens replaced by the model's prediction.
    """
    rev_vocab = {v: k for k, v in vocab.items()}
    model.eval()
    examples_shown = 0
    with torch.no_grad():
        for batch in dataloader:
            batch = batch.to(device)
            # For display, mask the input using mask_tokens (with mlm_probability=0.15)
            masked_inputs, _ = mask_tokens(batch.clone(), SPECIAL_TOKENS["<mask>"], len(vocab), mlm_probability=0.15)
            for i in range(masked_inputs.size(0)):
                input_seq = masked_inputs[i].unsqueeze(0)  # (1, seq_len)
                input_seq_t = input_seq.transpose(0, 1)      # (seq_len, 1)
                padding_mask = generate_padding_mask(input_seq).to(device)
                logits = model(input_seq_t, src_padding_mask=padding_mask)
                logits = logits.transpose(0, 1)  # (1, seq_len, vocab_size)
                predictions = torch.argmax(logits, dim=-1).squeeze(0).cpu().tolist()

                # For positions that were masked in the input, use the model's prediction; else, show the original token.
                original = batch[i].cpu().tolist()
                display_seq = []
                # We iterate over the original sequence. If the token in the masked input is <mask>,
                # we pop one token from predictions.
                for orig, inp in zip(original, input_seq[0].cpu().tolist()):
                    if inp == SPECIAL_TOKENS["<mask>"]:
                        # If predictions list is empty, use <unk>
                        display_seq.append(rev_vocab.get(predictions.pop(0) if predictions else None, "<unk>"))
                    else:
                        display_seq.append(rev_vocab.get(orig, "<unk>"))
                print("Input (masked replaced by prediction):", " ".join(display_seq))
                print("-" * 50)
                examples_shown += 1
                if examples_shown >= num_examples:
                    return

# ---------------------
# 9. Main Training Loop and Testing
# ---------------------
if __name__ == "__main__":
    # Load data from JSON files.
    # Each file is expected to be a list of examples with keys "amplitude_tokens" and "squared_amplitude_tokens".
    with open("processed_data/train.json") as f:
        train_data = json.load(f)
    with open("processed_data/val.json") as f:
        val_data = json.load(f)
    with open("processed_data/test.json") as f:
        test_data = json.load(f)

    # Build vocabulary from combined data.
    combined_data = train_data + val_data + test_data
    vocab = build_vocab(combined_data)
    vocab_size = len(vocab)
    print(f"Vocabulary size: {vocab_size}")

    # Create dataset by combining tokens from both columns.
    train_dataset = LanguageModelDataset(train_data, vocab, max_len=200)
    val_dataset = LanguageModelDataset(val_data, vocab, max_len=200)
    test_dataset = LanguageModelDataset(test_data, vocab, max_len=200)

    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
    val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = BertForMaskedLM(vocab_size, d_model=128, nhead=8, num_layers=6, dropout=0.1)
    model = model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    # Use -100 as ignore_index for loss computation.
    criterion = nn.CrossEntropyLoss(ignore_index=-100)

    num_epochs = 500
    for epoch in range(1, num_epochs + 1):
        train_loss = train_epoch_bert(model, train_loader, optimizer, criterion, device, mlm_probability=0.15)
        val_loss = evaluate_bert(model, val_loader, criterion, device, mlm_probability=0.15)
        print(f"Epoch {epoch}: Train Loss = {train_loss:.4f}, Val Loss = {val_loss:.4f}")

    # Display sample predictions (masked token predictions) from the test set.
    print("\nSample Predictions on Test Data:")
    display_predictions_bert(model, test_loader, vocab, num_examples=5, device=device)


Vocabulary size: 32828
Epoch 1: Train Loss = 3.8072, Val Loss = 3.3031
Epoch 2: Train Loss = 3.1891, Val Loss = 2.9602
Epoch 3: Train Loss = 2.9058, Val Loss = 2.6946
Epoch 4: Train Loss = 2.6199, Val Loss = 2.4222
Epoch 5: Train Loss = 2.3676, Val Loss = 2.1747
Epoch 6: Train Loss = 2.1512, Val Loss = 1.9753
Epoch 7: Train Loss = 1.9693, Val Loss = 1.7926
Epoch 8: Train Loss = 1.8327, Val Loss = 1.6760
Epoch 9: Train Loss = 1.7288, Val Loss = 1.6019
Epoch 10: Train Loss = 1.6275, Val Loss = 1.5048
Epoch 11: Train Loss = 1.5547, Val Loss = 1.4133
Epoch 12: Train Loss = 1.4963, Val Loss = 1.3948
Epoch 13: Train Loss = 1.4372, Val Loss = 1.3471
Epoch 14: Train Loss = 1.3858, Val Loss = 1.2808
Epoch 15: Train Loss = 1.3285, Val Loss = 1.2444
Epoch 16: Train Loss = 1.2958, Val Loss = 1.1986
Epoch 17: Train Loss = 1.2626, Val Loss = 1.1810
Epoch 18: Train Loss = 1.2140, Val Loss = 1.1744
Epoch 19: Train Loss = 1.1888, Val Loss = 1.1327
Epoch 20: Train Loss = 1.1723, Val Loss = 1.1175
Epoch 

KeyboardInterrupt: 

In [18]:
print("working")
def test_accuracy_bert(model, dataloader, device, mlm_probability=0.15):
    """
    Computes token-level accuracy for masked language modeling on the test set.
    Only considers tokens that were masked (i.e. labels != -100).

    Returns:
        accuracy (float): the fraction of correctly predicted masked tokens.
    """
    model.eval()
    total_masked = 0
    total_correct = 0
    with torch.no_grad():
        for batch in dataloader:
            batch = batch.to(device)  # (batch, seq_len)
            # Create masked inputs and labels using the same masking function.
            masked_inputs, labels = mask_tokens(batch.clone(), SPECIAL_TOKENS["<mask>"], vocab_size, mlm_probability)
            # Transpose for Transformer: (seq_len, batch)
            masked_inputs = masked_inputs.transpose(0, 1)
            labels = labels.transpose(0, 1)
            padding_mask = generate_padding_mask(batch).to(device)
            logits = model(masked_inputs, src_padding_mask=padding_mask)
            # Get predictions (seq_len, batch)
            predictions = torch.argmax(logits, dim=-1)

            # For accuracy, we compare predictions with labels where labels != -100.
            mask = (labels != -100)
            total_masked += mask.sum().item()
            total_correct += (predictions[mask] == labels[mask]).sum().item()
    accuracy = total_correct / total_masked if total_masked > 0 else 0.0
    return accuracy

# Example usage:
accuracy = test_accuracy_bert(model, test_loader, device, mlm_probability=0.15)
print(f"Test Token-Level Accuracy on Masked Positions: {accuracy:.4f}")


working
Test Token-Level Accuracy on Masked Positions: 0.8125


In [19]:
def sequence_accuracy_bert(model, dataloader, device, mlm_probability=0.15):
    """
    Compute sequence accuracy for masked language modeling on the test set.
    For each example, we mask tokens using the same strategy as training.
    Then, if every token in the example that was masked is predicted correctly,
    the sequence is considered correct.

    Returns:
        accuracy (float): fraction of sequences with all masked tokens predicted correctly.
    """
    model.eval()
    total_sequences = 0
    correct_sequences = 0
    with torch.no_grad():
        for batch in dataloader:
            batch = batch.to(device)  # (batch, seq_len)
            # Create masked inputs and corresponding labels.
            masked_inputs, labels = mask_tokens(batch.clone(), SPECIAL_TOKENS["<mask>"], vocab_size, mlm_probability)
            # Transpose for the Transformer: (seq_len, batch)
            masked_inputs_t = masked_inputs.transpose(0, 1)
            labels_t = labels.transpose(0, 1)
            padding_mask = generate_padding_mask(batch).to(device)
            logits = model(masked_inputs_t, src_padding_mask=padding_mask)
            predictions = torch.argmax(logits, dim=-1)  # (seq_len, batch)

            # Evaluate each sequence in the batch.
            for i in range(batch.size(0)):
                seq_labels = labels_t[:, i]
                seq_preds = predictions[:, i]
                # Only consider positions that were masked (labels != -100)
                mask_positions = (seq_labels != -100)
                # If no tokens were masked, count the sequence as correct.
                if mask_positions.sum().item() == 0:
                    correct_sequences += 1
                else:
                    if torch.equal(seq_preds[mask_positions], seq_labels[mask_positions]):
                        correct_sequences += 1
                total_sequences += 1
    return correct_sequences / total_sequences if total_sequences > 0 else 0.0

acc = sequence_accuracy_bert(model, test_loader, device, mlm_probability=0.15)
print(f"Test Sequence Accuracy: {acc:.4f}")

Test Sequence Accuracy: 0.0244
