# Assignment - David

Write a method called wrpr_classification which takes three arguments:

1. A range of features (eg 4-8) 
2. Step forward/ backward boolean option
3. Scoring methodology whose default is ‘accuracy ’. 
 
This method should implment the sequential feature selection approach for classification (hint mlxtend library) using a random forest tree model. The method is expected to find the best subset of features from the given range that optimizes the scoring methodology provided while using either a forward or backward step and finally it should print the metric dictionary.

http://rasbt.github.io/mlxtend/user_guide/feature_selection/SequentialFeatureSelector/#example-1-a-simple-sequential-forward-selection-example

http://rasbt.github.io/mlxtend/user_guide/feature_selection/SequentialFeatureSelector/#example-9-selecting-the-best-feature-combination-in-a-k-range

https://github.com/rasbt/mlxtend/blob/master/mlxtend/feature_selection/sequential_feature_selector.py

 ## Data Importation
 

In [0]:
import pandas as pd

X_train = pd.read_pickle("../x_train.pkl")
y_train = pd.read_pickle("../y_train.pkl")
y_train_time = pd.read_pickle("../y_train_time.pkl")

# Selected feature set from Filter Techniques
feature_set_six = ['v_Vel', 'lateral_current_lane', 'longit_pos_vehicle1', 'longit_pos_vehicle2', 'longit_pos_vehicle3', 'lat_pos_vehicle1', 'lat_pos_vehicle2', 'iTTC_ref3', 'v_Vel_follow1', 'v_Vel_preced2', 'longit_pos_preced1', 'longit_pos_follow1', 'longit_pos_preced2', 'longit_pos_follow2', 'iTTC_preced1', 'iTTC_follow1', 'iTTC_preced2', 'iTTC_follow2']

X_train_fs = X_train[feature_set_six]

In [0]:
def get_best_feature_dict_metrics(sfs):
    """Returns only the best feature set's metric dictionnary.

      Parameters
      ----------
      sfs : SequentialFeatureSelector object
          A SequentialFeatureSelector object after having being fitted.
      
      Returns
      -------
      dict
          a dict containing the sfs's best feature set's metrics.
    """

    # Retrieve the complete metric dictionnary
    dict_metrics = sfs.get_metric_dict()

    # Iterate through each (key, value) pair until finding the best feature names
    for key, value in dict_metrics.items():
        if value['feature_names'] == sfs.k_feature_names_:
            return (dict_metrics[key])

In [0]:
def pp_metric_dict(dic):
    """Pretty Prints the given feature set's metrics dictionnary

      Parameters
      ----------
      dic : dict
          The feature set's metric dictionnary extracted from the SFS metric dict.
    """

    # Iterate over each (key, value) pair and print them side by side
    for key,value in dic.items():
        print(f'{key}: {value}')

In [0]:
from mlxtend.feature_selection import SequentialFeatureSelector as SFS
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_validate
from sklearn.metrics import make_scorer, accuracy_score, precision_score, recall_score, roc_auc_score, f1_score
import numpy as np

def pp_SFS_results(step_forward, results):
    """Pretty Prints the extracted results from a Sequential Feature Selection. 
        The metrics being printed are accuracy, precision, recall, f1 score and roc auc.

      Parameters
      ----------
      step_forward : bool
          This boolean represent whether the Sequential Feature Selection has been done forward or backwards.
          True: forward, False: backward.
          
      results : dict
          Dictionnary containing the Sequential Feature Selection's metrics
    """
    if step_forward:
        print(f"==================== SFFS, range: {results['range']} ====================")
    else:
        print(f"==================== SBFS, range: {results['range']} ====================")

    print(f"Accuracy: {results['accuracy']}")
    print(f"Precision: {results['precision']}")
    print(f"Recall: {results['recall']}")
    print(f"F1 Score: {results['f1_score']}")
    print(f"ROC AUC: {results['roc_auc']}")

def wrpr_classification(X, y, feature_range, step_forward):
    """Performs Sequential Feature Selection. The orientation (forward/backward) of the feature selection depends on the arguments (step_forward)
        The feature selection will be performed in a feature range. This function also prints the result metrics of the extracted feature selection.

      Parameters
      ----------
      X : {array-like, sparse matrix, pandas Dataframe}, shape = [n_samples, n_features]
          Data from which the SFS will be based upon.
          
      y : {array-like, pandas Series}, shape = [n_samples]
          Target values.

      feautre_range : tuple
          A tuple containing the range in which you wish to perform the sequential feature selection. The shape of the tuple is (min, max).

      step_forward : bool
          A boolean describing whether you would like to perform the sequential feature selection forwards or backwards.
          (True: Forward, False: Backward).
      
      Returns
      -------
      dict
          a dictionnary containing the sequential feature selection's metric results.
    """

    # Classification Score Metrics Declaration
    scoring_classifier = {
        'accuracy': make_scorer(accuracy_score),
        'precision': make_scorer(precision_score, average='macro'),
        'recall': make_scorer(recall_score, average='macro'),
        'roc_auc': make_scorer(roc_auc_score, multi_class="ovr", needs_proba=True),
        'f1_score': make_scorer(f1_score, average='macro')
    }

    # Random Forest Classifier creation
    rf_clf = RandomForestClassifier(n_jobs=-1)

    # Sequential Feature Selection Object Creation + fitting
    sfs = SFS(rf_clf, k_features=feature_range, forward=step_forward, n_jobs=-1)
    sfs = sfs.fit(X, y)

    rf_clf = RandomForestClassifier()

    # Cross Validate + Metrics Retrieval for classification on only the current feature set
    classification_cross_validation_results = cross_validate(rf_clf, X[list(sfs.k_feature_names_)], y.values.ravel(), cv=10, scoring=scoring_classifier, n_jobs=-1)

    # Results storage
    results = {
        'accuracy': np.mean(classification_cross_validation_results['test_accuracy']),
        'precision': np.mean(classification_cross_validation_results['test_precision']),
        'recall': np.mean(classification_cross_validation_results['test_recall']),
        'f1_score': np.mean(classification_cross_validation_results['test_f1_score']),
        'roc_auc': np.mean(classification_cross_validation_results['test_roc_auc']),
        'range': feature_range,
        'feature_names': list(sfs.k_feature_names_)
    }

    # Current Sequential Feature Selection Results printing
    pp_SFS_results(step_forward, results)

    # Best feature set dict metrics retrieval
    dict_metrics = get_best_feature_dict_metrics(sfs)

    # Best feature set dict metrics printing
    pp_metric_dict(dict_metrics)

    print('\n')

    return results

In [0]:
def is_new_best(current_best, results):
    """Compares the results given as parameter to the current best feature set metrics. 

      Parameters
      ----------
      current_best : dict
          Dictionnary containing the current best feature set metrics.
          
      results : dict
          Dictionnary containing the new metrics we want to compare to the current best ones.
      
      Returns
      -------
      bool
          Boolean flag on whether the new metrics are better than the current best.
          True: new metrics are better, False: Current best is better than new metrics.
    """


    # Calculate the difference of each metrics
    accuracy_difference =  results['accuracy'] - current_best['accuracy']
    precision_difference = results['precision'] - current_best['precision']
    recall_difference = results['recall'] - current_best['recall']
    f1_score_difference = results['f1_score'] - current_best['f1_score']
    roc_auc_difference = results['roc_auc'] - current_best['roc_auc']

    # Calculate the total difference between the metrics
    total_difference = accuracy_difference + precision_difference + recall_difference + f1_score_difference + roc_auc_difference

    return total_difference > 0

In [0]:
from random import seed
from random import randint

def range_creation(num_ranges, num_features):
    """Creates and returns random ranges for a certain given number of features.
        The minimum difference between the min and the max within a range is 5.

      Parameters
      ----------
      num_ranges : int
          Number of ranges you wish to generate
          
      num_features : int
          Number of features present in your data set
      
      Returns
      -------
      array
          an array of tuples containing the ranges.
    """

    # Empty array which will contain the different generated ranges
    ranges = []

    for _ in range(num_ranges):
        # Do while
        while True:
            # First generate max
            max = randint(1,num_features)

            # Generate min between 1 and max value
            min = randint(1, max)

            # At least have a range of 5 to leave while loop
            if max - min > 5:
                break
        
        # Append new range to the ranges array
        ranges.append((min, max))

    return ranges


def pp_best_fs_SFS(best_results):
    """Pretty Prints the best results given as an argument.

      Parameters
      ----------
      best_results : dict
          Dictionnary containing the best results for SFFS and SBFS.
    """

    # Declaring the different types of Sequential Feature Selection
    sequential_feature_selection = ['SFFS', 'SBFS']

    # Printing the best feature sets results
    for sequential_type in sequential_feature_selection:
        print(f"============================================================================")
        print(f'==================== Best {sequential_type} Feature Set ====================')
        print(f"============================================================================")
        print(f"Range: {best_results[sequential_type]['range']}")
        print(f"Feature names: {best_results[sequential_type]['feature_names']}\n")

        print(f"Accuracy: {best_results[sequential_type]['accuracy']}")
        print(f"Precision: {best_results[sequential_type]['precision']}")
        print(f"Recall: {best_results[sequential_type]['recall']}")
        print(f"F1 Score: {best_results[sequential_type]['f1_score']}")
        print(f"ROC AUC: {best_results[sequential_type]['roc_auc']}\n\n")



def find_best_fs_SFS_classifier(X_train, y_train, num_ranges):
    """Calculates and finds the best feature sets using SFFS and SFBS.
        To calculate all the different metrics, the Random Forest Classifier model is used.
        This function also prints the results of each range for the forward and the backward
        sequential feature selection, in addition to the best feauture sets and ranges.

      Parameters
      ----------
      X_train : {array-like, sparse matrix, pandas Dataframe}, shape = [n_samples, n_features]
          Data from which the SFS will be based upon.
          
      y_train : {array-like, pandas Series}, shape = [n_samples]
          Target values.
    
      num_ranges : int
          Number of ranges you wish to be generated to find the optimal feature set.
      
      Returns
      -------
      dict
          A dictionnary containing the best feature sets for SFFS and SFBS, with their respective metrics, and range.
    """

    # Best results dictionnary initialisation
    best_results = {
        'SFFS': {
            'accuracy': 0,
            'precision': 0,
            'recall': 0,
            'f1_score': 0,
            'roc_auc': 0,
            'range': (0,0),
            'feature_names': []
        },
        'SBFS': {
            'accuracy': 0,
            'precision': 0,
            'recall': 0,
            'f1_score': 0,
            'roc_auc': 0,
            'range': (0,0),
            'feature_names': []
        }
    }

    # Ranges generation
    ranges = range_creation(num_ranges, len(X_train.columns))

    # Iterate over each range generated
    for curr_range in ranges:
        print(f"=============================================================")
        print(f"Range: {curr_range}")
        print(f"=============================================================")

        # Calculate SFFS & SBFS for current range
        SFFS_results = wrpr_classification(X_train, y_train, curr_range, True)
        SBFS_results = wrpr_classification(X_train, y_train, curr_range, False)

        # Comparing result with current best and replacing if needed
        if is_new_best(best_results['SFFS'], SFFS_results):
            best_results['SFFS'] = SFFS_results
        
        if is_new_best(best_results['SBFS'], SBFS_results):
            best_results['SBFS'] = SBFS_results

    # Pretty Print best feature sets
    pp_best_fs_SFS(best_results)

    return best_results

In [0]:
best_fs = find_best_fs_SFS_classifier(X_train_fs, y_train, 10)

Range: (12, 18)
Accuracy: 0.9798368298368297
Precision: 0.9802733256662701
Recall: 0.9798368298368298
F1 Score: 0.9798088119228113
ROC AUC: 0.9956878494465908
feature_idx: (0, 1, 2, 5, 6, 7, 8, 9, 10, 11, 13, 14, 16)
cv_scores: [0.97727273 0.98135198 0.98426573 0.97494172 0.98076923]
avg_score: 0.9797202797202796
feature_names: ('v_Vel', 'lateral_current_lane', 'longit_pos_vehicle1', 'lat_pos_vehicle1', 'lat_pos_vehicle2', 'iTTC_ref3', 'v_Vel_follow1', 'v_Vel_preced2', 'longit_pos_preced1', 'longit_pos_follow1', 'longit_pos_follow2', 'iTTC_preced1', 'iTTC_preced2')
ci_bound: 0.0041944225772268424
std_dev: 0.003263403263403259
std_err: 0.0016317016317016295


Accuracy: 0.9785547785547786
Precision: 0.9789868780886923
Recall: 0.9785547785547786
F1 Score: 0.9785269130711776
ROC AUC: 0.995985235626844
feature_idx: (1, 2, 3, 5, 6, 7, 8, 9, 11, 12, 13, 15, 16, 17)
cv_scores: [0.97319347 0.98193473 0.98484848 0.97785548 0.98310023]
avg_score: 0.98018648018648
feature_names: ('lateral_current_

## Tests

### Feature Set selections test
This test was to see if looping on the same range, the Sequential Feature Selection outputs the same feature set each time or not. The answer is no.

In [0]:
rf_clf = RandomForestClassifier(n_jobs=-1)

sfs = SFS(rf_clf, k_features=(4,8), forward=True, scoring='accuracy')
sfs = sfs.fit(X_train_fs, y_train)

In [0]:
feature_sets = []

for i in range(10):
    rf_clf = RandomForestClassifier(n_jobs=-1)

    sfs = SFS(rf_clf, k_features=(1,18), forward=True, scoring='accuracy')
    sfs = sfs.fit(X_train_fs, y_train)

    feature_sets.append(list(sfs.k_feature_names_))

for i in feature_sets:
    print(len(i))

11
11
15
6
10
14
10
5
9
11
