In [None]:
import torch

from torch import nn
from torch.utils.data import Dataset
import numpy as np

import matplotlib.pyplot as plt
from sklearn.preprocessing import OneHotEncoder
from joblib import Parallel, delayed

import pandas as pd

import os

from scenarios.abstract_scenario import AbstractScenario
from methods.toy_model_selection_method import ToyModelSelectionMethod # change input dimension! (f model(s) for X, g model for Y)

In [None]:
# Load dataset in the same way as HSIC-X
dataset_rpe1 = pd.read_csv("R/sec6.2/dataset_rpe1.csv")

# Select relevant columns (same as HSIC-X)
interv_genes = dataset_rpe1.columns[:9].tolist()
train_data = dataset_rpe1[dataset_rpe1['interventions'].isin(interv_genes + ["non-targeting"])].copy()

# Convert intervention column to categorical
train_data['interventions'] = train_data['interventions'].astype('category')
train_data['Ztr'] = train_data.iloc[:, 10].astype('category')

# Get list of unique interventions (excluding "non-targeting")
unique_interventions = [g for g in train_data['interventions'].unique() if g != "non-targeting"]

# Define test data (same as HSIC-X)
test_data_path = 'R/sec6.2/test_single_cell.csv'
test_data = torch.tensor(np.genfromtxt(test_data_path, delimiter=',', skip_header=1), dtype=torch.float32)
Xtest = test_data[:, 0:9].reshape(-1, 9)

In [None]:
# Convert `Ztr` to One-Hot Encoding AFTER removing one environment
encoder = OneHotEncoder(sparse_output=False)  # Drop first category for consistency
Ztr_encoded = encoder.fit_transform(train_data[['Ztr']])

# Extract features and target
Xtr = train_data.iloc[:, :9].values  # First 9 columns
Ytr = train_data.iloc[:, 9].values   # 10th column

# Convert to PyTorch tensors
X_train = torch.tensor(Xtr, dtype=torch.float32)
Y_train = torch.tensor(Ytr.reshape(-1, 1), dtype=torch.float32)
Z_train = torch.tensor(Ztr_encoded, dtype=torch.float32)

In [None]:
# Define PyTorch Dataset
class MyDataset(Dataset):
    def __init__(self, X, Z, Y):
        self.X = X
        self.Z = Z
        self.Y = Y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.Z[idx], self.Y[idx]

# Create dataset and split into training/validation sets
dataset = MyDataset(X_train, Z_train, Y_train)
train_ratio = 0.9
train_size = int(train_ratio * len(dataset))
val_size = len(dataset) - train_size

train_data, val_data = torch.utils.data.random_split(dataset, [train_size, val_size])

# Extract separate tensors for training and validation
X_train, Z_train, Y_train = train_data.dataset.X[:train_size], train_data.dataset.Z[:train_size], train_data.dataset.Y[:train_size]
X_val, Z_val, Y_val = val_data.dataset.X[train_size:], val_data.dataset.Z[train_size:], val_data.dataset.Y[train_size:]

In [None]:
# Function to train and predict DeepGMM for a single run
def tr_deepGMM(run_id):
    print(f"Starting DeepGMM Run {run_id + 1}/10...")

    # Initialize and train DeepGMM model
    deepGMM = ToyModelSelectionMethod()
    deepGMM.fit(X_train.double(), Z_train.double(), Y_train.double(), 
                X_val.double(), Z_val.double(), Y_val.double(), 
                g_dev=None, verbose=True)

    # Predict on test data
    y_hat_deepGMM = deepGMM.predict(Xtest.double()).flatten().detach().numpy()
    return y_hat_deepGMM

In [None]:
num_repeats = 10
results = Parallel(n_jobs=num_repeats)(
    delayed(tr_deepGMM)(i) for i in range(num_repeats)
)

In [None]:
df_results = pd.DataFrame(results).T  # Transpose to have runs as columns
df_results.columns = [f'Run_{i+1}' for i in range(num_repeats)]
output_filename = 'results/deepgmm_singlecell_10runs.csv'
df_results.to_csv(output_filename, index=False)