In [None]:
import pandas as pd
import numpy as np
import optuna
import matplotlib.pyplot as plt
import networkx as nx
import itertools
from sklearn.decomposition import PCA
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.model_selection import train_test_split, StratifiedKFold, cross_val_score
from sklearn.metrics import confusion_matrix, accuracy_score, roc_auc_score, roc_curve
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.pipeline import Pipeline
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler
from imblearn.under_sampling import RandomUnderSampler
from imblearn.over_sampling import RandomOverSampler
from catboost import CatBoostClassifier
import os
import joblib

In [None]:
stage_2_samples = pd.read_csv('../data/cancer/stage_2_prostate_cancer_samples.csv')
stage_2_samples.shape

In [None]:
stage_4_samples = pd.read_csv('../data/cancer/stage_4_prostate_cancer_samples.csv')
stage_4_samples.shape

In [None]:
combined_dataset = pd.concat([stage_2_samples, stage_4_samples], ignore_index=True)

In [None]:
# Verify and clean the Stage column
print("Unique values in Stage column:", combined_dataset['Stage'].unique())
combined_dataset['Stage'] = combined_dataset['Stage'].str.strip()
combined_dataset['ID_REF'] = np.where(combined_dataset['Stage'] == 'Stage: 2', 0, 1)
combined_dataset['ID_REF'] = np.where(combined_dataset['Stage'] == 'Stage: 4', 1, combined_dataset['ID_REF'])

# Print class distribution to ensure both classes are present
print("Class distribution in ID_REF column:")
print(combined_dataset['ID_REF'].value_counts())


In [None]:
# Define process_data function
def process_data(data, under_sample_factor=None, over_sample_factor=None):
    columns_to_drop = ['Sample_ID', 'Sex', 'Age', 'Stage', 'Disease']
    data = data.drop(columns=columns_to_drop, axis=1)
    
    x = np.array(data.drop(["ID_REF"], axis=1)).astype('float')
    y = np.array(data["ID_REF"]).astype('int')
    feature_names = data.columns[1:]

    if under_sample_factor is not None and isinstance(under_sample_factor, float) and 0 < under_sample_factor <= 1:
        under_sampler = RandomUnderSampler(sampling_strategy=under_sample_factor)
        x, y = under_sampler.fit_resample(x, y)

    if over_sample_factor is not None and isinstance(over_sample_factor, float) and 0 < over_sample_factor <= 1:
        over_sampler = RandomOverSampler(sampling_strategy=over_sample_factor)
        x, y = over_sampler.fit_resample(x, y)

    x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=0, stratify=y)

    scaler = StandardScaler()
    x_train = scaler.fit_transform(x_train)
    x_test = scaler.transform(x_test)

    return x_train, x_test, y_train, y_test, feature_names

In [None]:
# Define parameters
feature_selection_num = 500
feature_importance_num = 20
pca_components = 100  # Number of principal components for PCA

# Process data
x_train, x_test, y_train, y_test, feature_names = process_data(combined_dataset)

In [None]:
# Define the objective function for Optuna - SVM
def svm_objective(trial):
    k = feature_selection_num
    
    C = trial.suggest_loguniform('C', 1e-3, 1e3)
    kernel = trial.suggest_categorical('kernel', ['linear', 'poly', 'rbf', 'sigmoid'])
    
    pipe = Pipeline([
        ('skb', SelectKBest(f_classif, k=k)),
        ('pca', PCA(n_components=pca_components)),
        ('estimator', SVC(C=C, kernel=kernel, random_state=0))
    ])
    
    cv = StratifiedKFold(n_splits=5)
    scores = cross_val_score(pipe, x_train, y_train, cv=cv, scoring='accuracy')
    return scores.mean()

In [None]:
def rf_objective(trial):
    k = feature_selection_num
    
    n_estimators = trial.suggest_int('n_estimators', 100, 1000)
    max_depth = trial.suggest_int('max_depth', 2, 32)
    max_features = trial.suggest_categorical('max_features', ['sqrt', 'log2'])
    criterion = trial.suggest_categorical('criterion', ['gini', 'entropy'])
    
    pipe = Pipeline([
        ('skb', SelectKBest(f_classif, k=k)),
        ('pca', PCA(n_components=pca_components)),
        ('estimator', RandomForestClassifier(n_estimators=n_estimators, max_depth=max_depth, max_features=max_features, criterion=criterion, random_state=0))
    ])
    
    cv = StratifiedKFold(n_splits=5)
    scores = cross_val_score(pipe, x_train, y_train, cv=cv, scoring='accuracy')
    return scores.mean()

In [None]:
def catboost_objective(trial):
    k = feature_selection_num
    
    learning_rate = trial.suggest_loguniform('learning_rate', 1e-3, 1.0)
    depth = trial.suggest_int('depth', 2, 10)
    iterations = trial.suggest_int('iterations', 100, 1000)
    
    pipe = Pipeline([
        ('skb', SelectKBest(f_classif, k=k)),
        ('pca', PCA(n_components=pca_components)),
        ('estimator', CatBoostClassifier(learning_rate=learning_rate, depth=depth, iterations=iterations, verbose=0, random_state=0))
    ])
    
    cv = StratifiedKFold(n_splits=5)
    scores = cross_val_score(pipe, x_train, y_train, cv=cv, scoring='accuracy')
    return scores.mean()

In [None]:
# Function to save study
def save_study(study, filename):
    joblib.dump(study, filename)

# Function to load study
def load_study(filename):
    return joblib.load(filename)

In [None]:
# Optimize hyperparameters using Optuna
svm_study_filename = 'svm_study_s2_s4.pkl'
rf_study_filename = 'rf_study_s2_s4.pkl'
catboost_study_filename = 'catboost_study_s2_s4.pkl'

### Find Hyperparmeters if not trained already

In [None]:
# SVM Hyperparameter Optimization
if os.path.exists(svm_study_filename):
    svm_study = load_study(svm_study_filename)
else:
    svm_study = optuna.create_study(direction='maximize')
    svm_study.optimize(svm_objective, n_trials=50)
    save_study(svm_study, svm_study_filename)

In [None]:
# RF Hyperparameter Optimization
if os.path.exists(rf_study_filename):
    rf_study = load_study(rf_study_filename)
else:
    rf_study = optuna.create_study(direction='maximize')
    rf_study.optimize(rf_objective, n_trials=50)
    save_study(rf_study, rf_study_filename)

In [None]:
# CatBoost Hyperparameter Optimization
if os.path.exists(catboost_study_filename):
    catboost_study = load_study(catboost_study_filename)
else:
    catboost_study = optuna.create_study(direction='maximize')
    catboost_study.optimize(catboost_objective, n_trials=50)
    save_study(catboost_study, catboost_study_filename)

### Print the best trial for each study

For SVM, RF, and Catboost

In [None]:
print("Best SVM trial:")
svm_trial = svm_study.best_trial
print("  Value: ", svm_trial.value)
print("  Params: ")
for key, value in svm_trial.params.items():
    print(f"    {key}: {value}")

In [None]:
print("Best Random Forest trial:")
rf_trial = rf_study.best_trial
print("  Value: ", rf_trial.value)
print("  Params: ")
for key, value in rf_trial.params.items():
    print(f"    {key}: {value}")

In [None]:
print("Best CatBoost trial:")
catboost_trial = catboost_study.best_trial
print("  Value: ", catboost_trial.value)
print("  Params: ")
for key, value in catboost_trial.params.items():
    print(f"    {key}: {value}")

## Train and evaluate the models with the best hyperparameters

In [None]:
# Train and evaluate the models with the best hyperparameters
def train_and_evaluate(pipe, x_train, y_train, x_test, y_test):
    pipe.fit(x_train, y_train)
    y_pred = pipe.predict(x_test)
    print(f'Testing accuracy {accuracy_score(y_test, y_pred)}')
    print(f'Confusion matrix: \n{confusion_matrix(y_test, y_pred)}')

### SVM

In [None]:
best_svm_params = svm_trial.params
svm_pipe = Pipeline([
    ('skb', SelectKBest(f_classif, k=feature_selection_num)),
    ('pca', PCA(n_components=pca_components)),
    ('estimator', SVC(C=best_svm_params['C'], kernel=best_svm_params['kernel'], random_state=0))
])
train_and_evaluate(svm_pipe, x_train, y_train, x_test, y_test)

### Random Forest

In [None]:
best_rf_params = rf_trial.params
rf_pipe = Pipeline([
    ('skb', SelectKBest(f_classif, k=feature_selection_num)),
    ('pca', PCA(n_components=pca_components)),
    ('estimator', RandomForestClassifier(n_estimators=best_rf_params['n_estimators'],
                                         max_depth=best_rf_params['max_depth'],
                                         max_features=best_rf_params['max_features'],
                                         criterion=best_rf_params['criterion'],
                                         random_state=0))
])
train_and_evaluate(rf_pipe, x_train, y_train, x_test, y_test)

### CatBoost

In [None]:
best_catboost_params = catboost_trial.params
catboost_pipe = Pipeline([
    ('skb', SelectKBest(f_classif, k=feature_selection_num)),
    ('pca', PCA(n_components=pca_components)),
    ('estimator', CatBoostClassifier(learning_rate=best_catboost_params['learning_rate'],
                                     depth=best_catboost_params['depth'],
                                     iterations=best_catboost_params['iterations'],
                                     verbose=0,
                                     random_state=0))
])
train_and_evaluate(catboost_pipe, x_train, y_train, x_test, y_test)

In [None]:
# Feature importance and top features can be extracted similarly to the previous script
def get_top_features(pipe, feature_names, top_feature_num):
    if isinstance(pipe.named_steps['estimator'], SVC):
        if pipe.named_steps['estimator'].kernel != 'linear':
            raise ValueError("Feature importance is not available for non-linear SVM kernels.")
        feature_scores = pipe.named_steps['estimator'].coef_[0]
    elif isinstance(pipe.named_steps['estimator'], RandomForestClassifier) or isinstance(pipe.named_steps['estimator'], CatBoostClassifier):
        feature_scores = pipe.named_steps['estimator'].feature_importances_
    features = pipe.named_steps['skb'].get_support(indices=True)
    top_indices = np.argsort(np.abs(feature_scores))[::-1][:top_feature_num]
    top_features = [(feature_names[i], feature_scores[i]) for i in top_indices]
    return top_features

In [None]:
# Get top features for each model
try:
    svm_top_features = get_top_features(svm_pipe, feature_names, feature_importance_num)
    print("Top SVM features:", svm_top_features)
except ValueError as e:
    print("SVM feature extraction error:", e)

rf_top_features = get_top_features(rf_pipe, feature_names, feature_importance_num)
catboost_top_features = get_top_features(catboost_pipe, feature_names, feature_importance_num)

print("Top Random Forest features:", rf_top_features)
print("Top CatBoost features:", catboost_top_features)

In [None]:
# Compile and print the list of top features
def compile_top_features_list(rf_features, cb_features):
    top_features = rf_features + cb_features
    feature_names = [feature[0] for feature in top_features]
    return '\n'.join(feature_names)

top_features_list = compile_top_features_list(rf_top_features, catboost_top_features)
print(top_features_list)