In [14]:
import pandas as pd
import numpy as np

from sklearn.model_selection import train_test_split, PredefinedSplit
from sklearn.preprocessing import StandardScaler
from scipy.stats import spearmanr

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

from feature_engineering.feature_utils import create_features_from_raw_df


In [5]:
csv_sequences = "data/GDPa1_v1.2_sequences.csv" 
csv_properties = "data/GDPa1_v1.2_20250814.csv"

Y = "HIC"
batch_size = 64
epochs = 10
lr = 1e-3
random_seed = 42

device = torch.accelerator.current_accelerator().type if torch.accelerator.is_available() else "cpu"
print("Using device:", device)

torch.manual_seed(random_seed)
np.random.seed(random_seed)

Using device: mps


In [None]:
seq_df = pd.read_csv(csv_sequences)
prop_df = pd.read_csv(csv_properties)

# target dataframe
target_df = prop_df[["antibody_id", Y]]

# merge target and sequences
seq_target = seq_df.merge(target_df, on="antibody_id", how="inner")
sequence_features = create_features_from_raw_df(seq_target)
# sequence_features has same index as df_raw and its own antibody_id column.
# concatenate along columns, drop duplicate antibody_id.
df = pd.concat(
    [seq_target.reset_index(drop=True),
     sequence_features.drop(columns=["antibody_id"]).reset_index(drop=True)],
    axis=1,

)

In [7]:
### amino acid vocab, our way of scaling 
AMINO_ACIDS = "ACDEFGHIKLMNPQRSTVWY"
aa_to_idx = {aa:i for i, aa in enumerate(AMINO_ACIDS)}

def one_hot_encode(seq):
    seq = str(seq).strip().upper()
    encoded = []
    for aa in seq:
        vec = [0]*20
        if aa in aa_to_idx:
            vec[aa_to_idx[aa]] = 1
        encoded.append(vec)
    return encoded

In [19]:
engineered_cols = [
    "vl_turn",
    "vh_molecular_weight",
    "vh_protein_sequence_length",
    "vh_aromaticity",
    "vh_helix",
    "vh_instability",
    "vh_molar_extinction_oxidized",
    "vl_aromaticity",
    "vh_sheet",
    "G_vl_protein_sequence",
    "vh_hydrophobic_count",
    "vh_aromatic_count",
    "vh_gravy",
    "Y_vh_protein_sequence",
]

df = df.dropna(subset=[Y]).reset_index(drop=True)
#split train and test
#train_df, test_df = train_test_split(df, test_size=0.2, random_state=random_seed)
test_fold = 0
train_df, test_df = df.loc[df['hierarchical_cluster_IgG_isotype_stratified_fold']!=test_fold], df.loc[df['hierarchical_cluster_IgG_isotype_stratified_fold']==test_fold]

# set everything to the same datatype
train_df[engineered_cols] = train_df[engineered_cols].astype(float)
test_df[engineered_cols] = test_df[engineered_cols].astype(float)

# normalize all the other features
scaler = StandardScaler()
train_df.loc[:, engineered_cols] = scaler.fit_transform(train_df[engineered_cols])
test_df.loc[:, engineered_cols] = scaler.transform(test_df[engineered_cols])


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  train_df[engineered_cols] = train_df[engineered_cols].astype(float)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  test_df[engineered_cols] = test_df[engineered_cols].astype(float)


In [20]:
class AntibodySeqDataset(Dataset):
    def __init__(self, df, feature_cols=None):
        self.vh = df["vh_protein_sequence"].astype(str).tolist()
        self.vl = df["vl_protein_sequence"].astype(str).tolist()
        self.y = df[Y].astype(float).values
        if feature_cols:
            self.features = df[feature_cols].astype(np.float32).values
        else:
            self.features = None

    # get length
    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        vh_seq = torch.tensor(one_hot_encode(self.vh[idx]), dtype=torch.float32)
        vl_seq = torch.tensor(one_hot_encode(self.vl[idx]), dtype=torch.float32)
        target = torch.tensor(self.y[idx], dtype=torch.float32)

        sample = {"vh_seq": vh_seq, "vl_seq": vl_seq, "target": target}

        if self.features is not None:
            sample["features"] = torch.tensor(self.features[idx], dtype=torch.float32)

        return sample


# can't use built in since sequences are not of the same length
def collate_fn(batch):
    vh_seqs = [b["vh_seq"] for b in batch]
    vl_seqs = [b["vl_seq"] for b in batch]

    batch_out = {
        "vh_seq": nn.utils.rnn.pad_sequence(vh_seqs, batch_first=True, padding_value=0.0),
        "vl_seq": nn.utils.rnn.pad_sequence(vl_seqs, batch_first=True, padding_value=0.0),
        "vh_lengths": torch.tensor([len(s) for s in vh_seqs], dtype=torch.long),
        "vl_lengths": torch.tensor([len(s) for s in vl_seqs], dtype=torch.long),
        "target": torch.stack([b["target"] for b in batch]),
    }

    if "features" in batch[0]:
        batch_out["features"] = torch.stack([b["features"] for b in batch])

    return batch_out


train_dataset = AntibodySeqDataset(train_df, feature_cols=engineered_cols)
test_dataset = AntibodySeqDataset(test_df, feature_cols=engineered_cols)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)


In [21]:
class AntibodyLSTMModel(nn.Module):
    def __init__(self,
                 input_size=20,
                 hidden_size=64,
                 num_layers=1,
                 dropout=0.1,
                 engineered_feat_dim=0):
        super().__init__()

        # one lstm for vh
        self.vh_lstm = nn.LSTM(input_size, hidden_size, num_layers=num_layers,
                               batch_first=True,
                               dropout=dropout if num_layers > 1 else 0.0)

        # two lstm for vl
        self.vl_lstm = nn.LSTM(input_size, hidden_size, num_layers=num_layers,
                               batch_first=True,
                               dropout=dropout if num_layers > 1 else 0.0)

        input_dim = hidden_size * 2 + engineered_feat_dim

        self.mlp = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 1),
        )

    def forward(self, vh_seq, vl_seq, vh_lengths, vl_lengths, features=None):

        vh_emb = vh_seq
        vl_emb = vl_seq
        
        # takes padded tensor and lengths, lets LSTM know not to consier PAD tokens
        vh_packed = nn.utils.rnn.pack_padded_sequence(
            vh_emb, vh_lengths.cpu(), batch_first=True, enforce_sorted=False
        )
        vl_packed = nn.utils.rnn.pack_padded_sequence(
            vl_emb, vl_lengths.cpu(), batch_first=True, enforce_sorted=False
        )

        vh_output, (vh_hidden, vh_cell) = self.vh_lstm(vh_packed)
        vl_output, (vl_hidden, vl_cell) = self.vl_lstm(vl_packed)

        # extract final layer
        vh_repr = vh_hidden[-1]
        vl_repr = vl_hidden[-1]

        combined = torch.cat([vh_repr, vl_repr], dim=1)  # (B, 2H)

        if features is not None:
            combined = torch.cat([combined, features], dim=1)  # (B, 2H + Feats)

        # mlp scalar prediction
        out = self.mlp(combined).squeeze(-1)
        return out


model = AntibodyLSTMModel(engineered_feat_dim=len(engineered_cols)).to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

In [22]:
def run_epoch(loader, train=True):
    if train:
        model.train()
    else:
        model.eval()

    total_loss = 0.0
    all_preds, all_targets = [], []

    for batch in loader:
        vh_seq = batch["vh_seq"].to(device)
        vl_seq = batch["vl_seq"].to(device)
        vh_lengths = batch["vh_lengths"].to(device)
        vl_lengths = batch["vl_lengths"].to(device)
        targets = batch["target"].to(device)

        feats = batch.get("features")
        if feats is not None:
            feats = feats.to(device)

        if train:
            optimizer.zero_grad()

        with torch.set_grad_enabled(train):
            preds = model(vh_seq, vl_seq, vh_lengths, vl_lengths, features=feats)
            loss = criterion(preds, targets)

            if train:
                loss.backward()
                optimizer.step()

        total_loss += loss.item() * targets.size(0)
        all_preds.append(preds.detach().cpu().numpy())
        all_targets.append(targets.detach().cpu().numpy())

    avg_loss = total_loss / len(loader.dataset)
    all_preds = np.concatenate(all_preds)
    all_targets = np.concatenate(all_targets)

    if np.std(all_preds) < 1e-8 or np.std(all_targets) < 1e-8:
        spearman = np.nan
    else:
        rho, _ = spearmanr(all_targets, all_preds)
        spearman = float(rho)

    return avg_loss, spearman


for epoch in range(1, epochs + 1):
    train_loss, train_rho = run_epoch(train_loader, train=True)
    val_loss, val_rho = run_epoch(test_loader, train=False)
    print(
        f"Epoch {epoch:02d} : "
        f"train MSE: {train_loss:.3f} | train ρ: {train_rho:.3f} || "
        f"val MSE: {val_loss:.3f} | val ρ: {val_rho:.3f}"
    )

print("Done")

Epoch 01 : train MSE: 7.966 | train ρ: 0.057 || val MSE: 7.263 | val ρ: 0.131
Epoch 02 : train MSE: 7.408 | train ρ: 0.229 || val MSE: 6.735 | val ρ: 0.162
Epoch 03 : train MSE: 6.822 | train ρ: 0.177 || val MSE: 6.097 | val ρ: 0.152
Epoch 04 : train MSE: 6.092 | train ρ: 0.166 || val MSE: 5.198 | val ρ: 0.122
Epoch 05 : train MSE: 5.023 | train ρ: 0.239 || val MSE: 3.779 | val ρ: 0.102
Epoch 06 : train MSE: 3.320 | train ρ: 0.058 || val MSE: 1.511 | val ρ: 0.116
Epoch 07 : train MSE: 0.905 | train ρ: -0.038 || val MSE: 0.567 | val ρ: 0.139
Epoch 08 : train MSE: 0.891 | train ρ: 0.218 || val MSE: 0.731 | val ρ: 0.139
Epoch 09 : train MSE: 0.370 | train ρ: 0.005 || val MSE: 0.078 | val ρ: 0.149
Epoch 10 : train MSE: 0.212 | train ρ: 0.169 || val MSE: 0.258 | val ρ: 0.188
Done
