## Imports

In [11]:
import ast
import numpy as np
import os
import pandas as pd
import pickle
import warnings

import matplotlib
matplotlib.use('Agg')  # Use the Agg backend which does not require a windowing system
import matplotlib.pyplot as plt

import wandb

from catboost import CatBoostClassifier
from imblearn.over_sampling import SMOTE, RandomOverSampler
from lightgbm import LGBMClassifier
from pyHSICLasso import HSICLasso
from scipy.signal import find_peaks, savgol_filter
from scipy.sparse import diags, eye
from scipy.sparse.linalg import spsolve
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier, GradientBoostingClassifier
from sklearn.feature_selection import SelectKBest, chi2, SelectFromModel
from sklearn.linear_model import LinearRegression, LogisticRegression, Lasso, ElasticNet
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, balanced_accuracy_score, confusion_matrix, ConfusionMatrixDisplay, make_scorer
from sklearn.model_selection import train_test_split, StratifiedKFold, cross_validate
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from xgboost import XGBClassifier

# Ignore all warnings
warnings.filterwarnings('ignore')

## Preprocessing

In [12]:
def load_data(filepath):
    data = pd.read_csv(filepath)
    return data

def preprocess_dataset(data, test_size=0.2):
    data = data.drop(columns=['Unnamed: 0'])
    data_rs = data
    data_rs = pd.get_dummies(data_rs, columns=['histological.type'])
    X = data_rs.drop(columns=['vital.status'])
    X_train, X_test, y_train, y_test = train_test_split(X, data_rs['vital.status'], test_size=0.2, random_state=42)
    return X_train, X_test, y_train, y_test

def load_and_preprocess_data(filepath):
    data = load_data(filepath)
    return preprocess_dataset(data)


filepath = 'brca_data_w_subtypes.csv'
X_train, X_test, y_train, y_test = load_and_preprocess_data(filepath)

## Class balance and Feature extraction

In [13]:
class HSIC_Lasso_custom():

    def __init__(self, num_features=200):
        self.num_features = num_features
        self.hsic_lasso = None
        self.selected_features = None
    
    def fit(self, X, y):
        X = np.array(X)
        y = np.array(y)
        featname = list(np.arange(X.shape[1]))
        self.hsic_lasso = HSICLasso()
        self.hsic_lasso.input(X, y, featname=featname)
        self.hsic_lasso.regression(num_feat=self.num_features, M=5, n_jobs=-1)
        self.selected_features = self.hsic_lasso.get_index()
        self.selected_features = np.array(self.selected_features, dtype=int)

    def transform(self, X):
        return X.iloc[:, self.selected_features]
    
class Lasso_custom():
        def __init__(self, alpha=0.1):
            self.alpha = alpha
            self.lasso = None
            self.selected_features = None
        
        def fit(self, X, y):
            X = np.array(X)
            y = np.array(y)
            self.lasso = Lasso(alpha=self.alpha)
            self.lasso.fit(X, y)
            self.selected_features = np.where(self.lasso.coef_ != 0)[0]
        
        def transform(self, X):
            return X.iloc[:, self.selected_features]

In [14]:
## Class balance: SMOTE, RandomOverSampler
## Feature selection: PCA, Lasso, ElasticNet, HSICLasso, SelectKBest, SelectFromLinearRegression, SelectFromRandomForest

def apply_oversampler(oversampler, X_train, y_train):
    if oversampler == 'SMOTE':
        return SMOTE().fit_resample(X_train, y_train)
    elif oversampler == 'RandomOverSampler':
        return RandomOverSampler().fit_resample(X_train, y_train)
    else:
        return X_train, y_train

def apply_feature_selection(method, X_train, y_train, X_test):
    # Initialize the transformer variable
    transformer = None
    
    if method == 'PCA':
        transformer = PCA(n_components=20).fit(X_train)
    elif method == 'SelectKBest':
        transformer = SelectKBest(k=2).fit(X_train, y_train)
    elif method == 'SelectFromLinearRegression':
        model = SelectFromModel(LinearRegression())
        transformer = model.fit(X_train, y_train)
    elif method == 'SelectFromRandomForest':
        model = SelectFromModel(RandomForestRegressor())
        transformer = model.fit(X_train, y_train)
    elif method == 'ElasticNet':
        model = SelectFromModel(ElasticNet())
        transformer = model.fit(X_train, y_train)
    elif method == 'Lasso':
        model = Lasso_custom()
        model.fit(X_train, y_train)
        transformer = model
    elif method == 'HSICLasso':
        model = HSIC_Lasso_custom()
        model.fit(X_train, y_train)
        transformer = model
    
    # Transform both training and test datasets
    if transformer:
        X_train_transformed = transformer.transform(X_train)
        X_test_transformed = transformer.transform(X_test)
        return X_train_transformed, X_test_transformed, transformer
    else:
        return X_train, X_test, None

def generate_datasets(X_train, y_train, X_test, y_test, oversamplers, feature_selection_methods):
    datasets = {}
    for oversampler in oversamplers:
        X_train_res, y_train_res = apply_oversampler(oversampler, X_train, y_train)
        for method in feature_selection_methods:
            X_train_fs, X_test_fs, transformer = apply_feature_selection(method, X_train_res, y_train_res, X_test)
            datasets[(oversampler, method)] = {'X_train': X_train_fs, 'y_train': y_train_res, 'X_test': X_test_fs, 'y_test': y_test, 'transformer': transformer}
    return datasets


oversamplers = ['SMOTE', 'RandomOverSampler']
feature_selection_methods = ['PCA', 'Lasso', 'ElasticNet', 'HSICLasso', 'SelectKBest', 'SelectFromLinearRegression', 'SelectFromRandomForest']

# Example usage:
datasets_dict = generate_datasets(X_train, y_train, X_test, y_test, oversamplers, feature_selection_methods)


Block HSIC Lasso B = 20.
M set to 5.
Using Gaussian kernel for the features, Gaussian kernel for the outcomes.
Block HSIC Lasso B = 20.
M set to 5.
Using Gaussian kernel for the features, Gaussian kernel for the outcomes.


In [None]:
with open('datasets.pkl', 'wb') as f:
    pickle.dump(datasets_dict, f)