#Combinatorics and Machine Learning for Gene Fusion
MGE and MML code for experiments

Prepocessing of the data to train the GCN for fusion graph classification.
Each fingerprints is converted a de Bruijn graph and after convertend in pytorch data

In [None]:
import pandas as pd
import numpy as np
import re
import networkx as nx
import torch
from torch_geometric.data import Data
import matplotlib.pyplot as plt

# Function to read and process the file
def process_file(filename, label):
    with open(filename, 'r') as file:
        data = file.read()
    entries = data.split("', '")
    entries = [entry.strip(" '") for entry in entries]
    df = pd.DataFrame(entries, columns=["Entry"])
    df['Label'] = label
    return df

# Function to extract numeric data from entries
def extract_numbers(df):
    def extract_numeric_part(entry):
        match = re.search(r'\|([\d\s|]+)', entry)
        if match:
            return ' '.join(match.group(1).strip().split('|')).strip()
        return ""
    numeric_data = df['Entry'].apply(extract_numeric_part)
    numeric_data = numeric_data.apply(lambda x: list(map(int, x.split())) if x else [])
    return pd.DataFrame({'Numbers': numeric_data, 'Label': df['Label']})

# Function to generate kmers from a sequence
def get_kmer(sequence, k=4):
    if len(sequence) < k:
        return []  # Return an empty list if the sequence is too short
    kmers = []
    sequence = ''.join(map(str, sequence))  # Convert sequence list to string
    for i in range(len(sequence) - k + 1):
        kmers.append(sequence[i:i + k])
    return kmers

# Function to generate De Bruijn graph edges from kmers
def get_debruijn_edges(kmers):
    edges = set()
    for k1 in kmers:
        for k2 in kmers:
            if k1 != k2 and k1[1:] == k2[:-1]:
                edges.add((k1, k2))
    return edges

# Function to create a NetworkX graph from a sequence
def sequence_to_nx_graph(sequence, k=4):
    kmers = get_kmer(sequence, k)
    if not kmers:
        return nx.DiGraph()  # Return an empty graph if no k-mers were generated
    edges = get_debruijn_edges(kmers)
    G = nx.DiGraph()
    for kmer in kmers:
        G.add_node(kmer, kmer_feature=kmer)
    G.add_edges_from(edges)
    return G

def convert_nx_to_torch_geo(G, label):
    kmer_list = list(G.nodes())
    if not kmer_list:
        return None  # Return None or handle appropriately for graphs with no nodes

    # Assuming each k-mer is a string of numbers, and we convert each character to an integer
    max_kmer_len = max(len(kmer) for kmer in kmer_list) if kmer_list else 0
    x = torch.zeros((len(kmer_list), max_kmer_len), dtype=torch.float)
    for i, kmer in enumerate(kmer_list):
        int_kmer = [int(char) for char in kmer]  # Convert each character to an integer
        x[i, :len(int_kmer)] = torch.tensor(int_kmer, dtype=torch.float)

    # Prepare edge indices
    if len(G.edges()) == 0:
        print(f"Warning: Graph with label {label} has no edges.")
        return None  # Skip graphs with no edges

    kmer_index = {kmer: idx for idx, kmer in enumerate(kmer_list)}
    edge_index = torch.tensor([[kmer_index[u], kmer_index[v]] for u, v in G.edges()], dtype=torch.long).t().contiguous()

    # Prepare label
    y = torch.tensor([label], dtype=torch.long)

    # Create PyTorch Geometric Data object
    data = Data(x=x, edge_index=edge_index, y=y, num_nodes=len(G.nodes()))
    return data



# Pre-process checks to filter out invalid sequences
def filter_valid_rows(df, k):
    valid_rows = []
    for index, row in df.iterrows():
        sequence = row['Numbers']
        if len(sequence) >= k:
            valid_rows.append(row)
        else:
            print(f"Skipping row {index} due to insufficient sequence length.", sequence)
    return pd.DataFrame(valid_rows)


# Load and process data
filename_fuse = '/content/fingerprint_fuse.txt'
filename_no_fuse = '/content/fingerprint_no_fuse.txt'  # Change this to the actual filename
df_no_fuse = process_file(filename_no_fuse, 0)
df_fuse = process_file(filename_fuse, 1)
df_no_fuse = extract_numbers(df_no_fuse)
df_fuse = extract_numbers(df_fuse)

print(df_no_fuse.head())
print(df_fuse.head())


# Filter valid rows based on k-mer size
k = 4
df_no_fuse = filter_valid_rows(df_no_fuse, k)
df_fuse = filter_valid_rows(df_fuse, k)

# Extract numbers and convert each row to a graph, then to a PyTorch Geometric Data object
data_list_no_fuse = []
data_list_fuse = []

for index, row in df_no_fuse.iterrows():
    sequence = row['Numbers']
    graph = sequence_to_nx_graph(sequence, k)
    data_object = convert_nx_to_torch_geo(graph, label=row['Label'])
    if data_object is not None:
        data_list_no_fuse.append(data_object)

for index, row in df_fuse.iterrows():
    sequence = row['Numbers']
    graph = sequence_to_nx_graph(sequence, k)
    data_object = convert_nx_to_torch_geo(graph, label=row['Label'])
    if data_object is not None:
        data_list_fuse.append(data_object)

# Combine the two lists
all_data = data_list_no_fuse + data_list_fuse

# Verify all graphs have nodes and edges
filtered_data_list = [data for data in all_data if data is not None and data.x.size(0) > 0 and data.edge_index.size(1) > 0]
print(f"Number of valid graphs after filtering: {len(filtered_data_list)}")


GCN definition

In [None]:
# Define the GCN model
class GCN(torch.nn.Module):
    def __init__(self, in_channels, hidden_channels, out_channels):
        super(GCN, self).__init__()
        self.conv1 = GCNConv(in_channels, hidden_channels)
        self.conv2 = GCNConv(hidden_channels, hidden_channels)
        self.fc = torch.nn.Linear(hidden_channels, out_channels)

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = self.conv2(x, edge_index)
        x = global_mean_pool(x, batch)  # Aggregates node embeddings into graph embeddings
        x = self.fc(x)
        return F.log_softmax(x, dim=1)

# Set the parameters
in_channels = 4       # Each node has 4 features
hidden_channels = 16  # Number of hidden units
out_channels = 2      # Binary classification (2 classes)

# Instantiate the model
model = GCN(in_channels, hidden_channels, out_channels)


Training and testing the model on a selected number of balanced datasets based on minority class balancing.

The minority class determines how many elements from the majority class need to be randomly selected to create a balanced dataset.

In [None]:
# Install necessary packages
!pip install torch-geometric torch-scatter torch-sparse torch-cluster torch-spline-conv torch
!pip install imbalanced-learn

import torch
from torch_geometric.data import Data, DataLoader
from torch_geometric.nn import GCNConv, GATConv, global_mean_pool
import torch.nn.functional as F
import torch.nn as nn
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
import random
import pandas as pd
import numpy as np

# Separate the minority and dominant class samples
minority_class = [data for data in filtered_data_list if data.y.item() == 0]
dominant_class = [data for data in filtered_data_list if data.y.item() == 1]

# Define the number of balanced datasets you want to create
num_datasets = 5

# Initialize a list to store the balanced datasets
balanced_datasets = []

# Seed for reproducibility
random.seed(42)

# Create multiple balanced datasets
for _ in range(num_datasets):
    # Randomly sample from the dominant class
    sampled_dominant_class = random.sample(dominant_class, len(minority_class))

    # Combine the minority class with the sampled dominant class
    balanced_dataset = minority_class + sampled_dominant_class

    # Shuffle the combined dataset
    random.shuffle(balanced_dataset)

    # Add the balanced dataset to the list
    balanced_datasets.append(balanced_dataset)

class GCN(nn.Module):
    def __init__(self, in_channels, hidden_channels, out_channels):
        super(GCN, self).__init__()
        self.conv1 = GCNConv(in_channels, hidden_channels)
        self.conv2 = GCNConv(hidden_channels, hidden_channels)
        self.fc = nn.Linear(hidden_channels, out_channels)

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = self.conv2(x, edge_index)
        x = global_mean_pool(x, batch)
        x = self.fc(x)
        return F.log_softmax(x, dim=1)

# Instantiate the model
in_channels = 4  # Example input feature size, adjust as necessary
hidden_channels = 16
out_channels = 2  # For binary classification
model = GCN(in_channels=in_channels, hidden_channels=hidden_channels, out_channels=out_channels)

# Define the optimizer and loss criterion
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)  # Adjusted learning rate
criterion = torch.nn.CrossEntropyLoss()

# Define the learning rate scheduler
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=5)

# Early stopping class
class EarlyStopping:
    def __init__(self, patience=5, min_delta=0):
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.best_score = None
        self.early_stop = False

    def __call__(self, val_loss):
        if self.best_score is None:
            self.best_score = val_loss
        elif val_loss > self.best_score + self.min_delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = val_loss
            self.counter = 0

# Initialize performance tracking
train_accuracies = []
val_accuracies = []
test_accuracies = []
roc_aucs = []

# Process each balanced dataset
for i, selected_dataset in enumerate(balanced_datasets):
    print(f"\nProcessing Balanced Dataset {i+1}/{num_datasets}")

    # Perform a train-val-test split on the selected balanced dataset
    train_data, temp_data = train_test_split(
        selected_dataset,
        test_size=0.4,  # 40% goes to temp set which will be split into val and test
        random_state=42,
        stratify=[data.y.item() for data in selected_dataset]
    )

    val_data, test_data = train_test_split(
        temp_data,
        test_size=0.5,  # Split temp set equally for val and test
        random_state=42,
        stratify=[data.y.item() for data in temp_data]
    )

    # Create DataLoaders for training, validation, and testing
    batch_size = 16
    train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False)
    test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False)

    # Training loop with detailed reporting
    def train():
        model.train()
        total_loss = 0
        num_batches = 0
        correct = 0
        total_samples = 0

        for data in train_loader:
            if data.y.size(0) == 0:
                print("Empty batch detected")
                continue  # Skip processing if the batch is empty

            optimizer.zero_grad()
            out = model(data)
            loss = criterion(out, data.y)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            num_batches += 1
            total_samples += data.y.size(0)
            correct += (out.argmax(dim=1) == data.y).sum().item()

        avg_loss = total_loss / num_batches if num_batches > 0 else float('inf')
        accuracy = correct / total_samples if total_samples > 0 else 0
        print(f"Training - Avg Loss: {avg_loss:.4f}, Accuracy: {accuracy:.4f}, Total Batches: {num_batches}, Total Samples: {total_samples}")
        return avg_loss, accuracy

    # Testing loop with ROC AUC calculation
    def test(loader, phase='Testing'):
        model.eval()
        correct = 0
        total = 0
        all_predictions = []
        all_labels = []
        all_probabilities = []

        with torch.no_grad():
            for data in loader:
                if data.y.size(0) == 0:
                    print("Empty batch detected")
                    continue  # Skip processing if the batch is empty

                out = model(data)
                prob = F.softmax(out, dim=1)[:, 1]  # Probability of class 1
                pred = out.argmax(dim=1)

                correct += (pred == data.y).sum().item()
                total += data.y.size(0)

                all_predictions.extend(pred.tolist())
                all_labels.extend(data.y.tolist())
                all_probabilities.extend(prob.tolist())

        accuracy = correct / total if total > 0 else 0
        roc_auc = roc_auc_score(all_labels, all_probabilities) if len(set(all_labels)) > 1 else float('nan')  # Handle single class case
        print(f"{phase} - Total Correct: {correct}, Total Samples: {total}, Accuracy: {accuracy:.4f}, ROC AUC: {roc_auc:.4f}")

        # Detailed prediction report
        class_0_count = all_predictions.count(0)
        class_1_count = all_predictions.count(1)
        print(f"Prediction Distribution - Class 0: {class_0_count}, Class 1: {class_1_count}")

        return accuracy, roc_auc

    # Train and evaluate the model with detailed reporting
    early_stopping = EarlyStopping(patience=5)

    for epoch in range(100):
        print(f"Epoch {epoch+1}/{100}")
        loss, train_acc = train()
        val_acc, _ = test(val_loader, phase='Validation')
        scheduler.step(loss)  # Adjust learning rate based on validation loss
        print(f"Epoch {epoch+1} Summary: Loss: {loss:.4f}, Train Acc: {train_acc:.4f}, Val Acc: {val_acc:.4f}\n")

        # Check early stopping
        early_stopping(val_acc)
        if early_stopping.early_stop:
            print("Early stopping")
            break

    # Evaluate on the test set after training
    test_acc, roc_auc = test(test_loader, phase='Test')

    # Record the performance for each dataset
    train_accuracies.append(train_acc)
    val_accuracies.append(val_acc)
    test_accuracies.append(test_acc)
    roc_aucs.append(roc_auc)

# Calculate and print the mean performance across all datasets
mean_train_accuracy = np.mean(train_accuracies)
mean_val_accuracy = np.mean(val_accuracies)
mean_test_accuracy = np.mean(test_accuracies)
mean_roc_auc = np.nanmean(roc_aucs)  # Use nanmean to ignore NaNs from single class scenarios

print("\nOverall Performance Across All Datasets")
print(f"Mean Training Accuracy: {mean_train_accuracy:.4f}")
print(f"Mean Validation Accuracy: {mean_val_accuracy:.4f}")
print(f"Mean Test Accuracy: {mean_test_accuracy:.4f}")
print(f"Mean ROC AUC: {mean_roc_auc:.4f}")



Training and testing using scattefold techniques on a dataset

In [None]:
import torch
from torch_geometric.loader import DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, f1_score, precision_score, recall_score, confusion_matrix
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv, global_mean_pool

# Define your GCN model
class GCN(nn.Module):
    def __init__(self, in_channels, hidden_channels, out_channels):
        super(GCN, self).__init__()
        self.conv1 = GCNConv(in_channels, hidden_channels)
        self.conv2 = GCNConv(hidden_channels, hidden_channels)
        self.fc = nn.Linear(hidden_channels, out_channels)

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = self.conv2(x, edge_index)
        x = global_mean_pool(x, batch)
        x = self.fc(x)
        return F.log_softmax(x, dim=1)

# Instantiate the model
in_channels = 4  # Adjust based on your dataset
hidden_channels = 16
out_channels = 2  # For binary classification
model = GCN(in_channels=in_channels, hidden_channels=hidden_channels, out_channels=out_channels)

# Define the optimizer and loss criterion
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
criterion = torch.nn.CrossEntropyLoss()

# Split the perfectly balanced dataset into train, validation, and test sets
train_data, temp_data = train_test_split(
    filtered_data_list,
    test_size=0.4,  # 40% goes to temp set which will be split into val and test
    random_state=42,
    stratify=[data.y.item() for data in filtered_data_list]
)

val_data, test_data = train_test_split(
    temp_data,
    test_size=0.5,  # Split temp set equally for val and test
    random_state=42,
    stratify=[data.y.item() for data in temp_data]
)

# Create DataLoaders for training, validation, and testing
batch_size = 32
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False)

# Early stopping class
class EarlyStopping:
    def __init__(self, patience=10, min_delta=0.01):
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.best_score = None
        self.early_stop = False

    def __call__(self, val_score):
        if self.best_score is None:
            self.best_score = val_score
        elif val_score < self.best_score + self.min_delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = val_score
            self.counter = 0

# Training function
def train():
    model.train()
    total_loss = 0
    num_batches = 0
    correct = 0
    total_samples = 0

    for data in train_loader:
        if data.y.size(0) == 0:
            print("Empty batch detected")
            continue

        optimizer.zero_grad()
        out = model(data)
        loss = criterion(out, data.y)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        num_batches += 1
        total_samples += data.y.size(0)
        correct += (out.argmax(dim=1) == data.y).sum().item()

    avg_loss = total_loss / num_batches if num_batches > 0 else float('inf')
    accuracy = correct / total_samples if total_samples > 0 else 0
    print(f"Training - Avg Loss: {avg_loss:.4f}, Accuracy: {accuracy:.4f}, Total Batches: {num_batches}, Total Samples: {total_samples}")
    return avg_loss, accuracy

# Testing function with additional metrics calculation
def test(loader, phase='Testing'):
    model.eval()
    correct = 0
    total = 0
    all_predictions = []
    all_labels = []
    all_probabilities = []

    with torch.no_grad():
        for data in loader:
            if data.y.size(0) == 0:
                print("Empty batch detected")
                continue

            out = model(data)
            prob = F.softmax(out, dim=1)[:, 1]  # Probability of class 1
            pred = out.argmax(dim=1)

            correct += (pred == data.y).sum().item()
            total += data.y.size(0)

            all_predictions.extend(pred.tolist())
            all_labels.extend(data.y.tolist())
            all_probabilities.extend(prob.tolist())

    accuracy = correct / total if total > 0 else 0
    roc_auc = roc_auc_score(all_labels, all_probabilities) if len(set(all_labels)) > 1 else float('nan')
    f1 = f1_score(all_labels, all_predictions, average='binary')
    precision = precision_score(all_labels, all_predictions, average='binary')
    recall = recall_score(all_labels, all_predictions, average='binary')
    cm = confusion_matrix(all_labels, all_predictions)

    print(f"{phase} - Total Correct: {correct}, Total Samples: {total}, Accuracy: {accuracy:.4f}, ROC AUC: {roc_auc:.4f}")
    print(f"{phase} - F1 Score: {f1:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}")
    print(f"{phase} - Confusion Matrix:\n{cm}")

    return accuracy, roc_auc, f1, precision, recall, cm

# Track performances
performance_history = {
    "epoch": [],
    "train_loss": [],
    "train_accuracy": [],
    "val_accuracy": [],
    "val_roc_auc": [],
    "val_f1_score": [],
    "val_precision": [],
    "val_recall": [],
}

# Train and evaluate the model with early stopping
early_stopping = EarlyStopping(patience=10, min_delta=0.01)

for epoch in range(100):
    print(f"Epoch {epoch+1}/{100}")
    loss, train_acc = train()
    val_acc, val_roc_auc, val_f1, val_precision, val_recall, _ = test(val_loader, phase='Validation')
    print(f"Epoch {epoch+1} Summary: Loss: {loss:.4f}, Train Acc: {train_acc:.4f}, Val Acc: {val_acc:.4f}, Val ROC AUC: {val_roc_auc:.4f}\n")

    # Record performance metrics
    performance_history["epoch"].append(epoch + 1)
    performance_history["train_loss"].append(loss)
    performance_history["train_accuracy"].append(train_acc)
    performance_history["val_accuracy"].append(val_acc)
    performance_history["val_roc_auc"].append(val_roc_auc)
    performance_history["val_f1_score"].append(val_f1)
    performance_history["val_precision"].append(val_precision)
    performance_history["val_recall"].append(val_recall)

    # Check early stopping
    early_stopping(val_roc_auc)
    if early_stopping.early_stop:
        print("Early stopping triggered.")
        break

# Evaluate on the test set after training
test_acc, roc_auc, f1, precision, recall, cm = test(test_loader, phase='Test')

# Save the final test performance to the history
performance_history["test_accuracy"] = test_acc
performance_history["test_roc_auc"] = roc_auc
performance_history["test_f1_score"] = f1
performance_history["test_precision"] = precision
performance_history["test_recall"] = recall
performance_history["confusion_matrix"] = cm

# Save performance metrics to a file
with open('./performance_metrics.txt', 'w') as f:
    f.write("Epoch,Train Loss,Train Accuracy,Val Accuracy,Val ROC AUC,Val F1 Score,Val Precision,Val Recall\n")
    for i in range(len(performance_history["epoch"])):
        f.write(f"{performance_history['epoch'][i]},"
                f"{performance_history['train_loss'][i]:.4f},"
                f"{performance_history['train_accuracy'][i]:.4f},"
                f"{performance_history['val_accuracy'][i]:.4f},"
                f"{performance_history['val_roc_auc'][i]:.4f},"
                f"{performance_history['val_f1_score'][i]:.4f},"
                f"{performance_history['val_precision'][i]:.4f},"
                f"{performance_history['val_recall'][i]:.4f}\n")
    f.write("\nFinal Test Performance\n")
    f.write(f"Test Accuracy: {performance_history['test_accuracy']:.4f}\n")
    f.write(f"Test ROC AUC: {performance_history['test_roc_auc']:.4f}\n")
    f.write(f"Test F1 Score: {performance_history['test_f1_score']:.4f}\n")
    f.write(f"Test Precision: {performance_history['test_precision']:.4f}\n")
    f.write(f"Test Recall: {performance_history['test_recall']:.4f}\n")
    f.write(f"Confusion Matrix:\n{performance_history['confusion_matrix']}\n")

print("\nFinal Performance")
print(f"Test Accuracy: {test_acc:.4f}")
print(f"Test ROC AUC: {roc_auc:.4f}")
print(f"Test F1 Score: {f1:.4f}")
print(f"Test Precision: {precision:.4f}")
print(f"Test Recall: {recall:.4f}")
print(f"Confusion Matrix:\n{cm}")


#Hypergraph experiment

Construction of the De Bruij Hypergraph from a pool of fingerprints and convertion to a pytoch Data element.

In [None]:
import pandas as pd
import numpy as np
import re
import networkx as nx
import torch
from torch_geometric.data import Data, DataLoader
import gc  # Import garbage collection module
import matplotlib.pyplot as plt
from collections import defaultdict

# Function to read and process the file
def process_file(filename, label):
    with open(filename, 'r') as file:
        data = file.read()
    entries = data.split("', '")
    entries = [entry.strip(" '") for entry in entries]
    df = pd.DataFrame(entries, columns=["Entry"])
    df['Label'] = label
    return df

# Function to extract numeric data from entries
def extract_numbers(df):
    def extract_numeric_part(entry):
        match = re.search(r'\|([\d\s|]+)', entry)
        if match:
            return ' '.join(match.group(1).strip().split('|')).strip()
        return ""
    numeric_data = df['Entry'].apply(extract_numeric_part)
    numeric_data = numeric_data.apply(lambda x: list(map(int, x.split())) if x else [])
    return pd.DataFrame({'Numbers': numeric_data, 'Label': df['Label']})

# Function to create a De Bruijn hypergraph from fingerprints
def create_de_bruijn_hypergraph(fingerprint, k=4):
    G = nx.Graph()
    nodes = []
    k_fingers = []

    # Generate k-fingers from the fingerprint
    for i in range(len(fingerprint) - k + 1):
        k_finger = tuple(fingerprint[i:i + k])
        if k_finger not in nodes:
            nodes.append(k_finger)
        k_fingers.append(k_finger)

    # Add nodes to the graph
    node_indices = {k_finger: idx for idx, k_finger in enumerate(nodes)}
    for k_finger in nodes:
        G.add_node(node_indices[k_finger], label=k_finger)

    # Form hyperedges based on overlaps
    for i in range(len(k_fingers) - 1):
        if k_fingers[i][1:] == k_fingers[i + 1][:-1]:
            G.add_edge(node_indices[k_fingers[i]], node_indices[k_fingers[i + 1]])

    return G, node_indices

# Convert k-mer tuples to numerical features
def kmer_to_features(kmer, max_kmer_length):
    # Convert k-mer into a fixed-length numerical vector
    return torch.tensor([float(x) for x in kmer], dtype=torch.float)

# Convert NetworkX graph to PyG Data
def convert_hypergraph_to_data(G, nodes, label):
    # Create edge index
    edge_index_list = list(G.edges)

    if len(edge_index_list) == 0:
        print(f"Warning: Hypergraph with label {label} has no edges.")
        edge_index = torch.empty((2, 0), dtype=torch.long)  # Empty edge index
    else:
        edge_index = torch.tensor(edge_index_list).t().contiguous()

    # Create node features from k-mers
    x = torch.stack([kmer_to_features(G.nodes[i]['label'], len(G.nodes[i]['label'])) for i in range(len(nodes))])

    y = torch.tensor([label], dtype=torch.long)
    data = Data(x=x, edge_index=edge_index, y=y, num_nodes=len(G.nodes))
    return data

# Pre-process checks to filter out invalid sequences
def filter_valid_rows(df, k):
    valid_rows = []
    for index, row in df.iterrows():
        sequence = row['Numbers']
        if len(sequence) >= k:
            valid_rows.append(row)
        else:
            print(f"Skipping row {index} due to insufficient sequence length.", sequence)
    return pd.DataFrame(valid_rows)

# Load and process data
filename_fuse = '/content/fingerprint_fuse.txt'
filename_no_fuse = '/content/fingerprint_no_fuse.txt'  # Change this to the actual filename
df_no_fuse = process_file(filename_no_fuse, 0)
df_fuse = process_file(filename_fuse, 1)
df_no_fuse = extract_numbers(df_no_fuse)
df_fuse = extract_numbers(df_fuse)

print(df_no_fuse.head())
print(df_fuse.head())

# Filter valid rows based on k-mer size
k = 4  # Adjusted k according to the example
df_no_fuse = filter_valid_rows(df_no_fuse, k)
df_fuse = filter_valid_rows(df_fuse, k)

# Convert each row to a hypergraph, then to a PyTorch Geometric Data object
data_list_no_fuse = []
data_list_fuse = []

for index, row in df_no_fuse.iterrows():
    fingerprint = row['Numbers']  # Use the sequence directly as fingerprints
    graph, nodes = create_de_bruijn_hypergraph(fingerprint, k)
    data_object = convert_hypergraph_to_data(graph, nodes, label=row['Label'])
    if data_object is not None:
        data_list_no_fuse.append(data_object)

for index, row in df_fuse.iterrows():
    fingerprint = row['Numbers']  # Use the sequence directly as fingerprints
    graph, nodes = create_de_bruijn_hypergraph(fingerprint, k)
    data_object = convert_hypergraph_to_data(graph, nodes, label=row['Label'])
    if data_object is not None:
        data_list_fuse.append(data_object)

# Combine the two lists
all_data = data_list_no_fuse + data_list_fuse

# Verify all graphs have nodes and edges
filtered_data_list = [data for data in all_data if data is not None and data.x.size(0) > 0 and (data.edge_index.size(1) > 0 or data.num_nodes == 1)]
print(f"Number of valid hypergraphs after filtering: {len(filtered_data_list)}")

# Print a summary of the transformed graphs
for i, data in enumerate(filtered_data_list[:5]):  # Print only the first 5 for brevity
    print(f"Graph {i+1}:")
    print(f"  Number of nodes: {data.num_nodes}")
    print(f"  Number of edges: {data.edge_index.size(1)}")
    print(f"  Label: {data.y.item()}")
    print("  Node features (first 5 nodes):")
    print(data.x[:5])
    print("  Edge index (first 5 edges):")
    print(data.edge_index[:, :5])
    print("\n")


Definition, Training, and Testing of a HyperGCN for Classifying the De Bruijn Hypergraph Previously Generated

In this process, we define, train, and test a Hypergraph Convolutional Network (HyperGCN) to classify nodes or hyperedges in a De Bruijn hypergraph that was previously generated.

In [None]:
import pandas as pd
import numpy as np
import re
import networkx as nx
import torch
from torch_geometric.loader import Data, DataLoader
from torch_geometric.nn import HypergraphConv, global_mean_pool
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, roc_auc_score, confusion_matrix
import torch.nn.functional as F

# Split the data into training, validation, and testing sets
train_data, temp_data = train_test_split(
    filtered_data_list, test_size=0.4, random_state=42, stratify=[data.y.item() for data in filtered_data_list]
)
val_data, test_data = train_test_split(
    temp_data, test_size=0.5, random_state=42, stratify=[data.y.item() for data in temp_data]
)


# Define the HGCN Model
class HGCN(torch.nn.Module):
    def __init__(self, in_channels, hidden_channels, out_channels):
        super(HGCN, self).__init__()
        self.conv1 = HypergraphConv(in_channels, hidden_channels)
        self.conv2 = HypergraphConv(hidden_channels, hidden_channels)
        self.fc = torch.nn.Linear(hidden_channels, out_channels)

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch
        x = F.relu(self.conv1(x, edge_index))
        x = self.conv2(x, edge_index)
        x = F.relu(x)
        x = global_mean_pool(x, batch)  # Pool the node embeddings to obtain graph embeddings
        x = self.fc(x)
        return F.log_softmax(x, dim=1)

# Early Stopping Class
class EarlyStopping:
    def __init__(self, patience=10, min_delta=0.01):
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.best_score = None
        self.early_stop = False

    def __call__(self, val_score):
        if self.best_score is None:
            self.best_score = val_score
        elif val_score < self.best_score + self.min_delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = val_score
            self.counter = 0

# Create DataLoaders for training, validation, and testing
batch_size = 32
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False)

# Initialize the model, optimizer, and loss function
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Set in_channels to k to match k-mer size
model = HGCN(in_channels=k, hidden_channels=16, out_channels=2).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01, weight_decay=5e-4)

# Training function
def train():
    model.train()
    total_loss = 0
    for data in train_loader:
        data = data.to(device)
        optimizer.zero_grad()
        out = model(data)
        loss = F.nll_loss(out, data.y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(train_loader)

# Evaluation function
def evaluate(loader):
    model.eval()
    correct = 0
    predictions = []
    labels = []
    probabilities = []
    with torch.no_grad():
        for data in loader:
            data = data.to(device)
            out = model(data)
            prob = F.softmax(out, dim=1)[:, 1]  # Probability of the positive class
            pred = out.argmax(dim=1)
            predictions.extend(pred.cpu().numpy())
            labels.extend(data.y.cpu().numpy())
            probabilities.extend(prob.cpu().numpy())
            correct += pred.eq(data.y).sum().item()
    return correct / len(loader.dataset), predictions, labels, probabilities

# Main training loop with early stopping
num_epochs = 100
early_stopping = EarlyStopping(patience=10, min_delta=0.01)

for epoch in range(1, num_epochs + 1):
    train_loss = train()
    train_acc, _, _, _ = evaluate(train_loader)
    val_acc, _, _, _ = evaluate(val_loader)
    print(f'Epoch: {epoch:03d}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Val Acc: {val_acc:.4f}')

    # Check early stopping condition
    early_stopping(val_acc)
    if early_stopping.early_stop:
        print("Early stopping triggered.")
        break

# Evaluate on the test set
test_acc, predictions, labels, probabilities = evaluate(test_loader)

# Calculate performance metrics
f1 = f1_score(labels, predictions, average='binary')
precision = precision_score(labels, predictions, average='binary')
recall = recall_score(labels, predictions, average='binary')
roc_auc = roc_auc_score(labels, probabilities)
conf_matrix = confusion_matrix(labels, predictions)

# Save and print results
performance_metrics = {
    'Accuracy': test_acc,
    'F1 Score': f1,
    'Precision': precision,
    'Recall': recall,
    'ROC AUC': roc_auc,
    'Confusion Matrix': conf_matrix
}

# Print the results
print("\nFinal Test Performance Metrics:")
for metric, value in performance_metrics.items():
    if metric != 'Confusion Matrix':
        print(f"{metric}: {value:.4f}")
    else:
        print(f"{metric}:\n{value}")


  # Optionally, save the metrics to a file
with open('./performance_metrics_hyper.txt', 'w') as f:
    for metric, value in performance_metrics.items():
        if metric != 'Confusion Matrix':
            f.write(f"{metric}: {value:.4f}\n")
        else:
            f.write(f"{metric}:\n{value}\n")