<a href="https://colab.research.google.com/github/YasinEnigma/Scripts/blob/master/EDT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from __future__ import annotations
import math
from typing import TYPE_CHECKING, Callable, Optional

import numpy as np

def linear(depth: int, optimal_depth: int) -> float:
    return 1 - depth / optimal_depth


def tanh(depth: int, optimal_depth: int) -> float:
    return math.tanh(linear(depth, optimal_depth))


def sigmoid(depth: int, optimal_depth: int) -> float:
    return 1 / (1 + math.exp(depth - optimal_depth)) - 0.5


def negative_quadratic(depth: int, optimal_depth: int) -> float:
    return -((depth - optimal_depth) ** 2)


class FitnessEvaluator:
    def __init__(self, a1: float, a2: float, f2_func: Callable[[int, int], float]) -> None:
        self.a1 = a1
        self.a2 = a2
        self.X: Optional[np.ndarray] = None
        self.Y: Optional[np.ndarray] = None
        self.f2_func = f2_func

    def _init(self, X: np.ndarray, Y: np.ndarray) -> None:
        self.X = X
        self.Y = Y

    def accuracy(self, individual: DecisionTree) -> float:
        predictions = np.array(individual.classify_many(self.X), dtype=int)
        return np.sum(predictions == self.Y) / len(self.Y)

    def __call__(self, individual: DecisionTree) -> float:
        f1 = self.accuracy(individual)
        # TODO: update f2 to non-linear function that intersects y=0 at x=optimal_depth
        f2 = self.f2_func(individual.depth, individual.optimal_depth)

        return self.a1 * f1 + self.a2 * f2

In [None]:
from __future__ import annotations

import random
from typing import TYPE_CHECKING, List

class Selector:
    def __init__(self, *args, **kwargs):
        self.do_crossover = True

    def __call__(self, population: List[DecisionTree], fitness: List[float], rounds: int):
        pass


class Tournament(Selector):
    def __init__(self, k):
        super().__init__()
        self.k = k

    def __call__(self, population: List[DecisionTree], fitness: List[float], rounds: int):
        for _ in range(rounds):
            inds = random.sample(range(len(population)),self.k)
            ind = max(inds, key=lambda i: fitness[i])
            p1 = population[ind]

            inds = random.sample(range(len(population)),self.k)
            ind = max(inds, key=lambda i: fitness[i])
            p2 = population[ind]

            yield p1, p2


class Elitism(Selector):
    def __init__(self):
        super().__init__()
        self.do_crossover = False

    def __call__(self, population: List[DecisionTree], fitness: List[float], rounds: int):
        fp = sorted(zip(fitness, population), key=lambda x: x[0], reverse=True)
        for i in range(0, rounds * 2, 2):
            yield fp[i][1], fp[i + 1][1]

In [None]:
import copy
import random
import statistics
from collections import deque
from typing import Dict, List, Optional, Tuple, Union

import numpy as np

def load_features(trainX: np.ndarray, trainY: np.ndarray, feature_names: Optional[List[str]] = None, label_names: Optional[Dict[int, str]] = None,) -> None:
    global features, feature_vals, labels, data_loaded, _feature_names, _label_names

    # features and feature values
    features = list(range(trainX.shape[1]))  #! watch for this
    feature_vals = [list(set(trainX[:, i])) for i in features]

    # unique labels
    labels = list(set(trainY))

    data_loaded = 1

    if feature_names is None or label_names is None:
        return

    # feature and label names
    _feature_names = feature_names
    _label_names = label_names

    data_loaded = 2

In [None]:
class Node:
    __slots__ = "split_val", "feature", "label", "depth"

    def __init__(self, split_val: float = None, feature: int = None, label: int = None) -> None:
        self.split_val = split_val
        self.feature = feature
        self.label = label
        self.depth = 0

    @classmethod
    def leaf(cls, label: int):
        return cls(label=label)

    @classmethod
    def split(cls, feature: int, split_val: float):
        return cls(feature=feature, split_val=split_val)

    def is_leaf(self) -> bool:
        return self.label is not None

    def __str__(self) -> str:
        if self.is_leaf():
            return f"Label: {_label_names[self.label]}"
        return f"Feature: {_feature_names[self.feature]}, Split: {self.split_val}"

In [None]:
class DecisionTree:
    __slots__ = "nodes", "depth", "optimal_depth"

    def __init__(self, optimal_depth: int) -> None:
        # store nodes in list for fast random access
        # lists are 1 indexed

        self.optimal_depth = optimal_depth
        l = 2 ** (optimal_depth + 1)

        self.nodes: List[Optional[Node]] = [None for _ in range(l)]

        # root has depth 0
        root_feature = random.choice(features)
        self.nodes[1] = Node.split(
            root_feature, random.choice(feature_vals[root_feature])
        )
        self.depth = 0

    @classmethod
    def generate_random(cls, optimal_depth: int, split_p: float):
        assert 0 < split_p <= 1
        ret = cls(optimal_depth)

        # generate tree with bfs
        q = deque([1])
        while q:
            cur = q.popleft()

            # loop twice for left and right child
            for _ in range(2):
                if (
                    ret.nodes[cur].depth + 2 <= optimal_depth
                    and random.random() <= split_p
                ):
                    next_pos = ret.add_node(cur, "split")
                    q.append(next_pos)
                else:
                    ret.add_node(cur, "leaf")

        return ret

    def add_node(self, node_ind: int, node_type: str) -> int:
        if self.nodes[node_ind * 2] is None:
            next_pos = node_ind * 2
        else:
            next_pos = node_ind * 2 + 1

        if node_type == "leaf":
            if next_pos % 2 == 1 and self.nodes[next_pos - 1].is_leaf():
                # we don't need to check the case where
                # next_pos % 2 == 0 and self.nodes[next_pos - 1].is_leaf()
                # because the algorithm fills left to right
                # if next_pos is a left child, it is guaranteed that its sibling
                # right child does not exist
                filtered = list(
                    filter(lambda x: x != self.nodes[next_pos - 1].label, labels)
                )
                new_node = Node.leaf((random.choice(filtered)))
            else:
                new_node = Node.leaf(random.choice(labels))

        elif node_type == "split":
            feature = random.choice(features)
            new_node = Node.split(feature, random.choice(feature_vals[feature]))

        self.nodes[next_pos] = new_node
        self.nodes[next_pos].depth = self.nodes[node_ind].depth + 1
        self.depth = max(self.depth, self.nodes[next_pos].depth)
        return next_pos

    def extend_nodes(self) -> None:
        self.nodes.extend([None for _ in range(len(self.nodes))])

    def classify_one(self, X: np.ndarray) -> int:
        cur = 1
        while True:
            node = self.nodes[cur]
            if node.label is not None:
                return node.label

            cur *= 2
            if X[node.feature] > node.split_val:
                cur += 1

    def classify_many(self, X: np.ndarray) -> List[int]:
        return [self.classify_one(x) for x in X]

    def accuracy(self, X: np.ndarray, Y: np.ndarray) -> float:
        predictions = np.array(self.classify_many(X), dtype=int)
        return np.mean(predictions == Y)

    def visualize(self, filename: str) -> None:
        if data_loaded != 2:
            raise Exception("Feature and label names not loaded")

        g = pgv.AGraph(directed=True)
        g.node_attr["shape"] = "box"
        for i, node in enumerate(self.nodes):
            if node is None:
                continue
            if node.is_leaf():
                label = _label_names[node.label]
            else:
                label = f"{_feature_names[node.feature]} ≤ {node.split_val}"
            g.add_node(i, label=label)
        q = deque([1])
        while q:
            cur = q.popleft()
            if self.nodes[cur].is_leaf():
                continue
            g.add_edge(cur, cur * 2)
            g.add_edge(cur, cur * 2 + 1)
            q.append(cur * 2)
            q.append(cur * 2 + 1)
        g.layout(prog="dot")
        if filename.endswith(".png"):
            g.draw(filename)
        elif filename.endswith(".dot"):
            g.write(filename)
        else:
            raise ValueError("Unsupported file type")

In [None]:
def qlog2(x: int) -> int:
    return (x - 1).bit_length() - 1


def crossover_v2(p1: DecisionTree, p2: DecisionTree) -> Tuple[DecisionTree, DecisionTree]:
    def replace(source: DecisionTree, source_ind: int, dest: DecisionTree, dest_ind: int) -> None:
        q = deque([(source_ind, dest_ind)])
        while q:
            si, di = q.popleft()
            if di >= len(dest.nodes):
                dest.extend_nodes()
            dest.nodes[di] = source.nodes[si]
            dest.nodes[di].depth = dest.nodes[di // 2].depth + 1
            dest.depth = max(dest.depth, dest.nodes[di].depth)
            if not source.nodes[si].is_leaf():
                q.append((si * 2, di * 2))
                q.append((si * 2 + 1, di * 2 + 1))

        # del is slightly faster than .pop()
        while dest.nodes[-1] is None:
            del dest.nodes[-1]

        # clean unused nodes
        reachable = [False for i in range(len(dest.nodes))]
        q = deque([dest_ind])
        reachable[dest_ind] = True
        while q:
            cur = q.popleft()
            if not reachable[cur]:
                dest.nodes[cur] = None
            elif not dest.nodes[cur].is_leaf() and cur * 2 < len(dest.nodes):
                reachable[cur * 2] = True
                reachable[cur * 2 + 1] = True
            if cur * 2 < len(dest.nodes):
                q.append(cur * 2)
                q.append(cur * 2 + 1)

        while dest.nodes[-1] is None:
            del dest.nodes[-1]

        dest.depth = qlog2(len(dest.nodes))

    # start from 2 to avoid root node
    p1_inds = [i for i in range(2, len(p1.nodes)) if p1.nodes[i] is not None]
    p2_inds = [i for i in range(2, len(p2.nodes)) if p2.nodes[i] is not None]

    c1 = copy.deepcopy(p1)
    replace(p2, random.choice(p2_inds), c1, random.choice(p1_inds))
    c2 = copy.deepcopy(p2)
    replace(p1, random.choice(p1_inds), c2, random.choice(p2_inds))
    return c1, c2


def mutate(tree: DecisionTree) -> None:
    valid = [i for i in range(len(tree.nodes)) if tree.nodes[i] is not None]
    ind = random.choice(valid)
    if tree.nodes[ind].is_leaf():
        other_ind = ind + 1 if ind % 2 == 0 else ind - 1
        if tree.nodes[other_ind].is_leaf():
            tree.nodes[ind] = Node.leaf(
                random.choice([x for x in labels if x != tree.nodes[other_ind].label])
            )
            return

        tree.nodes[ind] = Node.leaf(random.choice(labels))
        return

    feature = random.choice(features)
    tree.nodes[ind] = Node.split(feature, random.choice(feature_vals[feature]))

In [None]:
class GDTClassifier:
    def __init__(self, population_size: int, split_probability: float, selectors: List[Tuple[Selector, float]], mutation_probability: float, optimal_depth: int, fitness_evaluator: FitnessEvaluator) -> None:
        self.population_size = population_size
        self.split_p = split_probability
        self.mut_p = mutation_probability
        self.optimal_depth = optimal_depth
        self.fitness_eval = fitness_evaluator
        self.population = None

        # make all the weights in selectors sum up to population_size
        half_pop = population_size // 2
        selection_sum = sum(s[1] for s in selectors)
        self.selectors = [
            [s[0], round(s[1] / selection_sum * half_pop)] for s in selectors
        ]

        # |diff| < half_pop
        diff = half_pop - sum(s[1] for s in self.selectors)
        if diff > 0:
            for i in range(diff):
                self.selectors[i][1] += 1
        else:
            for i in range(diff):
                self.selectors[i][1] -= 1

        # assert sum(s[1] for s in self.selectors) == population_size

    def fit(self, X: np.ndarray, Y: np.ndarray, generations: int, valX: np.ndarray = None, valY: np.ndarray = None, get_stats: int = 0, verbose: bool = False, feature_names: List[str] = None, label_names: List[str] = None) -> List[Dict[str, Union[int, float]]]:
        if verbose:
            assert get_stats > 0

        if data_loaded == 0:
            load_features(X, Y, feature_names, label_names)

        n = self.population_size

        if self.population is None:
            # initiate population
            self.population = [
                DecisionTree.generate_random(self.optimal_depth, self.split_p)
                for _ in range(n)
            ]

        train_eval = copy.copy(self.fitness_eval)
        train_eval._init(X, Y)

        val_eval = None
        # if validation data  is exists
        if validation := (valX is not None and valY is not None):
            val_eval = copy.copy(self.fitness_eval)
            val_eval._init(valX, valY)

        # fit history
        history = []

        # main loop
        for gen in range(1, generations + 1):
            fitnesses = [train_eval(tree) for tree in self.population]

            if get_stats != 0 and gen % get_stats == 0:
                history.append(
                    self._statistics(gen, train_eval, validation, val_eval, fitnesses)
                )
                if verbose:
                    self._verbose(
                        generations=generations, validation=validation, **history[-1]
                    )

            new_pop = []

            # selection + crossover
            for selector, rounds in self.selectors:
                for p1, p2 in selector(self.population, fitnesses, rounds):
                    if selector.do_crossover:
                        new_pop.extend(crossover_v2(p1, p2))
                    else:
                        new_pop.extend((p1, p2))

            # mutation
            for i in random.sample(range(n), int(n * self.mut_p)):
                mutate(new_pop[i])

            self.population = new_pop

        # sort by fitness
        fitnesses = [train_eval(tree) for tree in self.population]
        fp = sorted(zip(fitnesses, self.population), key=lambda x: x[0], reverse=True)
        self.population = [fp[i][1] for i in range(n)]

        return history

    def _statistics(self, generation: int, train_eval: FitnessEvaluator, validation: bool, val_eval: FitnessEvaluator, fitnesses: List[float]) -> Dict[str, Union[int, float]]:
        accuracies = tuple(train_eval.accuracy(tree) for tree in self.population)
        tree_gen = tuple(tree.depth for tree in self.population)
        max_depth = max(tree_gen)
        min_depth = min(tree_gen)
        avg_depth = statistics.mean(tree_gen)
        median_depth = statistics.median(tree_gen)
        std_depth = statistics.stdev(tree_gen)

        if validation:
            val_accuracies = tuple(val_eval.accuracy(tree) for tree in self.population)
            avg_val_accuracy = statistics.mean(val_accuracies)

        avg_fitness = statistics.mean(fitnesses)
        avg_accuracy = statistics.mean(accuracies)

        max_fitness = max(fitnesses)
        max_accuracy = max(accuracies)

        return {
            "generation": generation,
            "avg_fitness": avg_fitness,
            "avg_accuracy": avg_accuracy,
            "max_fitness": max_fitness,
            "max_accuracy": max_accuracy,
            "avg_val_accuracy": avg_val_accuracy if validation else None,
            "max_depth": max_depth,
            "min_depth": min_depth,
            "avg_depth": avg_depth,
            "median_depth": median_depth,
            "std_depth": std_depth,
        }

    def _verbose(
        self,
        generation: int,
        generations: int,
        avg_fitness: float,
        avg_accuracy: float,
        max_fitness: float,
        max_accuracy: float,
        validation: bool,
        avg_val_accuracy: float,
        max_depth: int,
        min_depth: int,
        avg_depth: float,
        median_depth: float,
        std_depth: float,
    ) -> None:
        print(f"Generation {generation}/{generations}")

        print("Fitness:")
        print(f"- average: {avg_fitness:.2f}")
        print(f"- max    : {max_fitness:.2f}")

        print("Accuracy:")
        print(f"- average: {avg_accuracy:.2f}")
        print(f"- max    : {max_accuracy:.2f}")
        if validation:
            print(f"- average validation: {avg_val_accuracy:.4f}")

        print("Tree depths:")
        print(f"- max    : {max_depth}")
        print(f"- min    : {min_depth}")
        print(f"- average: {avg_depth:.2f}")
        print(f"- median : {median_depth:.2f}")
        print(f"- std    : {std_depth:.2f}")
        print()

    def predict(self, X: np.ndarray, top_k: float = 1) -> List[int]:
        predictions = [
            tree.classify_many(X)
            # for tree in self.population[:1]
            for tree in self.population[: int(top_k * len(self.population))]
        ]

        return predictions
        # return [statistics.mode(individidual) for individidual in zip(*predictions)]
    def __str__(self) -> str:
      output = f'''
      population_size: {self.population_size}
      split_probability: {self.split_p}
      mutation_probability: {self.mut_p}
      optimal_depth: {self.optimal_depth}
      fitness_evaluator: {self.fitness_eval}
      selector: {self.selectors}'''
      return output

In [None]:
import copy
import random
import statistics
from collections import deque
from typing import Dict, List, Optional, Tuple, Union

import numpy as np

def load_features(trainX: np.ndarray, trainY: np.ndarray, feature_names: Optional[List[str]] = None, label_names: Optional[Dict[int, str]] = None,) -> None:
    global features, feature_vals, labels, data_loaded, _feature_names, _label_names

    # features and feature values
    features = list(range(trainX.shape[1]))  #! watch for this
    feature_vals = [list(set(trainX[:, i])) for i in features]

    # unique labels
    labels = list(set(trainY))

    data_loaded = 1

    if feature_names is None or label_names is None:
        return

    # feature and label names
    _feature_names = feature_names
    _label_names = label_names

    data_loaded = 2

In [None]:
from sklearn.datasets import load_wine
from sklearn.datasets import load_digits
from sklearn.datasets import load_diabetes
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import pandas as pd

data = pd.read_csv('abalone.csv')
data.head()

Y = data['Rings']
X = data.drop('Rings', axis=1)


x_train, x_test, y_train, y_test = train_test_split(X, Y, test_size=0.2, random_state=42, stratify = Y)

scaler = StandardScaler()
scaler.fit(x_train)

x_train = scaler.transform(x_train)
x_test = scaler.transform(x_test)

ValueError: The least populated class in y has only 1 member, which is too few. The minimum number of groups for any class cannot be less than 2.

In [None]:
from sklearn.datasets import load_wine
from sklearn.datasets import load_digits
from sklearn.datasets import load_diabetes
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import LabelEncoder
import pandas as pd

data = pd.read_csv('ecoli.csv')
data.head()


Y = data['class']
X = data.drop('class', axis=1)

le = LabelEncoder()
# X['Sequence Name'] = le.fit_transform(X['Sequence Name'])
Y = le.fit_transform(Y)
X = X.drop('Sequence Name', axis=1)


x_train, x_test, y_train, y_test = train_test_split(X, Y, test_size=0.2, random_state=42, stratify = Y)

scaler = StandardScaler()
scaler.fit(x_train)

x_train = scaler.transform(x_train)
x_test = scaler.transform(x_test)

X

Unnamed: 0,mcg,gvh,lip,chg,aac,alm1,alm2
0,0.49,0.29,0.48,0.5,0.56,0.24,0.35
1,0.07,0.40,0.48,0.5,0.54,0.35,0.44
2,0.56,0.40,0.48,0.5,0.49,0.37,0.46
3,0.59,0.49,0.48,0.5,0.52,0.45,0.36
4,0.23,0.32,0.48,0.5,0.55,0.25,0.35
...,...,...,...,...,...,...,...
331,0.74,0.56,0.48,0.5,0.47,0.68,0.30
332,0.71,0.57,0.48,0.5,0.48,0.35,0.32
333,0.61,0.60,0.48,0.5,0.44,0.39,0.38
334,0.59,0.61,0.48,0.5,0.42,0.42,0.37


In [None]:
Y

array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       3, 3, 2, 2, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4,
       4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 5, 5, 5,
       5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 6, 6, 6, 6, 6, 7, 7,
       7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7,

In [None]:
features = X.columns
# targets = Y.columns

In [None]:
from sklearn.datasets import load_wine
from sklearn.datasets import load_digits
from sklearn.datasets import load_diabetes
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import pandas as pd

data = load_wine()

df = pd.DataFrame(data['data'], columns=data['feature_names'])
features = data['feature_names']
targets = data['target_names']

Y = data['target']
X = df.to_numpy()

x_train, x_test, y_train, y_test = train_test_split(X, Y, test_size=0.2, random_state=42, stratify = Y)

scaler = StandardScaler()
scaler.fit(x_train)

x_train = scaler.transform(x_train)
x_test = scaler.transform(x_test)

# Genetic Decision Tree

In [None]:
data_loaded=0

In [None]:
kwargs = {
    "population_size": 200,
    "split_probability": 0.5,
    # "selectors": [(Tournament(4), 1)],
    "selectors": [(Tournament(4), 0.7), (Elitism(), 0.3)],
    "mutation_probability": 0.1,
    "optimal_depth": 6,
    "fitness_evaluator": FitnessEvaluator(0.8, 0.1, negative_quadratic),
}

In [None]:
clf = GDTClassifier(**kwargs)

In [None]:
print(clf)


      population_size: 200
      split_probability: 0.5
      mutation_probability: 0.1
      optimal_depth: 6
      fitness_evaluator: <__main__.FitnessEvaluator object at 0x781471045930>
      selector: [[<__main__.Tournament object at 0x781471047940>, 70], [<__main__.Elitism object at 0x781471046770>, 30]]


In [None]:
clf.fit(x_train, y_train, 100, x_test,y_test,1,feature_names=features,verbose=1)

Generation 1/100
Fitness:
- average: -0.70
- max    : 0.44
Accuracy:
- average: 0.13
- max    : 0.55
- average validation: 0.1347
Tree depths:
- max    : 6
- min    : 1
- average: 4.06
- median : 5.00
- std    : 2.07

Generation 2/100
Fitness:
- average: -0.01
- max    : 0.45
Accuracy:
- average: 0.20
- max    : 0.56
- average validation: 0.2120
Tree depths:
- max    : 11
- min    : 1
- average: 6.04
- median : 6.00
- std    : 1.30

Generation 3/100
Fitness:
- average: 0.08
- max    : 0.45
Accuracy:
- average: 0.33
- max    : 0.56
- average validation: 0.3376
Tree depths:
- max    : 11
- min    : 1
- average: 6.20
- median : 6.00
- std    : 1.35

Generation 4/100
Fitness:
- average: 0.17
- max    : 0.47
Accuracy:
- average: 0.41
- max    : 0.60
- average validation: 0.4205
Tree depths:
- max    : 10
- min    : 1
- average: 6.05
- median : 6.00
- std    : 1.25

Generation 5/100
Fitness:
- average: 0.16
- max    : 0.47
Accuracy:
- average: 0.45
- max    : 0.60
- average validation: 0.483

[{'generation': 1,
  'avg_fitness': -0.6983134328358209,
  'avg_accuracy': 0.13085820895522388,
  'max_fitness': 0.44179104477611947,
  'max_accuracy': 0.5522388059701493,
  'avg_val_accuracy': 0.13470588235294118,
  'max_depth': 6,
  'min_depth': 1,
  'avg_depth': 4.06,
  'median_depth': 5.0,
  'std_depth': 2.0707098290151373},
 {'generation': 2,
  'avg_fitness': -0.0056865671641791035,
  'avg_accuracy': 0.20414179104477612,
  'max_fitness': 0.45074626865671646,
  'max_accuracy': 0.5634328358208955,
  'avg_val_accuracy': 0.21198529411764705,
  'max_depth': 11,
  'min_depth': 1,
  'avg_depth': 6.04,
  'median_depth': 6.0,
  'std_depth': 1.3026451635462948},
 {'generation': 3,
  'avg_fitness': 0.08245522388059702,
  'avg_accuracy': 0.3349440298507463,
  'max_fitness': 0.45074626865671646,
  'max_accuracy': 0.5634328358208955,
  'avg_val_accuracy': 0.3376470588235294,
  'max_depth': 11,
  'min_depth': 1,
  'avg_depth': 6.195,
  'median_depth': 6.0,
  'std_depth': 1.3513347207994735},
 {'

In [None]:
y_p = clf.population[0].classify_many(x_test)

In [None]:
y_pred = clf.predict(x_test)

In [None]:
from sklearn.metrics import *

In [None]:
accuracy_score(y_test,y_p)

0.8235294117647058

In [None]:
print(f'Accuracy => {accuracy_score(y_test,y_p)}')
print(f'Precision => {precision_score(y_test, y_p, average="micro")}')
print(f'Recall => {recall_score(y_test, y_p, average="micro")}')
print(f'F1 Score => {f1_score(y_test, y_p, average="micro")}')

Accuracy => 0.8235294117647058
Precision => 0.8235294117647058
Recall => 0.8235294117647058
F1 Score => 0.8235294117647058


In [None]:
print(classification_report(y_test,y_p))

              precision    recall  f1-score   support

           0       0.90      0.93      0.92        29
           1       0.68      0.94      0.79        16
           4       0.00      0.00      0.00         7
           5       0.80      1.00      0.89         4
           6       0.00      0.00      0.00         1
           7       0.91      0.91      0.91        11

    accuracy                           0.82        68
   macro avg       0.55      0.63      0.58        68
weighted avg       0.74      0.82      0.78        68



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


In [None]:
from sklearn.tree import DecisionTreeClassifier

In [None]:
dt_clf = DecisionTreeClassifier()

In [None]:
dt_clf.fit(x_train, y_train)

In [None]:
y_pred = dt_clf.predict(x_test)

In [None]:
from sklearn.metrics import *

In [None]:
print(f'Accuracy => {accuracy_score(y_test,y_pred)}')
print(f'Precision => {precision_score(y_test, y_pred, average="micro")}')
print(f'Recall => {recall_score(y_test, y_pred, average="micro")}')
print(f'F1 Score => {f1_score(y_test, y_pred, average="micro")}')

Accuracy => 0.7794117647058824
Precision => 0.7794117647058824
Recall => 0.7794117647058824
F1 Score => 0.7794117647058824


In [None]:
print(classification_report(y_test,y_pred))

              precision    recall  f1-score   support

           0       0.94      1.00      0.97        29
           1       0.89      0.50      0.64        16
           2       0.00      0.00      0.00         0
           3       0.00      0.00      0.00         0
           4       0.40      0.86      0.55         7
           5       1.00      0.75      0.86         4
           6       0.00      0.00      0.00         1
           7       0.88      0.64      0.74        11

    accuracy                           0.78        68
   macro avg       0.51      0.47      0.47        68
weighted avg       0.85      0.78      0.79        68



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


# Random Forest Decision Tree

In [None]:
def _create_bootstrap_samples(X, Y,n_base_learner=50) -> tuple:
    """
    Creates bootstrap samples for each base learner
    """
    bootstrap_samples_X = []
    bootstrap_samples_Y = []
    bootstrap_sample_size = None
    for i in range(n_base_learner):

        if not bootstrap_sample_size:
            bootstrap_sample_size = X.shape[0]

        sampled_idx = np.random.choice(X.shape[0], size=bootstrap_sample_size, replace=True)
        bootstrap_samples_X.append(X[sampled_idx])
        bootstrap_samples_Y.append(Y[sampled_idx])

    return bootstrap_samples_X, bootstrap_samples_Y

In [None]:
bootstrap_samples_X, bootstrap_samples_Y = _create_bootstrap_samples(x_train, y_train)

In [None]:
data_loaded=0

In [None]:
kwargs = {
    "population_size": 200,
    "split_probability": 0.5,
    "selectors": [(Tournament(4), 0.8), (Elitism(), 0.2)],
    "mutation_probability": 0.1,
    "optimal_depth": 6,
    "fitness_evaluator": FitnessEvaluator(0.8, 0.1, negative_quadratic),
}

In [None]:
models = []
for i in range(10):
  clf = GDTClassifier(**kwargs)
  clf.fit(x_train, y_train, 100, x_test,y_test,1,feature_names=features,label_names=targets,verbose=1)
  models.append(clf.population[0])

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
- min    : 1
- average: 6.03
- median : 6.00
- std    : 1.45

Generation 68/100
Fitness:
- average: 0.34
- max    : 0.60
Accuracy:
- average: 0.68
- max    : 0.75
- average validation: 0.6729
Tree depths:
- max    : 11
- min    : 1
- average: 6.16
- median : 6.00
- std    : 1.43

Generation 69/100
Fitness:
- average: 0.35
- max    : 0.62
Accuracy:
- average: 0.69
- max    : 0.78
- average validation: 0.6790
Tree depths:
- max    : 11
- min    : 1
- average: 6.07
- median : 6.00
- std    : 1.42

Generation 70/100
Fitness:
- average: 0.27
- max    : 0.62
Accuracy:
- average: 0.65
- max    : 0.78
- average validation: 0.6474
Tree depths:
- max    : 11
- min    : 1
- average: 5.99
- median : 6.00
- std    : 1.59

Generation 71/100
Fitness:
- average: 0.31
- max    : 0.62
Accuracy:
- average: 0.69
- max    : 0.78
- average validation: 0.6788
Tree depths:
- max    : 13
- min    : 1
- average: 6.13
- median : 6.00
- std    : 1.5

In [None]:
def _predict_proba_w_base_learners(X_set: np.array,base_learner_list) -> list:
  """
  Creates list of predictions for all base learners
  """
  pred_prob_list = []
  for base_learner in base_learner_list:
    # pred = base_learner.predict(X_set)
    pred = base_learner.classify_many(X_set)
    # print(pred[0])
    pred_prob_list.append(pred)

  return pred_prob_list

from collections import Counter
def predict_proba(X_set: np.array,base_learner_list) -> list:
  """Returns the predicted probs for a given data set"""

  pred_probs = []
  base_learners_pred_probs = _predict_proba_w_base_learners(X_set,base_learner_list)
  # print(base_learners_pred_probs)
  # Average the predicted probabilities of base learners
  for obs in range(X_set.shape[0]):
      base_learner_probs_for_obs = [a[obs] for a in base_learners_pred_probs]
      # Calculate the average for each index
      # print(base_learner_probs_for_obs)
      # obs_average_pred_probs = np.mean(base_learner_probs_for_obs, axis=0)
      # pred_probs.append(obs_average_pred_probs)
      print(Counter(base_learner_probs_for_obs))
      pred_probs.append(list(Counter(base_learner_probs_for_obs).keys())[0])

  return pred_probs

def predict(X_set: np.array,base_learner_list) -> np.array:
  """Returns the predicted labels for a given data set"""

  pred_probs = predict_proba(X_set,base_learner_list)
  return pred_probs

In [None]:
y_pred = predict(x_test,models)

Counter({0: 10})
Counter({7: 8, 5: 2})
Counter({4: 4, 0: 3, 1: 2, 5: 1})
Counter({0: 10})
Counter({7: 8, 5: 2})
Counter({1: 9, 0: 1})
Counter({1: 6, 4: 4})
Counter({1: 8, 4: 1, 0: 1})
Counter({1: 7, 4: 3})
Counter({1: 6, 4: 4})
Counter({0: 10})
Counter({1: 6, 4: 4})
Counter({0: 10})
Counter({1: 9, 0: 1})
Counter({1: 7, 4: 3})
Counter({7: 10})
Counter({0: 10})
Counter({7: 10})
Counter({6: 3, 5: 3, 0: 2, 7: 1, 1: 1})
Counter({1: 9, 0: 1})
Counter({0: 10})
Counter({0: 10})
Counter({1: 8, 4: 2})
Counter({5: 6, 7: 3, 1: 1})
Counter({0: 10})
Counter({0: 10})
Counter({0: 7, 7: 3})
Counter({1: 8, 4: 2})
Counter({1: 9, 0: 1})
Counter({1: 8, 4: 2})
Counter({0: 10})
Counter({0: 10})
Counter({0: 10})
Counter({1: 5, 4: 5})
Counter({0: 10})
Counter({0: 10})
Counter({0: 10})
Counter({0: 10})
Counter({7: 7, 0: 3})
Counter({7: 9, 1: 1})
Counter({0: 10})
Counter({0: 4, 7: 3, 5: 3})
Counter({1: 9, 0: 1})
Counter({0: 10})
Counter({5: 6, 7: 4})
Counter({7: 8, 5: 2})
Counter({7: 7, 0: 3})
Counter({1: 9, 0: 

In [None]:
y_pred = np.round(y_pred)
y_pred

array([0, 7, 4, 0, 7, 1, 1, 1, 1, 1, 0, 1, 0, 1, 1, 7, 0, 7, 6, 1, 0, 0,
       1, 7, 0, 0, 0, 1, 1, 1, 0, 0, 0, 1, 0, 0, 0, 0, 7, 7, 0, 7, 1, 0,
       7, 7, 7, 1, 0, 7, 1, 1, 1, 7, 0, 7, 1, 0, 7, 0, 0, 0, 0, 7, 0, 1,
       0, 0])

In [None]:
y_test

array([0, 7, 4, 0, 7, 1, 4, 1, 1, 1, 0, 4, 0, 1, 1, 7, 0, 7, 6, 1, 0, 0,
       4, 5, 0, 0, 0, 1, 1, 4, 0, 0, 0, 4, 0, 0, 0, 0, 7, 7, 0, 0, 1, 0,
       5, 7, 7, 1, 0, 5, 1, 1, 1, 7, 0, 7, 1, 0, 7, 0, 0, 1, 0, 5, 0, 4,
       0, 0])

In [None]:
from sklearn.metrics import *

In [None]:
accuracy_score(y_test,y_pred)

0.8235294117647058

In [None]:
print(f'Accuracy => {accuracy_score(y_test,y_pred)}')
print(f'Precision => {precision_score(y_test, y_pred, average="micro")}')
print(f'Recall => {recall_score(y_test, y_pred, average="micro")}')
print(f'F1 Score => {f1_score(y_test, y_pred, average="micro")}')

Accuracy => 0.8235294117647058
Precision => 0.8235294117647058
Recall => 0.8235294117647058
F1 Score => 0.8235294117647058


In [None]:
print(classification_report(y_test,y_pred))

              precision    recall  f1-score   support

           0       0.97      0.97      0.97        29
           1       0.71      0.94      0.81        16
           4       1.00      0.14      0.25         7
           5       0.00      0.00      0.00         4
           6       1.00      1.00      1.00         1
           7       0.69      1.00      0.81        11

    accuracy                           0.82        68
   macro avg       0.73      0.67      0.64        68
weighted avg       0.81      0.82      0.77        68



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


In [None]:
from sklearn.ensemble import RandomForestClassifier

In [None]:
rf = RandomForestClassifier(n_estimators=10)

In [None]:
rf.fit(x_train, y_train)

In [None]:
y_pred = rf.predict(x_test)

In [None]:
from sklearn.metrics import *

In [None]:
print(f'Accuracy => {accuracy_score(y_test,y_pred)}')
print(f'Precision => {precision_score(y_test, y_pred, average="micro")}')
print(f'Recall => {recall_score(y_test, y_pred, average="micro")}')
print(f'F1 Score => {f1_score(y_test, y_pred, average="micro")}')

Accuracy => 0.8529411764705882
Precision => 0.8529411764705882
Recall => 0.8529411764705882
F1 Score => 0.8529411764705882


In [None]:
print(classification_report(y_test,y_pred))

              precision    recall  f1-score   support

           0       1.00      1.00      1.00        29
           1       0.77      0.62      0.69        16
           4       0.40      0.57      0.47         7
           5       1.00      1.00      1.00         4
           6       0.50      1.00      0.67         1
           7       1.00      0.91      0.95        11

    accuracy                           0.85        68
   macro avg       0.78      0.85      0.80        68
weighted avg       0.88      0.85      0.86        68



# Adaboost Genetic

In [None]:
data_load= 0

In [None]:
def _calculate_amount_of_say(base_learner: DecisionTree, X: np.array, y: np.array) -> float:
        """calculates the amount of say (see SAMME)"""
        preds = base_learner.classify_many(X)
        K = label_count = len(np.unique(y))
        err = 1 - np.sum(preds==y) / len(preds)
        amount_of_say = np.log((1-err)/err) + np.log(K-1)
        return amount_of_say

In [None]:
def _fit_base_learner(X_bootstrapped: np.array, y_bootstrapped: np.array, X_train, y_train) -> DecisionTree:
        """Trains a Decision Tree model with depth 1 and returns the model"""
        kwargs = {
            "population_size": 200,
            "split_probability": 0.5,
            "selectors": [(Tournament(4), 0.8), (Elitism(), 0.2)],
            "mutation_probability": 0.1,
            "optimal_depth": 6,
            "fitness_evaluator": FitnessEvaluator(0.8, 0.1, negative_quadratic),
        }
        base_learner = GDTClassifier(**kwargs)

        base_learner.fit(X_bootstrapped, y_bootstrapped, 100, x_test,y_test,1,feature_names=features,label_names=targets,verbose=1)
        amount_of_say = _calculate_amount_of_say(base_learner.population[0], X_train, y_train)

        return base_learner.population[0], amount_of_say

In [None]:
def _update_dataset(sample_weights: np.array, X_train, y_train) -> tuple:
        """Creates bootstrapped samples w.r.t. sample weights"""
        n_samples = X_train.shape[0]
        bootstrap_indices = np.random.choice(n_samples, size=n_samples, replace=True, p=sample_weights)
        X_bootstrapped = X_train[bootstrap_indices]
        y_bootstrapped = y_train[bootstrap_indices]

        return X_bootstrapped, y_bootstrapped

In [None]:
def _calculate_sample_weights(base_learner: DecisionTree, amount_of_say, X_train, y_train) -> np.array:
        """Calculates sample weights (see SAMME)"""
        preds = base_learner.classify_many(X_train)
        matches = (preds == y_train)
        not_matches = (~matches).astype(int)
        sample_weights = 1/X_train.shape[0] * np.exp(amount_of_say*not_matches)
        # Normalize weights
        sample_weights = sample_weights / np.sum(sample_weights)

        return sample_weights

In [None]:
def train(X_train: np.array, y_train: np.array, n_base_learner=10) -> tuple:
        """
        trains base learners with given feature and label dataset
        """
        X_train = X_train
        y_train = y_train
        X_bootstrapped = X_train
        y_bootstrapped = y_train
        label_count = len(np.unique(y_train))

        base_learner_list = []
        amount_of_says = []
        for i in range(n_base_learner):
            base_learner, amount_of_say = _fit_base_learner(X_bootstrapped, y_bootstrapped, X_train, y_train)
            base_learner_list.append(base_learner)
            amount_of_says.append(amount_of_say)

            sample_weights = _calculate_sample_weights(base_learner, amount_of_say, X_train, y_train)
            X_bootstrapped, y_bootstrapped = _update_dataset(sample_weights, X_train, y_train)
        return base_learner_list, amount_of_says

In [259]:
def _predict_scores_w_base_learners(X: np.array,base_learner_list,amount_of_says,n_base_learner) -> list:
  """
  Creates list of predictions for all base learners
  """
  pred_scores = np.zeros(shape=(n_base_learner, X.shape[0]))
  pred_import = np.zeros(shape=(n_base_learner, X.shape[0]))
  for idx, base_learner in enumerate(base_learner_list):
      pred_probs = np.array(base_learner.classify_many(X))
      pred_scores[idx] = pred_probs
      pred_import[idx] = amount_of_says[idx]

  return pred_scores, pred_import

def predict_proba(X: np.array,base_learner_list,amount_of_says,n_base_learner) -> np.array:
  """Returns the predicted probs for a given data set"""

  pred_probs = []
  base_learners_pred_scores, importance = _predict_scores_w_base_learners(X,base_learner_list,amount_of_says,n_base_learner)
  scores = []

  for i in range(len(base_learners_pred_scores[0])):
    score = {}
    for k in range(Y.max()+1):
      score[k] = 0
    for j in range(len(base_learners_pred_scores)):
      score[base_learners_pred_scores[j][i]] += importance[j][i]
    scores.append(max(score, key=score.get))
  # Take the avg scores and turn them to probabilities
  # avg_base_learners_pred_scores = np.mean(base_learners_pred_scores, axis=0)
  # column_sums = sum(avg_base_learners_pred_scores)
  # # pred_probs = avg_base_learners_pred_scores / column_sums
  # pred_probs = avg_base_learners_pred_scores
  pred_probs = scores

  return pred_probs

def predict(X: np.array,base_learner_list,amount_of_says,n_base_learner=10) -> np.array:
  """Returns the predicted labels for a given data set"""

  pred_probs = predict_proba(X,base_learner_list,amount_of_says,n_base_learner)
  # preds = np.argmax(pred_probs, axis=1)

  return pred_probs

In [None]:
data_loaded=0

In [None]:
base_learner_list, amount_of_says = train(x_train, y_train, n_base_learner=100)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
- min    : 3
- average: 6.17
- median : 6.00
- std    : 0.90

Generation 68/100
Fitness:
- average: 0.45
- max    : 0.59
Accuracy:
- average: 0.68
- max    : 0.74
- average validation: 0.3949
Tree depths:
- max    : 11
- min    : 3
- average: 6.20
- median : 6.00
- std    : 0.97

Generation 69/100
Fitness:
- average: 0.46
- max    : 0.59
Accuracy:
- average: 0.68
- max    : 0.74
- average validation: 0.3898
Tree depths:
- max    : 10
- min    : 4
- average: 6.11
- median : 6.00
- std    : 0.88

Generation 70/100
Fitness:
- average: 0.46
- max    : 0.59
Accuracy:
- average: 0.68
- max    : 0.74
- average validation: 0.3955
Tree depths:
- max    : 10
- min    : 3
- average: 6.19
- median : 6.00
- std    : 0.90

Generation 71/100
Fitness:
- average: 0.43
- max    : 0.60
Accuracy:
- average: 0.69
- max    : 0.75
- average validation: 0.4012
Tree depths:
- max    : 11
- min    : 4
- average: 6.30
- median : 6.00
- std    : 1.0

In [None]:
len(x_test)

68

In [270]:
y_pred = predict(x_test,base_learner_list, amount_of_says,100)

In [271]:
y_pred

[0,
 7,
 1,
 0,
 7,
 1,
 4,
 1,
 4,
 4,
 0,
 4,
 0,
 1,
 1,
 7,
 0,
 7,
 6,
 1,
 0,
 0,
 1,
 5,
 0,
 0,
 0,
 1,
 1,
 1,
 0,
 0,
 0,
 1,
 0,
 0,
 0,
 0,
 0,
 7,
 0,
 0,
 1,
 0,
 5,
 7,
 0,
 1,
 0,
 5,
 4,
 1,
 1,
 1,
 0,
 7,
 1,
 0,
 7,
 0,
 0,
 1,
 0,
 5,
 0,
 4,
 0,
 0]

In [272]:
len(y_pred)

68

In [273]:
from sklearn.metrics import *

In [274]:
accuracy_score(y_test,y_pred)

0.8529411764705882

In [275]:
confusion_matrix(y_pred,y_test)

array([[29,  0,  0,  0,  0,  2],
       [ 0, 13,  4,  0,  0,  1],
       [ 0,  3,  3,  0,  0,  0],
       [ 0,  0,  0,  4,  0,  0],
       [ 0,  0,  0,  0,  1,  0],
       [ 0,  0,  0,  0,  0,  8]])

In [276]:
print(f'Accuracy => {accuracy_score(y_test,y_pred)}')
print(f'Precision => {precision_score(y_test, y_pred, average="micro")}')
print(f'Recall => {recall_score(y_test, y_pred, average="micro")}')
print(f'F1 Score => {f1_score(y_test, y_pred, average="micro")}')

Accuracy => 0.8529411764705882
Precision => 0.8529411764705882
Recall => 0.8529411764705882
F1 Score => 0.8529411764705882


In [277]:
print(classification_report(y_test,y_pred))

              precision    recall  f1-score   support

           0       0.94      1.00      0.97        29
           1       0.72      0.81      0.76        16
           4       0.50      0.43      0.46         7
           5       1.00      1.00      1.00         4
           6       1.00      1.00      1.00         1
           7       1.00      0.73      0.84        11

    accuracy                           0.85        68
   macro avg       0.86      0.83      0.84        68
weighted avg       0.86      0.85      0.85        68



In [266]:
from sklearn.ensemble import AdaBoostClassifier

In [278]:
ad = AdaBoostClassifier(n_estimators=10)

In [279]:
ad.fit(x_train, y_train)

In [280]:
y_pred = ad.predict(x_test)

In [281]:
print(f'Accuracy => {accuracy_score(y_test,y_pred)}')
print(f'Precision => {precision_score(y_test, y_pred, average="micro")}')
print(f'Recall => {recall_score(y_test, y_pred, average="micro")}')
print(f'F1 Score => {f1_score(y_test, y_pred, average="micro")}')

Accuracy => 0.6617647058823529
Precision => 0.6617647058823529
Recall => 0.6617647058823529
F1 Score => 0.6617647058823529


In [282]:
print(classification_report(y_test,y_pred))

              precision    recall  f1-score   support

           0       0.69      1.00      0.82        29
           1       0.64      1.00      0.78        16
           4       0.00      0.00      0.00         7
           5       0.00      0.00      0.00         4
           6       0.00      0.00      0.00         1
           7       0.00      0.00      0.00        11

    accuracy                           0.66        68
   macro avg       0.22      0.33      0.27        68
weighted avg       0.45      0.66      0.53        68



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
