## Advanced Data Analytics - Algorithms and Machine Learning
## 31005
### Harrison Cole
### 12962712

### Section 1 - Imports

In [596]:
import abc
import numpy as np
import pandas as pd
import random
import traceback

from typing import Callable, Optional, Tuple
from sklearn.datasets import load_iris as dataset
from sklearn.metrics import accuracy_score


### Section 2 - Utility Function Definitions

In [597]:
def require(value: Optional[any], field: str):
    if value is None:
        raise ValueError(f'Missing required value: "{field}".')
    return value


def default(value: Optional[any], otherwise: any) -> any:
    return otherwise if value is None else value


def is_categorical(column):
    return pd.api.types.is_categorical_dtype(column)


def value_counts(y, normalise: bool = True):
    values, counts = np.unique(y, return_counts=True)
    if normalise:
        return values, counts / np.sum(counts)
    return values, counts

def majority_class_index(data, attribute):
    return np.argmax(np.unique(data[attribute], return_counts=True)[1])


### Section 3 - Datastructures, Interfaces and Implementations

In [598]:
class SplitCriterionFunction(metaclass=abc.ABCMeta):

    @abc.abstractmethod
    def compute(self, frequencies) -> float:
        pass


class Entropy(SplitCriterionFunction):

    def compute(self, frequencies, eps=1e-9) -> float:
        return -(frequencies * np.log2(frequencies + eps)).sum()


class GiniIndex(SplitCriterionFunction):

    def compute(self, frequencies) -> float:
        return 1 - np.sum(np.square(frequencies))

In [599]:
class TransformFunction(metaclass=abc.ABCMeta):

    @abc.abstractmethod
    def transform(self, value: any) -> any:
        pass


class IdentityTransformFunction(TransformFunction):

    def transform(self, value: any) -> any:
        return value


class LookupTransformFunction(TransformFunction):

    def __init__(self, transformer: Callable[[any], any]):
        self.__transformer = require(transformer, 'transformer')

    def transform(self, value: any) -> any:
        return self.__transformer(value)


In [600]:
class Pivot:

    def __init__(self, predicate: Callable[[any], bool], info: Tuple[any, any, str, str]):
        self.__predicate = require(predicate, 'predicate')
        self.__info = require(info, 'info')

    @property
    def predicate(self) -> Callable[[any], bool]:
        return self.__predicate

    def attribute(self) -> any:
        return self.__info[0]

    def subject(self) -> any:
        return self.__info[1]

    def true_condition(self) -> str:
        return self.__info[2]

    def false_condition(self) -> str:
        return self.__info[3]

    def split(self, value: any) -> bool:
        return self.predicate(value)

    # TODO: rename method as this is only used for splits on continuous attributes...
    @staticmethod
    def continuous(attribute, probe) -> 'Pivot':
        def predicate(value: any) -> bool:
            return value[attribute] <= probe
        return Pivot(predicate=predicate, info=(attribute, probe, '<=', '>'))

    def __str__(self, condition: bool = True) -> str:
        operator: str = self.true_condition() if condition else self.false_condition()
        return f'x[{self.attribute()}] {operator.ljust(2)} {self.subject()}'


class PivotCandidate:

    def __init__(self, feature: any, gain: float, probe: float):
        self.__feature = feature
        self.__gain = gain
        self.__probe = probe

    def feature(self) -> any:
        return require(self.__feature, 'feature')

    def gain(self) -> float:
        return require(self.__gain, 'gain')

    def probe(self) -> float:
        return require(self.__probe, 'probe')

    def update(self, feature: int, gain: float, probe: float) -> bool:
        if gain < self.gain():
            return False
        self.__feature = feature
        self.__gain = gain
        self.__probe = probe
        return True

    @staticmethod
    def initial() -> 'PivotCandidate':
        return PivotCandidate(0, 0, 0.5)

    def __str__(self):
        return f'feature: {self.__feature}, gain: {self.__gain}, probe: {self.__probe}'


In [601]:
class Node(metaclass=abc.ABCMeta):

    @abc.abstractmethod
    def eval(self, element: any) -> any:
        raise NotImplementedError('Node#eval')

    def prune(self) -> 'Node':
        return self

    @staticmethod
    def terminate(value: any) -> 'Node':
        return TerminalNode(value=value)

    @staticmethod
    def branch(pivot: 'Pivot', lower: Optional['Node'] = None, upper: Optional['Node'] = None) -> 'Node':
        return BranchNode(pivot=pivot, lower=lower, upper=upper)

    @staticmethod
    def lookup(mapping: dict[any, 'Node'], feature: any) -> 'Node':
        return LookupNode(mapping=mapping, feature=feature)


class TerminalNode(Node):

    def __init__(self, value: any):
        self.__value = require(value, 'value')

    def eval(self, element: any) -> any:
        return self.value

    @property
    def value(self) -> any:
        return require(self.__value, 'value')


class BranchNode(Node):

    __lower: 'Node'
    __upper: 'Node'

    def __init__(self, pivot: 'Pivot', lower: 'Node', upper: 'Node'):
        self.__pivot = require(pivot, 'pivot')
        self.__lower = require(lower, 'lower')
        self.__upper = require(upper, 'upper')

    def eval(self, element: any) -> any:
        branch: Node = self.lower if self.pivot.split(element) else self.upper
        return branch.eval(element)

    def prune(self) -> 'Node':
        if isinstance(self.lower, TerminalNode) and isinstance(self.upper, TerminalNode) and self.lower.value == self.upper.value:
            return self.lower
        return self

    @property
    def pivot(self) -> 'Pivot':
        return self.__pivot

    @property
    def lower(self) -> 'Node':
        return self.__lower

    @property
    def upper(self) -> 'Node':
        return self.__upper


class LookupNode(Node):

    __mapping: dict[any, 'Node']
    __feature: any

    def __init__(self, mapping: dict[any, 'Node'], feature: any):
        self.__mapping = require(mapping, 'mapping')
        self.__feature = require(feature, 'feature')

    def eval(self, element: any) -> any:
        value = element[self.feature]
        lookup: Node = require(self.mapping[value], 'lookup')
        return lookup.eval(element)

    def prune(self) -> 'Node':
        size = len(self.mapping)
        if size == 1:
            return next(iter(self.mapping.values()))
        return self

    @property
    def mapping(self) -> dict[any, 'Node']:
        return self.__mapping

    @property
    def feature(self) -> any:
        return self.__feature


In [602]:
class DecisionTreeBuilder(metaclass=abc.ABCMeta):

    __entropy: 'SplitCriterionFunction' = Entropy()
    __gini: 'SplitCriterionFunction' = GiniIndex()

    @abc.abstractmethod
    def build(self, x, y) -> 'Node':
        raise NotImplementedError('DecisionTreeBuilder#build')

    # @staticmethod
    # def compute_impurity(samples, criterion: str = 'entropy') -> float:
    #     _, probabilities = value_counts(samples, normalise=True)
    #     fns = {'entropy': Entropy, 'gini': GiniIndex}
    #     fn: SplitCriterionFunction = require(fns.get(criterion, None), criterion)()
    #     return fn.compute(probabilities)

    def entropy(self, attributes) -> float:
        _, probabilities = value_counts(attributes, normalise=True)
        return self.__entropy.compute(frequencies=probabilities)

    def information_gain_categorical(self, data, target_attribute, feature_attribute):
        total_entropy = self.entropy(data[target_attribute])
        feature_entropy = self.entropy(data[feature_attribute])
        return total_entropy - feature_entropy

    def information_gain_continuous(self, data, target_attribute, feature_attribute, probe):
        size, target, feature = len(data), data[target_attribute], data[feature_attribute]
        total_entropy = self.entropy(target)
        lte, gt = feature <= probe, feature > probe

        lower_entropy = self.entropy(target[lte]) * (np.count_nonzero(lte) / size)
        upper_entropy = self.entropy(target[gt]) * (np.count_nonzero(gt) / size)

        return total_entropy - (lower_entropy + upper_entropy)

    @staticmethod
    def factory(implementation: str, **kwargs) -> 'DecisionTreeBuilder':
        factories = {
            'ID3': ID3DecisionTreeBuilder
        }
        constructor = require(factories.get(implementation, None), implementation)
        return constructor(**kwargs)

    @staticmethod
    def default() -> 'DecisionTreeBuilder':
        return DecisionTreeBuilder.factory('ID3')


class ID3DecisionTreeBuilder(DecisionTreeBuilder):

    # TODO: handle continuous attributes
    # TODO: output classes in original format...
    # TODO: mean value
    def build(self, x, y) -> 'Node':
        data = x.copy()
        data[y.name] = y
        return self._build(original=data, subset=data, features=x.columns, target=y.name)

    def _build(self, original, subset, features, target, parent_class=None) -> 'Node':
        """
        ID3 Algorithm as per: https://en.wikipedia.org/wiki/ID3_algorithm#Algorithm
        """
        classes = np.unique(subset[target])
        choices = len(classes)

        # base case #1
        # Every element of the subset belongs to the same class.
        if choices <= 1:
            return Node.terminate(classes[0])

        # base case #2
        # There are no examples in the subset, which happens when no example in the parent set was found to match a
        # specific value of the selected attribute
        if len(subset) <= 0:
            return Node.terminate(parent_class)

        majority_class = classes[majority_class_index(data=subset, attribute=target)]

        # base case #3
        # There are no more attributes to be selected, but the examples still do not belong to the same class.
        if len(features) <= 0:
            return Node.terminate(majority_class)

        gains = np.asarray([self.information_gain_categorical(data=subset, target_attribute=target, feature_attribute=feature) for feature in features])

        best_feature = features[np.argmax(gains)]
        attribute = subset[best_feature]

        remaining_features = [feature for feature in features if feature != best_feature]

        subtree: Node

        # noinspection PyBroadException
        try:
            subtree = self._build_continuous(original=original, subset=subset, features=remaining_features, target=target, parent_class=majority_class, best_feature=best_feature, attribute=attribute)
        except:
            subtree = self._build_categorical(original=original, subset=subset, features=remaining_features, target=target, parent_class=majority_class, best_feature=best_feature, attribute=attribute)

        while (pruned := subtree.prune()) != subtree:
            subtree = pruned

        return subtree

    def _build_continuous(self, original, subset, features, target, parent_class, best_feature, attribute) -> 'Node':
        probes = self.create_probe_values(attribute.min(), attribute.max())
        candidate: PivotCandidate = PivotCandidate.initial()
        for probe in probes:
            gain = self.information_gain_continuous(data=subset, target_attribute=target, feature_attribute=best_feature, probe=probe)
            candidate.update(feature=best_feature, gain=gain, probe=probe)

        def build_subtree(indices) -> 'Node':
            return self._build(original=original, subset=subset[indices], features=features, target=target, parent_class=parent_class)

        pivot: Pivot = Pivot.continuous(candidate.feature(), candidate.probe())
        lower = build_subtree(subset[candidate.feature()] <= candidate.probe())
        upper = build_subtree(subset[candidate.feature()] > candidate.probe())

        return Node.branch(pivot=pivot, lower=lower, upper=upper)

    def _build_categorical(self, original, subset, features, target, parent_class, best_feature, attribute) -> 'Node':
        values = np.unique(attribute)
        mapping: dict[any, Node] = {}

        for value in values:
            data = subset.where(subset[best_feature] == value).dropna()
            subtree = self._build(original=original, subset=data, features=features, target=target, parent_class=parent_class)
            mapping[value] = subtree

        return Node.lookup(mapping=mapping, feature=best_feature)

    # def _continuous(self) -> 'Node':
    #     candidate: PivotCandidate = PivotCandidate.initial()
    #     attributes = x.shape[1]
    #     for index in range(attributes):
    #         # TODO: handle continuous and categorical attributes
    #         attribute = x[:, index]  # array of all values at that index
    #
    #         print(f'attribute => {attribute}')
    #         # print(f'build')
    #         # print(f'index: {index}, attribute: {attribute}')
    #         probes = ID3DecisionTreeBuilder.create_probe_values(attribute.min(), attribute.max())
    #         # print(f'probes: {probes}')
    #         for probe in probes:
    #             gain = ID3DecisionTreeBuilder.compute_information_gain(y, attribute, probe)
    #             # gain = np.random.random()
    #
    #             # gain = 0.0  # compute_gain(samples, attribute, target)
    #             # gain = self.purity(attribute, probe, x, y)  # TODO: compute information gain
    #             # gain = self.measure_progress(y, attribute, probe)
    #             # gain = self.purity(attribute, )
    #             candidate.update(feature=index, gain=gain, probe=probe)
    #
    #     # TODO: sanity check candidate or build pivot from candidate
    #     pivot: Pivot = Pivot.continuous(candidate.feature(), candidate.probe())
    #     # TODO: use or apply pivot data-structure and make more efficient...
    #     idx_lower = x[:, candidate.feature()] <= candidate.probe()
    #     idx_upper = x[:, candidate.feature()] > candidate.probe()
    #
    #     def build_index(indices) -> Node:
    #         return self.build(x[indices], y[indices])
    #
    #     return Node.branch(pivot, build_index(idx_lower), build_index(idx_upper))
    #     pass

    # def _categorical(self) -> 'Node':
        # if is_categorical(subset[best_feature]):
        #     print(f'{best_feature} IS CATEGORICAL')
        # else:
        #     print(f'{best_feature} IS NOT CATEGORICAL')
        #
        # # print(f'INFORMATION GAIN\n{information_gain}\n {best_feature_index} - {best_feature}')
        #
        # available_features = [feature for feature in features if feature != best_feature]
        # values = np.unique(subset[best_feature])
        #
        # mapping: dict[any, Node] = {}
        #
        # for value in values:
        #     data = subset.where(subset[best_feature] == value).dropna()
        #     subtree = self._build(original=original, subset=data, features=available_features, target=target, parent_class=majority_class)
        #     mapping[value] = subtree
        #
        # return Node.lookup(mapping=mapping, feature=best_feature)
        # pass

    # # TODO: handle categorical attributes...
    # def build(self, x, y) -> 'Node':
    #     classes = np.unique(y)
    #     choices = len(classes)
    #
    #     if choices <= 0:  # edge-case: no choices
    #         default_value = '<todo:default-value>'  # TODO: get default value
    #         return Node.terminate(default_value)
    #
    #     if choices == 1:  # edge-case: one clear choice
    #         return Node.terminate(classes[0])
    #
    #     candidate: PivotCandidate = PivotCandidate.initial()
    #     attributes = x.shape[1]
    #     for index in range(attributes):
    #         # TODO: handle continuous and categorical attributes
    #         attribute = x[:, index]  # array of all values at that index
    #
    #         print(f'attribute => {attribute}')
    #         # print(f'build')
    #         # print(f'index: {index}, attribute: {attribute}')
    #         probes = ID3DecisionTreeBuilder.create_probe_values(attribute.min(), attribute.max())
    #         # print(f'probes: {probes}')
    #         for probe in probes:
    #             gain = ID3DecisionTreeBuilder.compute_information_gain(y, attribute, probe)
    #             # gain = np.random.random()
    #
    #             # gain = 0.0  # compute_gain(samples, attribute, target)
    #             # gain = self.purity(attribute, probe, x, y)  # TODO: compute information gain
    #             # gain = self.measure_progress(y, attribute, probe)
    #             # gain = self.purity(attribute, )
    #             candidate.update(feature=index, gain=gain, probe=probe)
    #
    #     # TODO: sanity check candidate or build pivot from candidate
    #     pivot: Pivot = Pivot.continuous(candidate.feature(), candidate.probe())
    #     # TODO: use or apply pivot data-structure and make more efficient...
    #     idx_lower = x[:, candidate.feature()] <= candidate.probe()
    #     idx_upper = x[:, candidate.feature()] > candidate.probe()
    #
    #     def build_index(indices) -> Node:
    #         return self.build(x[indices], y[indices])
    #
    #     return Node.branch(pivot, build_index(idx_lower), build_index(idx_upper))
    #
    # @staticmethod
    # def measure_progress(y, attribute, target, criterion: str = 'entropy'):
    #     size = len(y)
    #     lte, gt = attribute <= target, attribute > target
    #     total_e = DecisionTreeBuilder.compute_impurity(y, criterion=criterion)
    #     lower_e = DecisionTreeBuilder.compute_impurity(y[lte], criterion=criterion)
    #     upper_e = DecisionTreeBuilder.compute_impurity(y[gt], criterion=criterion)
    #     lower_w = np.count_nonzero(lte) / size
    #     upper_w = np.count_nonzero(gt) / size
    #
    #     return total_e - (lower_w * lower_e + upper_w * upper_e)
    #
    # @staticmethod
    # def compute_information_gain(samples, attribute, target) -> float:
    #     return ID3DecisionTreeBuilder.measure_progress(samples, attribute, target)
    #     # classes, frequencies = value_counts(samples, normalise=True)
    #     # total: float = DecisionTreeBuilder.compute_impurity(samples=target)
    #     # cumulative: float = 0
    #     # print(f'compute_information_gain')
    #     # print(f'total: {total}')
    #     # print(f'classes: {classes}')
    #     # print(f'frequencies: {frequencies}')
    #     # print(f'samples: {samples}')
    #     # print(f'attribute: {attribute}')
    #     # print(f'target: {target}')
    #     # print()
    #     # print()
    #     # for (value, frequency) in zip(classes, frequencies):
    #     #     print(f'class: {value}, frequency: {frequency}')
    #     #     indices = attribute[attribute <= target]
    #     #     # indices = [0]
    #     #     # indices = attributes[]
    #     #     # indices = samples[attribute] == value
    #     #     # indices = samples[attribute == value]
    #     #     print(f'indices {indices}')
    #     #     # print(f'indices: {indices}')
    #     #     contribution = DecisionTreeBuilder.compute_impurity(target[indices])
    #     #     cumulative += frequency * contribution
    #     # return total - cumulative
    #

    def create_probe_values(self, minima, maxima):
        return [v * minima + (1.0 - v) * maxima for v in [0.75, 0.5, 0.25]]  # TODO: expand values


In [603]:
class Model(metaclass=abc.ABCMeta):

    @abc.abstractmethod
    def compile(self, *args, **kwargs):
        raise NotImplementedError('Model#compile')

    @abc.abstractmethod
    def fit(self, x, y, *args, **kwargs):
        raise NotImplementedError('Model#fit')

    @abc.abstractmethod
    def predict(self, x, *args, **kwargs):
        raise NotImplementedError('Model#predict')


class DecisionTree(Model):

    # TODO: default value (most common class..?)
    __builder: 'DecisionTreeBuilder' = DecisionTreeBuilder.default()
    __root: Optional['Node'] = None

    def compile(self, *args, **kwargs):
        previous: DecisionTreeBuilder = self.__builder
        try:
            builder = DecisionTreeBuilder.factory(kwargs['implementation'], **kwargs)
        except (KeyError, ValueError):
            builder = previous
        self.__builder = builder

    def fit(self, x, y, *args, **kwargs):
        self.__root = self.builder.build(x, y)

    def predict(self, x, *args, **kwargs):
        tree: Node = self.root
        samples = x.to_dict(orient='records')
        return np.asarray([tree.eval(sample) for sample in samples])

    @property
    def builder(self) -> 'DecisionTreeBuilder':
        return require(self.__builder, 'builder')

    @property
    def root(self) -> 'Node':
        return require(self.__root, 'root')


### Section 4 - Implementation

In [604]:
def debug_tree(node, depth: int = 0, size: int = 1):
    padding = '|' + ('---' * depth) + ' '

    def p(o):
        print(f'{str(depth).ljust(2)} {padding} {o}')

    if isinstance(node, TerminalNode):
        p(f'class: {node.value}')
    elif isinstance(node, BranchNode):
        p(f'lower pivot: {node.pivot.__str__(condition=True)}')
        debug_tree(node.lower, depth=depth+size, size=size)
        p(f'upper pivot: {node.pivot.__str__(condition=False)}')
        debug_tree(node.upper, depth=depth+size, size=size)
    elif isinstance(node, LookupNode):
        for (k, v) in node.mapping.items():
            p(f'lookup: {node.feature} {k}')
            debug_tree(v, depth=depth+size, size=size)
    else:
        raise ValueError(f'Unexpected node: {node}')


model: DecisionTree = DecisionTree()
# [xxx, yyy] = dataset(return_X_y=True, as_frame=True)

ddd = {
    'wind_direction': ['N', 'S', 'E', 'W'],
    'tide': ['Low', 'High'],
    'swell_forecasting': ['small', 'medium', 'large'],
    'good_waves': ['Yes', 'No'],
}

# create an empty dataframe
df = pd.DataFrame(columns=ddd.keys())

np.random.seed(42)
for i in range(150):
    df.loc[i, 'wind_direction'] = str(np.random.choice(ddd['wind_direction'], 1)[0])
    df.loc[i, 'tide'] = str(np.random.choice(ddd['tide'], 1)[0])
    df.loc[i, 'swell_forecasting'] = str(np.random.choice(ddd['swell_forecasting'], 1)[0])
    df.loc[i, 'good_waves'] = str(np.random.choice(ddd['good_waves'], 1)[0])
    df.loc[i, 'temp'] = int(np.random.random() * 26) + 1
    df.loc[i, 'hello'] = 'world'

xxx = df.drop('good_waves', 1)
yyy = df['good_waves']

print(f'Data\n{xxx}\n')
print(f'Targets\n{yyy}\n')

model.fit(xxx, yyy)

debug_tree(model.root, 1, 1)

predictions = model.predict(xxx)

accuracy = accuracy_score(yyy, predictions)

print(f'Predictions\n{predictions}')
print(f'Actual\n{yyy}')
print(f'Accuracy: {accuracy}')

Data
    wind_direction  tide swell_forecasting  temp  hello
0                E  High             small  20.0  world
1                N   Low             large   5.0  world
2                E   Low             small   4.0  world
3                E  High             small  19.0  world
4                S  High            medium  26.0  world
..             ...   ...               ...   ...    ...
145              S  High             small  15.0  world
146              E   Low            medium  26.0  world
147              N  High             large  26.0  world
148              S   Low             small  19.0  world
149              E   Low             small  16.0  world

[150 rows x 5 columns]

Targets
0      Yes
1       No
2       No
3       No
4       No
      ... 
145     No
146    Yes
147    Yes
148    Yes
149     No
Name: good_waves, Length: 150, dtype: object

TERMINAL
[['N' 'Yes']
 ['S' 'Yes']
 ['W' 'No']]
OTHERS
[['E' <__main__.BranchNode object at 0x12f6c4970>]]

TERMINAL
[['E' 

  xxx = df.drop('good_waves', 1)
