In [1]:
# Install required packages separately for better debugging
!pip install -q tape_proteins
!pip install -q Bio
!pip install -q ablang2

# Clone the repository
!git clone https://github.com/TAI-Medical-Lab/MVSF-AB.git

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/68.9 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m68.9/68.9 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/3.3 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m3.3/3.3 MB[0m [31m116.5 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.3/3.3 MB[0m [31m57.3 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/139.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.6/139.6 kB[0m [31m10.1 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/297.8 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [2]:
import os
import pandas as pd
import numpy as np
from tqdm.auto import tqdm
from glob import glob
import math
import gc
import ctypes
import random
import re
from tape import ProteinBertModel, TAPETokenizer
from sklearn.model_selection import train_test_split
from Bio import SeqIO
import ablang2
import tensorflow as tf
import torch
tqdm.pandas()
device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [3]:
import torch
import ablang2
from tape import ProteinBertModel, TAPETokenizer

# Reference: https://github.com/TobiasHeOl/AbLang2?tab=readme-ov-file#ablang2-usecases
def initialize_ablang_encoder(device):
    """
    Initializes the AbLang2 model for antibody encoding.

    Parameters:
        device (str): Device to run the model ('cpu' or 'cuda').

    Returns:
        function: A function to compute embeddings for antibody sequences.
    """
    # Load the pre-trained AbLang2 model
    ablang_model = ablang2.pretrained(model_to_use='ablang2-paired', random_init=False, ncpu=1, device=device)

    def encode_antibody_sequences(heavy_chain_seq, light_chain_seq):
        """
        Computes the AbLang2 embedding for a given antibody sequence.

        Parameters:
            heavy_chain_seq (str): Heavy chain sequence of the antibody.
            light_chain_seq (str): Light chain sequence of the antibody.

        Returns:
            numpy.ndarray: Antibody embedding vector.
        """
        # Combine VH and VL sequences with '|' separator
        combined_sequence = [f"{heavy_chain_seq}|{light_chain_seq}"]

        # Tokenize the sequence
        tokenized_sequence = ablang_model.tokenizer(combined_sequence, pad=True, w_extra_tkns=False, device=device)

        # Generate the embedding
        with torch.no_grad():
            embedding = ablang_model.AbRep(tokenized_sequence).last_hidden_states.mean(axis=1).detach().cpu().numpy()[0]

        return embedding

    return encode_antibody_sequences


# Reference: https://github.com/songlab-cal/tape?tab=readme-ov-file#examples
def initialize_tape_encoder(model_path='bert-base', device='cpu'):
    """
    Initializes the TAPE model for protein sequence encoding.

    Parameters:
        model_path (str): Path or name of the pre-trained TAPE model.
        device (str): Device to run the model ('cpu' or 'cuda').

    Returns:
        function: A function to compute protein embeddings.
    """
    # Load the pre-trained TAPE model
    tape_model = ProteinBertModel.from_pretrained(model_path).to(device)
    tape_tokenizer = TAPETokenizer(vocab='unirep')

    def encode_protein_sequence(protein_sequence):
        """
        Computes the TAPE embedding for a given protein sequence.

        Parameters:
            protein_sequence (str): Protein sequence to encode.

        Returns:
            numpy.ndarray: Protein embedding vector.
        """
        # Tokenize the sequence
        token_ids = torch.tensor([tape_tokenizer.encode(protein_sequence)]).to(device)

        # Generate the embedding
        with torch.no_grad():
            embedding_output = tape_model(token_ids)

        return embedding_output[1].detach().cpu().numpy()[0]

    return encode_protein_sequence


In [4]:
def convert_fasta_to_csv(fasta_file, csv_file):
    """
    Converts a FASTA file to a CSV file.

    Parameters:
        fasta_file (str): Path to the input FASTA file.
        csv_file (str): Path to the output CSV file.
    """
    # Parse FASTA and store sequences
    records = []
    for record in SeqIO.parse(fasta_file, "fasta"):
        records.append([record.id, str(record.seq)])  # Extract ID and sequence

    # Convert to DataFrame
    df = pd.DataFrame(records, columns=["ID", "Sequence"])

    # Save as CSV
    df.to_csv(csv_file, index=False)
    print(f"✅ Converted {fasta_file} to {csv_file}")

def generate_unique_id(df):
    """
    Generates a unique ID for each row in the DataFrame.

    Parameters:
        df (pd.DataFrame): Input DataFrame containing light, heavy, and antigen columns.

    Returns:
        pd.Series: Unique IDs for each row.
    """
    return (df.index.astype(str) + "_" + df.light + "_" + df.heavy + "_" + df['antigen'])


In [5]:
# Regular expression pattern to retain only valid amino acid residues
valid_amino_acid_pattern = r'[^ARNDCEQGHILKMFPSTWYV]'

# Convert FASTA files to CSV format
convert_fasta_to_csv("MVSF-AB/datasets/seq_natural.fasta", "MVSF-AB/datasets/seq_natural.csv")
convert_fasta_to_csv("MVSF-AB/datasets/seq.fasta", "MVSF-AB/datasets/seq.csv")

✅ Converted MVSF-AB/datasets/seq_natural.fasta to MVSF-AB/datasets/seq_natural.csv
✅ Converted MVSF-AB/datasets/seq.fasta to MVSF-AB/datasets/seq.csv


In [6]:
# Load datasets into DataFrames
natural_sequence_df = pd.read_csv('MVSF-AB/datasets/seq_natural.csv')  # Natural antibody sequences
antibody_antigen_pairs_df = pd.read_csv('MVSF-AB/datasets/pairs_sabdab.csv')  # Antibody-antigen interaction pairs
benchmark_pairs_df = pd.read_csv('MVSF-AB/datasets/pairs_benchmark.csv')  # Benchmark dataset for testing


In [7]:
tape_emb = initialize_tape_encoder(device = device)
ablang_emb = initialize_ablang_encoder(device = device)


100%|██████████| 567/567 [00:00<00:00, 304932.73B/s]
100%|██████████| 370264230/370264230 [00:38<00:00, 9529486.23B/s] 


Downloading model ...


In [8]:
seq_nat_map = dict(natural_sequence_df.values)
if not os.path.exists('df.parquet'):
    antibody_antigen_pairs_df['VL_seq'] = antibody_antigen_pairs_df['light'].apply(lambda x: re.sub(valid_amino_acid_pattern, '', seq_nat_map[x]))
    antibody_antigen_pairs_df['VH_seq'] = antibody_antigen_pairs_df['heavy'].apply(lambda x: re.sub(valid_amino_acid_pattern, '', seq_nat_map[x]))
    antibody_antigen_pairs_df['Antigen'] = antibody_antigen_pairs_df['antigen'].apply(lambda x: re.sub(valid_amino_acid_pattern, '', seq_nat_map[x]))
    antibody_antigen_pairs_df['id'] = generate_unique_id(antibody_antigen_pairs_df)

    antibody_antigen_pairs_df['ag_emb'] = antibody_antigen_pairs_df['Antigen'].progress_apply(tape_emb)
    antibody_antigen_pairs_df['ab_emb'] = antibody_antigen_pairs_df.progress_apply(lambda x:ablang_emb(x['VH_seq'],x['VL_seq']),axis = 1)

    antibody_antigen_pairs_df.to_parquet("antibody_antigen_pairs_df.parquet",index = False)
else:
    antibody_antigen_pairs_df = pd.read_parquet('antibody_antigen_pairs_df.parquet')
if not os.path.exists('test_df.parquet'):
    benchmark_pairs_df['VL_seq'] = benchmark_pairs_df['light'].apply(lambda x: re.sub(valid_amino_acid_pattern, '', seq_nat_map[x]))
    benchmark_pairs_df['VH_seq'] = benchmark_pairs_df['heavy'].apply(lambda x: re.sub(valid_amino_acid_pattern, '', seq_nat_map[x]))
    benchmark_pairs_df['Antigen'] = benchmark_pairs_df['antigen'].apply(lambda x: re.sub(valid_amino_acid_pattern, '', seq_nat_map[x]))
    benchmark_pairs_df['id'] = generate_unique_id(benchmark_pairs_df)
    benchmark_pairs_df['ag_emb'] = benchmark_pairs_df['Antigen'].progress_apply(tape_emb)
    benchmark_pairs_df['ab_emb'] = benchmark_pairs_df.progress_apply(lambda x:ablang_emb(x['VH_seq'],x['VL_seq']),axis = 1)
    benchmark_pairs_df.to_parquet("benchmark_pairs_df.parquet",index = False)
else:
    benchmark_pairs_df = pd.read_parquet('benchmark_pairs_df.parquet')

train_df,val_df = train_test_split(antibody_antigen_pairs_df,test_size=0.1,random_state=42)
print('Train shape:',train_df.shape)
print('Valid shape:',val_df.shape)
print('Test shape:',benchmark_pairs_df.shape)

  0%|          | 0/578 [00:00<?, ?it/s]

  token_ids = torch.tensor([tape_tokenizer.encode(protein_sequence)]).to(device)


  0%|          | 0/578 [00:00<?, ?it/s]

  0%|          | 0/38 [00:00<?, ?it/s]

  0%|          | 0/38 [00:00<?, ?it/s]

Train shape: (520, 10)
Valid shape: (58, 10)
Test shape: (38, 10)


In [9]:
# Reference: https://github.com/TAI-Medical-Lab/MVSF-AB/blob/98f435bddb51fd15fcee1a49ff8a6b5dadf9aa5b/src/utils.py#L45

def normalize_tensor(train_tensor):
    """
    Normalizes the input tensor to a range of [0, 1].

    Parameters:
        train_tensor (numpy.ndarray): Input tensor to normalize.

    Returns:
        tuple: Normalized tensor, minimum value, and maximum value.
    """
    train_tensor = -train_tensor
    min_val = np.min(train_tensor)
    max_val = np.max(train_tensor)
    print('min,max:', min_val, max_val)
    normalized_tensor = (train_tensor - min_val) / (max_val - min_val)
    return normalized_tensor, min_val, max_val

In [10]:
antibody_antigen_pairs_df['delta_g'].max()

-5.04

In [11]:
train_df['delta_g'], min_val, max_val = normalize_tensor(train_df['delta_g'])
val_df['delta_g'] = ((-val_df['delta_g'] - min_val) / (max_val - min_val)).values.reshape(-1)
benchmark_pairs_df['delta_g'] =  (-benchmark_pairs_df['delta_g'])

min,max: 5.04 16.05654049


In [12]:
train_df['delta_g'].min(),train_df['delta_g'].max(),val_df['delta_g'].min(),val_df['delta_g'].max()

(0.0, 1.0, 0.11410283665194425, 0.9926211481659066)

In [13]:
def create_dataset(df, batch=32, shuffle=False):
    # Convert the DataFrame columns to lists and create a TensorFlow dataset
    ds = tf.data.Dataset.from_tensor_slices(
        (df['ab_emb'].tolist(), df['ag_emb'].tolist(), df['delta_g'].tolist())
    )

    # Map the data (separating features and target)
    ds = ds.map(lambda x, y, z: ((x, y), z))

    # Shuffle the dataset if needed
    if shuffle:
        ds = ds.shuffle(buffer_size=len(df))

    # Apply batching
    if batch:
        ds = ds.batch(batch)

    return ds


In [14]:
train_ds = create_dataset(train_df,batch = 32,shuffle = True)
val_ds = create_dataset(val_df,batch = 32,shuffle = False)
test_ds = create_dataset(benchmark_pairs_df,batch = 32,shuffle = False)

In [15]:
for (x1,x2),y in train_ds:
    break

In [16]:
import keras
from keras.models import Model
from keras.layers import Input, Dense, Dropout, Concatenate
from keras.optimizers import Adam
import tensorflow as tf

class AntibodyAffinityModel(Model):
    def __init__(self, num_layers=3, dropout_rate=0.5):
        super(AntibodyAffinityModel, self).__init__()
        self.num_layers = num_layers
        self.dropout_rate = dropout_rate

        # Define input layers
        self.ab_input = Input(shape=(480,), name="Antibody_Input")
        self.ag_input = Input(shape=(768,), name="Antigen_Input")

        # Build antibody branch with unique names
        x_ab = self.build_dense_layers(self.ab_input, prefix="Ab")

        # Build antigen branch with unique names
        x_ag = self.build_dense_layers(self.ag_input, prefix="Ag")

        # Concatenate branches
        x = Concatenate(name="Concatenation")([x_ab, x_ag])

        # Fully connected layers
        x = Dense(256, activation='relu', name="FC_256")(x)
        x = Dropout(self.dropout_rate, name="Dropout_256")(x)

        x = Dense(128, activation='relu', name="FC_128")(x)
        x = Dropout(self.dropout_rate, name="Dropout_128")(x)

        x = Dense(64, activation='relu', name="FC_64")(x)
        x = Dropout(self.dropout_rate, name="Dropout_64")(x)

        # Output layer
        output = Dense(1, activation='linear', name="Output_Layer")(x)

        # Create model
        self.model = Model(inputs=[self.ab_input, self.ag_input], outputs=output)

    def build_dense_layers(self, input_layer, prefix=""):
        """ Helper function to create multiple dense layers with unique names """
        x = input_layer
        for i in range(self.num_layers):
            x = Dense(64, activation='relu', name=f"{prefix}_Dense_64_Layer_{i+1}")(x)
            x = Dropout(self.dropout_rate, name=f"{prefix}_Dropout_Layer_{i+1}")(x)
        return x

    def compile_and_train(self, train_data, val_data, learning_rate=0.0001, epochs=100):
        """ Compiles and trains the model """
        self.model.compile(loss='mse', metrics=['mae'], optimizer=Adam(learning_rate=learning_rate))
        self.model.fit(train_data, validation_data=val_data, epochs=epochs)

    def predict_affinity(self, test_data):
        """ Makes predictions on test data """
        return self.model.predict(test_data)

# Clearing any previous Keras sessions
keras.backend.clear_session()

# Initialize and train the model
with tf.device('cpu'):
    affinity_model = AntibodyAffinityModel()
    affinity_model.compile_and_train(train_ds, val_ds)

# Making predictions
with tf.device('cpu'):
    predictions = affinity_model.predict_affinity(test_ds)


Epoch 1/100
[1m17/17[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 155ms/step - loss: 0.5874 - mae: 0.5982 - val_loss: 0.2280 - val_mae: 0.4446
Epoch 2/100
[1m17/17[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - loss: 0.4274 - mae: 0.4988 - val_loss: 0.2298 - val_mae: 0.4463
Epoch 3/100
[1m17/17[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - loss: 0.2667 - mae: 0.4080 - val_loss: 0.2245 - val_mae: 0.4409
Epoch 4/100
[1m17/17[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - loss: 0.1987 - mae: 0.3538 - val_loss: 0.2084 - val_mae: 0.4225
Epoch 5/100
[1m17/17[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - loss: 0.1887 - mae: 0.3453 - val_loss: 0.1997 - val_mae: 0.4119
Epoch 6/100
[1m17/17[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - loss: 0.1724 - mae: 0.3379 - val_loss: 0.1995 - val_mae: 0.4119
Epoch 7/100
[1m17/17[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - loss:

In [17]:
from sklearn.metrics import mean_absolute_error,mean_squared_error,r2_score,root_mean_squared_error
from scipy import stats

In [18]:
#ref: https://github.com/TAI-Medical-Lab/MVSF-AB/blob/98f435bddb51fd15fcee1a49ff8a6b5dadf9aa5b/train.py#L166
# inverse transform
y_pred = ((predictions * (max_val - min_val)) + min_val).reshape(-1)
y_true = benchmark_pairs_df['delta_g'].values.reshape(-1)

In [19]:
print('pcc', stats.pearsonr(y_true, y_pred)[1])
print('rmse',root_mean_squared_error(y_true,y_pred))
print('mse',mean_squared_error(y_true,y_pred))
print('mae',mean_absolute_error(y_true,y_pred))


pcc 0.816944268451567
rmse 2.4809605378525856
mse 6.1551651903817906
mae 2.1318666428013855
