In [None]:
import torch_geometric
from my_functions import *
from sklearn.model_selection import StratifiedKFold
import torch
from torch_geometric.loader import DataLoader
from sklearn.metrics import precision_score, roc_curve, auc
from torch.optim import Adam
from Classifier import HypergraphNet
from pathlib import Path

print("PyTorch GPU disponibile:", torch.cuda.is_available())
if torch.cuda.is_available():
    print("Torch version:", torch.__version__)
    print("GPU utilizzata:", torch.cuda.get_device_name(0))
    print("Torch-geometric version:", torch_geometric.__version__)

Import dell'ipergrafo costruito per la Network 1: in-silinco

In [None]:
script_dir = Path(__file__).parent
json_filePC = script_dir / "Net1_PC.json"
input_file = script_dir / "Dati" / "training data" / "Network 1 - in silico" / "net1_expression_data.tsv"

expression_data, gene_names = read_expression_data(input_file)

Creazione sotto-ipegrafi

In [None]:
threshold = "0.8"
dataset_positiviPC = create_subgraph_data(json_filePC, expression_data, gene_names, threshold)

print(f'Il numero di sotto-grafi PC positivi è: {len(dataset_positiviPC)}\n')
for x in range(20, 25):
    print(dataset_positiviPC[x])

In [None]:
hyperedges, unique_genes = load_hypergraph(json_filePC, threshold)
dataset_negativiPC = generate_negative_subgraphs(hyperedges, expression_data, gene_names, num_neg_samples=len(dataset_positiviPC))

print(f'Il numero di sotto-grafi negativi è: {len(dataset_negativiPC)}\n')

for x in range(0, 5):
    print(dataset_negativiPC[x])

K-FOLD CROSS VALIDATION

In [None]:
def train_with_early_stopping(model, train_loader, optimizer, criterion, epochs=100, patience=30):
    best_loss = float('inf')
    counter = 0
    
    for epoch in range(epochs):
        model.train()
        total_loss = 0
        
        for data in train_loader:
            optimizer.zero_grad()
            out = model(data.x, data.edge_index, data.batch)
            loss = criterion(out, data.y.float())
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        
        avg_loss = total_loss / len(train_loader)
        print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}")
        
        # Early stopping
        if avg_loss < best_loss:
            best_loss = avg_loss
            counter = 0  # Reset se la loss migliora
        else:
            counter += 1
        
        if counter >= patience:
            print("🔴 stop")
            break

In [None]:
# Parametri della cross-validation
k_folds = 5  # Numero di fold per la K-Fold Cross Validation
batch_size = 32
hidden_channels = 64
out_channels = 1  # binaria
criterion = torch.nn.BCELoss()  # Loss per classificazione binaria

datasetPC = dataset_positiviPC + dataset_negativiPC
# Creazione delle etichette per la stratificazione
labels = np.array([data.y.item() for data in datasetPC])
# Stratified K-Fold
skf = StratifiedKFold(n_splits=k_folds, shuffle=True, random_state=42)

In [None]:
metrics = {"accuracy": [], "precision": [], "auc": []}

for fold, (train_idx, test_idx) in enumerate(skf.split(datasetPC, labels)):
    print(f"Fold {fold+1}/{k_folds}")
    
    # Creazione dei sottoinsiemi
    train_data = [datasetPC[i] for i in train_idx]
    test_data = [datasetPC[i] for i in test_idx]
    
    train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False)
    
    # Modello
    in_channels = train_data[0].x.shape[1]  # Numero di feature per nodo
    model = HypergraphNet(in_channels, hidden_channels, out_channels)
    optimizer = Adam(model.parameters(), lr=0.0005, weight_decay=5e-4)
    
    # Training
    train_with_early_stopping(model, train_loader, optimizer, criterion, epochs=100, patience=30)
    
    # Testing
    model.eval()
    all_preds = []
    all_labels = []
    all_probs = []

    with torch.no_grad():
        for data in test_loader:
            out = model(data.x, data.edge_index, data.batch)
            probs = out.squeeze().cpu().numpy()  # Probabilità previste
            preds = (probs > 0.5).astype(int)  # Predizioni binarie
            
            all_probs.extend(probs)
            all_preds.extend(preds)
            all_labels.extend(data.y.cpu().numpy())

    # metriche
    accuracy = sum([p == l for p, l in zip(all_preds, all_labels)]) / len(all_labels)
    precision = precision_score(all_labels, all_preds)
    fpr, tpr, _ = roc_curve(all_labels, all_probs)
    roc_auc = auc(fpr, tpr)

    print(f"Fold {fold+1} - Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, AUC: {roc_auc:.4f}")

    metrics["accuracy"].append(accuracy)
    metrics["precision"].append(precision)
    metrics["auc"].append(roc_auc)

In [None]:
# risultati finali
print("\n=== Risultati Finali ===")
print(f"Accuracy: {np.mean(metrics['accuracy']):.4f} ± {np.std(metrics['accuracy']):.4f}")
print(f"Precision: {np.mean(metrics['precision']):.4f} ± {np.std(metrics['precision']):.4f}")
print(f"AUC-ROC: {np.mean(metrics['auc']):.4f} ± {np.std(metrics['auc']):.4f}")