In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.feature_selection import SelectKBest, f_regression
import numpy as np
from transformers import EsmModel, EsmTokenizer
from sklearn.random_projection import SparseRandomProjection
from sklearn.preprocessing import StandardScaler

In [None]:
# !pip install scikit-learn
# !pip install torch
# !pip install transformers

In [4]:
# Step 1: Load Dataset
data_path = './cleaned_dataset.csv' 
data = pd.read_csv(data_path)

In [5]:
# ----------------- Step 1: Feature Extraction -----------------
# Protein Embeddings using ESM
tokenizer = EsmTokenizer.from_pretrained("facebook/esm2_t6_8M_UR50D")
model = EsmModel.from_pretrained("facebook/esm2_t6_8M_UR50D")
embedding_dim = model.config.hidden_size
def get_protein_embedding(uniprot_id):
    """Compute embeddings for protein sequences using ESM model."""
    inputs = tokenizer(uniprot_id, return_tensors="pt", add_special_tokens=True)
    with torch.no_grad():
        outputs = model(**inputs)
    # Mean pooling over sequence tokens
    embedding = outputs.last_hidden_state.mean(dim=1).squeeze().numpy()
    return embedding

# Generate embeddings for proteins
protein_embeddings = {}
for uniprot_id in data['UniProt_ID'].unique():
    try:
        protein_embeddings[uniprot_id] = get_protein_embedding(uniprot_id)
    except Exception as e:
        print(f"Error for {uniprot_id}: {e}")

Some weights of EsmModel were not initialized from the model checkpoint at facebook/esm2_t6_8M_UR50D and are newly initialized: ['esm.pooler.dense.bias', 'esm.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [6]:
# ----------------- Step 2: Placeholder Chemical Features -----------------
# Generate random embeddings for chemicals
def generate_random_projections(cids, embedding_dim=embedding_dim):
    """Generate random embeddings using SparseRandomProjection."""
    random_projector = SparseRandomProjection(n_components=embedding_dim, random_state=42)
    cid_indices = {cid: idx for idx, cid in enumerate(cids)}
    random_matrix = np.random.rand(len(cids), embedding_dim)
    random_embeddings = random_projector.fit_transform(random_matrix)
    return {cid: random_embeddings[cid_indices[cid]] for cid in cids}

# Filter out rows where kiba_score is NaN
valid_data = data.dropna(subset=['kiba_score'])
unique_cids = valid_data['pubchem_cid'].dropna().unique()

chemical_embeddings = generate_random_projections(unique_cids)

In [7]:
# Normalize embeddings
protein_embeddings = {k: StandardScaler().fit_transform(v.reshape(1, -1)).flatten() 
                      for k, v in protein_embeddings.items()}
chemical_embeddings = {k: StandardScaler().fit_transform(v.reshape(1, -1)).flatten() 
                       for k, v in chemical_embeddings.items()}

In [8]:
class AttentionBlock(nn.Module):
    def __init__(self, input_dim):
        super(AttentionBlock, self).__init__()
        self.attention = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 1)
        )
        self.softmax = nn.Softmax(dim=1)  # Apply softmax over the sequence dimension

    def forward(self, x):
        # Compute attention weights
        attention_weights = self.softmax(self.attention(x))
        # Apply weights and maintain embedding dimension
        weighted_output = x * attention_weights
        return weighted_output.sum(dim=1)  # Collapse sequence dimension, retain batch size and embedding dim

In [9]:
class ProteinChemicalDataset(Dataset):
    def __init__(self, data, protein_embeddings, chemical_embeddings):
        self.data = data
        self.protein_embeddings = protein_embeddings
        self.chemical_embeddings = chemical_embeddings

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        protein_emb = self.protein_embeddings[row['UniProt_ID']]  # Shape: (embedding_dim,)
        chemical_emb = self.chemical_embeddings[row['pubchem_cid']]  # Shape: (embedding_dim,)
        target = np.log1p(row['kiba_score'])
        weight = 0.5 if row['kiba_score_estimated'] else 1.0

        return (torch.tensor(protein_emb, dtype=torch.float32).unsqueeze(0),  # Add batch dimension
                torch.tensor(chemical_emb, dtype=torch.float32).unsqueeze(0),  # Add batch dimension
                torch.tensor(target, dtype=torch.float32),
                torch.tensor(weight, dtype=torch.float32))

In [10]:
# Split data
train_data, test_data = train_test_split(data, test_size=0.2, random_state=42)
train_dataset = ProteinChemicalDataset(train_data, protein_embeddings, chemical_embeddings)
test_dataset = ProteinChemicalDataset(test_data, protein_embeddings, chemical_embeddings)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32)

In [11]:
# ----------------- Step 5: Deep Learning Model -----------------
class BindingPredictor(nn.Module):
    def __init__(self, embedding_dim):
        super(BindingPredictor, self).__init__()
        self.protein_attention = AttentionBlock(embedding_dim)
        self.chemical_attention = AttentionBlock(embedding_dim)
        self.fc = nn.Sequential(
            nn.Linear(embedding_dim * 2, 128),
            nn.ReLU(),
            nn.Linear(128, 1)#,
            # nn.Sigmoid()
        )

    def forward(self, protein_emb, chemical_emb):
        protein_feat = self.protein_attention(protein_emb)  # Shape: (batch_size, embedding_dim)
        chemical_feat = self.chemical_attention(chemical_emb)  # Shape: (batch_size, embedding_dim)
        combined = torch.cat((protein_feat, chemical_feat), dim=1)  # Concatenate along embedding dimension
        return self.fc(combined)

In [12]:
# Define the Model
class KibaModel(nn.Module):
    def __init__(self, embedding_dim):
        super(KibaModel, self).__init__()
        self.protein_attention = AttentionBlock(embedding_dim)
        self.chemical_attention = AttentionBlock(embedding_dim)
        self.fc = nn.Sequential(
            nn.Linear(embedding_dim * 2, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Dropout(0.4),
            nn.Linear(256, 128),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(128, 64),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.Linear(64, 1)
        )
    def forward(self, protein_emb, chemical_emb):
        protein_feat = self.protein_attention(protein_emb)  # Shape: (batch_size, embedding_dim)
        chemical_feat = self.chemical_attention(chemical_emb)  # Shape: (batch_size, embedding_dim)
        combined = torch.cat((protein_feat, chemical_feat), dim=1)  # Concatenate along embedding dimension
        return self.fc(combined)

In [13]:
# Custom weight initialization
def initialize_weights(m):
    if isinstance(m, nn.Linear):
        nn.init.kaiming_uniform_(m.weight, nonlinearity='relu')  # He initialization for ReLU activation
        if m.bias is not None:
            nn.init.zeros_(m.bias)

In [14]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [15]:
# Initialize model
model = KibaModel(embedding_dim)
model = model.to(device)
# Apply custom weight initialization
model.apply(initialize_weights)
# Loss and optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [16]:
# Gradient clipping function
def clip_gradient(optimizer, grad_clip):
    for group in optimizer.param_groups:
        for param in group['params']:
            if param.grad is not None:
                param.grad.data.clamp_(-grad_clip, grad_clip)

In [17]:
def train_model(model, train_loader, criterion, optimizer, device, grad_clip=None, num_epochs=10):
    model.train()
    for epoch in range(num_epochs):
        total_loss = 0.0
        total_original_loss = 0.0  # For loss in the original scale
        for protein_emb, chemical_emb, target, weights in train_loader:
            protein_emb, chemical_emb, target, weights = (
                protein_emb.to(device),
                chemical_emb.to(device),
                target.to(device),
                weights.to(device),
            )
            optimizer.zero_grad()
            outputs = model(protein_emb, chemical_emb).squeeze()
            
            # Debug predictions
            # print(f"Epoch {epoch+1} - Output Range: Min: {outputs.min().item()}, Max: {outputs.max().item()}")
            
            loss = (criterion(outputs, target) * weights).mean()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)  # Gradient clipping
            optimizer.step()
            total_loss += loss.item()
            
            # Compute loss in original scale
            original_outputs = torch.expm1(outputs)
            original_targets = torch.expm1(target)
            original_loss = torch.mean((original_outputs - original_targets) ** 2).item()
            total_original_loss += original_loss
        
        

        print(f"Epoch {epoch+1}/{num_epochs}, Loss (log-transformed): {total_loss/len(train_loader):.4f}, "
              f"Loss (original scale): {total_original_loss/len(train_loader):.4f}")

In [18]:
# Train and Evaluate
train_model(model, train_loader, criterion, optimizer, device, grad_clip=None, num_epochs=10)

Epoch 1/10, Loss (log-transformed): 5.8052, Loss (original scale): 2156122541339.7639
Epoch 2/10, Loss (log-transformed): 4.8638, Loss (original scale): 2156120931561.1660


KeyboardInterrupt: 

In [None]:
# Step 6: Evaluate the Model
def evaluate_model(model, loader, device, criterion):
    model.eval()
    total_loss = 0.0
    all_targets = []
    all_predictions = []
    
    with torch.no_grad():
        for protein_emb, chemical_emb, target, _ in loader:
            protein_emb, chemical_emb, target = (
                protein_emb.to(device),
                chemical_emb.to(device),
                target.to(device),
            )
            
            predictions = model(protein_emb, chemical_emb)  # Forward pass
            loss = criterion(predictions.squeeze(), target)  # Compute loss in transformed space
            total_loss += loss.item()
            
            # Collect inverse-transformed predictions and original targets for metrics
            predictions = predictions.squeeze().cpu().numpy()
            predictions = np.expm1(predictions)  # Apply inverse transformation
            all_predictions.extend(predictions)
            all_targets.extend(target.cpu().numpy())
    
    # Compute metrics (e.g., MAE, RMSE) on the original scale
    mae = np.mean(np.abs(np.array(all_predictions) - np.array(all_targets)))
    rmse = np.sqrt(np.mean((np.array(all_predictions) - np.array(all_targets)) ** 2))
    
    print(f"Validation Loss (log-transformed space): {total_loss / len(loader):.4f}")
    print(f"MAE (original scale): {mae:.4f}")
    print(f"RMSE (original scale): {rmse:.4f}")

In [None]:
# Save the model
torch.save(model.state_dict(), 'regression_model_w_attention.pth')

In [None]:
evaluate_model(model, test_loader, device)