In [1]:
import warnings
warnings.filterwarnings('ignore')

# from sklearnex import patch_sklearn
# patch_sklearn()

In [2]:
import os
import numpy as np
import pandas as pd
import plotly.express as px
import copy

import sklearn
print(sklearn.__version__)
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report, accuracy_score, f1_score, roc_auc_score,  roc_curve, auc, precision_recall_curve
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler, QuantileTransformer, KBinsDiscretizer, Normalizer, PowerTransformer, SplineTransformer, MaxAbsScaler
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.feature_selection import SelectFromModel, SelectKBest, chi2, VarianceThreshold
from sklearn.decomposition import PCA
from sklearn.utils.class_weight import compute_class_weight
from sklearn.linear_model import LogisticRegression, PassiveAggressiveClassifier, Perceptron, RidgeClassifier, SGDClassifier, SGDOneClassSVM
from sklearn.pipeline import Pipeline

from xgboost import XGBRegressor, plot_importance
from imblearn.over_sampling import SMOTE, ADASYN,BorderlineSMOTE, KMeansSMOTE, SMOTEN, SMOTENC, SVMSMOTE

from mlxtend.plotting import plot_learning_curves

import matplotlib.pyplot as plt

1.0.2


In [3]:
class WorkingSet:
    def __init__(self, csv_file_name:str):
        self.imported_dataframe = pd.read_csv(csv_file_name)
        self.training_df = None
        self.X_train = None
        self.y_train = None
        self.X_test = None
        self.y_test = None
        self.target_name = ''
        self.generate_train_set()
        self.generate_test_set()
    
    def generate_train_set(self):
        
        df_train = self.imported_dataframe[self.imported_dataframe['Inf_Train_test'].str.contains('train', case=False)]
        df_valid = self.imported_dataframe[self.imported_dataframe['Inf_Train_test'].str.contains('valid', case=False)]

        self.training_df = pd.concat([df_train, df_valid], axis=0)
        
        print(f'Are thre any Nan = {self.training_df.isnull().values.any()}, Number of Nan = {self.training_df.isnull().sum().sum()}')

        for key in self.training_df.keys():
            if 'target' in key.lower():
                self.target_name = key
                print(self.target_name)
        
        self.X_train = self.training_df.drop(['Target_Lesion_ClinSig', 'Inf_Train_test'], axis=1)
        self.X_train = self.drop_all_zero_columns(self.X_train)
        self.X_train = self.drop_columns_std_larger(self.X_train)
        
        self.y_train = self.training_df['Target_Lesion_ClinSig']

    def generate_test_set(self):
        df_test = self.imported_dataframe[self.imported_dataframe['Inf_Train_test'].str.contains('test', case=False)]
        self.X_test = df_test.drop(['Target_Lesion_ClinSig', 'Inf_Train_test'], axis=1)
        self.X_test = self.drop_all_zero_columns(self.X_test)
        self.X_test = self.drop_columns_std_larger(self.X_test)
        self.y_test = df_test['Target_Lesion_ClinSig']
    
    @staticmethod
    def drop_all_zero_columns(a_dataframe: pd.DataFrame) -> pd.DataFrame:  
        return a_dataframe.loc[:, a_dataframe.ne(0).any()]
    
    @staticmethod
    def drop_columns_std_larger(a_dataframe: pd.DataFrame) -> pd.DataFrame:
        return a_dataframe.loc[:, a_dataframe.std() < 10000]

In [4]:
def apply_variance_threshold(x_set: pd.DataFrame) -> np.array:
    variance_threshold = VarianceThreshold(threshold=0.2)
    variance_threshold.fit(x_set)
    return variance_threshold.transform(x_set)

In [7]:
class DataVariance:
    def __init__(self, all_set, variance_flag: bool = False):
        self.all_set = all_set
        self.variance_flag = variance_flag
        self.X_train_trans = None
        self.X_train = None
        self.y_train = None
        self.X_test_trans = None
        self.initialize()

    def initialize(self):
        self.make_variance()
        self.resample_dataset()

    def make_variance(self):
        self.X_train_trans = apply_variance_threshold(self.all_set.X_train)
        self.X_test_trans = apply_variance_threshold(self.all_set.X_test)

    def resample_dataset(self):
        smote = ADASYN(random_state=2022, sampling_strategy='minority', n_jobs=4)
        if self.variance_flag:
            self.X_train, self.y_train = smote.fit_resample(self.X_train_trans, self.all_set.y_train.values.ravel())
        else:
            self.X_train, self.y_train = smote.fit_resample(self.all_set.X_train.values, self.all_set.y_train.values.ravel())

In [8]:
class RocCurve:
    def __init__(self, model: sklearn, all_set: WorkingSet, variance: DataVariance = None):
        self.model = model
        self.all_set = all_set
        self.variance = variance
        self.initialize()

    def initialize(self):

        # if self.variance != None:
        fpr_lr_train, tpr_lr_train, roc_auc_lr_train = self.generate_score(self.variance.X_train, self.variance.y_train)
        fpr_lr_test, tpr_lr_test, roc_auc_lr_test = self.generate_score(self.variance.X_test_trans, self.all_set.y_test.values.ravel())
        # else:
        #     fpr_lr_train, tpr_lr_train, roc_auc_lr_train = self.generate_score(self.all_set.X_train.values, self.all_set.y_train.values.ravel())
        #     fpr_lr_test, tpr_lr_test, roc_auc_lr_test = self.generate_score(self.all_set.X_test.values, self.all_set.y_test.values.ravel())

        self.plot_roc_curve(fpr_lr_train, tpr_lr_train, roc_auc_lr_train, fpr_lr_test, tpr_lr_test, roc_auc_lr_test)

    def generate_score(self, x_set, y_set):
        y_scores = self.model.predict(x_set)

        fpr_lr, tpr_lr, _ = roc_curve(y_set, y_scores)
        roc_auc_lr = auc(fpr_lr, tpr_lr)

        return fpr_lr, tpr_lr, roc_auc_lr

    @staticmethod
    def plot_roc_curve(fpr_lr_train, tpr_lr_train, roc_auc_lr_train, fpr_lr_test, tpr_lr_test, roc_auc_lr_test):

        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 7))
        ax1.set_title('ROC curve Train', fontsize=16)
        ax1.plot(fpr_lr_train, tpr_lr_train, lw=3, label=f'LogRegr ROC curve (area = {roc_auc_lr_train:0.2f})')
        ax1.set_xlabel('False Positive Rate', fontsize=16)
        ax1.set_ylabel('True Positive Rate', fontsize=16)
        ax1.legend(loc='lower right', fontsize=13)
        ax1.plot([0, 1], [0, 1], color='navy', lw=3, linestyle='--')

        ax2.set_title('ROC curve Test', fontsize=16)
        ax2.plot(fpr_lr_test, tpr_lr_test, lw=3, label=f'LogRegr ROC curve (area = {roc_auc_lr_test:0.2f})')
        ax2.set_xlabel('False Positive Rate', fontsize=16)
        ax2.set_ylabel('True Positive Rate', fontsize=16)
        ax2.legend(loc='lower right', fontsize=13)
        ax2.plot([0, 1], [0, 1], color='navy', lw=3, linestyle='--')

In [None]:
# pc1_loadings = loadings.sort_values(by='PC1', ascending=False)[['PC1']]
# pc1_loadings = pc1_loadings.reset_index()
# pc1_loadings.columns = ['Attribute', 'CorrelationWithPC1']

# plt.bar(x=pc1_loadings['Attribute'], height=pc1_loadings['CorrelationWithPC1'], color='#087E8B')
# plt.title('PCA loading scores (first principal component)', size=20)
# plt.xticks(rotation='vertical')
# plt.show()

In [9]:
path = os.getcwd()
path

'/home/fabio/Documents/tabular_lesion'

In [10]:
def plot_class_balance(all_set: WorkingSet):
    df_Clinically_Sig = pd.DataFrame(all_set.training_df[all_set.target_name].value_counts())
    df_Clinically_Sig.reset_index(inplace=True)
    df_Clinically_Sig = df_Clinically_Sig.rename(columns = {'index':'Clinically_Sig'})
    df_Clinically_Sig = df_Clinically_Sig.rename(columns = {all_set.target_name:'Count'})

    fig = px.bar(df_Clinically_Sig, x='Clinically_Sig', y='Count', color=('blue', 'red'), text='Count', title='Class Balance')
    fig.update_layout(showlegend=False)
    fig.show(renderer="colab")

In [13]:
all_set = WorkingSet(os.path.join('data', 'lesion_df_balanced_Target_Lesion_ClinSig.csv'))

Are thre any Nan = False, Number of Nan = 0
Target_Lesion_ClinSig


In [14]:
plot_class_balance(all_set)

In [None]:
assert all_set.X_train.shape[0] == all_set.y_train.shape[0]
assert all_set.X_test.shape[0] == all_set.y_test.shape[0]

In [15]:
variance_flag = True
data = DataVariance(all_set, variance_flag)

In [17]:
class_weights = compute_class_weight(class_weight = "balanced", classes = np.unique(data.y_train), y = data.y_train)
print(class_weights)

[0.99945339 1.00054721]


In [18]:
from sklearn.pipeline import FeatureUnion
# transforms for the feature union
transforms = list()
transforms.append(('maxbbs', MaxAbsScaler()))
transforms.append(('mms', MinMaxScaler()))
transforms.append(('ss', StandardScaler()))
transforms.append(('rs', RobustScaler()))
transforms.append(('qt', QuantileTransformer(n_quantiles=100, output_distribution='normal')))
transforms.append(('norm', Normalizer()))
transforms.append(('pt', PowerTransformer()))
transforms.append(('st', SplineTransformer()))
# transforms.append(('kbd', KBinsDiscretizer(n_bins=10, encode='ordinal', strategy='uniform')))
# creatNormalizer the feature union
feature_transform = FeatureUnion(transforms)

In [19]:
LR = LogisticRegression(random_state=2022, 
                        max_iter=100000, 
                        penalty='elasticnet', 
                        solver='saga', 
                        n_jobs=6, 
                        warm_start = True,
                        multi_class = 'auto',
                        tol=1e-4
                       )
LRparam_grid = {
    'classifier__l1_ratio': [0.2, 0.225, 0.25],
    'classifier__C': [0.0001, 0.0005, 0.001, 0.005, 0.01]
    }

In [20]:
# define the pipeline
steps = list()
steps.append(('scaler', feature_transform))
steps.append(('classifier', LR))
pipeline = Pipeline(steps=steps)
LR_search = GridSearchCV(pipeline, param_grid=LRparam_grid, refit = True, verbose = 1, cv=10, n_jobs=4)

In [None]:
LR_search.fit(data.X_train, data.y_train)

Fitting 10 folds for each of 15 candidates, totalling 150 fits


In [None]:
print(LR_search.best_params_)
# summarize
print('Mean Accuracy: %.3f' % LR_search.best_score_)
print('Config: %s' % LR_search.best_params_)

In [None]:
RocCurve(LR_search.best_estimator_, all_set)

In [None]:
def plot_confusion_matrix_report(model, all_set: WorkingSet, X_test_trans: np.array = None):
    if X_test_trans.all() != None:
        predictions = model.predict(X_test_trans)
    else:
        predictions = model.predict(all_set.X_test.values)
        
    cm = confusion_matrix(all_set.y_test.values.ravel(), predictions, labels = model.classes_)
    disp = ConfusionMatrixDisplay(confusion_matrix = cm, display_labels = model.classes_)
    disp.plot()
    target_names = ['class 0', 'class 1']
    print(classification_report(all_set.y_test.values.ravel(), predictions, target_names = target_names))

In [None]:
plot_confusion_matrix_report(LR_search.best_estimator_, all_set, X_test_trans) 

In [None]:
def plot_features_importance(model):
    # get importance
    importance = model.steps[1][1].coef_[0]
    # plot feature importance
    plt.figure(figsize=(15,7))
    plt.bar([x for x in range(len(importance))], importance)

In [None]:
plot_features_importance(LR_search.best_estimator_)

In [None]:
import shap
shap.initjs()

#set the tree explainer as the model of the pipeline
explainer = shap.TreeExplainer(pipeline['classifier'])

#apply the preprocessing to x_test
observations = pipeline['imputer'].transform(X_train)

#get Shap values from preprocessed data
shap_values = explainer.shap_values(observations)

#plot the feature importance
shap.summary_plot(shap_values, X_train, plot_type="bar")

Ridge Classifier

In [None]:
ridge_class = RidgeClassifier(alpha=0.01, copy_X=True, fit_intercept=True, max_iter=10000, random_state=2022, solver='auto', tol=0.001)
RC_param_grid = {
    'classifier__alpha': [0.001, 0.01, 0.1]
    'classifier__C': [0.0001, 0.0005, 0.001, 0.005, 0.01]
    }

In [None]:
# define the pipeline
steps = list()
steps.append(('scaler', feature_transform))
steps.append(('classifier', ridge_class))
pipeline = Pipeline(steps=steps)
ridge_class_search = GridSearchCV(pipeline, param_grid=RC_param_grid, refit = True, verbose = 1, cv=10, n_jobs=4)
ridge_class_search.fit(X_train, y_train)

In [None]:
from sklearn.preprocessing import QuantileTransformer, KBinsDiscretizer
from sklearn.decomposition import TruncatedSVD
from sklearn.pipeline import FeatureUnion, Pipeline

In [None]:
from sklearn.feature_selection import RFE
# define the feature selection
rfe = RFE(estimator=LR_search.best_estimator_, n_features_to_select=70)

In [None]:
# define the pipeline
steps = list()
steps.append(('fu', fu))
steps.append(('rfe', rfe))
steps.append(('m', LR_search.best_estimator_))
pipeline = Pipeline(steps=steps)

In [None]:
from sklearn.model_selection import RepeatedStratifiedKFold
# define the cross-validation procedure
cv = RepeatedStratifiedKFold(n_splits=5, n_repeats=3, random_state=2022)

In [None]:
from sklearn.model_selection import cross_val_score
# evaluate model
scores = cross_val_score(pipeline, X_train, y_train, scoring='accuracy', cv=cv, n_jobs=4, verbose=1)

In [None]:
# report performance
print('Accuracy: %.3f (%.3f)' % (np.mean(scores), np.std(scores)))

In [None]:
LogisticRegression, PassiveAggressiveClassifier, Perceptron, RidgeClassifier, SGDClassifier, SGDOneClassSVM

In [None]:


pca = PCA().fit(scaled_X_train)

plt.plot(pca.explained_variance_ratio_.cumsum(), lw=3, color='#087E8B')
plt.title('Cumulative explained variance by number of principal components', size=20)
plt.show()

In [None]:
loadings = pd.DataFrame(
    data=pca.components_.T * np.sqrt(pca.explained_variance_), 
    columns=[f'PC{i}' for i in range(1, len(df_X_train.columns) + 1)],
    index=df_X_train.columns
)
loadings[0:10].head()

In [None]:
pc1_loadings = loadings.sort_values(by='PC1', ascending=False)[['PC1']]
pc1_loadings = pc1_loadings.reset_index()
pc1_loadings.columns = ['Attribute', 'CorrelationWithPC1']

plt.bar(x=pc1_loadings['Attribute'], height=pc1_loadings['CorrelationWithPC1'], color='#087E8B')
plt.title('PCA loading scores (first principal component)', size=20)
plt.xticks(rotation='vertical')
plt.show()

In [None]:
# Import the necessary libraries first


In [None]:
test = SelectKBest(score_func=chi2, k=4)
fit = test.fit(abs(scaled_X_train), y_train)

In [None]:
# Summarize scores
np.set_printoptions(precision=3)
print(fit.scores_)

In [None]:
features = fit.transform(abs(scaled_X_train))
# Summarize selected features
print(features[0:5,:])

In [None]:
from sklearn.feature_selection import RFE

In [None]:
rfe = RFE(LR_best, n_features_to_select=75, step=1)

In [None]:
fit = rfe.fit(scaled_X_train, y_train)

In [None]:
print("Num Features: %s" % (fit.n_features_))
print("Selected Features: %s" % (fit.support_))
print("Feature Ranking: %s" % (fit.ranking_))

In [None]:
# First things first


In [None]:
ridge_class = RidgeClassifier(alpha=0.01, copy_X=True, fit_intercept=True, max_iter=10000, random_state=2022, solver='auto', tol=0.001)

In [None]:
ridge.fit(scaled_X_train, y_train)

In [None]:
def pretty_print_coefs(coefs, names = None, sort = False):
    if names == None:
        names = ["X%s" % x for x in range(len(coefs))]
    lst = zip(coefs, names)
    if sort:
        lst = sorted(lst,  key = lambda x:-np.abs(x[0]))
    return " + ".join("%s * %s" % (round(coef, 3), name)
                                   for coef, name in lst)

In [None]:
# print ("Ridge model:", pretty_print_coefs(ridge.coef_))

In [None]:
# for idx, coeff in enumerate(ridge.coef_):
#     if coeff > 0.05:
#         print(idx, coeff)

In [None]:
np.std(ridge.coef_), np.mean(ridge.coef_), np.median(ridge.coef_)

In [None]:
y_pred = ridge.predict(autoscaler.fit_transform(df_X_test.values))

In [None]:
y_pred

In [None]:
cm = confusion_matrix(df_y_test.values.ravel(), y_pred, labels=ridge.classes_)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=ridge.classes_)
disp.plot()

In [None]:
target_names = ['class 0', 'class 1']
print(classification_report(df_y_test.values.ravel(), y_pred, target_names=target_names))

In [None]:
val_acc = accuracy_score(df_y_test.values.ravel(), y_pred)
val_f1 = f1_score(df_y_test.values.ravel(), y_pred)
print(f"Acc: {val_acc} | F1: {val_f1}")

In [None]:
# df_concat = pd.concat([df_X_train, df_y_train], axis=1)
# df_corr = df_concat.corr()

In [None]:
threshold = 0.90


columns = np.full((df_corr.shape[0],), True, dtype=bool)
for i in range(df_corr.shape[0]):
    for j in range(i+1, df_corr.shape[0]):
        if df_corr.iloc[i,j] >= threshold:
            if columns[j]:
                columns[j] = False
selected_columns = df_concat.columns[columns]
selected_columns
df_training = df_concat[selected_columns]

In [None]:
df_training.shape, df_concat.shape

In [None]:
lst_diff = df_concat.columns.difference(df_training.columns)
len(lst_diff)

In [None]:
corr_X = df_training.drop('Target_Lesion_ClinSig', axis=1)
corr_y = df_training['Target_Lesion_ClinSig']

In [None]:
model.fit(autoscaler.fit_transform(corr_X.values), corr_y.values.ravel())
# get importance
importance = model.coef_[0]

In [None]:
# plot feature importance
pyplot.bar([x for x in range(len(importance))], importance)
pyplot.show()

In [None]:
df_X_test_corr = df_X_test.drop(lst_diff, axis=1)

predictions = model.predict(autoscaler.fit_transform(df_X_test_corr.values))
cm = confusion_matrix(df_y_test.values.ravel(), predictions, labels=model.classes_)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=model.classes_)
disp.plot()

In [None]:
val_acc = accuracy_score(df_y_test.values.ravel(), predictions)
val_f1 = f1_score(df_y_test.values.ravel(), predictions)
print(f"Acc: {val_acc} | F1: {val_f1}")

In [None]:
feature_importance = abs(model.coef_[0])
feature_importance = 100.0 * (feature_importance / feature_importance.max())
sorted_idx = np.argsort(feature_importance)
pos = np.arange(sorted_idx.shape[0]) + .5

In [None]:
featfig = pyplot.figure(figsize=(12, 15))
featax = featfig.add_subplot(1, 1, 1)
featax.barh(pos, feature_importance[sorted_idx], align='center')
featax.set_yticks(pos)
featax.set_yticklabels(np.array(corr_X.columns)[sorted_idx], fontsize=8)
featax.set_xlabel('Relative Feature Importance')

In [None]:
# import importlib
# import imblearn
# from imblearn import over_sampling as os
# for class_name in os.__all__[:6]:
#     module = importlib.import_module('imblearn.over_sampling')
#     class_ = getattr(module, class_name)
#     sampling = class_
#     print(sampling())
# os.__all__