### Import Modules

In [None]:
import os
import numpy as np
import pandas as pd
from functools import reduce
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from tpot import TPOTClassifier
import shap

### Read Training Data

In [None]:
train_data_path = 'train'
predictors_paths = {
    'GM': os.path.join(train_data_path, 'GM.csv'),
    'WM': os.path.join(train_data_path, 'WM.csv'),
    'DMN': os.path.join(train_data_path, 'DMN.csv'),
    'FA': os.path.join(train_data_path, 'FA.csv'),
    'MD': os.path.join(train_data_path, 'MD.csv')
}
additional_variables_path = os.path.join(train_data_path, 'Subjects.csv')

# Predictors
modalities = ['GM', 'DMN', 'FA']
selected_modalities = {modality: predictors_paths[modality] for modality in modalities if modality in predictors_paths}
def read_and_rename(modality, path):
    df = pd.read_csv(path)
    df = df.rename(columns={label: f"{label}_{modality}" for label in df.columns if label != 'ID'})
    return df
dfs = [read_and_rename(modality, path) for modality, path in selected_modalities.items()]
predictors_df = reduce(lambda left, right: pd.merge(left, right, on='ID'), dfs)

# Response and confounding variables
additional_variables = ['Sex', 'Age']
df = pd.read_csv(additional_variables_path)
selected_variables = [variable for variable in additional_variables if variable in df.columns]
additional_variables_df = df[["ID"] + selected_variables]

# Merge predictors with response and confounding variables on 'ID'
df = pd.merge(additional_variables_df, predictors_df, on='ID')

# Prepare X and y
X = df.drop(columns=['ID', 'Sex'])
y = df['Sex']

# Split into training and test datasets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
print(f'Sample size for training: {X_train.shape[0]}')
print(f'Sample size for test: {X_test.shape[0]}')

### AutoML: TPOT (Tree-based Pipeline Optimization Tool)
$ \text{Total pipelines evaluated} = \text{population size} + (\text{generations} \times \text{offspring size}) $

In [None]:
# http://epistasislab.github.io/tpot
# generations: number of iterations to run the pipeline optimization process
# population_size: number of individuals to retain in the genetic programming (GP) population every generation
# offspring_size: number of offspring to produce in each GP generation; by default, offspring_size = population_size
# total pipelines evaluated = 50 + (5 x 50) = 300
tpot = TPOTClassifier(
    generations=5, 
    population_size=50, 
    verbose=2,
    random_state=42,
    max_time_mins=None,
    scorers=['accuracy']
)

### Train and Test Model

In [None]:
tpot.fit(X_train, y_train)
predictions = tpot.predict(X_test)
acc = accuracy_score(y_test, predictions)
print(f"Accuracy by TPOT: {acc:.3f}")
best_pipeline = tpot.fitted_pipeline_
print("Best pipeline steps:")
for step in best_pipeline.steps:
    print(step)

### Feature Importances

In [None]:
preprocessor = best_pipeline[:-1]
classifier = best_pipeline.steps[-1][1]
X_train_transformed = preprocessor.transform(X_train)

# Features
feature_names = None
try:
    feature_names = preprocessor.get_feature_names_out()
except (AttributeError, Exception) as e:
    if isinstance(X_train, pd.DataFrame) and X_train_transformed.shape[1] == len(X_train.columns):
        feature_names = X_train.columns
    else:
        feature_names = [f"feature_{i}" for i in range(X_train_transformed.shape[1])]

# Importances
if hasattr(classifier, 'feature_importances_'):
    print(f"{classifier.__class__.__name__} has feature_importances_ attribute.")
    feature_importances = classifier.feature_importances_
elif hasattr(classifier, 'coef_'):
    print(f"{classifier.__class__.__name__} has coef_ attribute.")
    feature_importances = classifier.coef_
    if feature_importances.ndim > 1:
        feature_importances = feature_importances[0]
else:
    print(f"{classifier.__class__.__name__} does not have feature_importances_ or coef_ attribute.")
    feature_importances = None

# Print features and their importances
if feature_importances is not None:
    features_and_importances = zip(feature_names, feature_importances)
    sorted_features_and_importances = sorted(features_and_importances, key=lambda x: x[1], reverse=True)
    top_features_and_importances = sorted_features_and_importances[:5]
    print(f"Top features' importances by TPOT:")
    for no, feature_importance in enumerate(top_features_and_importances):
        print(f"{no + 1}. {feature_importance[0]}: {feature_importance[1]:.3f}")

### SHAP (SHapley Additive exPlanations)

In [None]:
def get_shap_values(preprocessor, model, X_test, n_samples_background=100):
    X_test_transformed = preprocessor.transform(X_test)
    is_classifier = hasattr(model, 'predict_proba')
    try:
        if hasattr(model, 'feature_importances_'):
            explainer = shap.TreeExplainer(model)
            shap_values = explainer.shap_values(X_test_transformed)
        elif hasattr(model, 'coef_'):
            explainer = shap.LinearExplainer(model, X_test_transformed)
            shap_values = explainer.shap_values(X_test_transformed)
        else:
            X_background = shap.sample(X_test_transformed, n_samples_background)
            if is_classifier:
                predict_function = model.predict_proba
            else:
                predict_function = model.predict
            explainer = shap.KernelExplainer(predict_function, X_background)
            shap_values = explainer.shap_values(X_test_transformed)
    except Exception as e:
        print(f"First explainer failed: {e}. Falling back to KernelExplainer.")
        X_background = shap.sample(X_test_transformed, n_samples_background)
        if is_classifier:
            predict_function = model.predict_proba
        else:
            predict_function = model.predict
        explainer = shap.KernelExplainer(predict_function, X_background)
        shap_values = explainer.shap_values(X_test_transformed)
    return explainer, shap_values, X_test_transformed

# Compute SHAP values
explainer, shap_values, X_test_transformed = get_shap_values(preprocessor, classifier, X_test)
shap.summary_plot(shap_values, X_test_transformed, feature_names=feature_names, max_display=5)

# Correlate between original and transformed features
class_idx = 1
class_shap_values = shap_values[:, :, class_idx]
mean_abs_shap = np.abs(class_shap_values).mean(axis=0)
feature_importance_indices = np.argsort(-mean_abs_shap)
if hasattr(X_test, 'columns'):
    original_features = X_test.columns
    for idx in feature_importance_indices[:5]:
        feature_values = X_test_transformed[:, idx]
        correlations = {}
        for col in original_features:
            correlations[col] = np.corrcoef(feature_values, X_test[col])[0, 1]
        sorted_correlations = sorted(correlations.items(), key=lambda x: abs(x[1]), reverse=True)
        print(f"Original features most correlated with transformed feature {idx}:")
        for feat, corr in sorted_correlations[:3]:
            print(f"  - {feat}: {corr:.4f}")

### Inference

In [None]:
test_data_path = 'test'
predictors_paths = {
    'GM': os.path.join(test_data_path, 'GM.csv'),
    'WM': os.path.join(test_data_path, 'WM.csv'),
    'DMN': os.path.join(test_data_path, 'DMN.csv'),
    'FA': os.path.join(test_data_path, 'FA.csv'),
    'MD': os.path.join(test_data_path, 'MD.csv')
}
additional_variables_path = os.path.join(test_data_path, 'Subjects.csv')

# Predictors
selected_modalities = {modality: predictors_paths[modality] for modality in modalities if modality in predictors_paths}
dfs = [read_and_rename(modality, path) for modality, path in selected_modalities.items()]
predictors_df = reduce(lambda left, right: pd.merge(left, right, on='ID'), dfs)

# Confounding variables
df = pd.read_csv(additional_variables_path)
selected_variables = [variable for variable in additional_variables if variable in df.columns]
additional_variables_df = df[['ID'] + selected_variables]

# Merge predictors with confounding variables on 'ID'
df = pd.merge(additional_variables_df, predictors_df, on='ID')

# Apply trained model
# Apply trained model
X_ext = df.drop(columns=['ID'])
X_ext_transformed = preprocessor.transform(X_ext)
predictions_ext = classifier.predict(X_ext_transformed)

# Save predictions
np.savetxt(os.path.join(test_data_path, "Predictions.txt"), predictions_ext)

# Compute SHAP values
shap_values = explainer.shap_values(X_ext_transformed)
shap.summary_plot(shap_values, X_ext_transformed, feature_names=feature_names, max_display=5)

# Correlate between original and transformed features
class_idx = 1
class_shap_values = shap_values[:, :, class_idx]
mean_abs_shap = np.abs(class_shap_values).mean(axis=0)
feature_importance_indices = np.argsort(-mean_abs_shap)
if hasattr(X_ext, 'columns'):
    original_features = X_ext.columns
    for idx in feature_importance_indices[:5]:
        feature_values = X_ext_transformed[:, idx]
        correlations = {}
        for col in original_features:
            correlations[col] = np.corrcoef(feature_values, X_ext[col])[0, 1]
        sorted_correlations = sorted(correlations.items(), key=lambda x: abs(x[1]), reverse=True)
        print(f"Original features most correlated with transformed feature {idx}:")
        for feat, corr in sorted_correlations[:3]:
            print(f"  - {feat}: {corr:.4f}")