# Example script for Hackathon

Within each cycle of active learning, you can:

1. Collect training data (original training data + your query data).

2. Train a prediction model to predict the DMS_score for each mutant (e.g., M0A).

3. Use the trained model to predict the score for all mutant in the test set.

4. Select query mutants for next round based on certain criteria. You may want to make sure you don't query the same mutant twice as you only have a limited chances of making queries in total.

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torch.utils.data import DataLoader, Dataset
import random
from copy import deepcopy
import pandas as pd
from scipy.stats import spearmanr
import argparse
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression

## 1. collect training data

Upload `sequence.fasta`, `train.csv`, and `test.csv` to the current runtime:

1. click the folder icon on the left

2. click the upload icon and upload the files to the current directory

In [None]:
with open('sequence.fasta', 'r') as f:
  data = f.readlines()

sequence_wt = data[1].strip()
sequence_wt

In [None]:
len(sequence_wt)

In [None]:
def get_mutated_sequence(mut, sequence_wt):
  wt, pos, mt = mut[0], int(mut[1:-1]), mut[-1]

  sequence = deepcopy(sequence_wt)

  return sequence[:pos]+mt+sequence[pos+1:]

In [None]:
df_train = pd.read_csv('train.csv')
df_train['sequence'] = df_train.mutant.apply(lambda x: get_mutated_sequence(x, sequence_wt))
df_train

In [None]:
df_test = pd.read_csv('test.csv')
df_test['sequence'] = df_test.mutant.apply(lambda x: get_mutated_sequence(x, sequence_wt))
df_test

In [None]:
# TODO: integrate the query data that you acquired each round into df_train

## 2. Train a prediction model

Here, we provided a linear regression model and used one-hot encoding to encode each variant. You would need to build your own model to achieve better performances.

Hint: you can perform cross-validation on the training set to evaluate your predictor before making predictions on the test set.

In [None]:
'''hyperparameters'''

seq_length = 656
seed = 0 # seed for splitting the validation set
val_ratio = 0.2 # proportion of validation set

In [None]:
class ProteinDataset(Dataset):
    def __init__(self, df, istrain=True):

        alphabet = 'ACDEFGHIKLMNPQRSTVWY'
        map_a2i = {j:i for i,j in enumerate(alphabet)}
        map_i2a = {i:j for i,j in enumerate(alphabet)}

        self.df = df

        self.num_samples = len(self.df)
        self.seq_length = len(self.df.sequence.values[0])
        self.num_channels = 20

        # TODO: replace one-hot encodings with your own encodings
        self.encodings = np.zeros((self.num_samples, self.num_channels, self.seq_length)).astype(np.float32)
        self.targets = np.zeros(self.num_samples).astype(np.float32)

        if istrain:
          for it, (seq,target) in enumerate(self.df[['sequence', 'DMS_score']].values):
              for i,aa in enumerate(seq):
                  self.encodings[it,map_a2i[aa],i] = 1
              self.targets[it] = target

          self.encodings = self.encodings.astype(np.float32)
          self.targets = self.targets.astype(np.float32)
        else:
          for it, seq in enumerate(self.df['sequence'].values):
              for i,aa in enumerate(seq):
                  self.encodings[it,map_a2i[aa],i] = 1

          self.encodings = self.encodings.astype(np.float32)

    def __len__(self):
        return self.num_samples

    def __getitem__(self, idx):
        return torch.tensor(self.encodings[idx]), torch.tensor(self.targets[idx])

In [None]:
train_dataset = ProteinDataset(df_train)
test_dataset = ProteinDataset(df_test, istrain=False)

# split validation set
train_dataset, val_dataset = train_test_split(train_dataset, test_size=val_ratio, random_state=seed, shuffle=True)

# TODO: revise according to your own model
train_loader = DataLoader(train_dataset, batch_size=len(train_dataset), shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=len(val_dataset), shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=len(test_dataset), shuffle=False)

In [None]:
# TODO: build your own prediction model to replace linear regression
# Hint: don't forget to use the validation set: you can either integrate the validation data into the training set or use it separately for early stopping

X_train, y_train = next(iter(train_loader))

regressor = LinearRegression()
regressor.fit(X_train.view(X_train.size(0), -1).numpy(), y_train.numpy())

X_test, _ = next(iter(test_loader))
y_test_pred = regressor.predict(X_test.view(X_test.size(0), -1).numpy())

In [None]:
df_test['DMS_score_predicted'] = y_test_pred
df_test

In [None]:
df_test[['mutant', 'DMS_score_predicted']].to_csv('test_predictions.csv')

## 3. Select query for next round

In [None]:
df_test.sort_values('DMS_score_predicted', ascending=False).head(100)

In [None]:
# Example: randomly select 100 test variants to be queried.
# Note: random selection may not be a good strategy
# TODO: select query mutants for the next round based on your own criteria

querys = np.random.choice(df_test.mutant.values, size=100, replace=False)
querys


In [None]:
with open('query.txt', 'w') as f:
  for mutant in querys:
    f.write(mutant+'\n')

#Create ESM Embeddings

Create ESM embeddings for train and test data

In [None]:
!pip install fair-esm
!pip install biopython

In [None]:
import esm

def create_esm_embeddings(df,
                          model_name="esm2_t33_650M_UR50D",
                          device='cuda:0'):

    device = torch.device(device if torch.cuda.is_available() else "cpu")

    model, alphabet = esm.pretrained.load_model_and_alphabet(model_name)
    model.to(device)
    model.eval()

    batch_converter = alphabet.get_batch_converter()

    embeddings = []

    for sequence in df["sequence"]:
        data = [("seq", sequence)]
        batch_labels, batch_strs, batch_tokens = batch_converter(data)
        batch_tokens = batch_tokens.to(device)

        with torch.no_grad():
            results = model(batch_tokens, repr_layers=[model.num_layers], need_head_weights=False)

        token_representations = results["representations"][model.num_layers]

        seq_representation = token_representations[0, 1 : (len(sequence) + 1)].mean(0)

        embeddings.append(seq_representation.cpu().numpy())
    df["esm_embedding"] = embeddings
    return df

In [None]:
df_train_embeddings = create_esm_embeddings(df_train)
df_train_embeddings.to_csv('train_embeddings.csv')

In [None]:
df_test_embeddings = create_esm_embeddings(df_test)
df_test_embeddings.to_csv('test_embeddings.csv')

In [None]:
df_train_embeddings["esm_embedding"] = df_train_embeddings["esm_embedding"].apply(
    lambda emb: emb.tolist() if isinstance(emb, np.ndarray) else emb
)
df_test_embeddings["esm_embedding"] = df_test_embeddings["esm_embedding"].apply(
    lambda emb: emb.tolist() if isinstance(emb, np.ndarray) else emb
)
df_train_embeddings.to_csv('train_embeddings_commas.csv')
df_test_embeddings.to_csv('test_embeddings_commas.csv')

# Adding queries to our training data

Updated names of query files based on which query was being processed

In [None]:
query3_df = pd.read_csv("query3.txt")
query3_df_embeddings = create_esm_embeddings(query3_df)
query3_df_embeddings["esm_embedding"] = query3_df_embeddings["esm_embedding"].apply(
    lambda emb: emb.tolist() if isinstance(emb, np.ndarray) else emb
)
query3_df_embeddings.to_csv('query3_embeddings_commas.csv')
q1q2_train_df_embeddings_commas = pd.read_csv("q1q2_train_embeddings_commas.csv")
q1q2q3_train_embeddings_commas = pd.concat([q1q2_train_df_embeddings_commas, query3_df_embeddings], ignore_index=True)
q1q2q3_train_embeddings_commas = q1q2q3_train_embeddings_commas.drop(columns=['Unnamed: 0.1', 'Unnamed: 0'])
q1q2q3_train_embeddings_commas.to_csv('q1q2q3_train_embeddings_commas.csv')

# Model 1: Multi-Head Attention Pooling + Deep MLP Regressor with Dropout and Batch Normalization + XGBoost

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
import numpy as np
import pandas as pd
import ast
from sklearn.preprocessing import StandardScaler
from scipy.stats import spearmanr
import xgboost as xgb


#DATASET
class ESM_Dataset(Dataset):
    def __init__(self, df):
        def convert_emb(x):
            arr = np.array(ast.literal_eval(x), dtype=np.float32)
            if arr.ndim == 1:
                arr = np.expand_dims(arr, axis=0)
            return arr

        self.embeddings = np.stack(df['esm_embedding'].apply(convert_emb).values)
        self.dms_scores = df['DMS_score'].values.astype(np.float32)

        N, L, D = self.embeddings.shape
        self.embeddings = self.embeddings.reshape(N, L * D)
        self.scaler = StandardScaler()
        self.embeddings = self.scaler.fit_transform(self.embeddings)
        self.embeddings = self.embeddings.reshape(N, L, D)

        self.dms_scores = (self.dms_scores - np.mean(self.dms_scores)) / np.std(self.dms_scores)

    def __len__(self):
        return len(self.dms_scores)

    def __getitem__(self, idx):
        emb = self.embeddings[idx]
        score = self.dms_scores[idx]
        return torch.tensor(emb, dtype=torch.float32), torch.tensor(score, dtype=torch.float32)


# Multi-Head Attention Pooling Module (Transformer-style)
class MultiHeadAttentionPooling(nn.Module):
    def __init__(self, input_dim, num_heads, dropout=0.1):
        """
        Learns a query vector to attend over the sequence tokens.
        """
        super().__init__()
        self.query = nn.Parameter(torch.randn(1, input_dim))
        self.mha = nn.MultiheadAttention(embed_dim=input_dim, num_heads=num_heads, dropout=dropout, batch_first=True)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        """
        x: [batch_size, seq_len, input_dim]
        Returns a pooled representation: [batch_size, input_dim]
        """
        batch_size = x.size(0)
        query = self.query.unsqueeze(0).expand(batch_size, 1, -1)
        attn_output, _ = self.mha(query, x, x)
        attn_output = self.dropout(attn_output)
        return attn_output.squeeze(1)

# Deep MLP Regressor with Dropout and Batch Normalization
class DeepMLPRegressor(nn.Module):
    def __init__(self, input_dim):
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(input_dim, 1024),
            nn.BatchNorm1d(1024),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(1024, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, 128),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(128, 64),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.Linear(64, 1)
        )

    def forward(self, x):
        return self.model(x).squeeze(-1)

# Multi-Head Attention Pooling + Deep MLP
class AttnTransformerMLPRegressor(nn.Module):
    def __init__(self, input_dim, num_heads=4, dropout=0.1):
        super().__init__()
        self.attn_pool = MultiHeadAttentionPooling(input_dim, num_heads, dropout)
        self.deep_mlp = DeepMLPRegressor(input_dim)

    def forward(self, x):
        """
        x: [batch_size, seq_len, input_dim]
        Returns: [batch_size]
        """
        pooled = self.attn_pool(x)
        return self.deep_mlp(pooled)

# Training & Evaluation
def train_model(model, train_loader, test_loader, epochs=50, lr=1e-4):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion = nn.MSELoss()

    for epoch in range(epochs):
        model.train()
        total_loss = 0
        for x, y in train_loader:
            x, y = x.to(device), y.to(device)
            optimizer.zero_grad()
            y_pred = model(x)
            loss = criterion(y_pred, y)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        avg_loss = total_loss / len(train_loader)
        print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}")

    model.eval()
    all_preds, all_labels = [], []
    with torch.no_grad():
        for x, y in test_loader:
            x, y = x.to(device), y.to(device)
            y_pred = model(x).cpu().numpy()
            all_preds.extend(y_pred)
            all_labels.extend(y.cpu().numpy())
    mse = np.mean((np.array(all_preds) - np.array(all_labels)) ** 2)
    spearman_corr, _ = spearmanr(all_preds, all_labels)
    print(f"Test MSE: {mse:.4f}, Spearman: {spearman_corr:.4f}")
    return mse, spearman_corr

def extract_features(model, loader):
    """
    Extracts features using the multi-head attention pooling module.
    Returns features of shape [N, input_dim] and corresponding labels.
    """
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.eval()
    features_list, labels_list = [], []
    with torch.no_grad():
        for x, y in loader:
            x = x.to(device)
            features = model.attn_pool(x)
            features_list.append(features.cpu().numpy())
            labels_list.append(y.cpu().numpy())
    features = np.concatenate(features_list, axis=0)
    labels = np.concatenate(labels_list, axis=0)
    return features, labels


df = pd.read_csv("q1q2q3_train_embeddings_commas.csv")
df['esm_embedding'] = df['esm_embedding'].apply(lambda x: x if isinstance(x, list) else str(x))

dataset = ESM_Dataset(df)
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

input_dim = dataset.embeddings.shape[2]
deep_model = AttnTransformerMLPRegressor(input_dim=input_dim, num_heads=4, dropout=0.1)
print("Training deep attention network with multi-head pooling...")
mse_nn, spearman_nn = train_model(deep_model, train_loader, test_loader, epochs=100, lr=1e-4)

# XGBoost
print("Extracting features from attention pooling module for XGBoost...")
train_features, train_labels = extract_features(deep_model, train_loader)
test_features, test_labels = extract_features(deep_model, test_loader)

print("Training XGBoost regressor on extracted features...")
xgb_model = xgb.XGBRegressor(
    objective='reg:squarederror',
    n_estimators=200,
    learning_rate=0.05,
    max_depth=6,
    random_state=42
)
xgb_model.fit(train_features, train_labels)
xgb_preds = xgb_model.predict(test_features)
spearman_xgb, _ = spearmanr(xgb_preds, test_labels)
mse_xgb = np.mean((xgb_preds - test_labels) ** 2)
print(f"XGBoost - MSE: {mse_xgb:.4f}, Spearman: {spearman_xgb:.4f}")


# Ensembling Neural and XGboost
print("Ensembling predictions from neural network and XGBoost...")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
deep_model.eval()
all_nn_preds = []
with torch.no_grad():
    for x, _ in test_loader:
        x = x.to(device)
        preds = deep_model(x).cpu().numpy()
        all_nn_preds.extend(preds)
all_nn_preds = np.array(all_nn_preds)

alpha = 0.5
ensemble_preds = alpha * all_nn_preds + (1 - alpha) * xgb_preds
spearman_ensemble, _ = spearmanr(ensemble_preds, test_labels)
mse_ensemble = np.mean((ensemble_preds - test_labels) ** 2)
print(f"Ensemble - MSE: {mse_ensemble:.4f}, Spearman: {spearman_ensemble:.4f}")

# Model 2: BiLSTM + Attention + MLP

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import ast
from torch.utils.data import Dataset, DataLoader
from scipy.stats import spearmanr
import torch.nn.functional as F

df = pd.read_csv("/content/q1q2q3_train_embeddings_commas.csv")
df["esm_embedding"] = df["esm_embedding"].apply(ast.literal_eval)
X = torch.tensor(df["esm_embedding"].tolist(), dtype=torch.float32)
y = torch.tensor(df["DMS_score"].values, dtype=torch.float32).view(-1, 1)

X = (X - X.mean(dim=0)) / (X.std(dim=0) + 1e-8)

class DMSDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# DataLoader
train_size = int(0.8 * len(df))
test_size = len(df) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(DMSDataset(X, y), [train_size, test_size])

# Best hyperparameters from tuning
best_params = {
    "hidden_dim": 512,
    "lstm_layers": 3,
    "batch_size": 64,
    "learning_rate": 0.0001,
    "weight_decay": 0.001,
    "dropout": 0.3
}

train_loader = DataLoader(train_dataset, batch_size=best_params["batch_size"], shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=best_params["batch_size"], shuffle=False)

# BiLSTM + Attention + MLP
class BiLSTMRegressor(nn.Module):
    def __init__(self, input_dim, hidden_dim, lstm_layers, dropout=0.3):
        super(BiLSTMRegressor, self).__init__()

        self.lstm = nn.LSTM(input_dim, hidden_dim, lstm_layers, batch_first=True, bidirectional=True, dropout=dropout)
        self.attn = nn.Linear(hidden_dim * 2, 1)

        self.fc = nn.Sequential(
            nn.Linear(hidden_dim * 2, 512),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, 1)
        )

        self._initialize_weights()

    def _initialize_weights(self):
        for name, param in self.lstm.named_parameters():
            if 'weight' in name:
                nn.init.xavier_uniform_(param)
            elif 'bias' in name:
                nn.init.zeros_(param)

        for name, param in self.fc.named_parameters():
            if 'weight' in name:
                nn.init.kaiming_uniform_(param, nonlinearity='relu')
            elif 'bias' in name:
                nn.init.zeros_(param)

        nn.init.xavier_uniform_(self.attn.weight)
        nn.init.zeros_(self.attn.bias)

    def forward(self, x):
        x = x.unsqueeze(1)
        lstm_out, _ = self.lstm(x)

        attn_weights = torch.softmax(self.attn(lstm_out).squeeze(-1), dim=1)
        attn_out = torch.sum(lstm_out * attn_weights.unsqueeze(-1), dim=1)

        output = self.fc(attn_out)
        return output


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#model = BiLSTMRegressor(input_dim=X.shape[1], **best_params).to(device)
model = BiLSTMRegressor(
    input_dim=X.shape[1],
    hidden_dim=best_params["hidden_dim"],
    lstm_layers=best_params["lstm_layers"],
    dropout=best_params["dropout"]
).to(device)
criterion = nn.MSELoss()
optimizer = optim.AdamW(model.parameters(), lr=best_params["learning_rate"], weight_decay=best_params["weight_decay"])

# Training Loop
def train_model(model, train_loader, test_loader, epochs=50):
    best_spearman = -1
    best_model_path = "best_model.pth"

    for epoch in range(epochs):
        model.train()
        total_loss = 0

        for X_batch, y_batch in train_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)

            optimizer.zero_grad()
            y_pred = model(X_batch)

            loss = criterion(y_pred, y_batch)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        # Evaluation
        model.eval()
        all_preds, all_targets = [], []
        with torch.no_grad():
            for X_batch, y_batch in test_loader:
                X_batch, y_batch = X_batch.to(device), y_batch.to(device)
                preds = model(X_batch).cpu().numpy().flatten()
                targets = y_batch.cpu().numpy().flatten()
                all_preds.extend(preds)
                all_targets.extend(targets)

        spearman_corr, _ = spearmanr(all_preds, all_targets)

        # Save best model
        if spearman_corr > best_spearman:
            best_spearman = spearman_corr
            torch.save(model.state_dict(), best_model_path)
            print(f"Saved new best model at epoch {epoch+1} with Spearman: {spearman_corr:.4f}")

        print(f"Epoch {epoch+1}/{epochs} - Loss: {total_loss/len(train_loader):.4f} - Spearman: {spearman_corr:.4f}")

    print(f"Training complete! Best Spearman: {best_spearman:.4f}")

# Train the final model
train_model(model, train_loader, test_loader, epochs=50)


# Model 3: BiLSTM + Attention + MLP + Pathogenicity data

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import ast
from torch.utils.data import Dataset, DataLoader
from scipy.stats import spearmanr
import torch.nn.functional as F

# dataset + Pathogenicity
df = pd.read_csv("/content/q1q2q3_train_embeddings_commas.csv")
df["esm_embedding"] = df["esm_embedding"].apply(ast.literal_eval)

patho_df = pd.read_csv("AF-P42898-F1-hg38.csv")
patho_df = patho_df[["protein_variant", "am_pathogenicity"]]

df = df.merge(patho_df, left_on="mutant", right_on="protein_variant", how="left")
df["am_pathogenicity"] = df["am_pathogenicity"].fillna(0.0)

X_embed = torch.tensor(df["esm_embedding"].tolist(), dtype=torch.float32)
patho_tensor = torch.tensor(df["am_pathogenicity"].values, dtype=torch.float32).unsqueeze(1)

X = torch.cat([X_embed, patho_tensor], dim=1)
y = torch.tensor(df["DMS_score"].values, dtype=torch.float32).view(-1, 1)

X = (X - X.mean(dim=0)) / (X.std(dim=0) + 1e-8)


class DMSDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

train_size = int(0.8 * len(df))
test_size = len(df) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(DMSDataset(X, y), [train_size, test_size])

best_params = {
    "hidden_dim": 512,
    "lstm_layers": 3,
    "batch_size": 64,
    "learning_rate": 0.0001,
    "weight_decay": 0.001,
    "dropout": 0.3
}

train_loader = DataLoader(train_dataset, batch_size=best_params["batch_size"], shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=best_params["batch_size"], shuffle=False)

# BiLSTM + Attention + MLP
class BiLSTMRegressor(nn.Module):
    def __init__(self, input_dim, hidden_dim, lstm_layers, dropout=0.3):
        super(BiLSTMRegressor, self).__init__()

        self.lstm = nn.LSTM(input_dim, hidden_dim, lstm_layers,
                            batch_first=True, bidirectional=True, dropout=dropout)
        self.attn = nn.Linear(hidden_dim * 2, 1)

        self.fc = nn.Sequential(
            nn.Linear(hidden_dim * 2, 512),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, 1)
        )

        self._initialize_weights()

    def _initialize_weights(self):
        for name, param in self.lstm.named_parameters():
            if 'weight' in name:
                nn.init.xavier_uniform_(param)
            elif 'bias' in name:
                nn.init.zeros_(param)
        for name, param in self.fc.named_parameters():
            if 'weight' in name:
                nn.init.kaiming_uniform_(param, nonlinearity='relu')
            elif 'bias' in name:
                nn.init.zeros_(param)
        nn.init.xavier_uniform_(self.attn.weight)
        nn.init.zeros_(self.attn.bias)

    def forward(self, x):
        x = x.unsqueeze(1)
        lstm_out, _ = self.lstm(x)

        attn_weights = torch.softmax(self.attn(lstm_out).squeeze(-1), dim=1)
        attn_out = torch.sum(lstm_out * attn_weights.unsqueeze(-1), dim=1)

        output = self.fc(attn_out)
        return output

# Training Loop
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = BiLSTMRegressor(
    input_dim=X.shape[1],
    hidden_dim=best_params["hidden_dim"],
    lstm_layers=best_params["lstm_layers"],
    dropout=best_params["dropout"]
).to(device)

criterion = nn.MSELoss()
optimizer = optim.AdamW(model.parameters(), lr=best_params["learning_rate"],
                        weight_decay=best_params["weight_decay"])

def train_model(model, train_loader, test_loader, epochs=50):
    best_spearman = -1
    best_model_path = "best_model.pth"

    for epoch in range(epochs):
        model.train()
        total_loss = 0

        for X_batch, y_batch in train_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)

            optimizer.zero_grad()
            y_pred = model(X_batch)
            loss = criterion(y_pred, y_batch)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        # Evaluation
        model.eval()
        all_preds, all_targets = [], []
        with torch.no_grad():
            for X_batch, y_batch in test_loader:
                X_batch, y_batch = X_batch.to(device), y_batch.to(device)
                preds = model(X_batch).cpu().numpy().flatten()
                targets = y_batch.cpu().numpy().flatten()
                all_preds.extend(preds)
                all_targets.extend(targets)

        spearman_corr, _ = spearmanr(all_preds, all_targets)

        if spearman_corr > best_spearman:
            best_spearman = spearman_corr
            torch.save(model.state_dict(), best_model_path)
            print(f" Saved new best model at epoch {epoch+1} with Spearman: {spearman_corr:.4f}")

        print(f"Epoch {epoch+1}/{epochs} - Loss: {total_loss/len(train_loader):.4f} - Spearman: {spearman_corr:.4f}")

    print(f" Training complete! Best Spearman: {best_spearman:.4f}")
    print(f" Best model saved as '{best_model_path}'")

train_model(model, train_loader, test_loader, epochs=50)

#Model 4: BiLSTM + Attention + MLP + Pathogenicity data + foldX data as Parameter

Training

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import ast
from torch.utils.data import Dataset, DataLoader
from scipy.stats import spearmanr
import torch.nn.functional as F


# Load Data
embedding_file = "q1q2q3_train_embeddings_commas.csv"
patho_file = "AF-P42898-F1-hg38.csv"
ddg_file = "mutant_ddG.csv"

df = pd.read_csv(embedding_file)
df["esm_embedding"] = df["esm_embedding"].apply(ast.literal_eval)

patho_df = pd.read_csv(patho_file)[["protein_variant", "am_pathogenicity"]].drop_duplicates("protein_variant")
df = df.merge(patho_df, left_on="mutant", right_on="protein_variant", how="left")
df["am_pathogenicity"] = df["am_pathogenicity"].fillna(0.0)

# Attach FoldX ddG values
ddg_df = pd.read_csv(ddg_file)
df = df.merge(ddg_df, on="mutant", how="left")
df["ddG"] = df["ddG"].fillna(0.0)


X_embed = torch.tensor(df["esm_embedding"].tolist(), dtype=torch.float32)
patho_tensor = torch.tensor(df["am_pathogenicity"].values, dtype=torch.float32).unsqueeze(1)
ddg_tensor = torch.tensor(df["ddG"].values, dtype=torch.float32).unsqueeze(1)
y = torch.tensor(df["DMS_score"].values, dtype=torch.float32).view(-1, 1)

X_embed = (X_embed - X_embed.mean(dim=0)) / (X_embed.std(dim=0) + 1e-8)
X = torch.cat([X_embed, patho_tensor, ddg_tensor], dim=1)

class DMSDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

train_size = int(0.8 * len(df))
test_size = len(df) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(DMSDataset(X, y), [train_size, test_size])

params = {
    "hidden_dim": 512,
    "lstm_layers": 3,
    "dropout": 0.3,
    "batch_size": 64,
    "learning_rate": 0.0001,
    "weight_decay": 0.001
}

train_loader = DataLoader(train_dataset, batch_size=params["batch_size"], shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=params["batch_size"], shuffle=False)

# BiLSTM + Attention + MLP
class BiLSTMRegressor(nn.Module):
    def __init__(self, input_dim, hidden_dim, lstm_layers, dropout=0.3):
        super(BiLSTMRegressor, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, lstm_layers,
                            batch_first=True, bidirectional=True, dropout=dropout)
        self.attn = nn.Linear(hidden_dim * 2, 1)
        self.fc = nn.Sequential(
            nn.Linear(hidden_dim * 2, 512),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, 1)
        )
        self._initialize_weights()

    def _initialize_weights(self):
        for name, param in self.lstm.named_parameters():
            if 'weight' in name:
                nn.init.xavier_uniform_(param)
            elif 'bias' in name:
                nn.init.zeros_(param)
        for name, param in self.fc.named_parameters():
            if 'weight' in name:
                nn.init.kaiming_uniform_(param, nonlinearity='relu')
            elif 'bias' in name:
                nn.init.zeros_(param)
        nn.init.xavier_uniform_(self.attn.weight)
        nn.init.zeros_(self.attn.bias)

    def forward(self, x):
        x = x.unsqueeze(1)
        lstm_out, _ = self.lstm(x)
        attn_weights = torch.softmax(self.attn(lstm_out).squeeze(-1), dim=1)
        attn_out = torch.sum(lstm_out * attn_weights.unsqueeze(-1), dim=1)
        return self.fc(attn_out)

# Training Loop
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = BiLSTMRegressor(
    input_dim=X.shape[1],
    hidden_dim=params["hidden_dim"],
    lstm_layers=params["lstm_layers"],
    dropout=params["dropout"]
).to(device)

criterion = nn.MSELoss()
optimizer = optim.AdamW(model.parameters(), lr=params["learning_rate"], weight_decay=params["weight_decay"])

def train_model(model, train_loader, test_loader, epochs=50):
    best_spearman = -1
    best_model_path = "best_model.pth"

    for epoch in range(epochs):
        model.train()
        total_loss = 0

        for X_batch, y_batch in train_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)

            optimizer.zero_grad()
            y_pred = model(X_batch)
            loss = criterion(y_pred, y_batch)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        # Evaluate
        model.eval()
        all_preds, all_targets = [], []
        with torch.no_grad():
            for X_batch, y_batch in test_loader:
                X_batch, y_batch = X_batch.to(device), y_batch.to(device)
                preds = model(X_batch).cpu().numpy().flatten()
                targets = y_batch.cpu().numpy().flatten()
                all_preds.extend(preds)
                all_targets.extend(targets)

        spearman_corr, _ = spearmanr(all_preds, all_targets)

        if spearman_corr > best_spearman:
            best_spearman = spearman_corr
            torch.save(model.state_dict(), best_model_path)
            print(f" Saved new best model at epoch {epoch+1} with Spearman: {spearman_corr:.4f}")

        print(f"Epoch {epoch+1}/{epochs} - Loss: {total_loss/len(train_loader):.4f} - Spearman: {spearman_corr:.4f}")

    print(f" Training complete! Best Spearman: {best_spearman:.4f}")
    print(f" Best model saved as '{best_model_path}'")


train_model(model, train_loader, test_loader, epochs=50)


Testing

In [None]:
import torch
import torch.nn as nn
import pandas as pd
import ast


# BiLSTMRegressor Model
class BiLSTMRegressor(nn.Module):
    def __init__(self, input_dim, hidden_dim, lstm_layers, dropout=0.3):
        super(BiLSTMRegressor, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, lstm_layers,
                            batch_first=True, bidirectional=True, dropout=dropout)
        self.attn = nn.Linear(hidden_dim * 2, 1)
        self.fc = nn.Sequential(
            nn.Linear(hidden_dim * 2, 512),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, 1)
        )
        self._initialize_weights()

    def _initialize_weights(self):
        for name, param in self.lstm.named_parameters():
            if 'weight' in name:
                nn.init.xavier_uniform_(param)
            elif 'bias' in name:
                nn.init.zeros_(param)
        for name, param in self.fc.named_parameters():
            if 'weight' in name:
                nn.init.kaiming_uniform_(param, nonlinearity='relu')
            elif 'bias' in name:
                nn.init.zeros_(param)
        nn.init.xavier_uniform_(self.attn.weight)
        nn.init.zeros_(self.attn.bias)

    def forward(self, x):
        x = x.unsqueeze(1)
        lstm_out, _ = self.lstm(x)
        attn_weights = torch.softmax(self.attn(lstm_out).squeeze(-1), dim=1)
        attn_out = torch.sum(lstm_out * attn_weights.unsqueeze(-1), dim=1)
        return self.fc(attn_out)


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
test_file = "test_embeddings_commas.csv"
train_file = "q1q2q3_train_embeddings_commas.csv"
patho_file = "AF-P42898-F1-hg38.csv"
ddg_file = "mutant_ddG.csv"
model_file = "best_model.pth"
output_csv = "predictions.csv"
top10_txt = "top10_mutations.txt"


train_df = pd.read_csv(train_file)
train_df["esm_embedding"] = train_df["esm_embedding"].apply(ast.literal_eval)
X_train = torch.tensor(train_df["esm_embedding"].tolist(), dtype=torch.float32)
mean, std = X_train.mean(dim=0), X_train.std(dim=0)

test_df = pd.read_csv(test_file)
test_df["esm_embedding"] = test_df["esm_embedding"].apply(ast.literal_eval)

# Attach pathogenicity
patho_df = pd.read_csv(patho_file)[["protein_variant", "am_pathogenicity"]].drop_duplicates("protein_variant")
test_df = test_df.merge(patho_df, left_on="mutant", right_on="protein_variant", how="left")
test_df["am_pathogenicity"] = test_df["am_pathogenicity"].fillna(0.0)

# Attach FoldX ddG values
ddg_df = pd.read_csv(ddg_file)
test_df = test_df.merge(ddg_df, on="mutant", how="left")
test_df["ddG"] = test_df["ddG"].fillna(0.0)

esm_tensor = torch.tensor(test_df["esm_embedding"].tolist(), dtype=torch.float32)
esm_tensor = (esm_tensor - mean) / (std + 1e-8)
patho_tensor = torch.tensor(test_df["am_pathogenicity"].values, dtype=torch.float32).unsqueeze(1)
ddg_tensor = torch.tensor(test_df["ddG"].values, dtype=torch.float32).unsqueeze(1)

X_test = torch.cat([esm_tensor, patho_tensor, ddg_tensor], dim=1)


best_params = {
    "hidden_dim": 512,
    "lstm_layers": 3,
    "dropout": 0.3
}

model = BiLSTMRegressor(
    input_dim=X_test.shape[1],
    hidden_dim=best_params["hidden_dim"],
    lstm_layers=best_params["lstm_layers"],
    dropout=best_params["dropout"]
).to(device)

model.load_state_dict(torch.load(model_file, map_location=device))
model.eval()

with torch.no_grad():
    preds = model(X_test.to(device)).cpu().numpy().flatten()


test_df["DMS_score_predicted"] = preds
test_df[["mutant", "DMS_score_predicted"]].to_csv(output_csv, index=False)
print(f"Saved predictions to {output_csv}")

top10 = test_df.nlargest(10, "DMS_score_predicted")[["mutant", "DMS_score_predicted"]]
with open(top10_txt, "w") as f:
    f.write("Effectiveness of the Top 10 Mutation Predictions (Recall-Based Evaluation)\n")
    f.write(top10.to_string(index=False))

print(f"Saved top 10 mutations to {top10_txt}")


In [None]:
import pandas as pd

pred_df = pd.read_csv("predictions.csv")

pred_df = pred_df.drop_duplicates(subset="mutant", keep="first")

pred_df.to_csv("predictions_deduplicated.csv", index=False)
print("Deduplicated file saved as 'predictions_parameter.csv'")

#Model 5: BiLSTM + Attention + MLP + Pathogenicity data + foldX data as Reinforcement (Reward Shaping)

Training

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import ast
from torch.utils.data import Dataset, DataLoader
from scipy.stats import spearmanr
import torch.nn.functional as F

# Load Data
embedding_file = "q1q2q3_train_embeddings_commas.csv"
patho_file = "AF-P42898-F1-hg38.csv"
ddg_file = "mutant_ddG.csv"

df = pd.read_csv(embedding_file)
df["esm_embedding"] = df["esm_embedding"].apply(ast.literal_eval)

# Attach pathogenicity
patho_df = pd.read_csv(patho_file)[["protein_variant", "am_pathogenicity"]].drop_duplicates("protein_variant")
df = df.merge(patho_df, left_on="mutant", right_on="protein_variant", how="left")
df["am_pathogenicity"] = df["am_pathogenicity"].fillna(0.0)

# Attach FoldX ddG values (used as reward signal)
ddg_df = pd.read_csv(ddg_file)
df = df.merge(ddg_df, on="mutant", how="left")
df["ddG"] = df["ddG"].fillna(0.0)


X_embed = torch.tensor(df["esm_embedding"].tolist(), dtype=torch.float32)
X_embed = (X_embed - X_embed.mean(dim=0)) / (X_embed.std(dim=0) + 1e-8)

patho_tensor = torch.tensor(df["am_pathogenicity"].values, dtype=torch.float32).unsqueeze(1)
X = torch.cat([X_embed, patho_tensor], dim=1)

y = torch.tensor(df["DMS_score"].values, dtype=torch.float32).view(-1, 1)
rewards = torch.tensor(df["ddG"].values, dtype=torch.float32)

def ddg_to_reward(ddg):
    return torch.sigmoid(-ddg)

reward_tensor = ddg_to_reward(rewards)


class DMSDataset(Dataset):
    def __init__(self, X, y, reward):
        self.X = X
        self.y = y
        self.reward = reward

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx], self.reward[idx]

train_size = int(0.8 * len(df))
test_size = len(df) - train_size
dataset = DMSDataset(X, y, reward_tensor)
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64)

# BiLSTMRegressor Model
class BiLSTMRegressor(nn.Module):
    def __init__(self, input_dim, hidden_dim, lstm_layers, dropout=0.3):
        super(BiLSTMRegressor, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, lstm_layers,
                            batch_first=True, bidirectional=True, dropout=dropout)
        self.attn = nn.Linear(hidden_dim * 2, 1)
        self.fc = nn.Sequential(
            nn.Linear(hidden_dim * 2, 512),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, 1)
        )
        self._initialize_weights()

    def _initialize_weights(self):
        for name, param in self.lstm.named_parameters():
            if 'weight' in name:
                nn.init.xavier_uniform_(param)
            elif 'bias' in name:
                nn.init.zeros_(param)
        for name, param in self.fc.named_parameters():
            if 'weight' in name:
                nn.init.kaiming_uniform_(param, nonlinearity='relu')
            elif 'bias' in name:
                nn.init.zeros_(param)
        nn.init.xavier_uniform_(self.attn.weight)
        nn.init.zeros_(self.attn.bias)

    def forward(self, x):
        x = x.unsqueeze(1)
        lstm_out, _ = self.lstm(x)
        attn_weights = torch.softmax(self.attn(lstm_out).squeeze(-1), dim=1)
        attn_out = torch.sum(lstm_out * attn_weights.unsqueeze(-1), dim=1)
        return self.fc(attn_out)

# Training
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = BiLSTMRegressor(input_dim=X.shape[1], hidden_dim=512, lstm_layers=3, dropout=0.3).to(device)
optimizer = optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-3)
base_criterion = nn.MSELoss(reduction='none')

def train_model(model, train_loader, test_loader, epochs=50):
    best_spearman = -1
    best_model_path = "best_model_rl.pth"

    for epoch in range(epochs):
        model.train()
        total_loss = 0

        for X_batch, y_batch, r_batch in train_loader:
            X_batch, y_batch, r_batch = X_batch.to(device), y_batch.to(device), r_batch.to(device)

            optimizer.zero_grad()
            y_pred = model(X_batch)
            base_loss = base_criterion(y_pred, y_batch)


            shaped_loss = (base_loss * (1 - r_batch.unsqueeze(1))).mean()
            shaped_loss.backward()
            optimizer.step()
            total_loss += shaped_loss.item()

        # Evaluation
        model.eval()
        all_preds, all_targets = [], []
        with torch.no_grad():
            for X_batch, y_batch, _ in test_loader:
                X_batch, y_batch = X_batch.to(device), y_batch.to(device)
                preds = model(X_batch).cpu().numpy().flatten()
                targets = y_batch.cpu().numpy().flatten()
                all_preds.extend(preds)
                all_targets.extend(targets)

        spearman_corr, _ = spearmanr(all_preds, all_targets)
        if spearman_corr > best_spearman:
            best_spearman = spearman_corr
            torch.save(model.state_dict(), best_model_path)
            print(f"Epoch {epoch+1}: New best Spearman {spearman_corr:.4f}")

        print(f"Epoch {epoch+1} - RL Loss: {total_loss/len(train_loader):.4f} - Spearman: {spearman_corr:.4f}")

    print(f"Training complete! Best Spearman: {best_spearman:.4f}")


train_model(model, train_loader, test_loader, epochs=50)


Testing

In [None]:
import torch
import torch.nn as nn
import pandas as pd
import ast

# BiLSTMRegressor
class BiLSTMRegressor(nn.Module):
    def __init__(self, input_dim, hidden_dim, lstm_layers, dropout=0.3):
        super(BiLSTMRegressor, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, lstm_layers,
                            batch_first=True, bidirectional=True, dropout=dropout)
        self.attn = nn.Linear(hidden_dim * 2, 1)
        self.fc = nn.Sequential(
            nn.Linear(hidden_dim * 2, 512),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, 1)
        )
        self._initialize_weights()

    def _initialize_weights(self):
        for name, param in self.lstm.named_parameters():
            if 'weight' in name:
                nn.init.xavier_uniform_(param)
            elif 'bias' in name:
                nn.init.zeros_(param)
        for name, param in self.fc.named_parameters():
            if 'weight' in name:
                nn.init.kaiming_uniform_(param, nonlinearity='relu')
            elif 'bias' in name:
                nn.init.zeros_(param)
        nn.init.xavier_uniform_(self.attn.weight)
        nn.init.zeros_(self.attn.bias)

    def forward(self, x):
        x = x.unsqueeze(1)
        lstm_out, _ = self.lstm(x)
        attn_weights = torch.softmax(self.attn(lstm_out).squeeze(-1), dim=1)
        attn_out = torch.sum(lstm_out * attn_weights.unsqueeze(-1), dim=1)
        return self.fc(attn_out)


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
test_file = "test_embeddings_commas.csv"
patho_file = "AF-P42898-F1-hg38.csv"
ddg_file = "mutant_ddG.csv"
train_file = "q1q2q3_train_embeddings_commas.csv"
model_file = "best_model_rl.pth"
output_csv = "predictions_rl.csv"
top10_txt = "top10_mutations_rl.txt"


train_df = pd.read_csv(train_file)
train_df["esm_embedding"] = train_df["esm_embedding"].apply(ast.literal_eval)
X_train = torch.tensor(train_df["esm_embedding"].tolist(), dtype=torch.float32)
mean, std = X_train.mean(dim=0), X_train.std(dim=0)


test_df = pd.read_csv(test_file)
test_df["esm_embedding"] = test_df["esm_embedding"].apply(ast.literal_eval)

# Attach pathogenicity
patho_df = pd.read_csv(patho_file)[["protein_variant", "am_pathogenicity"]].drop_duplicates("protein_variant")
test_df = test_df.merge(patho_df, left_on="mutant", right_on="protein_variant", how="left")
test_df["am_pathogenicity"] = test_df["am_pathogenicity"].fillna(0.0)

# Attach ddG
ddg_df = pd.read_csv(ddg_file)
test_df = test_df.merge(ddg_df, on="mutant", how="left")
test_df["ddG"] = test_df["ddG"].fillna(0.0)

esm_tensor = torch.tensor(test_df["esm_embedding"].tolist(), dtype=torch.float32)
esm_tensor = (esm_tensor - mean) / (std + 1e-8)
patho_tensor = torch.tensor(test_df["am_pathogenicity"].values, dtype=torch.float32).unsqueeze(1)
X_test = torch.cat([esm_tensor, patho_tensor], dim=1)


model = BiLSTMRegressor(
    input_dim=X_test.shape[1],
    hidden_dim=512,
    lstm_layers=3,
    dropout=0.3
).to(device)

model.load_state_dict(torch.load(model_file, map_location=device))
model.eval()

with torch.no_grad():
    preds = model(X_test.to(device)).cpu().numpy().flatten()


test_df["DMS_score_predicted"] = preds
test_df[["mutant", "DMS_score_predicted", "ddG"]].to_csv(output_csv, index=False)
print(f"Saved predictions to {output_csv}")


top10 = test_df.nlargest(10, "DMS_score_predicted")[["mutant", "DMS_score_predicted", "ddG"]]
with open(top10_txt, "w") as f:
    f.write("Top 10 Predicted Mutants (High Functional Score)\n")
    f.write(top10.to_string(index=False))

print(f"Saved top 10 mutations to {top10_txt}")


In [None]:
import pandas as pd

pred_df = pd.read_csv("predictions.csv")

pred_df = pred_df.drop_duplicates(subset="mutant", keep="first")

pred_df.to_csv("predictions_deduplicated.csv", index=False)
print("Deduplicated file saved as 'predictions_reinforcement.csv'")

#Model 6: BiLSTM + Attention + MLP + Pathogenicity data + foldX data as both Parameter & Reinforcement (Reward Shaping)

Training

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import ast
from torch.utils.data import Dataset, DataLoader
from scipy.stats import spearmanr


BATCH_SIZE = 64
LR = 1e-4
WEIGHT_DECAY = 1e-3
EPOCHS = 50
HIDDEN_DIM = 512
LSTM_LAYERS = 3
DROPOUT = 0.3

EMBEDDING_FILE = "q1q2q3_train_embeddings_commas.csv"
PATHO_FILE = "AF-P42898-F1-hg38.csv"
DDG_FILE = "mutant_ddG.csv"
BEST_MODEL_PATH = "best_model_combined.pth"

df = pd.read_csv(EMBEDDING_FILE)
df["esm_embedding"] = df["esm_embedding"].apply(ast.literal_eval)

# Attach pathogenicity
patho_df = pd.read_csv(PATHO_FILE)[["protein_variant", "am_pathogenicity"]].drop_duplicates("protein_variant")
df = df.merge(patho_df, left_on="mutant", right_on="protein_variant", how="left")
df["am_pathogenicity"] = df["am_pathogenicity"].fillna(0.0)

# Attach FoldX ddG
ddg_df = pd.read_csv(DDG_FILE)
df = df.merge(ddg_df, on="mutant", how="left")
df["ddG"] = df["ddG"].fillna(0.0)


# ESM embeddings
X_embed = torch.tensor(df["esm_embedding"].tolist(), dtype=torch.float32)

X_mean, X_std = X_embed.mean(dim=0), X_embed.std(dim=0)
X_embed = (X_embed - X_mean) / (X_std + 1e-8)

patho_tensor = torch.tensor(df["am_pathogenicity"].values, dtype=torch.float32).unsqueeze(1)
ddg_tensor = torch.tensor(df["ddG"].values, dtype=torch.float32).unsqueeze(1)

X = torch.cat([X_embed, patho_tensor, ddg_tensor], dim=1)

y = torch.tensor(df["DMS_score"].values, dtype=torch.float32).view(-1, 1)

def ddg_to_reward(ddg_values):
    return torch.sigmoid(-ddg_values)

reward_tensor = ddg_to_reward(ddg_tensor.squeeze(1))


class DMSDataset(Dataset):
    def __init__(self, X, y, rewards):
        self.X = X
        self.y = y
        self.rewards = rewards

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx], self.rewards[idx]

full_dataset = DMSDataset(X, y, reward_tensor)
train_size = int(0.8 * len(full_dataset))
test_size = len(full_dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(full_dataset, [train_size, test_size])

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

# BiLSTMRegressor
class BiLSTMRegressor(nn.Module):
    def __init__(self, input_dim, hidden_dim, lstm_layers, dropout=0.3):
        super(BiLSTMRegressor, self).__init__()
        self.lstm = nn.LSTM(
            input_dim,
            hidden_dim,
            num_layers=lstm_layers,
            batch_first=True,
            bidirectional=True,
            dropout=dropout
        )
        self.attn = nn.Linear(hidden_dim * 2, 1)
        self.fc = nn.Sequential(
            nn.Linear(hidden_dim * 2, 512),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, 1)
        )
        self._initialize_weights()

    def _initialize_weights(self):
        for name, param in self.lstm.named_parameters():
            if 'weight' in name:
                nn.init.xavier_uniform_(param)
            elif 'bias' in name:
                nn.init.zeros_(param)
        for name, param in self.fc.named_parameters():
            if 'weight' in name:
                nn.init.kaiming_uniform_(param, nonlinearity='relu')
            elif 'bias' in name:
                nn.init.zeros_(param)
        nn.init.xavier_uniform_(self.attn.weight)
        nn.init.zeros_(self.attn.bias)

    def forward(self, x):

        x = x.unsqueeze(1)

        lstm_out, _ = self.lstm(x)
        attn_weights = torch.softmax(self.attn(lstm_out).squeeze(-1), dim=1)
        attn_out = torch.sum(lstm_out * attn_weights.unsqueeze(-1), dim=1)

        return self.fc(attn_out)


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = BiLSTMRegressor(
    input_dim=X.shape[1],
    hidden_dim=HIDDEN_DIM,
    lstm_layers=LSTM_LAYERS,
    dropout=DROPOUT
).to(device)


base_criterion = nn.MSELoss(reduction="none")
optimizer = optim.AdamW(model.parameters(), lr=LR, weight_decay=WEIGHT_DECAY)


# Training Loop (with reward shaping)

def train_model(model, train_loader, test_loader, epochs=EPOCHS):
    best_spearman = -1.0

    for epoch in range(epochs):
        model.train()
        total_loss = 0.0

        for X_batch, y_batch, r_batch in train_loader:
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)
            r_batch = r_batch.to(device)

            optimizer.zero_grad()
            y_pred = model(X_batch).squeeze(1)
            base_loss = base_criterion(y_pred, y_batch.squeeze(1))
            shaped_loss = (base_loss * (1.0 - r_batch)).mean()

            shaped_loss.backward()
            optimizer.step()
            total_loss += shaped_loss.item()

        # Evaluate
        model.eval()
        all_preds = []
        all_targets = []
        with torch.no_grad():
            for X_batch, y_batch, _ in test_loader:
                X_batch = X_batch.to(device)
                y_batch = y_batch.to(device)
                preds = model(X_batch).cpu().numpy().flatten()
                targets = y_batch.cpu().numpy().flatten()
                all_preds.extend(preds)
                all_targets.extend(targets)

        spearman_corr, _ = spearmanr(all_preds, all_targets)

        if spearman_corr > best_spearman:
            best_spearman = spearman_corr
            torch.save(model.state_dict(), BEST_MODEL_PATH)
            print(f"Epoch {epoch+1}: New best Spearman = {spearman_corr:.4f}")

        print(f"Epoch {epoch+1}/{epochs} | RL Loss: {total_loss/len(train_loader):.4f} | Spearman: {spearman_corr:.4f}")

    print(f"Training complete! Best Spearman: {best_spearman:.4f}")

train_model(model, train_loader, test_loader)


Testing

In [None]:
import torch
import torch.nn as nn
import pandas as pd
import ast


# Same BiLSTMRegressor
class BiLSTMRegressor(nn.Module):
    def __init__(self, input_dim, hidden_dim, lstm_layers, dropout=0.3):
        super(BiLSTMRegressor, self).__init__()
        self.lstm = nn.LSTM(
            input_dim,
            hidden_dim,
            num_layers=lstm_layers,
            batch_first=True,
            bidirectional=True,
            dropout=dropout
        )
        self.attn = nn.Linear(hidden_dim * 2, 1)
        self.fc = nn.Sequential(
            nn.Linear(hidden_dim * 2, 512),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, 1)
        )
        self._initialize_weights()

    def _initialize_weights(self):
        for name, param in self.lstm.named_parameters():
            if 'weight' in name:
                nn.init.xavier_uniform_(param)
            elif 'bias' in name:
                nn.init.zeros_(param)
        for name, param in self.fc.named_parameters():
            if 'weight' in name:
                nn.init.kaiming_uniform_(param, nonlinearity='relu')
            elif 'bias' in name:
                nn.init.zeros_(param)
        nn.init.xavier_uniform_(self.attn.weight)
        nn.init.zeros_(self.attn.bias)

    def forward(self, x):
        x = x.unsqueeze(1)
        lstm_out, _ = self.lstm(x)
        attn_weights = torch.softmax(self.attn(lstm_out).squeeze(-1), dim=1)
        attn_out = torch.sum(lstm_out * attn_weights.unsqueeze(-1), dim=1)
        return self.fc(attn_out)

TEST_EMBEDDING_FILE = "test_embeddings_commas.csv"
PATHO_FILE = "AF-P42898-F1-hg38.csv"
DDG_FILE = "mutant_ddG.csv"
TRAIN_EMBEDDING_FILE = "q1q2q3_train_embeddings_commas.csv"
BEST_MODEL_PATH = "best_model_combined.pth"
PREDICTION_CSV = "predictions_combined.csv"
TOP10_TXT = "top10_mutations_combined.txt"

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

import ast
train_df = pd.read_csv(TRAIN_EMBEDDING_FILE)
train_df["esm_embedding"] = train_df["esm_embedding"].apply(ast.literal_eval)
X_train = torch.tensor(train_df["esm_embedding"].tolist(), dtype=torch.float32)
X_mean, X_std = X_train.mean(dim=0), X_train.std(dim=0)

test_df = pd.read_csv(TEST_EMBEDDING_FILE)
test_df["esm_embedding"] = test_df["esm_embedding"].apply(ast.literal_eval)

# Attach pathogenicity
patho_df = pd.read_csv(PATHO_FILE)[["protein_variant", "am_pathogenicity"]].drop_duplicates("protein_variant")
test_df = test_df.merge(patho_df, left_on="mutant", right_on="protein_variant", how="left")
test_df["am_pathogenicity"] = test_df["am_pathogenicity"].fillna(0.0)

# Attach ddG
ddg_df = pd.read_csv(DDG_FILE)
test_df = test_df.merge(ddg_df, on="mutant", how="left")
test_df["ddG"] = test_df["ddG"].fillna(0.0)

esm_test = torch.tensor(test_df["esm_embedding"].tolist(), dtype=torch.float32)
esm_test = (esm_test - X_mean) / (X_std + 1e-8)
patho_test = torch.tensor(test_df["am_pathogenicity"].values, dtype=torch.float32).unsqueeze(1)
ddg_test = torch.tensor(test_df["ddG"].values, dtype=torch.float32).unsqueeze(1)

X_test = torch.cat([esm_test, patho_test, ddg_test], dim=1)


model = BiLSTMRegressor(
    input_dim=X_test.shape[1],
    hidden_dim=512,
    lstm_layers=3,
    dropout=0.3
).to(device)

model.load_state_dict(torch.load(BEST_MODEL_PATH, map_location=device))
model.eval()

with torch.no_grad():
    preds = model(X_test.to(device)).cpu().numpy().flatten()

test_df["DMS_score_predicted"] = preds
test_df[["mutant", "DMS_score_predicted", "ddG"]].to_csv(PREDICTION_CSV, index=False)
print(f"Predictions saved to '{PREDICTION_CSV}'")

test_df.drop_duplicates(subset="mutant", keep="first", inplace=True)

top10 = test_df.nlargest(10, "DMS_score_predicted")[["mutant", "DMS_score_predicted", "ddG"]]
with open(TOP10_TXT, "w") as f:
    f.write("Top 10 Predicted Mutants (Combined Approach)\n\n")
    f.write(top10.to_string(index=False))
print(f"Top 10 mutations saved to '{TOP10_TXT}'")


#Model 7: BiLSTM + Attention + MLP + Active learning while using Pathogenicity data + foldX data

In [None]:
train_file = "q1q2q3_train_embeddings_commas.csv"
test_file  = "test_embeddings_commas.csv"
ddg_file   = "mutant_ddG_ALL.csv"
patho_file = "immunogenicity.csv"

train_df = pd.read_csv(train_file)
# train_df["esm_embedding"] = train_df["esm_embedding"].apply(ast.literal_eval)

ddg_df = pd.read_csv(ddg_file)
def remove_second_char_if_A(s):
    if len(s) > 2 and s[1] == 'A':
        return s[0] + s[2:]
    else:
        return s
ddg_df["mutant"] = ddg_df["mutant"].apply(remove_second_char_if_A)

patho_df = pd.read_csv(patho_file)
patho_df["protein_variant"] = patho_df["protein_variant"].apply(remove_second_char_if_A)

# Merge into train
train_df = train_df.merge(ddg_df, on="mutant", how="left")
train_df = train_df.merge(patho_df, left_on="mutant", right_on="protein_variant", how="left")
train_df["pathogenicity_score"] = train_df["pathogenicity_score"].fillna(0.0)
print(train_df)

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
import pandas as pd
import ast
import numpy as np
from scipy.stats import spearmanr

# BiLSTM Model Definition
class BiLSTMRegressor(nn.Module):
    def __init__(self, input_dim, hidden_dim=256, num_layers=2, dropout=0.2):
        super(BiLSTMRegressor, self).__init__()
        self.lstm = nn.LSTM(
            input_dim,
            hidden_dim,
            num_layers=num_layers,
            batch_first=True,
            bidirectional=True,
            dropout=dropout
        )
        self.attn = nn.Linear(hidden_dim * 2, 1)
        self.fc = nn.Sequential(
            nn.Linear(hidden_dim * 2, 512),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, 1)
        )
        self._init_weights()

    def _init_weights(self):
        for name, param in self.lstm.named_parameters():
            if "weight" in name:
                nn.init.xavier_uniform_(param)
            elif "bias" in name:
                nn.init.zeros_(param)
        for name, param in self.fc.named_parameters():
            if "weight" in name:
                nn.init.kaiming_uniform_(param, nonlinearity="relu")
            elif "bias" in name:
                nn.init.zeros_(param)
        nn.init.xavier_uniform_(self.attn.weight)
        nn.init.zeros_(self.attn.bias)

    def forward(self, x):

        x = x.unsqueeze(1)
        lstm_out, _ = self.lstm(x)
        attn_weights = torch.softmax(self.attn(lstm_out).squeeze(-1), dim=1)
        attn_out = torch.sum(lstm_out * attn_weights.unsqueeze(-1), dim=1)
        return self.fc(attn_out)


class DMSTrainDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

def train_model(model, train_loader, val_loader=None, epochs=20, lr=1e-4, wd=1e-4, device="cpu"):
    criterion = nn.MSELoss()
    optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=wd)
    best_spearman = -1
    model.to(device)

    for epoch in range(1, epochs+1):
        model.train()
        total_loss = 0.0
        for X_batch, y_batch in train_loader:
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)

            optimizer.zero_grad()
            preds = model(X_batch).squeeze(1)
            loss = criterion(preds, y_batch)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        avg_loss = total_loss / len(train_loader)

        # Evaluate
        spearman_corr = 0.0
        if val_loader is not None:
            model.eval()
            all_preds, all_targets = [], []
            with torch.no_grad():
                for X_val, y_val in val_loader:
                    X_val = X_val.to(device)
                    y_val = y_val.to(device)
                    val_preds = model(X_val).cpu().numpy().flatten()
                    val_targets = y_val.cpu().numpy().flatten()
                    all_preds.extend(val_preds)
                    all_targets.extend(val_targets)

            spearman_corr, _ = spearmanr(all_preds, all_targets)
            if spearman_corr > best_spearman:
                best_spearman = spearman_corr

        print(f"[Epoch {epoch}/{epochs}] Loss={avg_loss:.4f}, Spearman={spearman_corr:.4f}")

    print(f"Training complete. Best Spearman on val set: {best_spearman:.4f}")
    return model


In [None]:
# Active Learning
def main_active_learning_loop(num_iterations=5):

    train_file = "q1q2q3_train_embeddings_commas.csv"
    test_file  = "test_embeddings_commas.csv"
    ddg_file   = "mutant_ddG_ALL.csv"
    patho_file = "immunogenicity.csv"


    train_df = pd.read_csv(train_file)
    train_df["esm_embedding"] = train_df["esm_embedding"].apply(ast.literal_eval)
    test_df_full = pd.read_csv(test_file)
    test_df_full["esm_embedding"] = test_df_full["esm_embedding"].apply(ast.literal_eval)

    unlabeled_pool_df = test_df_full.copy()

    ddg_df = pd.read_csv(ddg_file)
    def remove_second_char_if_A(s):
        if len(s) > 2 and s[1] == 'A':
            return s[0] + s[2:]
        else:
            return s
    ddg_df["mutant"] = ddg_df["mutant"].apply(remove_second_char_if_A)

    patho_df = pd.read_csv(patho_file)
    patho_df["protein_variant"] = patho_df["protein_variant"].apply(remove_second_char_if_A)

    # Merge into train
    train_df = train_df.merge(ddg_df, on="mutant", how="left")
    train_df = train_df.merge(patho_df, left_on="mutant", right_on="protein_variant", how="left")
    train_df["pathogenicity_score"] = train_df["pathogenicity_score"].fillna(0.0)

    # Merge into unlabeled pool
    unlabeled_pool_df = unlabeled_pool_df.merge(ddg_df, on="mutant", how="left")
    unlabeled_pool_df = unlabeled_pool_df.merge(patho_df, left_on="mutant", right_on="protein_variant", how="left")
    unlabeled_pool_df["pathogenicity_score"] = unlabeled_pool_df["pathogenicity_score"].fillna(0.0)
    print(train_df)

    X_list, y_list = [], []
    for _, row in train_df.iterrows():
        emb = row["esm_embedding"]
        ddg_val   = row["ddG"] if pd.notna(row["ddG"]) else 0.0
        patho_val = row["pathogenicity_score"] if pd.notna(row["pathogenicity_score"]) else 0.0
        full_input = emb + [ddg_val, patho_val]
        X_list.append(full_input)
        y_list.append(row["DMS_score"])

    X_data = torch.tensor(X_list, dtype=torch.float32)
    y_data = torch.tensor(y_list, dtype=torch.float32)

    X_mean = X_data.mean(dim=0)
    X_std  = X_data.std(dim=0) + 1e-8
    X_data = (X_data - X_mean) / X_std

    full_dataset = DMSTrainDataset(X_data, y_data)
    train_size = int(0.8 * len(full_dataset))
    val_size   = len(full_dataset) - train_size
    train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size])

    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
    val_loader   = DataLoader(val_dataset,   batch_size=64, shuffle=False)

    input_dim = X_data.shape[1]
    model = BiLSTMRegressor(input_dim=input_dim, hidden_dim=256, num_layers=2, dropout=0.2)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


    # Active Learning Loop
    for iteration in range(1, num_iterations+1):
        print(f"\n================== AL Iteration {iteration}/{num_iterations} ==================\n")

        model = train_model(model, train_loader, val_loader, epochs=10, lr=1e-4, wd=1e-4, device=device)

        if len(unlabeled_pool_df) == 0:
            print("No more unlabeled data to pick from.")
            break

        X_unlab_list = []
        for _, row in unlabeled_pool_df.iterrows():
            emb = row["esm_embedding"]
            ddg_val   = row["ddG"] if pd.notna(row["ddG"]) else 0.0
            patho_val = row["pathogenicity_score"] if pd.notna(row["pathogenicity_score"]) else 0.0
            full_input = emb + [ddg_val, patho_val]
            X_unlab_list.append(full_input)

        X_unlab_tensor = torch.tensor(X_unlab_list, dtype=torch.float32)
        X_unlab_tensor = (X_unlab_tensor - X_mean) / X_std

        model.eval()
        preds = []
        with torch.no_grad():
            for i in range(len(X_unlab_tensor)):
                x_i = X_unlab_tensor[i].unsqueeze(0).to(device)
                y_hat = model(x_i).item()
                preds.append(y_hat)

        unlabeled_pool_df["DMS_score_predicted"] = preds


        top200 = unlabeled_pool_df.nlargest(200, "DMS_score_predicted") if len(unlabeled_pool_df) >= 200 else unlabeled_pool_df
        top10  = top200.nsmallest(10, "ddG") if len(top200) >= 10 else top200

        print(f"  => Selected {len(top10)} new mutants for labeling (out of {len(top200)} top).")

        true_labels_for_top10 = np.random.uniform(0.0, 1.0, size=len(top10))
        top10["DMS_score"] = true_labels_for_top10

        train_df = pd.concat([train_df, top10], ignore_index=True)

        top10_mutants = set(top10["mutant"].unique())
        unlabeled_pool_df = unlabeled_pool_df[~unlabeled_pool_df["mutant"].isin(top10_mutants)].reset_index(drop=True)

        X_new_list, y_new_list = [], []
        for _, row in train_df.iterrows():
            emb = row["esm_embedding"]
            if isinstance(emb, str):
                emb = ast.literal_eval(emb)
            ddg_val   = row["ddG"] if pd.notna(row["ddG"]) else 0.0
            patho_val = row["pathogenicity_score"] if pd.notna(row["pathogenicity_score"]) else 0.0
            full_input = emb + [ddg_val, patho_val]
            X_new_list.append(full_input)
            y_new_list.append(row["DMS_score"])

        X_new_tensor = torch.tensor(X_new_list, dtype=torch.float32)
        X_new_tensor = (X_new_tensor - X_mean) / X_std
        y_new_tensor = torch.tensor(y_new_list, dtype=torch.float32)

        new_dataset = DMSTrainDataset(X_new_tensor, y_new_tensor)
        train_loader = DataLoader(new_dataset, batch_size=64, shuffle=True)

    print(f"\nAll {num_iterations} active learning iterations complete!\n")

    model.eval()

    X_test_full_list = []
    for _, row in test_df_full.iterrows():
        emb = row["esm_embedding"]
        ddg_val   = row["ddG"] if "ddG" in row and pd.notna(row["ddG"]) else 0.0
        patho_val = row["pathogenicity_score"] if "pathogenicity_score" in row and pd.notna(row["pathogenicity_score"]) else 0.0
        full_input = emb + [ddg_val, patho_val]
        X_test_full_list.append(full_input)

    X_test_full_tensor = torch.tensor(X_test_full_list, dtype=torch.float32)

    test_df_full_merged = test_df_full.copy()
    test_df_full_merged = test_df_full_merged.merge(ddg_df, on="mutant", how="left", suffixes=("", "_ddg"))
    test_df_full_merged = test_df_full_merged.merge(patho_df, left_on="mutant", right_on="protein_variant", how="left", suffixes=("", "_patho"))
    test_df_full_merged["ddG"] = test_df_full_merged["ddG"].fillna(0.0)
    test_df_full_merged["pathogenicity_score"] = test_df_full_merged["pathogenicity_score"].fillna(0.0)


    X_test_full_list = []
    for _, row in test_df_full_merged.iterrows():
        emb = row["esm_embedding"]
        if isinstance(emb, str):
            emb = ast.literal_eval(emb)
        ddg_val   = row["ddG"] if pd.notna(row["ddG"]) else 0.0
        patho_val = row["pathogenicity_score"] if pd.notna(row["pathogenicity_score"]) else 0.0
        full_input = emb + [ddg_val, patho_val]
        X_test_full_list.append(full_input)

    X_test_full_tensor = torch.tensor(X_test_full_list, dtype=torch.float32)
    X_test_full_tensor = (X_test_full_tensor - X_mean) / X_std

    preds_full = []
    with torch.no_grad():
        for i in range(len(X_test_full_tensor)):
            x_i = X_test_full_tensor[i].unsqueeze(0).to(device)
            y_hat = model(x_i).item()
            preds_full.append(y_hat)

    test_df_full_merged["DMS_score_predicted"] = preds_full

    test_df_full_merged[["mutant", "DMS_score_predicted", "ddG", "pathogenicity_score"]].to_csv(
        "final_test_predictions.csv", index=False)
    print("Saved final test predictions (all test mutants) to 'final_test_predictions.csv'")

    final_top10 = test_df_full_merged.nlargest(10, "DMS_score_predicted")
    with open("final_top10_mutations.txt", "w") as f:
        f.write("Top 10 Mutants by Final Predicted DMS Score (Among Entire Test Set)\n\n")
        f.write(final_top10[["mutant", "DMS_score_predicted", "ddG", "pathogenicity_score"]]
                .to_string(index=False))
    print("Saved final top 10 mutants to 'final_top10_mutations.txt'")

def main():
    main_active_learning_loop(num_iterations=5)

if __name__ == "__main__":
    main()


#Testing correlation between ddG and DMS

In [None]:
import pandas as pd
from scipy.stats import spearmanr, pearsonr

subset_df = train_df.dropna(subset=["ddG", "DMS_score"])

ddg_vals = subset_df["ddG"].values
dms_vals = subset_df["DMS_score"].values

# 2) Spearman correlation
spearman_corr, spearman_p = spearmanr(ddg_vals, dms_vals)
print(f"Spearman correlation = {spearman_corr:.4f}, p-value = {spearman_p:.4e}")

# 3) Pearson correlation
pearson_corr, pearson_p = pearsonr(ddg_vals, dms_vals)
print(f"Pearson correlation = {pearson_corr:.4f}, p-value = {pearson_p:.4e}")
