In [1]:
# if code is running on IBM Cloud Pak, uncomment
# %%writefile Outliers.py

import os
import sys
from timeit import default_timer as timer
import pandas as pd
import numpy as np
from loguru import logger
from sklearn.ensemble import IsolationForest

class Outliers:

    def handle(self, df):
        # function for handling of outliers in the data
        logger.info('Started handling of outliers...')
        start = timer()
                
        outlier_indexes = Outliers._isolation_forest_detection(self, df, self.cols_group["cols_num"])
        
        # change detected outlier values to NAN value for 'MissingValue' phase  
        for feature in self.cols_group["cols_num"]:
            for index in outlier_indexes[feature]:
                df.loc[index, feature] = np.nan

        
        end = timer()
        logger.info('Completed handling of outliers in {} seconds', round(end-start, 6))
        
        return df


    def _isolation_forest_detection(self, df, cols_num):
        # function for outlier detection with isolation forest alg
        logger.info('Isolation Forest algorithm started the detection...')
        # Key   : Feature names
        # Value : List of outlier elements' indexes for that feature 
        IF_predictions = {}
                
        for feature in cols_num:
            # reshaping for IsolationForest()
            feature_shaped = df[feature].values.reshape(-1, 1)

            # initializing the isolation forest
            isolation_model = IsolationForest(contamination = 0.003)

            # training the model 
            isolation_model.fit(feature_shaped)

            # making predictions 
            IF_predictions[feature] = np.where(isolation_model.predict(feature_shaped) == -1)[0]
            
            logger.debug(f'Outlier Detection: {len(IF_predictions[feature])} value(s) found for feature "{feature}"')
        
        logger.info('Isolation Forest algorithm finished the detection...')
        
        return IF_predictions