In [None]:
!pip install torch
!pip install torch_geometric


In [2]:
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch_geometric.nn import HypergraphConv

In [5]:
#UPLOAD THE MATRICES

#Feacture matrix
fecture= pd.read_csv('/content/Feature_matrix.csv', index_col='ensembl_id')
H_target = pd.read_csv('/content/Incidence_matrix.csv',index_col='ensembl_id')

In [8]:
# Convert Dataframe to tensors
fecture_torch = torch.tensor(fecture.values, dtype=torch.float32)
H_target_torch = torch.tensor(H_target.values, dtype=torch.float32)

In [9]:
#Sparce HYperedge Index
hyperedge_index = H_target_torch.nonzero().t()

In [10]:
print(f"Node features shape: {fecture_torch.shape}")
print(f"Incidence matrix shape: {H_target_torch.shape}")
print(f"Hyperedge index shape: {hyperedge_index.shape}")
print("-" * 20)

Node features shape: torch.Size([60615, 193])
Incidence matrix shape: torch.Size([60615, 2778])
Hyperedge index shape: torch.Size([2, 136292])
--------------------


In [11]:
# --- 2. The Hypergraph Autoencoder Model ---

class HypergraphAutoencoder(nn.Module):
    def __init__(self, in_dim, hidden_dim, latent_dim, num_hyperedges):
        super(HypergraphAutoencoder, self).__init__()

        # --- Encoder ---
        # We use two HypergraphConv layers to learn node embeddings (Z_V)
        self.encoder_layer1 = HypergraphConv(in_dim, hidden_dim)
        self.encoder_layer2 = HypergraphConv(hidden_dim, latent_dim)

        # --- Learnable Hyperedge Embeddings (Z_E) ---
        # The encoder only gives us node embeddings. We need to also
        # learn the embeddings for the hyperedges (Z_E) to use
        # in our inner product decoder. We create a learnable
        # nn.Parameter for this.
        self.hyperedge_embeddings = nn.Parameter(
            torch.Tensor(num_hyperedges, latent_dim)
        )
        # Initialize the weights (good practice)
        nn.init.xavier_uniform_(self.hyperedge_embeddings)

        self.relu = nn.ReLU()

    def encode(self, x, hyperedge_index):
        """ Encodes nodes into the latent space. """
        # Layer 1
        z = self.encoder_layer1(x, hyperedge_index)
        z = self.relu(z)
        # Layer 2 (final node embeddings Z_V)
        z_v = self.encoder_layer2(z, hyperedge_index)
        return z_v

    def decode(self, z_v, z_e):
        """
        Decodes the incidence matrix H using a simple inner product.
        This is the simplest decoder: H_hat = Z_V * Z_E^T
        """
        # z_v shape: (NUM_NODES, LATENT_DIM)
        # z_e shape: (NUM_HYPEREDGES, LATENT_DIM)
        # We want to get (NUM_NODES, NUM_HYPEREDGES)
        # So we multiply z_v by the transpose of z_e.
        h_hat = torch.matmul(z_v, z_e.t())

        # We return the 'logits' (raw scores) and let the loss
        # function apply the sigmoid. This is more numerically stable.
        return h_hat

    def forward(self, x, hyperedge_index):
        # 1. Get node embeddings (Z_V)
        z_v = self.encode(x, hyperedge_index)

        # 2. Get hyperedge embeddings (Z_E)
        z_e = self.hyperedge_embeddings

        # 3. Reconstruct the incidence matrix
        reconstructed_h_logits = self.decode(z_v, z_e)

        return reconstructed_h_logits

In [12]:
# Model Hyperparameters
NODE_FEATURE_DIM = fecture_torch.shape[1]
NUM_HYPEREDGES = H_target_torch.shape[1]
LATENT_DIM = 10
HIDDEN_DIM = 10
LEARNING_RATE = 0.01
EPOCHS = 200

In [13]:
# Instantiate the model
model = HypergraphAutoencoder(
    in_dim=NODE_FEATURE_DIM,
    hidden_dim=HIDDEN_DIM,
    latent_dim=LATENT_DIM,
    num_hyperedges=NUM_HYPEREDGES
)

In [14]:
criterion = nn.BCEWithLogitsLoss()

# Optimizer
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

print(f"Starting training for {EPOCHS} epochs...")

Starting training for 200 epochs...


In [15]:
for epoch in range(EPOCHS):
    # Set model to training mode
    model.train()

    # --- Forward Pass ---
    # Get the reconstructed incidence matrix (logits)
    h_hat_logits = model(fecture_torch, hyperedge_index)

    # --- Calculate Loss ---
    # Compare the reconstructed logits to the *original* target H matrix
    loss = criterion(h_hat_logits, H_target_torch)

    # --- Backward Pass and Optimization ---
    # 1. Clear old gradients
    optimizer.zero_grad()
    # 2. Compute gradients
    loss.backward()
    # 3. Update weights
    optimizer.step()

    if (epoch + 1) % 20 == 0:
        print(f"Epoch [{epoch+1}/{EPOCHS}], Loss: {loss.item():.4f}")

Epoch [20/200], Loss: 0.5316
Epoch [40/200], Loss: 0.3576
Epoch [60/200], Loss: 0.1205
Epoch [80/200], Loss: 0.0354
Epoch [100/200], Loss: 0.0177
Epoch [120/200], Loss: 0.0123
Epoch [140/200], Loss: 0.0099
Epoch [160/200], Loss: 0.0085
Epoch [180/200], Loss: 0.0076
Epoch [200/200], Loss: 0.0069


In [16]:
model.eval()
with torch.no_grad():
    # Get the final node embeddings
    learned_node_embeddings = model.encode(fecture_torch, hyperedge_index)

    # Get the final hyperedge embeddings
    learned_hyperedge_embeddings = model.hyperedge_embeddings

    # Get the final reconstructed H (as probabilities)
    reconstructed_h_prob = torch.sigmoid(model(fecture_torch, hyperedge_index))

In [17]:
#save outputs files
torch.save(learned_node_embeddings, 'learned_gene_embeddings.pt')
torch.save(learned_hyperedge_embeddings, 'learned_pathway_embeddings.pt')

# Downstream Analysis - Clustering of Similar Genes to Most Commonly Mutated Genes in Glioma Cancer

In [18]:
# Load the saved embeddings
embeddings = torch.load('/content/learned_gene_embeddings.pt')

# Convert to numpy if it's a tensor
if isinstance(embeddings, torch.Tensor):
    embeddings_np = embeddings.detach().cpu().numpy()
else:
    embeddings_np = embeddings

In [24]:
Genes_embedding = pd.DataFrame(embeddings_np, index=fecture.index)

# Find closest genes to Oncognes

In [19]:
from scipy.spatial.distance import euclidean

In [21]:
#Create list with Oncogenes
List_of_Oncogenes = pd.read_csv('/content/Oncogen_Glioma.csv')['Ensembl_code'].to_list()

In [22]:
def find_closest_genes_euclidean(gene_id, df_embeddings, n_closest=10):
    """
    Memory-efficient function to find closest genes using Euclidean distance
    """
    if gene_id not in df_embeddings.index:
        print(f"Gene {gene_id} not found!")
        return None

    # Get embedding for query gene
    query_embedding = df_embeddings.loc[gene_id].values

    # Calculate distances one at a time (memory efficient)
    distances = []
    genes = []

    for idx, gene in enumerate(df_embeddings.index):
        if gene != gene_id:  # Skip the query gene itself
            dist = euclidean(query_embedding, df_embeddings.iloc[idx].values)
            distances.append(dist)
            genes.append(gene)

    # Create results dataframe
    results_df = pd.DataFrame({
        'gene': genes,
        'euclidean_distance': distances
    })

    # Sort and get top n
    results_df = results_df.sort_values('euclidean_distance').head(n_closest)

    return results_df.reset_index(drop=True)

In [None]:
results = {}

for gene in List_of_Oncogenes:
    closest = find_closest_genes_euclidean(gene, Genes_embedding, n_closest=10)

    if closest is not None:
        results[gene] = closest
        print(f"{'='*70}")
        print(f"10 Closest genes to {gene} (Euclidean distance):")
        print(f"{'='*70}")
        print(closest.to_string(index=False))

# Save results
all_results = []
for gene, closest_genes in results.items():
    closest_genes['query_gene'] = gene
    all_results.append(closest_genes)

final_df = pd.concat(all_results, ignore_index=True)
final_df = final_df[['query_gene', 'gene', 'euclidean_distance']]
final_df.to_csv('/content/all_closest_genes_euclidean.csv', index=False)