# Imports

In [1]:
import warnings
warnings.simplefilter('ignore')

In [2]:
!pip install openml



In [3]:
import math
import openml
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
from sklearn.ensemble import IsolationForest
import numpy as np

# Implementation

In [4]:
class Node:

    def __init__(self, value, left=None, right=None,is_leaf=False,split_value=None,split_attribut=None):
        self.value = value  
        self.left = left    
        self.right = right
        self.is_leaf = is_leaf
        self.split_value=split_value
        self.split_attribut=split_attribut  

In [5]:
from typing import List

In [20]:
class CustomIForestModel:

  def __init__(self, number_of_trees: int,sample_size: int = 256,contamination: float = 0.1 ):
    self.number_of_trees = number_of_trees
    self.sample_size = sample_size
    self.forest: List[Node] = None
    self.contamination = contamination
    self.threshold = None

  def fit(self,set):
    self.forest = self.__iForest(set)
    return self.forest

  def __generate_threshold(self,scores):
    self.threshold = np.percentile(scores,(1-self.contamination)*100)

  #training Stage 
  @staticmethod 
  def __random_split(sample_set):
    random_attribut=np.random.choice(sample_set.shape[1], size=1, replace=False)[0]
    min=np.amin(sample_set[:,[random_attribut]])
    max=np.amax(sample_set[:,[random_attribut]])
    random_split_value=np.random.uniform(min, max)
    left_partition= sample_set[np.where(sample_set[:,random_attribut]<=random_split_value)]
    right_partition= sample_set[np.where(sample_set[:,random_attribut]>random_split_value)]
    if right_partition.shape[0]*left_partition.shape[0]==0:
      #print("Invalid partitioning! Retrying...")
      return CustomIForestModel.__random_split(sample_set)
    return left_partition,right_partition,random_split_value,random_attribut
  
  @staticmethod
  def __iTree(sample_set,current_height: int,height_limit):
    if current_height>= height_limit or sample_set.shape[0]<=1:
        return Node(sample_set,is_leaf= True)
    else:
        left_partition,right_partition,random_split_value,random_attribut= CustomIForestModel.__random_split(sample_set)
        node= Node(sample_set,is_leaf= False,split_value=random_split_value,split_attribut=random_attribut)
        node.left=CustomIForestModel.__iTree(left_partition,current_height+1,height_limit)
        node.right=CustomIForestModel.__iTree(right_partition,current_height+1,height_limit)
        return node
  
  def __iForest(self,training_set)-> List[Node] :
    forest=[]
    height_limit= math.ceil(math.log(self.sample_size,2))
    for i in range(self.number_of_trees):
        sample_set = training_set[np.random.choice(training_set.shape[0], size=self.sample_size, replace=False),:]
        forest.append(CustomIForestModel.__iTree(sample_set,0,height_limit))
    return forest
  
  # Evaluation Stage
  @staticmethod  
  def __cost(size: int):
    if (size==1): return 0
    return 2*(math.log(size-1)+np.euler_gamma-(1-(1/size)))

  @staticmethod
  def __path_length(instance,tree: Node,current_path_length):
    if tree.is_leaf:
      return current_path_length+CustomIForestModel.__cost(tree.value.shape[0])
    if instance[tree.split_attribut]<=tree.split_value:
      return CustomIForestModel.__path_length(instance,tree.left,current_path_length+1)
    else:
      return CustomIForestModel.__path_length(instance,tree.right,current_path_length+1)
  
  @staticmethod
  def __anomalie_score(estimated_path,set_size: int):
    return math.pow(2,-(estimated_path/CustomIForestModel.__cost(set_size)))

  def score_samples(self,test_set):
    if not(self.forest):
      return Null
    scores=[]
    forest_size=len(self.forest)
    for i in range(test_set.shape[0]):
      instance= test_set[i,:]    
      s=0
      for j in range(forest_size):
        s += CustomIForestModel.__path_length(instance,self.forest[j],0)
      score = CustomIForestModel.__anomalie_score(s/forest_size,forest_size)
      scores.append(score)
    return np.array(scores)
    
  def predict(self, test_set) :
    predictions = [] 
    scores = self.score_samples(test_set)
    self.__generate_threshold(scores)
    for score in scores:
      predicted_class = 0 if ( score <= self.threshold ) else 1
      predictions.append(predicted_class)
    return np.array(predictions)


# Testing new implementation

In [7]:
# we will be using the mullcross dataset
dataset = openml.datasets.get_dataset(40897)
X, y, categorical_indicator, attribute_names = dataset.get_data(
    dataset_format="array", target=dataset.default_target_attribute
)
mulcross = pd.DataFrame(X, columns=attribute_names)
mulcross["class"] = y
mulcross.head()

Unnamed: 0,V1,V2,V3,V4,class
0,-0.20395,0.363011,1.013766,0.187131,0
1,-0.761118,2.436424,0.681846,0.654366,0
2,-0.209979,1.131098,-0.28218,-0.20221,0
3,0.836812,0.650342,-0.4269,-0.305281,0
4,0.454204,1.560128,-0.204841,0.219233,0


In [8]:
ds=x=mulcross['class'].value_counts()
ds

0    235930
1     26214
Name: class, dtype: int64

In [9]:
#train-test splitting
X_train,X_test, y_train,y_test=train_test_split(X,y,random_state=42,test_size=0.1)
print("shape of X :",X.shape)
print("shape of X_train:", X_train.shape)
print("shape of X_test:", X_test.shape)

shape of X : (262144, 4)
shape of X_train: (235929, 4)
shape of X_test: (26215, 4)


In [10]:
model = CustomIForestModel(100)
forest = model.fit(X_train)

In [11]:
scores = model.score_samples(X_test)
predictions = model.predict(X_test)

In [17]:
data=pd.DataFrame(predictions)
data.iloc[:,0].value_counts()

0    23593
1     2622
Name: 0, dtype: int64

In [18]:
# AUC Score via scores samples

from sklearn.metrics import roc_auc_score
my_auc=roc_auc_score(y_test,scores)
my_auc

0.9652994957026633

In [19]:
# AUC Score via predictions

from sklearn.metrics import roc_auc_score
my_auc=roc_auc_score(y_test,predictions)
my_auc

0.7963897872382136