In [1]:
# ===== 0) Imports =====
import numpy as np
import pandas as pd
from tqdm import tqdm
import math
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from tokenizers import Tokenizer
import random
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import confusion_matrix, roc_auc_score


### Predefined function

In [2]:
# Custom dataset class for peptide sequences
class PeptideDataset(Dataset):
    def __init__(self, ds, tokenizer, seq_len):
        """
        Args:
            ds (list): List of raw peptide sequences.
            tokenizer: A tokenizer object that can map characters/tokens to IDs.
            seq_len (int): Fixed sequence length for model input (after padding).
        """
        super().__init__()
        self.seq_len = seq_len
        self.ds = ds
        self.tokenizer = tokenizer

        # Define special tokens (convert them into tensor form)
        self.sos_token = torch.tensor([tokenizer.token_to_id("[SOS]")], dtype=torch.int64)  # Start of sequence
        self.eos_token = torch.tensor([tokenizer.token_to_id("[EOS]")], dtype=torch.int64)  # End of sequence
        self.pad_token = torch.tensor([tokenizer.token_to_id("[PAD]")], dtype=torch.int64)  # Padding
        self.mask_token = torch.tensor([tokenizer.token_to_id("[MASK]")], dtype=torch.int64) # Mask (for MLM)

    def __len__(self):
        # Return total number of sequences in the dataset
        return len(self.ds)

    def __getitem__(self, idx):
        """
        Retrieve one sequence, apply random masking, 
        then build encoder_input and label tensors with special tokens + padding.
        """
        seq = self.ds[idx]  # Get raw sequence

        # Apply masking strategy (15% replaced with [MASK]) 
        # Returns both masked sequence IDs and the original token IDs
        masked_seq_ids, origi_seq_ids = random_mask(seq, self.tokenizer)

        # Compute how many [PAD] tokens are needed after adding [SOS] and [EOS]
        num_padding_tokens = self.seq_len - len(masked_seq_ids) - 2  
        if num_padding_tokens < 0:
            raise ValueError("Sequence is too long for the specified seq_len")

        # Build encoder input: [SOS] + masked sequence + [EOS] + [PAD...]
        encoder_input = torch.cat(
            [
                self.sos_token,
                torch.tensor(masked_seq_ids, dtype=torch.int64),
                self.eos_token,
                torch.tensor([self.pad_token] * num_padding_tokens, dtype=torch.int64),
            ],
            dim=0,
        )
        # Build label: [SOS] + original sequence + [EOS] + [PAD...]
        # (the model should learn to predict original tokens from masked input)
        label = torch.cat(
            [
                self.sos_token,
                torch.tensor(origi_seq_ids, dtype=torch.int64),
                self.eos_token,
                torch.tensor([self.pad_token] * num_padding_tokens, dtype=torch.int64),
            ],
            dim=0,
        )
        # Sanity check: ensure both sequences have fixed length = seq_len
        assert encoder_input.size(0) == self.seq_len, "encoder_input size mismatch"
        assert label.size(0) == self.seq_len, "label size mismatch"
        return {
            "encoder_input": encoder_input,  
            # Shape: (seq_len). Model input sequence with masked tokens and padding.

            # "encoder_mask": (encoder_input != self.pad_token).unsqueeze(0).unsqueeze(0).int(),  
            "encoder_mask": (encoder_input == self.pad_token).bool(),  
            # Shape: (1, 1, seq_len). Attention mask where 1 = valid token, 0 = pad.
            # Two unsqueezes are added to match transformer attention dimensions.

            "label": label,
            # Shape: (seq_len). Original sequence (target labels for MLM).
            "seq": seq
        }
# Helper function to apply random token masking (like BERT MLM)
def random_mask(sentence, tokenizer):
    """
    Args:
        sentence (str): Raw sequence (string of characters).
        tokenizer: Tokenizer to map characters to IDs.

    Returns:
        masked_seq_ids (list of int): Sequence IDs with 15% tokens replaced by [MASK].
        origi_seq_ids (list of int): Original unmasked sequence IDs.
    """
    origi_seq_ids = tokenizer.encode(sentence).ids  
    masked_seq_ids = origi_seq_ids
    # Sanity check: both masked and original sequences must have same length
    assert len(masked_seq_ids) == len(origi_seq_ids), \
        "Masked sequence length does not match original sequence length"

    return masked_seq_ids, origi_seq_ids

In [3]:

class SinusoidalPositionalEncoding(nn.Module):
    """
    Fixed (non-learnable) sinusoidal positional encoding, as in
    'Attention Is All You Need'. Precomputes a (max_len, d_model)
    table and adds it to token embeddings at call time.

    Args:
        d_model: Embedding (model) dimension.
        max_len: Maximum supported sequence length for precomputed table.
        dropout: Dropout applied after adding positions.
    """
    def __init__(self, d_model: int, max_len: int, dropout: float = 0.1):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        # Precompute sinusoidal table: shape (max_len, d_model)
        # pe[pos, 2i]   = sin(pos / (10000^(2i/d_model)))
        # pe[pos, 2i+1] = cos(pos / (10000^(2i/d_model)))
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)  # (max_len, 1)
        div_term = torch.exp(
            torch.arange(0, d_model, 2, dtype=torch.float) * (-math.log(10000.0) / d_model)
        )
        pe[:, 0::2] = torch.sin(position * div_term)  # even dimensions
        pe[:, 1::2] = torch.cos(position * div_term)  # odd  dimensions

        # Add batch dim for broadcasting at runtime: (1, max_len, d_model)
        pe = pe.unsqueeze(0)

        # Register as a buffer so it moves with .to(device) but is not a parameter
        self.register_buffer("pe", pe)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Add positions to token embeddings.

        Args:
            x: Token embeddings, shape (B, S, D) = (batch, seq_len, d_model)

        Returns:
            Tensor of shape (B, S, D) with positions added, then dropout applied.
        """
        # Slice the first S positions and add to x; no gradient through the table
        x = x + self.pe[:, : x.size(1), :].requires_grad_(False)

        # NOTE (optional): if you do NOT want to add positions to PAD rows,
        # you can pass a (B, S) bool mask into forward and zero out pe on PAD:
        # pe = self.pe[:, : x.size(1), :]
        # pe = pe.masked_fill(key_padding_mask.unsqueeze(-1), 0.0)  # True=PAD
        # x = x + pe

        return self.dropout(x)

class PepBERT(nn.Module):
    """
    BERT-style Transformer encoder for sequences (e.g., peptides).

    Components:
        - Token embedding with a defined padding_idx (PAD rows are zero and not updated).
        - Fixed sinusoidal positional encoding (added to token embeddings).
        - Stack of nn.TransformerEncoderLayer blocks (self-attention + FFN).
        - Linear projection to vocabulary size (for MLM-style training).

    Args:
        vocab_size: Size of the tokenizer vocabulary.
        seq_len:    Max sequence length (drives positional table size).
        pad_id:     Vocabulary ID used for PAD tokens.
        d_model:    Embedding dimension.
        n_heads:    Number of attention heads.
        n_layers:   Number of encoder layers (depth).
        d_ff:       Hidden size of the position-wise feed-forward (usually 4*d_model).
        dropout:    Dropout probability used across the module.
    """
    def __init__(
        self,
        vocab_size: int,
        seq_len: int,
        pad_id: int,
        d_model: int = 160,
        n_heads: int = 8,
        n_layers: int = 6,
        d_ff: int = 640,
        dropout: float = 0.1,
    ):
        super().__init__()

        # Token embedding: PAD rows are always zero and are excluded from updates
        self.embedding = nn.Embedding(vocab_size, d_model, padding_idx=pad_id)

        # Fixed positional encoding (sin/cos), shape added as (B, S, D)
        self.pos_encoding = SinusoidalPositionalEncoding(d_model, seq_len, dropout)

        # Transformer encoder stack (each layer: MHA + FFN + residual + LayerNorm)
        # batch_first=True => inputs are (B, S, D)
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=n_heads,
            dim_feedforward=d_ff,
            dropout=dropout,
            batch_first=True,
        )
        self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=n_layers)

        # Final projection to logits over vocabulary (for masked language modeling)
        self.proj = nn.Linear(d_model, vocab_size)

    def forward(
        self,
        src: torch.Tensor,
        src_key_padding_mask: torch.Tensor | None = None,
    ) -> torch.Tensor:
        """
        Forward pass.

        Args:
            src: LongTensor token IDs, shape (B, S).
            src_key_padding_mask: BoolTensor, shape (B, S),
                                    True at PAD positions (to be ignored by attention).

        Returns:
            Logits over vocabulary: shape (B, S, vocab_size).
        """
        # Embed tokens and scale by sqrt(d_model) (stabilizes training as in the paper)
        x = self.embedding(src) * math.sqrt(self.embedding.embedding_dim)  # (B, S, D)

        # Add fixed sinusoidal positional encoding
        x = self.pos_encoding(x)  # (B, S, D)

        # IMPORTANT:
        # - nn.TransformerEncoder expects src_key_padding_mask with shape (B, S), True=PAD.
        # - On Apple Silicon (MPS), passing this mask may trigger a nested-tensor path
        #   not fully implemented. If you ever hit NotImplementedError on MPS, you can:
        #   (A) Drop the mask on MPS:
        #       if x.device.type == "mps": src_key_padding_mask = None
        #   (B) Or pass a dummy attn_mask to disable that fast-path while keeping padding mask:
        #       S = x.size(1)
        #       dummy = torch.zeros((S, S), dtype=torch.bool, device=x.device)
        #       x = self.encoder(x, mask=dummy, src_key_padding_mask=src_key_padding_mask)
        #       return self.proj(x)

        hidden_states = self.encoder(x, src_key_padding_mask=src_key_padding_mask)  # (B, S, D)

        # Project hidden states to vocabulary logits (use CrossEntropyLoss with ignore_index=pad_id)
        return hidden_states, self.proj(hidden_states)  # (B, S, vocab_size)

### Loading the pretrained model

In [4]:
# ===== 1) Device selection =====
# Automatically pick the best available device:
if torch.cuda.is_available():
    device = torch.device("cuda")
elif torch.backends.mps.is_available():
    device = torch.device("mps")
else:
    device = torch.device("cpu")
print("Using device:", device)

# ===== 2) Load pretrained tokenizer & model (must match training setup) =====
seq_len = 52  # maximum sequence length (with [SOS] and [EOS])
tokenizer = Tokenizer.from_file("tokenizer.json")
pad_id = tokenizer.token_to_id("[PAD]")

model = PepBERT(
    vocab_size=tokenizer.get_vocab_size(),
    seq_len=seq_len,
    pad_id=pad_id,
    d_model=160,
    n_heads=8,
    n_layers=6,
    d_ff=640,
    dropout=0.0
).to(device)

# ===== 3) Load model checkpoint =====
checkpoint = torch.load("tmodel_49.pt", map_location=device)
model.load_state_dict(checkpoint["model_state_dict"])
model.eval()  # inference mode (disables dropout)

Using device: cuda


PepBERT(
  (embedding): Embedding(29, 160, padding_idx=1)
  (pos_encoding): SinusoidalPositionalEncoding(
    (dropout): Dropout(p=0.0, inplace=False)
  )
  (encoder): TransformerEncoder(
    (layers): ModuleList(
      (0-5): 6 x TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=160, out_features=160, bias=True)
        )
        (linear1): Linear(in_features=160, out_features=640, bias=True)
        (dropout): Dropout(p=0.0, inplace=False)
        (linear2): Linear(in_features=640, out_features=160, bias=True)
        (norm1): LayerNorm((160,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((160,), eps=1e-05, elementwise_affine=True)
        (dropout1): Dropout(p=0.0, inplace=False)
        (dropout2): Dropout(p=0.0, inplace=False)
      )
    )
  )
  (proj): Linear(in_features=160, out_features=29, bias=True)
)

### Define embedding function based on pretrianed model

In [5]:
# ===== 4) Helper: extract embeddings =====
@torch.no_grad()
def embed_sequences(dataloader, model, device):
    """
    Extract one fixed-size embedding per sequence.

    Steps:
        - Forward pass through PepBERT → hidden states (B, S, D).
        - Slice out the true amino acid tokens (excluding [SOS], [EOS], PAD).
        - Mean-pool over token embeddings → single vector per sequence.

    Returns:
        DataFrame of shape (N, d_model) with embeddings for all sequences.
    """
    rows = []
    for batch in tqdm(dataloader, desc="Embedding"):
        encoder_input = batch["encoder_input"].to(device)    # (B, S)
        encoder_mask  = batch["encoder_mask"].to(device)     # (B, S), True=PAD
        seq_list  = batch["seq"]                         # list[str], raw sequences

        # Forward pass. Model must return (hidden_states,logits).
        hidden_states, _ = model(encoder_input, src_key_padding_mask=encoder_mask)  # (B, S, V)
        # hidden: (B, S, d_model)
    
        # Compute mean embedding for each sequence in the batch
        for i, raw_seq in enumerate(seq_list):
            L = len(raw_seq)                   # true sequence length (without [SOS]/[EOS])
            token_slice = hidden_states[i, 1:1+L, :]  # skip [SOS], take L tokens
            pooled = token_slice.mean(dim=0).cpu().numpy()
            rows.append(pooled)

    return pd.DataFrame(rows)



### Embedding new dataset and model development for new dataset

In [6]:
# ===== 5) Load datasets (Excel) =====
train_df = pd.read_excel("QS_train.xlsx", na_filter=False)
test_df  = pd.read_excel("QS_test.xlsx", na_filter=False)

# Remove duplicates and sequences longer than 50 aa
train_df = train_df.drop_duplicates(subset=["sequence"])
train_df = train_df[train_df["sequence"].apply(len) <= 50].reset_index(drop=True)
test_df  = test_df[test_df["sequence"].apply(len) <= 50].reset_index(drop=True)

# ===== 6) Build Dataset & DataLoader =====
# Your PeptideDataset should return:
#   - encoder_input: token IDs with [SOS]/[EOS]/PAD
#   - encoder_mask : True at PAD positions
#   - seq          : raw sequence (string)
ds_train = PeptideDataset(train_df["sequence"].tolist(), tokenizer, seq_len)
ds_test  = PeptideDataset(test_df["sequence"].tolist(),  tokenizer, seq_len)

dl_train = DataLoader(ds_train, batch_size=100, shuffle=False)
dl_test  = DataLoader(ds_test,  batch_size=100, shuffle=False)

# ===== 7) Extract embeddings for train and test sets =====
emb_train = embed_sequences(dl_train, model, device)
emb_test  = embed_sequences(dl_test,  model, device)
emb_train.to_csv("peptide_bert_embed_train.csv", index=False)
emb_test.to_csv("peptide_bert_embed_test.csv",  index=False)


  output = torch._nested_tensor_from_mask(
Embedding: 100%|██████████| 4/4 [00:00<00:00, 15.21it/s]
Embedding: 100%|██████████| 1/1 [00:00<00:00, 112.12it/s]


In [7]:
# ===== 8) Train a logistic regression classifier =====
y_train = train_df["label"].to_numpy()
y_test  = test_df["label"].to_numpy()

clf = LogisticRegression(max_iter=1000, n_jobs=-1)
clf.fit(emb_train, y_train)

# ===== 9) Evaluate performance =====
y_prob = clf.predict_proba(emb_test)[:, 1]      # predicted probability of class 1
y_pred = (y_prob >= 0.5).astype(int)            # binary prediction

# Confusion matrix elements
TN, FP, FN, TP = confusion_matrix(y_test, y_pred).ravel()

# Compute metrics
ACC  = (TP+TN) / (TP+TN+FP+FN)
Sn   = TP / (TP+FN+1e-12)        # sensitivity (recall)
Sp   = TN / (TN+FP+1e-12)        # specificity
BACC = 0.5 * (Sn + Sp)           # balanced accuracy
MCC  = (TP*TN - FP*FN) / math.sqrt((TP+FP)*(TP+FN)*(TN+FP)*(TN+FN) + 1e-12)
AUC  = roc_auc_score(y_test, y_prob)

metrics = dict(ACC=ACC, BACC=BACC, Sn=Sn, Sp=Sp, MCC=MCC, AUC=AUC)
print(metrics)

# Save metrics
pd.DataFrame([metrics]).to_csv("performance_report.csv", index=False)

{'ACC': 0.9117647058823529, 'BACC': 0.9249999999999431, 'Sn': 0.9999999999999286, 'Sp': 0.8499999999999576, 'MCC': 0.8366600265340755, 'AUC': 0.9428571428571428}
