In [None]:
def nested_cross_val(df,model,space, n_iter,scoring,search_cv='random',num_classes=4,outer=10,inner=3,seed=42):
  """
    Objective
    ---------
    Creates nested folds manually.
    Returns dataframe  of validation data
    

    Parameters
    ----------
    df : dataframe
      Input ready in form of dataframe.
        
    model : machine learning model
      Model in which to cross validate.
      eg) CatboostRegressor, LightGBMRegressor
      
    space : dictionary of lists
      Search space for hyper parameter optimization.
      eg) {
            'num_leaves': [300,400,500], 
            'max_bin': [175,255,510],
            'num_iterations': [700,800,900], 
            'learning_rate':[0.05,0.1,0.15],
            'boosting_type': ['gbdt', 'dart'],
            'max_depth': [-1, 5, 10]
          }
      
    n_iter : number of parameter settings sampled
      n_iter trades off runtime vs quality of the solution.
      
    scoring : string
      score method for evaluating model
      
    search_cv : string, default='random'
      hyper-parameter optimization approach. Random and grid search options
      eg)'random' or 'grid'
      
    num_classes : int, default=4
      number of classes for classification.
      eg) 4, 6, 8
      
    outer : int, default=10
      number of outer folds
      
    inner : int, default=3
      number of inner folds
      
    seed : int, default=42
      seed for reproducability
      
      
    Returns
    ----------
    Dataframe with final results.
    """
  from sklearn.model_selection import StratifiedKFold, RandomizedSearchCV
  from sklearn.metrics import balanced_accuracy_score
  from sklearn.preprocessing import label_binarize
  from sklearn.multiclass import OneVsRestClassifier
  from numpy import mean, std
  import numpy as np
  import pandas as pd

  from tqdm import tqdm
  
  #set validation parameters
  outer_cv = StratifiedKFold(n_splits=outer, random_state=seed)
  inner_cv = StratifiedKFold(n_splits=inner, random_state=seed)
  
  
  outer_scores = []
  outer_estimates = []
  outer_configurations = []
  error_indices = []
  
  ground_truth = []
  predicted_labels = []
  predicted_probas = []

  X, y = df.drop(columns=['classification']), df['classification']  

  # Use the random grid to search for best hyperparameters
  # configure the cross-validation procedure
  cv_outer = StratifiedKFold(n_splits=outer, random_state=seed)
  # enumerate splits
  for train_ix, test_ix in tqdm(cv_outer.split(X,y), total=10):
    # split data
    # Get the training data
    X_train, y_train = X.iloc[train_ix], y.iloc[train_ix]
    # Get the validation data
    X_test, y_test = X.iloc[test_ix], y.iloc[test_ix]

    # configure the cross-validation procedure
    cv_inner = StratifiedKFold(n_splits=inner, random_state=seed)
    
    
    # define search
    if search_cv=='random':
      search = RandomizedSearchCV(
        estimator = model(), 
        param_distributions = space, 
        n_iter = n_iter, # key parameter
        cv = cv_inner,  # n_splits is key parameter
    #     verbose=2, 
        random_state=seed, 
        scoring=scoring
      )
    else:
      search = GridSearchCV(
      model(), 
      space, 
      cv = cv_inner,  # n_splits is key parameter
      scoring=scoring
    )
    # execute search
    result = search.fit(X_train, y_train)
    # get the best performing model fit on the whole training set
    best_model = result.best_estimator_
    # evaluate model on the hold out dataset
    yhat = best_model.predict(X_test).flatten()
    # evaluate the model
    acc = balanced_accuracy_score(y_test, yhat)
    
    # ROC METRICS 
    # Binarize the output
    roc_y_train, roc_y_test = label_binarize(y_train, classes=[0, 1, 2,3]), label_binarize(y_test, classes=[0, 1, 2,3])
    
    try:
      roc_params = result.best_params_.copy()
      roc_params.pop('class_weight', None)
      roc_params.pop('class_weights', None)
      roc_params['class_weight'] = 'balanced'
      roc_params['num_classes']=1
      classifier = OneVsRestClassifier(model(**roc_params))
      y_score = classifier.fit(X_train, roc_y_train).predict_proba(X_test)
    except Exception as e:
      try:
        roc_params = result.best_params_.copy()
        roc_params.pop('class_weight', None)
        roc_params.pop('class_weights', None)
        roc_params['class_weight'] = 'balanced'
        classifier = OneVsRestClassifier(model(**roc_params))
        y_score = classifier.fit(X_train, roc_y_train).predict_proba(X_test)
      except:
        roc_params = result.best_params_.copy()
        roc_params.pop('class_weight', None)
        roc_params.pop('class_weights', None)
#         roc_params['class_weights'] = 'balanced'
        classifier = OneVsRestClassifier(model(**roc_params))
        y_score = classifier.fit(X_train, roc_y_train).predict_proba(X_test)
      
    predicted_probas.append(list(y_score))
    
    
    #gather misclassified objects
    y_test = np.asarray(y_test)
    y_pred = best_model.predict(X_test).flatten()
    misclassified = np.where(y_test != y_pred)
    misclassified = list(X.iloc[test_ix].iloc[misclassified].index)
    error_indices.append(misclassified)
    ground_truth.append(list(y_test))
    predicted_labels.append(list(y_pred))
    
    
    # store the result
    outer_scores.append(acc)
    outer_estimates.append(result.best_score_)
    outer_configurations.append(result.best_params_)
    # report progress
    print(f'>acc={acc:.2f}, est={result.best_score_:.3f}, cfg={result.best_params_}')
    
  error_indices = [item for sublist in error_indices for item in sublist]
  
  assert len(error_indices) == len(list(set(error_indices)))
  final_data = pd.DataFrame({
    'outer_scores': [outer_scores],
    'outer_estimates': [outer_estimates],
    'outer_configurations': [outer_configurations],
    'error_indices': [error_indices],
    'ground_truth': [[item for sublist in ground_truth for item in sublist]],
    'predicted_labels': [[item for sublist in predicted_labels for item in sublist]],
    'predicted_probas': [[list(item) for sublist in predicted_probas for item in sublist]]
  })
  
  outer_average = np.array(final_data['outer_scores'][0]).mean()
  final_data['outer_average'] = outer_average
  
  # summarize the estimated performance of the model
  print(f"Balanced Accuracy: {outer_average:.3f} ({std(np.array(final_data['outer_scores'][0])):.3f})")
  
  return final_data

In [None]:
def base_performance(df,model,outer,seed,tuned_acc):
  """
    Objective
    ---------
    Assess performance increase from hyper-parameter optimization
    

    Parameters
    ----------
    df : dataframe
      Input ready in form of dataframe.
        
    model : machine learning model
      Model in which to cross validate.
      eg) CatboostRegressor, LightGBMRegressor
      
    seed : int, default=42
      seed for reproducability
      
    tuned_mae : float
      tuned model performance
      
  """
  from sklearn.model_selection import StratifiedKFold
  from numpy import mean
  from sklearn.metrics import balanced_accuracy_score
  from numpy import std
  X, y = df.drop(columns=['classification']), df['classification'] 
  #GET BASE PERFORMANCE
  # Use the random grid to search for best hyperparameters
  # configure the cross-validation procedure
  cv_outer = StratifiedKFold(n_splits=outer, random_state=seed)
  # enumerate splits
  outer_results = list()
  counter=0
  for train_ix, test_ix in cv_outer.split(X,y):
    # split data
    # Get the training data
    X_train, y_train = X.iloc[train_ix], y.iloc[train_ix]
    # Get the validation data
    X_test, y_test = X.iloc[test_ix], y.iloc[test_ix]

    # define the model
    model = model

    # execute search
    result = model.fit(X_train, y_train)
    # get the best performing model fit on the whole training set
    # evaluate model on the hold out dataset
    yhat = result.predict(X_test)
    # evaluate the model
    acc = balanced_accuracy_score(y_test, yhat)
    # store the result
    outer_results.append(acc)
    # report progress
    print(f'>acc={acc:.2f}')
    
  # summarize the estimated performance of the model
  base_acc = mean(outer_results)
  print(f'Tuned Accuracy: {tuned_acc:.3f}')
  print(f'Base Accuracy: {base_acc:.3f}')

  print(f"Improvement of: {(100 * (tuned_acc - base_acc) / base_acc):.3f}%")

In [None]:
def roc_curve(y_true,predicted_probas,n_classes=4):
  """
    Objective
    ---------
    Plot ROC curves
    

    Parameters
    ----------
    y_true : list
      Ground truth labels
        
    predicted_probas : list
      Confidence scores of model
      
    num_classes : int, default=4
      number of classes for classification.
      eg) 4, 6, 8
      
  """
  from scipy import interp
  from sklearn.metrics import roc_curve, auc
  from itertools import cycle
  from sklearn.preprocessing import label_binarize
  import numpy as np
  
  import matplotlib.pyplot as plt
  %matplotlib inline

  y_true = label_binarize(y_true, classes=[0, 1, 2,3])

  adjusted = []
  for val in predicted_probas:
    adjusted.append(list(val))
  adjusted=np.array(adjusted)
  predicted_probas = adjusted

  fpr = dict()
  tpr = dict()
  roc_auc = dict()
  for i in range(n_classes):
      fpr[i], tpr[i], _ = roc_curve(y_true[:, i], predicted_probas[:, i])
      roc_auc[i] = auc(fpr[i], tpr[i])
  colors = cycle(['blue', 'red', 'green', 'yellow'])

  # First aggregate all false positive rates
  all_fpr = np.unique(np.concatenate([fpr[i] for i in range(n_classes)]))

  # Then interpolate all ROC curves at this points
  mean_tpr = np.zeros_like(all_fpr)
  for i in range(n_classes):
      mean_tpr += interp(all_fpr, fpr[i], tpr[i])

  # Finally average it and compute AUC
  mean_tpr /= n_classes

  fpr["macro"] = all_fpr
  tpr["macro"] = mean_tpr
  roc_auc["macro"] = auc(fpr["macro"], tpr["macro"])

  plt.plot(fpr["macro"], tpr["macro"],
           label='macro-average ROC curve (area = {0:0.2f})'
                 ''.format(roc_auc["macro"]),
           linewidth=2)

  for i, color in zip(range(n_classes), colors):
      plt.plot(fpr[i], tpr[i], color=color, lw=1,
               label='ROC curve of class {0} (area = {1:0.2f})'
               ''.format(i, roc_auc[i]))

  plt.plot([0, 1], [0, 1], 'k--', lw=1)
  plt.xlim([-0.05, 1.0])
  plt.ylim([0.0, 1.05])
  plt.xlabel('False Positive Rate')
  plt.ylabel('True Positive Rate')
  plt.title('Receiver operating characteristic for multi-class data')
  plt.legend(loc="lower right")
  plt.show()

In [None]:
def most_confused(y_true,y_pred,normalize=True):
  """
    Objective
    ---------
    Display confusion matrix
    

    Parameters
    ----------
    y_true : list
      Ground truth labels
        
    y_pred : list
      Predicted classes
      
    normalize : boolean, default=True
      Display normalized confusion matrix
      
  """
  import matplotlib.pyplot as plt
  import seaborn as sns
  %matplotlib inline
  sns.set()
  import scikitplot as skplt
  import numpy as np

  y_true = list(y_true)
  y_pred = list(y_pred)
  skplt.metrics.plot_confusion_matrix(
          y_true,
          y_pred,
         title="Confusion matrix", normalize=True)

  #FIX DISPLAY OF RESULTS
  plt.xlim(-0.5, len(np.unique(y_true))-0.5)
  plt.ylim(len(np.unique(y_true))-0.5, -0.5)
  #   plt.ylim(-0.5, 4.5)
  plt.show()

In [None]:
def weightsArr(values):
  weights_arr = []
  y = values.values.tolist()
  num_classes = len(values.unique().tolist())
  for i in range(num_classes):
    percentage = y.count(i)/len(y)
    result = 1 - percentage
    weights_arr.append(result)
  return weights_arr

In [None]:
def weightsDict(values):
  weights={}
  y = values.values.tolist()
  num_classes = len(values.unique().tolist())
  for i in range(num_classes):
    percentage = y.count(i)/len(y)
    result = 1 - percentage
    weights[i]=result
  return weights