In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import os
import sys

import os.path as osp
import numpy as np
import pandas as pd

sys.path.append('/home/pocha/projects')
from metstab_pred.src.utils import get_configs_and_model
from metstab_pred.src.config import utils_section, csv_section
# from metstab_pred.src.data import unlog_stability

from metstab_pred.src.shap_analysis.utils import load_shap_files, load_ml_files, Task, Category
# from metstab_pred.src.shap_analysis.preprocessing import get_smiles_true_predicted, get_smiles_correct
# from metstab_pred.src.shap_analysis.preprocessing import get_smiles_stability_value, get_smiles_train_test
from metstab_pred.src.shap_analysis.preprocessing import get_present_features, filter_samples
# from metstab_pred.src.shap_analysis.preprocessing import e, enough
from metstab_pred.src.shap_analysis.analyses import situation_at_threshold

from metstab_pred.src.shap_analysis.test_separation_point import find_optimal_separation_point as kfind, SeparationType
from metstab_pred.src.shap_analysis.categorisation import well_separated
from metstab_pred.src.shap_analysis.well_separated import purity



In [6]:
directory = '/home/pocha/dane_phd/random_split/'
    
exp = 'h-kr-c-nb'
exp = 'r-ma-r-svm'
some_model = osp.join(directory, 'ml', exp)
some_shaps = osp.join(directory, 'shap', exp)

# data preparation
data_cfg, repr_cfg, task_cfg, model_cfg, model_pickle = get_configs_and_model(some_model)
x_train, x_test, smiles_train, smiles_test = load_ml_files(some_model)
task = Task(task_cfg[utils_section]['task'])
shap_cfg, smiles_order, X_full, morgan_repr, true_ys, preds, classes_order, expected_values, shap_values, background_data = load_shap_files(some_shaps, task)
X_full_df = pd.DataFrame(X_full, columns=list(range(X_full.shape[1])), index=smiles_order)

# filter samples
my_feats = get_present_features(x_train, 0.01)
to_analyse_X, to_analyse_df, to_analyse_shaps, smi_order, mol_indices, feature_order = filter_samples(smiles_order, my_feats, X_full, shap_values, task, smiles_order)

In [9]:
def test(X_full, shap_values, feature_order, task):
    
    for f in feature_order:
        feat_idx = feature_order.index(f)  # indeks cechy o nazwie "f"
        
        if task == Task.REGRESSION:
            separation_result = [well_separated(X_full[:,feat_idx], shap_values[:,feat_idx], task, n_way=2), ]
            classes = [None, ]
        elif task == Task.CLASSIFICATION:
            separation_result = well_separated(X_full[:,feat_idx], shap_values[:,:,feat_idx], task, n_way=2)
            classes = list(range(len(separation_result)))
        else:
            raise ValueError(f"Unknown task: {task}. Known tasks are `regression` and `classification`.")

        reference_result = []
        for c in classes:
            ref = kfind(shap_values, X_full, feature_order, f, task, class_index=c, extras=True)
            reference_result.append(ref)
            
        for my, ref, c in zip(separation_result, reference_result, classes):
            # checking score
            my_score = set([my.score])
            ref_score = set([kk.score/len(X_full) for kk in ref])
            assert my_score == ref_score, f"{my_score} != {ref_score}"
            
            # checking thresholds and their corresponding group tagging
            my_tre_val = dict(zip(my.thresholds, my.values))
            ref_tr_val = dict([(kk.x, (0,1) if kk.type == SeparationType.ZEROES_ON_LEFT else (1,0)) for kk in ref])
            assert my_tre_val == ref_tr_val, f"{my_tre_val} != {ref_tr_val}"

            # checking purities
            for my_t, (my_lp, my_rp) in zip(my.thresholds, my.purities):
                l0, l1, r0, r1 = situation_at_threshold(my_t, f, to_analyse_shaps, to_analyse_X, feature_order, task, class_index=c, print_func=None)

                mines = (my_lp, my_rp)
                reference = (purity(l0, l1), purity(r0, r1))
                assert np.all(np.isclose(mines, reference, equal_nan=True)), f"{mines} != {reference}"
                
    print("Done.")

In [10]:
test(to_analyse_X, to_analyse_shaps, feature_order, task)

Done.


In [6]:
## Task.REGRESSION
for f in feature_order:
    # separation_result
    my = well_separated(X_full[:,f], shap_values[:,f], task, n_way=2)
    k = kfind(to_analyse_shaps, to_analyse_X, feature_order, f, task, class_index=None, extras=True)
    
    #for (my, k) in zip(separation_result, k_result):
        
    my_score = set([my.score])
    k_score = set([kk.score/len(X_full) for kk in k])
    assert my_score == k_score, f"{my_score} != {k_score}"

    my_tre_val = dict(zip(my.thresholds, my.values))
    k_tr_val = dict([(kk.x, (0,1) if kk.type == SeparationType.ZEROES_ON_LEFT else (1,0)) for kk in k])
    assert my_tre_val == k_tr_val, f"{my_tre_val} != {k_tr_val}"

    for my_t, (my_lp, my_rp) in zip(my.thresholds, my.purities):
        l0, l1, r0, r1 = situation_at_threshold(my_t, f, to_analyse_shaps, to_analyse_X, feature_order, task, class_index=None, print_func=None)

        mines = (my_lp, my_rp)
        reference = (purity(l0, l1), purity(r0, r1))
        assert np.all(np.isclose(mines, reference, equal_nan=True)), f"{mines} != {reference}"
        

print("Done")

Done


In [None]:
## Task.CLASSIFICATION
for f in feature_order:
    separation_result = well_separated(X_full[:,f], shap_values[:,:,f], task, n_way=2)
    
    k_result = []
    for c in [0, 1, 2]:
        kres = kfind(to_analyse_shaps, to_analyse_X, feature_order, f, task, c, extras=True)
        k_result.append(kres)
    
    for c, (my, k) in enumerate(zip(separation_result, k_result)):
        
        my_score = set([my.score])
        k_score = set([kk.score/len(X_full) for kk in k])
        assert my_score == k_score, f"{my_score} != {k_score}"
        
        my_tre_val = dict(zip(my.thresholds, my.values))
        k_tr_val = dict([(kk.x, (0,1) if kk.type == SeparationType.ZEROES_ON_LEFT else (1,0)) for kk in k])
        assert my_tre_val == k_tr_val, f"{my_tre_val} != {k_tr_val}"
        
        for my_t, (my_lp, my_rp) in zip(my.thresholds, my.purities):
            l0, l1, r0, r1 = situation_at_threshold(my_t, f, to_analyse_shaps, to_analyse_X, feature_order, task, class_index=c, print_func=None)
            
            mines = (my_lp, my_rp)
            reference = (purity(l0, l1), purity(r0, r1))
            assert np.all(np.isclose(mines, reference, equal_nan=True)), f"{mines} != {reference}"
        

print("Done")

In [None]:
# def test_find_separation_point_well_separated(shap_values, X_full, feature_order, task, class_idx):
    
    
#     for i in feature_order:
#         SeparationResult = namedtuple('SeparationResult', ['score', 'thresholds', 'values', 'purities'])
        
#         separation_result = well_separated(feature_values, shap_values, task, n_way=2)
#         kmax_correct, kpurity, kbest_thresholds = kfind(shap_values, X_full, feature_order, i, task, class_idx)

#         assert max_correct == kmax_correct, AssertionError(f'{max_correct} != {kmax_correct}')

#         best_thresholds = np.array(sorted(best_thresholds))
#         kbest_thresholds = np.array(sorted(kbest_thresholds))
#         assert len(best_thresholds) == len(kbest_thresholds), AssertionError(f'{best_thresholds} != {kbest_thresholds}')
#         assert set(best_thresholds) == set(kbest_thresholds), AssertionError(f'{best_thresholds} != {kbest_thresholds}')