In [2]:
import numpy as np
import random as rn

"""Extended Isolation Forest version"""

version_tag = (2, 0, 2)
__version__ = '.'.join(map(str, version_tag[:3]))

if len(version_tag) > 3:
    __version__ = '%s-%s' % (__version__, version_tag[3])

def c_factor(n):
    return 2.0*(np.log(n-1)+0.5772156649) - (2.0*(n-1.)/(n*1.0))


class iForest:
    def __init__(self, X, ntrees, sample_size, limit=None, ExtensionLevel=0):
        self.ntrees = ntrees
        self.X = X
        self.nobjs = len(X)
        self.sample = sample_size
        self.Trees = []
        self.limit = limit
        self.exlevel = ExtensionLevel
        self.CheckExtensionLevel()
        if limit is None:
            self.limit = int(np.ceil(np.log2(self.sample)))
        self.c = c_factor(self.sample)
        for i in range(self.ntrees):
            ix = rn.sample(range(self.nobjs), self.sample)
            X_p = X[ix]
            self.Trees.append(iTree(X_p, 0, self.limit, exlevel=self.exlevel))

    def CheckExtensionLevel(self):
        dim = self.X.shape[1]
        if self.exlevel < 0 or self.exlevel > dim-1:
            raise Exception(f"Extension level must be between 0 and {dim-1}.")

    def compute_paths(self, X_in=None):
        if X_in is None:
            X_in = self.X
        S = np.zeros(len(X_in))
        for i in range(len(X_in)):
            h_temp = 0
            for j in range(self.ntrees):
                h_temp += PathFactor(X_in[i], self.Trees[j]).path
            Eh = h_temp / self.ntrees
            S[i] = 2.0**(-Eh / self.c)
        return S


class Node:
    def __init__(self, X, n, p, e, left, right, node_type=''):
        self.e = e
        self.size = len(X)
        self.X = X
        self.n = n
        self.p = p
        self.left = left
        self.right = right
        self.ntype = node_type


class iTree:
    def __init__(self, X, e, l, exlevel=0):
        self.exlevel = exlevel
        self.e = e
        self.X = X
        self.size = len(X)
        self.dim = self.X.shape[1]
        self.Q = np.arange(self.dim, dtype='int')
        self.l = l
        self.p = None
        self.n = None
        self.exnodes = 0
        self.root = self.make_tree(X, e, l)

    def make_tree(self, X, e, l):
        self.e = e
        if e >= l or len(X) <= 1:
            self.exnodes += 1
            return Node(X, self.n, self.p, e, None, None, 'exNode')
        mins = X.min(axis=0)
        maxs = X.max(axis=0)
        idxs = np.random.choice(range(self.dim), self.dim - self.exlevel - 1, replace=False)
        self.n = np.random.normal(0, 1, self.dim)
        self.n[idxs] = 0
        self.p = np.random.uniform(mins, maxs)
        w = (X - self.p).dot(self.n) < 0
        return Node(
            X, self.n, self.p, e,
            left=self.make_tree(X[w], e + 1, l),
            right=self.make_tree(X[~w], e + 1, l),
            node_type='inNode'
        )


class PathFactor:
    def __init__(self, x, itree):
        self.path_list = []
        self.x = x
        self.e = 0
        self.path = self.find_path(itree.root)

    def find_path(self, T):
        if T.ntype == 'exNode':
            if T.size <= 1:
                return self.e
            else:
                self.e += c_factor(T.size)
                return self.e
        p = T.p
        n = T.n
        self.e += 1
        if (self.x - p).dot(n) < 0:
            self.path_list.append('L')
            return self.find_path(T.left)
        else:
            self.path_list.append('R')
            return self.find_path(T.right)


def all_branches(node, current=None, branches=None):
    if current is None:
        current = []
    if branches is None:
        branches = []
    current = current[:node.e]
    if node.ntype == 'inNode':
        current.append('L')
        all_branches(node.left, current=current, branches=branches)
        current = current[:-1]
        current.append('R')
        all_branches(node.right, current=current, branches=branches)
    else:
        branches.append(current)
    return branches

In [3]:
import numpy as np
import random
import pandas as pd


def init_population(pop_size, feature_dim, n_obs):
    population = []
    for _ in range(pop_size):
        chromosome = {
            'ntrees': random.randint(50, 300),
            'sample_size': random.randint(int(n_obs*0.5), n_obs),
            'contamination': round(random.uniform(0.01, 0.2), 3),
            'exlevel': random.randint(0, feature_dim-1)
        }
        population.append(chromosome)
    return population

def fitness(chromosome, X, labels=None):
    forest = iForest(X, ntrees=chromosome['ntrees'],
                     sample_size=chromosome['sample_size'],
                     ExtensionLevel=chromosome['exlevel'])
    scores = forest.compute_paths(X)

    if labels is not None:
        from sklearn.metrics import f1_score
        preds = (scores > np.percentile(scores, 100*chromosome['contamination'])).astype(int)
        return f1_score(labels, preds)
    else:
        return np.var(scores)

def select_parents(population, fitnesses, num_parents):
    idx = np.argsort(fitnesses)[-num_parents:]
    return [population[i] for i in idx]

def crossover(parent1, parent2):
    child = {}
    for key in parent1.keys():
        child[key] = random.choice([parent1[key], parent2[key]])
    return child

def mutate(chromosome, feature_dim, n_obs, mutation_rate=0.2):
    if random.random() < mutation_rate:
        key = random.choice(list(chromosome.keys()))
        if key == 'ntrees':
            chromosome[key] = random.randint(50, 300)
        elif key == 'sample_size':
            chromosome[key] = random.randint(int(n_obs*0.5), n_obs)
        elif key == 'contamination':
            chromosome[key] = round(random.uniform(0.01, 0.2), 3)
        elif key == 'exlevel':
            chromosome[key] = random.randint(0, feature_dim-1)
    return chromosome

def ga_eif(X, labels=None, pop_size=20, generations=10):
    n_obs, feature_dim = X.shape
    population = init_population(pop_size, feature_dim, n_obs)

    best_overall = None
    best_fitness_overall = -np.inf

    for gen in range(generations):
        fitnesses = [fitness(ch, X, labels) for ch in population]
        gen_best_idx = np.argmax(fitnesses)
        gen_best_fitness = fitnesses[gen_best_idx]
        gen_best_chrom = population[gen_best_idx]

        print(f"Gen {gen} best fitness: {gen_best_fitness:.4f}")
        print(f"Gen {gen} best params: {gen_best_chrom}")

        if gen_best_fitness > best_fitness_overall:
            best_fitness_overall = gen_best_fitness
            best_overall = gen_best_chrom.copy()

        parents = select_parents(population, fitnesses, num_parents=pop_size//2)
        next_pop = parents.copy()

        while len(next_pop) < pop_size:
            p1, p2 = random.sample(parents, 2)
            child = crossover(p1, p2)
            child = mutate(child, feature_dim, n_obs)
            next_pop.append(child)

        population = next_pop

    print("\nGA finished.")
    print("Best overall fitness:", best_fitness_overall)
    print("Best overall EIF params:", best_overall)
    return best_overall

#X defined as orderbook features. For more information visit data/ folder.
#best_chrom = ga_eif(X, pop_size=20, generations=10)

In [5]:
import pandas as pd
df = pd.read_csv('/content/df_selection.csv')
df = df.iloc[: , 1:-2]
df.head(2)
print(len(df.columns.to_list()))

df.to_csv("/content/df_selected_redo.csv")



29


In [6]:
from sklearn.preprocessing import MinMaxScaler
import pickle
scaler = MinMaxScaler()
X = scaler.fit_transform(df.values)

with open("ga_scaler.pkl", "wb") as f:
    pickle.dump(scaler, f)





In [6]:
best_params = ga_eif(X, pop_size=10, generations=10)

Gen 0 best fitness: 0.0018
Gen 0 best params: {'ntrees': 235, 'sample_size': 1684, 'contamination': 0.11, 'exlevel': 26}
Gen 1 best fitness: 0.0019
Gen 1 best params: {'ntrees': 235, 'sample_size': 1684, 'contamination': 0.11, 'exlevel': 26}
Gen 2 best fitness: 0.0019
Gen 2 best params: {'ntrees': 119, 'sample_size': 1691, 'contamination': 0.062, 'exlevel': 12}
Gen 3 best fitness: 0.0019
Gen 3 best params: {'ntrees': 235, 'sample_size': 1684, 'contamination': 0.11, 'exlevel': 26}
Gen 4 best fitness: 0.0018
Gen 4 best params: {'ntrees': 235, 'sample_size': 1684, 'contamination': 0.11, 'exlevel': 26}
Gen 5 best fitness: 0.0019
Gen 5 best params: {'ntrees': 235, 'sample_size': 1691, 'contamination': 0.071, 'exlevel': 26}
Gen 6 best fitness: 0.0019
Gen 6 best params: {'ntrees': 235, 'sample_size': 1691, 'contamination': 0.062, 'exlevel': 26}
Gen 7 best fitness: 0.0019
Gen 7 best params: {'ntrees': 235, 'sample_size': 1691, 'contamination': 0.062, 'exlevel': 26}
Gen 8 best fitness: 0.0018
G

In [7]:
forest = iForest(X,
                 ntrees=235,
                 sample_size= 1691,
                 ExtensionLevel=26)
scores_train = forest.compute_paths(X)


In [8]:
import pickle

with open("eif_model.pkl", "wb") as f:
    pickle.dump(forest, f)


In [30]:
'''
Compute according to your data.

X_test_scaled = scaler.transform(df_test.values)
scores_test = forest.compute_paths(X_test_scaled)

'''

'\nCompute according to your data.\n\nX_test_scaled = scaler.transform(df_test.values)\nscores_test = forest.compute_paths(X_test_scaled)\n\n'