In [None]:
%%capture
%matplotlib inline
!pip install optuna

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import os, sys, string, time, math
import optuna
from   datetime import date

import warnings
warnings.filterwarnings('ignore')

from sklearn.preprocessing  import LabelEncoder, MinMaxScaler, StandardScaler
from sklearn.impute         import SimpleImputer
from imblearn.pipeline      import Pipeline, make_pipeline
from scipy.stats            import wilcoxon
from statistics             import mean

from sklearn.ensemble     import StackingClassifier
from sklearn.linear_model import LogisticRegression, SGDClassifier, Perceptron
from sklearn.svm          import SVC, LinearSVC
from sklearn.naive_bayes  import MultinomialNB, GaussianNB, BernoulliNB
from sklearn.tree         import DecisionTreeClassifier, ExtraTreeClassifier
from sklearn.neighbors    import KNeighborsClassifier
from sklearn.neural_network           import MLPClassifier
from sklearn.gaussian_process         import GaussianProcessClassifier
from sklearn.gaussian_process.kernels import RBF, Matern
from sklearn.discriminant_analysis    import LinearDiscriminantAnalysis

from sklearn.metrics           import make_scorer, accuracy_score, f1_score, roc_curve, roc_auc_score, auc, brier_score_loss, confusion_matrix, matthews_corrcoef
from sklearn.model_selection   import cross_validate, cross_val_score, StratifiedKFold, RepeatedStratifiedKFold, GridSearchCV, RandomizedSearchCV
from sklearn.feature_selection import SelectKBest, chi2, mutual_info_classif

from sklearn.utils._testing import ignore_warnings
from sklearn.exceptions import ConvergenceWarning
with warnings.catch_warnings():
  warnings.filterwarnings("ignore", category=ConvergenceWarning)

In [None]:
smells = ['[J] God Class.csv', '[J] Data Class.csv', '[J] Long Method.csv', '[J] Long Parameter List.csv', '[J] Feature Envy.csv', '[J] Switch Statements.csv' ,
         '[P] Large Class.csv', '[P] Long Method.csv']

fnames = smells

models = {
    'KNN' : KNeighborsClassifier(),
    'DT'  : DecisionTreeClassifier(),
    'LR'  : LogisticRegression(solver='liblinear',max_iter=500),
    'SVM' : SVC(probability=True),
    'MLP' : MLPClassifier(max_iter=500),
    'NB'  : BernoulliNB(),
}

DetDF = pd.DataFrame (columns = ['Classifier', 'Accuracy', 'Precision', 'Recall', 'F1-score', 'Brier', 'AUC', 'MCC', 'Time', 'Dataset'])
ResDF = pd.DataFrame (columns = ['Classifier', 'Accuracy', 'Precision', 'Recall', 'F1-score', 'Brier', 'AUC', 'MCC', 'Time', 'Dataset'])
StaDF = pd.DataFrame (columns = ['Classifier_1', 'Classifier_2', 'Test', 'Stat', 'Sig-level', 'p-value', 'Null Hypo', 'Win', 'Lost', 'Effect Size', 'Effect Type', 'Dataset'])
EnsDF = pd.DataFrame (columns = ['Ensemble', 'Base', 'Dataset'])

# **HP Tuning**

In [None]:
n_trials = 10
n_jobs = -1
cv = 5
passthrough=False

@ignore_warnings(category=ConvergenceWarning)
def tune_stack (model):
  study = optuna.create_study(direction='maximize')
  study.optimize(objective_STACK, n_trials=n_trials,n_jobs=n_jobs)
  return StackingClassifier (estimators=get_estimators_pipelined(),
                            final_estimator=LogisticRegression(solver='liblinear',**study.best_params),
                            cv=cv, passthrough=passthrough, n_jobs=n_jobs)

def tune_models ():
  global models

  tuned_models = {}
  for key in models:
    if 'DSE' not in key:
      tuned_models[key] = tune_model(key,models[key])
  models = tuned_models

def tune_model(name,model):
  study = optuna.create_study(direction='maximize')

  if name == 'DT':
    study.optimize(objective_DT, n_trials=n_trials,n_jobs=n_jobs)
    return DecisionTreeClassifier(**study.best_params)

  elif name == 'LR':
    study.optimize(objective_LR, n_trials=n_trials,n_jobs=n_jobs)
    return LogisticRegression(solver='liblinear',max_iter=1000,**study.best_params)

  elif name == 'KNN':
    study.optimize(objective_KNN, n_trials=n_trials,n_jobs=n_jobs)
    return KNeighborsClassifier(**study.best_params)

  elif name == 'SVM':
    study.optimize(objective_SVM, n_trials=n_trials,n_jobs=n_jobs)
    return SVC(probability=True,**study.best_params)

  elif name == 'MLP':
    study.optimize(objective_MLP, n_trials=n_trials,n_jobs=n_jobs)
    return MLPClassifier(max_iter=1000,**study.best_params)

  elif name == 'NB':
    study.optimize(objective_NB, n_trials=n_trials,n_jobs=n_jobs)
    return BernoulliNB(**study.best_params)

  return model

In [None]:
def objective_DT(trial):
  max_depth     = trial.suggest_int('max_depth', 2, 12)
  splitter      = trial.suggest_categorical('splitter', ['best', 'random'])
  max_features  = trial.suggest_categorical('max_features', [None, 'sqrt', 'log2'])

  model = DecisionTreeClassifier(
        max_depth=max_depth,
        splitter=splitter,
        max_features=max_features
  )

  score = cross_val_score(create_pipeline('',model), X, y, scoring=mcc_scoring, cv=cv, n_jobs=n_jobs).mean()
  return score

def objective_LR(trial):
  penalty = trial.suggest_categorical('penalty', ['l1', 'l2'])
  C       = trial.suggest_float('C', 0, 100)

  model = LogisticRegression(
        solver='liblinear',
        penalty=penalty,
        C=C,
        max_iter=1000
  )

  score = cross_val_score(create_pipeline('',model), X, y, scoring=mcc_scoring, cv=cv, n_jobs=n_jobs).mean()
  return score

def objective_KNN(trial):
  weights     = trial.suggest_categorical('weights', ['uniform', 'distance'])
  metric      = trial.suggest_categorical('metric', ['euclidean', 'manhattan', 'minkowski'])
  n_neighbors = trial.suggest_int('n_neighbors', 1, 20)

  model = KNeighborsClassifier(
      weights=weights,
      metric=metric,
      n_neighbors=n_neighbors
  )

  score = cross_val_score(create_pipeline('',model), X, y, scoring=mcc_scoring, cv=cv, n_jobs=n_jobs).mean()
  return score

def objective_SVM(trial):
  kernel = trial.suggest_categorical('kernel', ['rbf', 'linear', 'poly', 'sigmoid'])
  gamma  = trial.suggest_categorical('gamma', ['scale', 'auto'])
  C      = trial.suggest_float('C', 0.001, 100)

  model = SVC(
      probability=True,
      kernel=kernel,
      gamma=gamma,
      C=C
  )

  score = cross_val_score(create_pipeline('',model), X, y, scoring=mcc_scoring, cv=cv, n_jobs=n_jobs).mean()
  return score

@ignore_warnings(category=ConvergenceWarning)
def objective_MLP(trial):
  activation = trial.suggest_categorical('activation', ['relu', 'identity', 'logistic','tanh'])
  solver     = trial.suggest_categorical('solver', ['adam', 'lbfgs', 'sgd'])
  alpha      = trial.suggest_float('alpha', 0.0001, 100)
  learning_rate  = trial.suggest_categorical('learning_rate', ['constant', 'invscaling', 'adaptive'])

  model = MLPClassifier(
      max_iter=1000,
      activation=activation,
      solver=solver,
      alpha=alpha,
      learning_rate=learning_rate
  )

  score = cross_val_score(create_pipeline('',model), X, y, scoring=mcc_scoring, cv=cv, n_jobs=n_jobs).mean()
  return score

def objective_NB(trial):
  alpha    = trial.suggest_float('alpha', 0.0001, 100)

  model = BernoulliNB(
      alpha=alpha,
  )

  score = cross_val_score(create_pipeline('',model), X, y, scoring=mcc_scoring, cv=cv, n_jobs=n_jobs).mean()
  return score

def objective_STACK(trial):
  penalty = trial.suggest_categorical('penalty', ['l1', 'l2'])
  C       = trial.suggest_float('C', 0, 100)

  model = StackingClassifier (estimators=get_estimators_pipelined(),
                             final_estimator=LogisticRegression(solver='liblinear'),
                             cv=cv, passthrough=passthrough, n_jobs=-1)

  score = cross_val_score(create_pipeline('',model), X, y, scoring=mcc_scoring, cv=cv, n_jobs=n_jobs).mean()
  return score

# **Model Building**

In [None]:
def scoring_MCC (y_true, y_pred, **kwargs):
  return matthews_corrcoef(np.array(y_true),np.array(y_pred))

mcc_scoring = make_scorer(scoring_MCC, greater_is_better=True)

In [None]:
def create_pipeline(name,model):

  if 'DSE' in name: return model

  steps = []
  _miss  = True
  _scale = True

  if _miss:  steps.append(('impute',SimpleImputer(missing_values=np.nan, strategy='mean')))
  if _scale: steps.append(('scale',MinMaxScaler()))

  steps.append((name,model))

  return Pipeline(steps=steps)

# **Stacking**

In [None]:
def save_stack_info (ensemble,estimators) :
  global EnsDF

  EnsDF = EnsDF.append({'Ensemble' : ensemble,
                        'Base'     : ','.join([n for n,_ in estimators]),
                        'Dataset'  : fname},
                       ignore_index=True)

In [None]:
def create_stacking (estimators):

  stack = StackingClassifier (estimators=estimators,
                             final_estimator=LogisticRegression(solver='liblinear'),
                             cv=cv, passthrough=passthrough, n_jobs=n_jobs)
  stack = tune_stack (stack)

  return stack

In [None]:
def get_estimators_pipelined ():

  estimators = []
  for key in models:
    if 'DSE' not in key:
      e = (key,create_pipeline(key,models[key]))
      estimators.append(e)

  return estimators

In [None]:
@ignore_warnings(category=ConvergenceWarning)
def evaluate_stacking (name,model):
  score = cross_val_score(model, X, y, scoring=mcc_scoring, cv=cv,n_jobs=n_jobs).mean()
  return score

In [None]:
def full_stacking ():

  estimators = get_estimators_pipelined ()
  stack = create_stacking (estimators)
  models['FSE'] = stack
  save_stack_info ('FSE',estimators)

  print('Full Stacking = \n',EnsDF)

In [None]:
def forward_stacking ():

  estimators = get_estimators_pipelined ()
  candidate_estimators = estimators
  selected_estimators = []
  score = 0

  while len(candidate_estimators) > 0:
    selected = greedy_search_estimator (selected_estimators,candidate_estimators)
    if selected is None: break
    selected_estimators.append (selected)
    candidate_estimators.remove (selected)

  stack = create_stacking (selected_estimators)
  models['DSE-GS'] = stack
  save_stack_info ('DSE-GS',selected_estimators)

  print('Forward Stacking Done = \n',EnsDF)

def greedy_search_estimator (selected_estimators,candidate_estimators):

  candidate = None

  if selected_estimators:
    baseline = create_stacking (selected_estimators)
    score = evaluate_stacking('DSE-GS',baseline)
  else:
    score = 0

  for estimator in candidate_estimators:
    duplicate = selected_estimators
    duplicate.append(estimator)
    baseline = create_stacking (duplicate)
    new_score = evaluate_stacking('DSE-GS',baseline)

    duplicate.remove(estimator)

    if new_score > score:
      score = new_score
      candidate = estimator

  return candidate

In [None]:
def backward_stacking ():

  full_estimators = get_estimators_pipelined ()
  removed_estimators = []
  score = 0

  while len(full_estimators) >= 2:
    selected = backward_elimination_estimator (full_estimators,removed_estimators)
    if selected is None: break
    removed_estimators.append (selected)
    full_estimators.remove (selected)

  stack = create_stacking (full_estimators)
  models['DSE-BE'] = stack
  save_stack_info ('DSE-BE',full_estimators)

  print('Backward Stacking Done = \n',EnsDF)

def backward_elimination_estimator (full_estimators,removed_estimators):

  candidate = None

  if len(full_estimators) == 2: return None
  baseline = create_stacking (full_estimators)
  score = evaluate_stacking('DSE-BE',baseline)

  for estimator in full_estimators:
    duplicate = full_estimators.copy()
    duplicate.remove(estimator)
    baseline = create_stacking (duplicate)
    new_score = evaluate_stacking('DSE-BE',baseline)

    if new_score >= score:
      score = new_score
      candidate = estimator

  return candidate

# **Main Execution**

In [None]:
for fname in fnames:

  dataset = pd.read_csv (fname)
  dataset.drop_duplicates()
  fname = fname.replace ('.csv','')
  nunique  = dataset.apply(pd.Series.nunique)
  colsDrop = nunique[nunique == 1].index
  dataset  = dataset.drop(colsDrop, axis=1)

  X = dataset.iloc[:, 0:-1]
  y = dataset.iloc[:,-1]
  y = LabelEncoder().fit_transform(y)

  tune_models()
  full_stacking()
  forward_stacking()
  backward_stacking()

  for name in models:

    model = models[name]
    folds = RepeatedStratifiedKFold(n_splits=10, n_repeats=10)

    scoring = {'Accuracy'  : 'accuracy',
               'Precision' : 'precision',
               'Recall'    : 'recall',
               'F1-score'  : 'f1',
               'Brier'  : 'neg_brier_score',
               'AUC'    : 'roc_auc',
               'MCC'    : mcc_scoring}

    pipe_model = create_pipeline(name,model)
    scores = cross_validate(pipe_model, X, y, scoring=scoring, cv=folds,n_jobs=-1)

    acc_results = scores ['test_Accuracy']
    pre_results = scores ['test_Precision']
    rec_results = scores ['test_Recall']
    f1s_results = scores ['test_F1-score']
    bre_scores  = abs( scores ['test_Brier'] )
    auc_scores  = scores ['test_AUC']
    mcc_scores  = scores ['test_MCC']
    time_scores = scores ['score_time']

    ResDF = ResDF.append({'Classifier' : name,
                          'Accuracy'   : np.round(acc_results.mean() * 100,2),
                          'Precision'  : np.round(pre_results.mean() * 100,2),
                          'Recall'     : np.round(rec_results.mean() * 100,2), 
                          'F1-score'   : np.round(f1s_results.mean() * 100,2),
                          'Brier'      : np.round(abs(bre_scores.mean()),2),
                          'AUC'        : np.round(auc_scores.mean(),2),
                          'MCC'        : np.round(mcc_scores.mean(),2),
                          'Time'       : np.round(time_scores.mean(),2),
                          'Dataset'    : fname}
                         ,ignore_index = True)

    for i in range(0,len(mcc_scores)):
      DetDF = DetDF.append({'Classifier' : name,
                            'Accuracy'   : np.round(acc_results[i] * 100,2),
                            'Precision'  : np.round(pre_results[i] * 100,2),
                            'Recall'     : np.round(rec_results[i] * 100,2), 
                            'F1-score'   : np.round(f1s_results[i] * 100,2),
                            'Brier'      : np.round(abs(bre_scores[i]),2),
                            'AUC'        : np.round(auc_scores[i],2),
                            'MCC'        : np.round(mcc_scores[i],2),
                            'Time'       : np.round(time_scores[i],2),
                            'Dataset'    : fname}
                           ,ignore_index = True)

    print(name)
    print(ResDF)
    print(DetDF)

# **Results analysis**

In [None]:
sns.set(style='ticks',palette='Set3')

for fname in fnames:
  fname = fname.replace ('.csv','')
  f, ax = plt.subplots(figsize=(10, 10))
  flierprops = dict(markerfacecolor='0.75', markersize=5, linestyle='none')
  box = DetDF[DetDF['Dataset'] == fname]
  sns.boxplot (x='Classifier',y='F1-score', data=box, flierprops=flierprops)
  ax.set_xticklabels(ax.get_xticklabels(),rotation=30)
  plt.ylabel ('F1-score', size=12)
  plt.xlabel ('Classifier', size=12)
  plt.title (fname, fontweight='bold',size=12)
  plt.savefig( '_' + fname + '_boxplot.png')
  plt.show()

In [None]:
names = DetDF['Classifier'].unique().tolist()

alpha = 0.05
alpha = alpha / (len(names)*(len(names)-1)/2)

for fname in fnames:
  fname = fname.replace ('.csv','')
  statDet = DetDF[DetDF['Dataset'] == fname]
  for i in range(len(names)):

    name = names[i]
    nestedNames = names[i+1:]

    for nested in nestedNames:

      model_1 =  statDet.loc[statDet['Classifier'] == name  ]['F1-score']
      model_2 =  statDet.loc[statDet['Classifier'] == nested]['F1-score']

      m1_score = model_1.mean()
      m2_score = model_2.mean()

      win, lost, effect_s, effect_t = '','','',''

      test = 'Wilcoxon'
      if m1_score == m2_score : stat, p = 1, 1
      else: stat, p = wilcoxon (model_1, model_2)

      if p > alpha: decision = 'Accept'
      else:
        decision = 'Reject'
        if   m1_score > m2_score : win, lost = name, nested
        else                     : win, lost = nested, name

        effect_s = stat / math.sqrt (len(model_1))
        if   effect_s < 0.3  : effect_t = 'small'
        elif effect_s < 0.5  : effect_t = 'moderate'
        elif effect_s >= 0.5 : effect_t = 'large'


      StaDF = StaDF.append({'Classifier_1' : name,
                            'Classifier_2' : nested,
                            'Test'         : test,
                            'Stat'         : stat,
                            'Sig-level'    : alpha,
                            'p-value'      : p,
                            'Null Hypo'    : decision,
                            'Win'          : win,
                            'Lost'         : lost,
                            'Effect Size'  : effect_s,
                            'Effect Type'  : effect_t,
                            'Dataset'    : fname}
                          ,ignore_index=True)

print(StaDF)

In [None]:
StaDF.to_excel ('_StatisticalResults.xlsx', header='column_names')
ResDF.to_excel ('_SummaryResults.xlsx', header='column_names')
DetDF.to_excel ('_DetailedResults.xlsx', header='column_names')
EnsDF.to_excel ('_StackEstimators.xlsx', header='column_names')