## Isolation Forest implementation from scratch

Isolation forest is a useful technique for Anamoly detection. 

The goal of this project is to implement the original Isolation Forest algorithm by Fei Tony Liu, Kai Ming Ting, and Zhi-Hua Zhou. There are two general approaches to anomaly detection: 

1. Model what normal looks like and then look for nonnormal observations
2. Focus on the anomalies, which are few and different. This is the interesting and relatively-new approach taken by the authors of isolation forests. 

The idea is to isolate the anomalies based on the assumption that anomalies are few and different hence they are more suspectible to isolation during random splitting as compared to normal data points. The implementation involves creating a large number of binary trees on random subsets of data split on random attributes and random values. The anomalies will on average have shorter path lengths from root. 

Reference: https://www.researchgate.net/publication/224384174


In [1]:
# Load packages
import pandas as pd
import numpy as np
import math
from lolviz import *
from sklearn.metrics import confusion_matrix

### class definitions

In [2]:
#Internal node
class inNode:
    def __init__(self,left,right,split_att,split_val):
        self.left=left
        self.right=right
        self.split_att=split_att
        self.split_val=split_val

In [3]:
#External node
class exNode:
    def __init__(self,size):
        self.size=size

In [4]:
def sameCheck(X):
    '''
    check whether all data in a numpy array is same
    '''
    if isinstance(X, pd.DataFrame):
        X = X.values
    return np.all(X==X[0,:])

## Creating isolation tree

In [56]:
class IsolationTree:
    def __init__(self, height_limit):
        self.height_limit=height_limit

    def fit(self, X:np.ndarray, improved=False):
        """
        Given a 2D matrix of observations, create an isolation tree. Set field
        self.root to the root of that tree and return it.

        If you are working on an improved algorithm, check parameter "improved"
        and switch to your new functionality else fall back on your original code.
        """
        self.n_nodes=0
        if improved:
            self.root=self.build_faster_itree(X,0,self.height_limit)
        else:
            self.root=self.build_itree(X,0,self.height_limit)
        
        return self.root   
    
    
    def build_itree(self,X,e,l):
        #Create and return external node if current ht exceeds max ht or no more observations
      
        if e>=l or len(X)<=1:
            self.n_nodes+=1
            return exNode(len(X))

        Q=X.shape[1]
        q=np.random.randint(0,Q)
        
        #Create and return external node if all observations have same value for that attribute
        att_min, att_max=X[:,q].min(),X[:,q].max()
        if att_min==att_max:
            self.n_nodes+=1
            return exNode(len(X))
        
        p=np.random.uniform(X[:,q].min(),X[:,q].max())
        #create a mask for splitting data on attribute value
        mask=X[:,q]<p
        Xl=X[mask]
        Xr=X[~mask]
        
        leftTree=self.build_itree(Xl,e+1,l)
        rightTree=self.build_itree(Xr,e+1,l)
        self.n_nodes+=1
        return inNode(leftTree,rightTree,q,p)
    
    
    def build_faster_itree(self, X, e, l):

        # Create and return external node if current ht exceeds max ht or no more observations
        if e >= l or len(X) <= 1:
            self.n_nodes += 1
            return exNode(len(X))

        best_split = {}
        Q = X.shape[1]
        smallest = len(X)
        size = 3

        # pick three random columns and three random split points for each column. Find the best split.
        q = np.random.choice(Q, size, replace=False)

        if sameCheck(X[:,q]):
            self.n_nodes += 1
            return exNode(len(X))

        for qi in q:
            att_min, att_max = X[:, qi].min(), X[:, qi].max()
            if att_min == att_max: continue

            p = np.random.uniform(att_min, att_max, size)
            for pi in p:
                mask = X[:, qi] < pi
                Xl = X[mask]
                Xr = X[~mask]
                left_len = len(Xl)
                right_len = len(Xr)
                small = min(left_len, right_len)
                if small < smallest:
                    smallest = small
                    best_split['left'] = Xl
                    best_split['right'] = Xr
                    best_split['split_att'] = qi
                    best_split['split_val'] = pi


        leftTree = self.build_itree(best_split['left'], e + 1, l)
        rightTree = self.build_itree(best_split['right'], e + 1, l)
        self.n_nodes += 1
        return inNode(leftTree, rightTree, best_split['split_att'], best_split['split_val'])
    
  

                                    

In [6]:
# helper functions
def H(i):
    return np.log(i) + 0.5772156649

def c(size):
    if size==2: return 1
    if size>2: return 2.0*(np.log(size-1)+ 0.5772156649)-(2.0*(size-1.0)/size*1.0)
    else: return 0


## Creating isolation forest and calculating average path length

In [7]:
class IsolationTreeEnsemble:
    def __init__(self, sample_size=256, n_trees=10):
        self.sample_size=sample_size
        self.n_trees=n_trees
        

    def fit(self, X:np.ndarray, improved=False):
        """
        Given a 2D matrix of observations, create an ensemble of IsolationTree
        objects and store them in a list: self.trees.  Convert DataFrames to
        ndarray objects.
        """
        if isinstance(X, pd.DataFrame):
            X = X.values
         
        self.trees=[]
        height_limit=np.ceil(np.log2(self.sample_size))
        
        #create t trees
        for i in range(self.n_trees):
            
            itree=IsolationTree(height_limit)
            X_sample=X[np.random.choice(X.shape[0],size=self.sample_size,replace=False)]
            itree.root=itree.fit(X_sample,improved)
            self.trees.append(itree)

        return self

    def path_length(self, X:np.ndarray) -> np.ndarray:
        """
        Given a 2D matrix of observations, X, compute the average path length
        for each observation in X.  Compute the path length for x_i using every
        tree in self.trees then compute the average for each x_i.  Return an
        ndarray of shape (len(X),1).
        """
        if isinstance(X, pd.DataFrame):
            X = X.values
            
        path_len= np.zeros((len(X),1),dtype=float)
        #iterate over each observation
        for i in range(len(X)):
            path_len_tree=[]
            #iterate over each tree to find average path length of that observation
            for itree in self.trees:
                #with recursion
                #path_len_tree.append(self.recursive_path_len(X[i],itree.root,0))
                #without recursion (while loop)
                path_len_tree.append(self.get_path_len(X[i],itree.root,0))
            average_path_len=np.mean(np.array(path_len_tree))

            path_len[i]=average_path_len
        return path_len
            
    
    def recursive_path_len(self,xi,node,e):
        if isinstance(node,exNode):
            return e+c(node.size)
        q=node.split_att
        if xi[q]<node.split_val : return self.recursive_path_len(xi,node.left,e+1)
        else: return self.recursive_path_len(xi,node.right,e+1)
        
    def get_path_len(self,xi,node,e):
        #stop when you reach external node
        while isinstance(node,inNode):
            if xi[node.split_att] < node.split_val:
                node=node.left
                e+=1
            else: 
                node=node.right
                e+=1
        return e+c(node.size)
            
 
  
        

## Scoring observations based on path length

In [None]:
  def anomaly_score(self, X:np.ndarray) -> np.ndarray:
        """
        Given a 2D matrix of observations, X, compute the anomaly score
        for each x_i observation, returning an ndarray of them.
        """
        if isinstance(X, pd.DataFrame):
            X = X.values
        
        path_len=self.path_length(X)
        power= -1*path_len/c(self.sample_size)
        anomaly_result=np.power(2,power)
        return anomaly_result
        
        
    def predict_from_anomaly_scores(self, scores:np.ndarray, threshold:float) -> np.ndarray:
        """
        Given an array of scores and a score threshold, return an array of
        the predictions: 1 for any score >= the threshold and 0 otherwise.
        """
        predictions=scores >= threshold
        predictions=predictions.astype(int)
        return predictions
        

    def predict(self, X:np.ndarray, threshold:float) -> np.ndarray:
        "A shorthand for calling anomaly_score() and predict_from_anomaly_scores()."
        anomaly_result=self.anomaly_score(X)
        predictions=self.predict_from_anomaly_scores(anomaly_result)
        return predictions
        

### Set a threshold for true positive rate.
If the threshold is too high then we have low TPR and low FPR. On the other hand if the threshold is very low then we have a high TPR but also high FPR. Our objective is to have high TPR with moderately low FPR.

In [8]:
def find_TPR_threshold(y, scores, desired_TPR):
    """
    Start at score threshold 1.0 and work down until we hit desired TPR.
    Step by 0.01 score increments. For each threshold, compute the TPR
    and FPR to see if we've reached to the desired TPR. If so, return the
    score threshold and FPR.
    """
    threshold=1.01 #since first time we will deduct 0.01
    current_TPR=0.0
    
    while current_TPR < desired_TPR and threshold>0:
        #lower threshold and recalculate 
        threshold-=0.01
        #comparing scores with threshold to get hard predictions
        predictions = scores>=threshold
        predictions = predictions.astype(int)
        #calculate TPR and FPR
        confusion = confusion_matrix(y, predictions)
        TN, FP, FN, TP = confusion.flat
        current_TPR = TP / (TP + FN)
        current_FPR = FP / (FP + TN)
        
    return threshold,current_FPR
    

This is the implementation. To run the notebook choose your own dataset and extract X: features and y: labels (if available). For example see below

Choose hyperparameters
1. Sample size (256 works well)
2. Number of trees (try in range of 300-1000)
3. Desired threshold for hard predictions

In [None]:
it = IsolationTreeEnsemble(sample_size=sample_size, n_trees=n_trees)
it.fit(X, improved=improved)
scores = it.anomaly_score(X)
# if true labels are available 
threshold, FPR = find_TPR_threshold(y, scores, desired_TPR)
# if true labels are not available
y_pred = it.predict_from_anomaly_scores(scores, threshold=threshold)