In [3]:
import torch
import pandas as pd
from torch_geometric.data import Data
from sklearn.model_selection import train_test_split
import torch.nn.functional as F

# Check if GPU is available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')  # Automatically choose CPU or GPU

# Load gene expression matrix (genes as columns, samples as rows)
# Assuming the E. coli expression data file is named 'DREAM5_ecoli_data.tsv'
expression_df = pd.read_csv("DREAM5_ecoli_data.tsv", sep="\t", header=0)
gene_ids = expression_df.columns.tolist()  # Get gene IDs from column names

# Convert expression matrix to node features tensor
node_features = torch.tensor(expression_df.values.T, dtype=torch.float).to(device)
print(f"Node features shape: {node_features.shape}")  # Check shape

# Load mutual information data (edges) with columns: 'Gene1', 'Gene2', 'MI'
# Assuming MI data for E. coli is in 'DREAM5_MI_ecoli_3col.csv'
mi_df = pd.read_csv("MI_ecoli_3col.csv")
mi_threshold = 0  # Set an MI threshold to filter strong interactions
mi_df = mi_df[mi_df['Mutual_Information'] > mi_threshold]

# Convert gene names to indices based on gene_ids list for compatibility with PyTorch Geometric
gene_to_idx = {gene: idx for idx, gene in enumerate(gene_ids)}

# Filter only valid gene pairs based on gene expression matrix columns
mi_df = mi_df[mi_df['Gene1'].isin(gene_to_idx) & mi_df['Gene2'].isin(gene_to_idx)]
mi_df['Gene1_idx'] = mi_df['Gene1'].map(gene_to_idx)
mi_df['Gene2_idx'] = mi_df['Gene2'].map(gene_to_idx)

# Convert to edge index tensor and edge weight tensor
edge_index = torch.tensor(mi_df[['Gene1_idx', 'Gene2_idx']].values.T, dtype=torch.long).to(device)
edge_weight = torch.tensor(mi_df['Mutual_Information'].values, dtype=torch.float).to(device)
print(f"Edge index shape: {edge_index.shape}")  # Check shape

# Load the ground truth interaction matrix (square matrix of 1s and 0s)
# Assuming interaction data for E. coli is in 'interaction_ecoli.csv'
interaction_matrix = pd.read_csv("interaction_ecoli.csv", header=0).values  # Convert to numpy array

# Function to replace any string labels with 0
def handle_non_binary_labels(labels):
    for idx, label in enumerate(labels):
        if isinstance(label, str):  # Check if the label is a string
            print(f"Replacing string label at index {idx}: {label}")  # Print the problematic label
            labels[idx] = 0  # Replace string label with 0
    return labels

# Split edges and weights into train, validation, and test sets, each being 5% of total
num_edges = edge_index.shape[1]
split_size = int(0.05 * num_edges)  # 5% of total edges

# Randomly shuffle and select indices for train, val, and test sets
edge_indices = list(range(num_edges))
train_indices, temp_indices = train_test_split(edge_indices, train_size=split_size, random_state=42)
val_indices, test_indices = train_test_split(temp_indices, train_size=split_size, random_state=42)

# Subset edge_index and edge_weight for each split
edge_index_train = edge_index[:, train_indices]
edge_weight_train = edge_weight[train_indices]
edge_index_val = edge_index[:, val_indices]
edge_weight_val = edge_weight[val_indices]
edge_index_test = edge_index[:, test_indices]
edge_weight_test = edge_weight[test_indices]

# Sanity check for edge splits
print(f"Training edge indices: {edge_index_train.shape}")
print(f"Validation edge indices: {edge_index_val.shape}")
print(f"Test edge indices: {edge_index_test.shape}")

# Match ground truth labels for the corresponding splits
train_labels = []
val_labels = []
test_labels = []

# Split ground truth for each edge subset
for edge in edge_index_train.T:
    i, j = edge.cpu().numpy()
    train_labels.append(interaction_matrix[i, j])  # Append the corresponding label

for edge in edge_index_val.T:
    i, j = edge.cpu().numpy()
    val_labels.append(interaction_matrix[i, j])  # Append the corresponding label

for edge in edge_index_test.T:
    i, j = edge.cpu().numpy()
    test_labels.append(interaction_matrix[i, j])  # Append the corresponding label

# Handle non-binary labels if needed
train_labels = handle_non_binary_labels(train_labels)
val_labels = handle_non_binary_labels(val_labels)
test_labels = handle_non_binary_labels(test_labels)

# Convert lists to tensors
train_labels = torch.tensor(train_labels, dtype=torch.float).to(device)
val_labels = torch.tensor(val_labels, dtype=torch.float).to(device)
test_labels = torch.tensor(test_labels, dtype=torch.float).to(device)

# Prepare PyTorch Geometric Data objects
train_data = Data(x=node_features, edge_index=edge_index_train, edge_attr=edge_weight_train, y=train_labels)
val_data = Data(x=node_features, edge_index=edge_index_val, edge_attr=edge_weight_val, y=val_labels)
test_data = Data(x=node_features, edge_index=edge_index_test, edge_attr=edge_weight_test, y=test_labels)

# Verify that the data is ready for training
print(f"Training data: {train_data}")
print(f"Validation data: {val_data}")
print(f"Test data: {test_data}")

# Print the shape for each dataset to verify correctness
print(f"Training node features shape: {train_data.x.shape}")
print(f"Training edge index shape: {train_data.edge_index.shape}")
print(f"Training edge weight shape: {train_data.edge_attr.shape}")
print(f"Training labels shape: {train_data.y.shape}")

print(f"Validation node features shape: {val_data.x.shape}")
print(f"Validation edge index shape: {val_data.edge_index.shape}")
print(f"Validation edge weight shape: {val_data.edge_attr.shape}")
print(f"Validation labels shape: {val_data.y.shape}")

print(f"Test node features shape: {test_data.x.shape}")
print(f"Test edge index shape: {test_data.edge_index.shape}")
print(f"Test edge weight shape: {test_data.edge_attr.shape}")
print(f"Test labels shape: {test_data.y.shape}")


Node features shape: torch.Size([4511, 805])
Edge index shape: torch.Size([2, 19388736])
Training edge indices: torch.Size([2, 969436])
Validation edge indices: torch.Size([2, 969436])
Test edge indices: torch.Size([2, 17449864])
Replacing string label at index 3448: G561
Replacing string label at index 5191: G3633
Replacing string label at index 12936: G2039
Replacing string label at index 13365: G477
Replacing string label at index 18346: G362
Replacing string label at index 20135: G153
Replacing string label at index 25069: G3638
Replacing string label at index 31523: G969
Replacing string label at index 31957: G2088
Replacing string label at index 34882: G1942
Replacing string label at index 35802: G1286
Replacing string label at index 37929: G4328
Replacing string label at index 58849: G2845
Replacing string label at index 60574: G2452
Replacing string label at index 61243: G586
Replacing string label at index 65901: G2432
Replacing string label at index 74808: G2644
Replacing str

In [None]:
from torch_geometric.data import DataLoader
# Prepare the DataLoader for PyTorch Geometric Data objects (for train, validation, test)
train_loader = DataLoader([train_data], batch_size=1, shuffle=True)
val_loader = DataLoader([val_data], batch_size=1, shuffle=False)
test_loader = DataLoader([test_data], batch_size=1, shuffle=False)

In [None]:
from torch_geometric.nn import GCNConv, GATConv, global_mean_pool, GraphConv
import torch
import torch.nn as nn
import torch.optim as optim
from torch_geometric.data import DataLoader
import matplotlib.pyplot as plt

# Define the Enhanced Graph Autoencoder with regularization and attention-based enhancements
class EnhancedGraphAutoencoder(nn.Module):
    def __init__(self, input_dim, hidden_dim):
        super(EnhancedGraphAutoencoder, self).__init__()

        # Encoder: Combination of GCN and GAT layers with skip connections
        self.encoder_gcn1 = GCNConv(input_dim, hidden_dim * 2, add_self_loops=True, normalize=True)
        self.encoder_gat = GATConv(hidden_dim * 2, hidden_dim, heads=4, concat=False, dropout=0.3)
        self.encoder_gat2 = GATConv(hidden_dim, hidden_dim, heads=4, concat=False, dropout=0.3)

        # Latent space: A non-linear transformation of the combined embeddings
        self.latent_transform = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim * 2),
            nn.ReLU(),
            nn.Linear(hidden_dim * 2, hidden_dim)
        )

        # Decoder: Multi-layer perceptron with ReLU activations and dropout
        self.decoder = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim * 2),
            nn.ReLU(),
            nn.Dropout(0.5),  # Apply dropout
            nn.Linear(hidden_dim * 2, input_dim)
        )

        # Residual connection (identity mapping)
        self.residual = nn.Linear(input_dim, input_dim)

        # Regularization (Laplacian smoothing penalty)
        self.laplacian_penalty = 1e-3

    def forward(self, data):
        x, edge_index = data.x, data.edge_index

        # Propagation and transformation in the encoder
        x_gcn = self.encoder_gcn1(x, edge_index)
        x_gcn = torch.relu(x_gcn)
        x_gat = self.encoder_gat(x_gcn, edge_index)
        x_gat2 = self.encoder_gat2(x_gat, edge_index)

        # Latent space transformation
        latent = self.latent_transform(x_gat2)

        # Decoding and residual mapping
        decoded = self.decoder(latent)
        decoded_residual = self.residual(data.x)

        return decoded + decoded_residual, latent  # Return decoded matrix and latent embeddings

    def compute_laplacian_loss(self, x, edge_index):
        # Compute smoothness penalty using Laplacian
        row, col = edge_index
        diff = x[row] - x[col]
        laplacian_loss = self.laplacian_penalty * torch.sum(torch.norm(diff, dim=1) ** 2)
        return laplacian_loss

# Initialize the model, optimizer, and loss function
input_dim = node_features.shape[1]  # Number of input features (genes)
hidden_dim = 256  # Latent space dimension

model = EnhancedGraphAutoencoder(input_dim=input_dim, hidden_dim=hidden_dim).to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)
criterion = nn.MSELoss()

# Training loop with Laplacian loss and advanced regularization
def train_autoencoder(train_loader, val_loader, model, optimizer, criterion, num_epochs=200):
    train_losses = []
    val_losses = []
    
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for data in train_loader:
            data = data.to(device)
            optimizer.zero_grad()
            
            # Forward pass
            output, embeddings = model(data)
            loss = criterion(output, data.x)  # Reconstruction loss
            
            # Add Laplacian regularization
            laplacian_loss = model.compute_laplacian_loss(embeddings, data.edge_index)
            total_loss = loss + laplacian_loss
            
            total_loss.backward()
            optimizer.step()
            running_loss += total_loss.item()

        # Log training loss
        train_losses.append(running_loss / len(train_loader))
        
        # Validation loss
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for data in val_loader:
                data = data.to(device)
                output, _ = model(data)
                loss = criterion(output, data.x)
                val_loss += loss.item()
        val_losses.append(val_loss / len(val_loader))
        
        print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {train_losses[-1]:.4f}, Val Loss: {val_losses[-1]:.4f}')
    
    return train_losses, val_losses

# Train the enhanced model
train_losses, val_losses = train_autoencoder(train_loader, val_loader, model, optimizer, criterion)

# Plot Training and Validation Loss
plt.figure()
plt.plot(range(1, len(train_losses) + 1), train_losses, label='Training Loss')
plt.plot(range(1, len(val_losses) + 1), val_losses, label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()
plt.show()


In [None]:
from sklearn.metrics import roc_auc_score, average_precision_score, matthews_corrcoef
import numpy as np
import torch

# Function to calculate AUPR, AUROC, and MCC
def calculate_metrics(val_data, model, device):
    model.eval()
    
    y_true = []  # Ground truth labels
    y_pred = []  # Predicted labels
    
    with torch.no_grad():
        for data in val_data:
            data = data.to(device)
            
            # Forward pass
            output, _ = model(data)
            
            # Extract ground truth labels for this batch
            ground_truth = data.y.cpu().numpy().flatten()  # Flatten if needed (for multi-dimensional labels)
            
            # Get predictions (assuming output is a continuous value for each sample)
            predictions = output.cpu().numpy().flatten()  # Flatten to match label shape
            
            y_true.extend(ground_truth)
            y_pred.extend(predictions)
    
    # Convert lists to numpy arrays
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)
    
    # Ensure predictions and ground truth have the same number of samples
    assert len(y_true) == len(y_pred), f"Mismatch in number of samples: {len(y_true)} vs {len(y_pred)}"
    
    # Compute AUROC (Area Under the Receiver Operating Characteristic Curve)
    auroc = roc_auc_score(y_true, y_pred)
    
    # Compute AUPR (Area Under the Precision-Recall Curve)
    aupr = average_precision_score(y_true, y_pred)
    
    # Compute MCC (Matthews Correlation Coefficient)
    # Threshold predictions at 0.5 to convert to binary (if binary classification)
    mcc = matthews_corrcoef(y_true, (y_pred > 0.5).astype(int))  # Apply threshold for classification
    
    return auroc, aupr, mcc

# Example usage: Calculate metrics for validation data
auroc, aupr, mcc = calculate_metrics(val_loader, model, device)

print(f"AUROC: {auroc:.4f}")
print(f"AUPR: {aupr:.4f}")
print(f"MCC: {mcc:.4f}")
