<a href="https://colab.research.google.com/github/LucasCavalherie/DFDT/blob/main/DFDT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# DFDT: (Dynamic Fast Decision Tree)

Esse código foi inspirado em um pseudocódigo apresentado no artigo:

DFDT: Dynamic Fast Decision Tree for IoT Data Stream Mining on Edge Devices (https://arxiv.org/pdf/2502.14011)


---
### **1. Imports**



In [None]:
# 1. Imports
import math
import random
import csv
import time
import os
from collections import defaultdict

---
### **2. Componentes Principais**
Blocos de componentes fundamentais e as funções auxiliares exigidas pelo algoritmo principal.



#### **Calculadora de Gini Impurity**
Esta é uma função auxiliar para medir a impureza de um conjunto de pontos de dados.

* **O que é:** `Gini Impurity` é uma métrica que varia de 0 a 0.5. Um valor de 0 significa que todos os pontos de dados em um nó pertencem à mesma `class` (perfeitamente puro). Um valor de 0.5 significa que as classes estão perfeitamente misturadas (impureza máxima para um caso binário).
* **Como funciona:** Calcula a probabilidade de classificar incorretamente um elemento escolhido aleatoriamente se ele fosse rotulado aleatoriamente de acordo com a distribuição de rótulos no subconjunto.
* **Parâmetros:**
  * `class_distribution`: Um dicionário que mapeia os rótulos de `class` para suas contagens (ex: `{'rain': 10, 'no_rain': 20}`).

In [None]:
# 2. Core Components
def calculate_gini_impurity(class_distribution):
    """Calculates the Gini Impurity for a class distribution."""
    total_count = sum(class_distribution.values())
    if total_count == 0:
        return 0.0

    impurity = 1.0
    for count in class_distribution.values():
        probability = count / total_count
        impurity -= probability ** 2
    return impurity

#### **Numeric Attribute Splitter**
Esta classe é o mecanismo de aprendizagem principal para um único atributo numérico. Seu trabalho é encontrar o melhor ponto de divisão possível para os dados que observou.

* **O que é:** Um "estatístico" dedicado para uma `feature` (ex: 'temperature'). Ele mantém o registo de todos os valores e rótulos de `class` correspondentes que viu para essa `feature`.
* **Como funciona:** O método `get_best_split` itera sobre todos os valores únicos observados, testando o ponto médio entre cada par como um limiar de divisão potencial. Para cada candidato, ele calcula o `Gini Gain` (a redução na impureza) e retorna o ponto de divisão que fornece o maior ganho.
* **Métodos Chave:**
  * `update(value, class_label)`: Adiciona um novo ponto de dados ao seu armazenamento interno.
  * `get_best_split(parent_impurity)`: Realiza a busca pelo melhor limiar de divisão.

In [None]:
class NumericAttributeSplitter:
    """
    This class stores statistics for a single numeric attribute
    and finds the best split point for it.
    """
    def __init__(self):
        self._value_class_observations = defaultdict(list)

    def update(self, value, class_label):
        """Updates the splitter with a new value and its class."""
        self._value_class_observations[value].append(class_label)

    def get_best_split(self, parent_impurity):
        """
        Finds the best split point for this attribute.
        It tests all midpoints between unique values as split candidates.
        """
        best_split_value = None
        best_gain = -1.0

        sorted_unique_values = sorted(self._value_class_observations.keys())

        if len(sorted_unique_values) < 2:
            return None, -1.0

        for i in range(len(sorted_unique_values) - 1):
            split_candidate = (sorted_unique_values[i] + sorted_unique_values[i+1]) / 2

            left_branch_dist = defaultdict(int)
            right_branch_dist = defaultdict(int)

            for value, classes in self._value_class_observations.items():
                for c in classes:
                    if value <= split_candidate:
                        left_branch_dist[c] += 1
                    else:
                        right_branch_dist[c] += 1

            left_count = sum(left_branch_dist.values())
            right_count = sum(right_branch_dist.values())
            total_count = left_count + right_count

            if left_count == 0 or right_count == 0:
                continue

            left_impurity = calculate_gini_impurity(left_branch_dist)
            right_impurity = calculate_gini_impurity(right_branch_dist)

            weighted_children_impurity = (left_count / total_count) * left_impurity + \
                                         (right_count / total_count) * right_impurity

            gain = parent_impurity - weighted_children_impurity

            if gain > best_gain:
                best_gain = gain
                best_split_value = split_candidate

        return best_split_value, best_gain

#### **Classe Node**
Esta classe define a estrutura de cada nó na árvore de decisão. Um nó pode ser uma folha (fazendo predições) ou um nó de divisão (direcionando o tráfego).

* **O que é:** A estrutura de dados fundamental da árvore.
* **Atributos Chave:**
  * `is_leaf`: Um booleano que indica se é um nó terminal.
  * `class_distribution`: Um dicionário que armazena as contagens de classes que terminaram neste nó.
  * `feature_estimators`: Um dicionário onde cada chave é um índice de atributo e cada valor é uma instância de `NumericAttributeSplitter`.
  * `split_attribute` / `split_value`: Se o nó não for uma folha, esses atributos armazenam a regra para a divisão (ex: "dividir no atributo 5 onde o valor <= 0.15").
  * Atributos específicos do DFDT (`n_l`, `n_check_l`, `active`, etc.) para gerir a lógica adaptativa.

In [None]:
class Node:
    """Represents a node in the decision tree with learning capabilities."""
    def __init__(self, is_leaf=True, parent=None, depth=0):
        self.is_leaf = is_leaf
        self.parent = parent
        self.depth = depth
        self.children = {}
        self.active = True
        self.id = random.randint(0, 1000000)

        self.class_distribution = defaultdict(int)
        self.feature_estimators = defaultdict(NumericAttributeSplitter)

        self.n_l = 0
        self.n_check_l = 0
        self.n_leaf_l = 0
        self.n_tree_l = 0

        self.split_attribute = None
        self.split_value = None

    def __repr__(self):
        return f"Node(id={self.id}, is_leaf={self.is_leaf}, active={self.active}, n_l={self.n_l}, dist={self.class_distribution})"

    def get_prediction(self):
        """Gets the prediction from a leaf node (the majority class)."""
        if not self.class_distribution:
            return None
        return max(self.class_distribution, key=self.class_distribution.get)

    def update_stats(self, instance_X, instance_y):
        """Updates the leaf's statistics with a new instance."""
        self.class_distribution[instance_y] += 1
        for attr_index, value in instance_X.items():
            self.feature_estimators[attr_index].update(value, instance_y)


---
### **3. Classe Principal do Algoritmo DFDT**
Esta classe encapsula todo o algoritmo DFDT, incluindo o seu estado (a árvore, estatísticas históricas) e toda a sua lógica para treino e predição.

* **O que é:** O controlador principal da árvore de decisão.
* **Como funciona:**
  1. O método `__init__` inicializa uma árvore vazia com apenas um nó raiz e configura os hiperparâmetros.
  2. O método `train` orquestra o processo de aprendizagem. Ele itera através do fluxo de dados, realizando a avaliação **test-then-train** para cada instância.
  3. O método `predict` encaminha uma nova instância pela árvore para obter uma predição de `class`.
  4. Métodos auxiliares internos (prefixados com `_`) lidam com a lógica complexa descrita no artigo do DFDT, como tentar uma divisão (`_attempt_growth`), verificar as condições de divisão (`_can_split`), realizar a divisão (`_split_leaf`) e adaptar o `grace period` (`_adapt_grace_period`).
* **Métodos Públicos Chave:**
  * `train(data_stream)`: O ponto de entrada principal para treinar o modelo.
  * `predict(instance_X)`: Usado para obter uma predição para uma instância nova e não vista.

In [None]:
# 3. Main DFDT Algorithm Class
class DFDT:
    """
    An implementation of the Dynamic Fast Decision Tree (DFDT) algorithm
    for streaming data, encapsulated in a single class.
    """
    def __init__(self, delta=0.05, initial_grace_period=100):
        self.delta = delta
        self.initial_grace_period = initial_grace_period
        self.root = Node(is_leaf=True, depth=0)
        self.root.n_min_grace_period = self.initial_grace_period
        self.leaves = {self.root}

        # Historical statistics for adaptive logic
        self.h_stat_values = []
        self.g_stat_values = []
        self.n_stat_values = []
        self.hb_stat_values = []

        # For Gini Impurity, the range is 0.5
        self.R_HEURISTIC_RANGE = 0.5

    def _avg(self, values_list):
        """Calculates the average of a list of numbers."""
        return sum(values_list) / len(values_list) if values_list else 0

    def _std_dev(self, values_list):
        """Calculates the standard deviation of a list of numbers."""
        if not values_list or len(values_list) < 2:
            return 0
        mean = self._avg(values_list)
        variance = sum([(x - mean) ** 2 for x in values_list]) / (len(values_list) - 1)
        return math.sqrt(variance)

    def _route_to_leaf(self, instance_X):
        """Routes an instance through the tree using learned split rules."""
        current_node = self.root
        while not current_node.is_leaf:
            if current_node.split_attribute is None or current_node.split_value is None:
                break

            instance_value = instance_X.get(current_node.split_attribute, 0)
            branch_index = 0 if instance_value <= current_node.split_value else 1

            if branch_index in current_node.children:
                current_node = current_node.children[branch_index]
            else:
                break
        return current_node

    def predict(self, instance_X):
        """Predicts the class for a single instance."""
        leaf = self._route_to_leaf(instance_X)
        return leaf.get_prediction()

    def train(self, data_stream):
        """
        Trains the DFDT model on a data stream using a prequential
        (test-then-train) evaluation method.
        """
        n_total_instances = 0
        correct_predictions = 0

        for i, (instance_X, instance_y) in enumerate(data_stream):
            print(f"\n--- [Instance #{i+1}] ---")

            # 1. TEST (Predict)
            prediction = self.predict(instance_X)
            if prediction is not None:
                print(f"Prediction: {prediction}, Actual: {instance_y}", end="")
                if prediction == instance_y:
                    correct_predictions += 1
                    print(" -> Correct!")
                else:
                    print(" -> Incorrect!")

            current_accuracy = (correct_predictions / (i + 1)) * 100
            print(f"Current Accuracy: {current_accuracy:.2f}%")

            # 2. TRAIN
            leaf = self._route_to_leaf(instance_X)
            leaf.update_stats(instance_X, instance_y)

            n_total_instances += 1
            leaf.n_l += 1

            self._attempt_growth(leaf, n_total_instances)

        final_accuracy = (correct_predictions / n_total_instances) * 100
        print(f"\n--- Training Complete --- \nFinal Accuracy: {final_accuracy:.2f}%")
        return final_accuracy

    def _attempt_growth(self, leaf, n_total_instances):
        """Contains the core logic for growing the tree at a given leaf."""
        if not leaf.active:
            return

        # Adaptive activation/deactivation logic
        if n_total_instances - leaf.n_tree_l > 0:
            fraction = ((leaf.n_l - leaf.n_leaf_l) * len(self.leaves)) / (n_total_instances - leaf.n_tree_l)
        else:
            fraction = 0

        grow_fast_flag = fraction > 2.0
        if fraction < 0.2:
            leaf.active = False
            print(f"!!! NODE {leaf.id} DEACTIVATED.")
            return

        # Check grace period
        if not hasattr(leaf, 'n_min_grace_period'):
            leaf.n_min_grace_period = self.initial_grace_period

        is_ready_for_check = (leaf.n_l - leaf.n_check_l > leaf.n_min_grace_period)

        if len(leaf.class_distribution) > 1 and is_ready_for_check:
            print(f"==> CHECKING SPLIT for leaf {leaf.id}.")
            epsilon = self._calculate_hoeffding_bound(leaf.n_l)
            g_values = self._calculate_g_values(leaf)

            if self._can_split(leaf, grow_fast_flag, g_values, epsilon):
                self._split_leaf(leaf)
            else:
                leaf.n_check_l = leaf.n_l
                self._adapt_grace_period(leaf, g_values, epsilon)

    def _split_leaf(self, leaf):
        """Performs the split of a leaf node."""
        print(f"--- NODE {leaf.id} APPROVED FOR SPLIT! ---")
        leaf.is_leaf = False
        leaf.active = False
        self.leaves.remove(leaf)

        split_attr, split_val = leaf.best_split_info['attribute'], leaf.best_split_info['split_value']
        leaf.split_attribute = split_attr
        leaf.split_value = split_val

        left_child = Node(is_leaf=True, parent=leaf, depth=leaf.depth + 1)
        right_child = Node(is_leaf=True, parent=leaf, depth=leaf.depth + 1)

        for attr, splitter in leaf.feature_estimators.items():
            for val, classes in splitter._value_class_observations.items():
                for cls in classes:
                    target_child = left_child if val <= split_val else right_child
                    target_child.update_stats({attr: val}, cls)

        self.leaves.add(left_child)
        self.leaves.add(right_child)
        leaf.children[0] = left_child
        leaf.children[1] = right_child
        print(f"  -> Node split on attribute '{split_attr}' with value <= {split_val}")

    def _can_split(self, leaf, grow_fast, g_values, epsilon):
        """Implements Algorithm 2: checks the conditions for a split."""
        if not g_values:
            return False

        sorted_g = sorted(g_values.items(), key=lambda item: item[1], reverse=True)
        if not sorted_g:
            return False

        g_best_attr, g_best_val = sorted_g[0]
        g_second_best_val = sorted_g[1][1] if len(sorted_g) > 1 else 0

        avg_hb_hist = _avg(self.hb_stat_values) if self.hb_stat_values else float('inf')
        if not ((g_best_val - g_second_best_val >= epsilon) or (epsilon < avg_hb_hist)):
            return False

        impurity = calculate_gini_impurity(leaf.class_distribution)

        if grow_fast: # Simplified GrowFast logic
            return True

        # Conservative split conditions (C3-C6)
        current_impurities = [calculate_gini_impurity(n.class_distribution) for n in self.leaves if n.active]
        c3 = impurity >= self._avg(current_impurities) - self._std_dev(current_impurities)
        c4 = impurity >= self._avg(self.h_stat_values) - self._std_dev(self.h_stat_values)
        c5 = g_best_val >= self._avg(self.g_stat_values) - self._std_dev(self.g_stat_values)
        c6 = leaf.n_l >= self._avg(self.n_stat_values)

        if c3 and c4 and c5 and c6:
            self._update_historical_stats(impurity, g_best_val, leaf.n_l, epsilon)
            best_split_val, _ = leaf.feature_estimators[g_best_attr].get_best_split(impurity)
            leaf.best_split_info = {'attribute': g_best_attr, 'split_value': best_split_val}
            return True

        return False

    def _adapt_grace_period(self, leaf, g_values, epsilon):
        """Dynamically adjusts the grace period for a leaf if a split fails."""
        sorted_g = sorted(g_values.values(), reverse=True)
        g_best = sorted_g[0] if sorted_g else 0
        g_second_best = sorted_g[1] if len(sorted_g) > 1 else 0
        delta_g = g_best - g_second_best
        avg_hb = _avg(self.hb_stat_values) if self.hb_stat_values else epsilon

        new_n_min = 0
        if delta_g < epsilon and delta_g > avg_hb and delta_g > 1e-9:
            new_n_min = math.ceil((self.R_HEURISTIC_RANGE**2 * math.log(1 / self.delta)) / (2 * delta_g**2))
        elif delta_g < avg_hb and epsilon > avg_hb and avg_hb > 1e-9:
            new_n_min = math.ceil((self.R_HEURISTIC_RANGE**2 * math.log(1 / self.delta)) / (2 * avg_hb**2))

        if new_n_min > 0:
            leaf.n_min_grace_period = max(leaf.n_min_grace_period, new_n_min)
            print(f"  -> Adjusting grace period for leaf {leaf.id} to {leaf.n_min_grace_period}")

    def _calculate_g_values(self, leaf):
        """Calculates Gini gain for all attributes in a leaf."""
        gains = {}
        parent_impurity = calculate_gini_impurity(leaf.class_distribution)
        for attr_index, splitter in leaf.feature_estimators.items():
            _, gain = splitter.get_best_split(parent_impurity)
            if gain > 0:
                gains[attr_index] = gain
        return gains

    def _calculate_hoeffding_bound(self, n_instances):
        if n_instances == 0: return float('inf')
        return math.sqrt((self.R_HEURISTIC_RANGE**2 * math.log(1 / self.delta)) / (2 * n_instances))

    def _update_historical_stats(self, impurity, g_best, n_l, epsilon):
        self.h_stat_values.append(impurity)
        self.g_stat_values.append(g_best)
        self.n_stat_values.append(n_l)
        self.hb_stat_values.append(epsilon)

---
### **4. Utilitário de Carregamento de Dados**
Esta célula contém uma função genérica para carregar um dataset a partir de um arquivo CSV local.

* **O que é:** Um utilitário reutilizável para a preparação de dados.
* **Como funciona:** Abre um arquivo CSV, ignora o cabeçalho e lê cada linha. Converte as `features` para `float` e as separa do rótulo da `class`. Retorna uma lista de tuplas, onde cada tupla contém um dicionário de instância e o seu rótulo correspondente, pronto para ser usado como um fluxo de dados.
* **Parâmetros:**
  * `file_path`: O caminho local para o arquivo CSV (ex: `'NOAA_weather.csv'`).
  * `class_index`: O índice da coluna que contém o rótulo da `class`. O padrão é -1 (a última coluna).

In [None]:
def load_csv_dataset(file_path, class_index=-1):
    """
    A generic function to load a dataset from a local CSV file.
    :param file_path: Path to the CSV file.
    :param class_index: The index of the column containing the class label.
    :return: A list of (instance_dict, label) tuples.
    """
    print(f"Loading dataset from local file: {file_path}...")
    stream = []
    try:
        with open(file_path, 'r', newline='') as f:
            reader = csv.reader(f)
            try:
                # Attempt to skip header, but don't fail if it doesn't exist
                header = next(reader)
                print(f"Dataset header: {header}")
            except StopIteration:
                print("No header found or file is empty.")
                return []

            for row in reader:
                try:
                    # Adjust class_index for feature list if it's negative
                    if class_index < 0:
                        effective_class_index = len(row) + class_index
                    else:
                        effective_class_index = class_index

                    label = row[effective_class_index]

                    features_list = [float(val) for i, val in enumerate(row) if i != effective_class_index]

                    instance_X_dict = {j: features_list[j] for j in range(len(features_list))}
                    stream.append((instance_X_dict, label))
                except (ValueError, IndexError) as e:
                    print(f"Skipping malformed row: {row} | Error: {e}")
        print(f"Successfully loaded {len(stream)} instances.")
        return stream
    except FileNotFoundError:
        print(f"Error: File not found at '{file_path}'. Please upload it to your environment.")
        return None


---
### **5. Execução e Configuração do Experimento**
Este é o bloco de execução principal. Aqui, você configura o experimento escolhendo o dataset e definindo os hiperparâmetros do modelo.

* **Como funciona:**
  1. **Configuração:** Defina o `FILE_PATH` e o `CLASS_COLUMN_INDEX` para o dataset que você deseja testar.
  2. **Carregar Dados:** Chame a função `load_csv_dataset` para preparar o fluxo de dados.
  3. **Inicializar Modelo:** Crie uma instância da classe `DFDT`, passando hiperparâmetros como `delta` e `initial_grace_period`.
  4. **Treinar:** Chame o método `train` na instância do modelo para iniciar o processo de aprendizagem.

In [None]:
def set_results_log(filepath, dataset_name, accuracy, exec_time):
    if accuracy is not None and exec_time is not None:
        print(f"Dataset: {dataset_name}")
        print(f"  -> Final Accuracy: {accuracy:.4f}")
        print(f"  -> Execution Time: {exec_time:.2f} seconds")
        result_line = f"{dataset_name},{exec_time:.2f},{accuracy:.4f}\n"
    else:
        result_line = f"{dataset_name},File Not Found,N/A\n"

    write_header = not os.path.exists(filepath)

    if write_header:
        print(f"Results file '{filepath}' created.")

    with open(filepath, 'a') as f:
        if write_header:
            f.write("Dataset,Execution Time (s),Final Accuracy\n")
        f.write(result_line)


def run_single_experiment(dataset_config):
    file_path = dataset_config['path']
    class_column_index = dataset_config['class_index']

    print(f"\n--- Processing dataset: {dataset_config['name']} ---")

    if not os.path.exists(file_path):
        print(f"WARNING: File '{file_path}' not found. Skipping this dataset.")
        return None, None

    data_stream = load_csv_dataset(file_path, class_column_index)

    if not data_stream:
        print(f"WARNING: Could not load data from '{file_path}'.")
        return None, None

    dfdt_model = DFDT(delta=0.05, initial_grace_period=200)

    start_time = time.time()
    final_accuracy = dfdt_model.train(data_stream)
    end_time = time.time()

    execution_time = end_time - start_time

    return final_accuracy, execution_time

In [None]:
print("DFDT Python Implementation - Generic Runner")
print("=" * 40)

datasets = [
    {'name': 'NOAA', 'path': 'NOAA.csv', 'class_index': 8},
    {'name': 'Keystroke', 'path': 'Keystroke.csv', 'class_index': 10},
    {'name': 'Chess', 'path': 'Chess.csv', 'class_index': 7},
    {'name': 'Luxembourg', 'path': 'Luxembourg.csv', 'class_index': 31},
    {'name': 'Ozone', 'path': 'Ozone.csv', 'class_index': 72},
    {'name': 'SmartMeter', 'path': 'SmartMeter.csv', 'class_index': 96},
    {'name': 'Electricity', 'path': 'Electricity.csv', 'class_index': 8},
    {'name': 'Rialto', 'path': 'Rialto.csv', 'class_index': 27},
    {'name': 'Forest', 'path': 'Forest.csv', 'class_index': 54},
    {'name': 'Posture', 'path': 'Posture.csv', 'class_index': 3},
    # {'name': 'KDDCUP99', 'path': 'KDDCUP99.csv', 'class_index': 38},
    {'name': 'PokerHand', 'path': 'PokerHand.csv', 'class_index': 10},
]
results_filepath = 'dfdt_experiment_results.txt'
if os.path.exists(results_filepath):
  os.remove(results_filepath)

for config in datasets:
    accuracy, exec_time = run_single_experiment(config)
    set_results_log(results_filepath, config['name'], accuracy, exec_time)

print(f"\n{'=' * 40}")
print("Processing complete!")
print(f"All results have been saved to '{results_filepath}'.")

[1;30;43mA saída de streaming foi truncada nas últimas 5000 linhas.[0m
Prediction: 0, Actual: 1 -> Incorrect!
Current Accuracy: 50.02%

--- [Instance #827954] ---
Prediction: 0, Actual: 1 -> Incorrect!
Current Accuracy: 50.02%

--- [Instance #827955] ---
Prediction: 0, Actual: 1 -> Incorrect!
Current Accuracy: 50.02%

--- [Instance #827956] ---
Prediction: 0, Actual: 2 -> Incorrect!
Current Accuracy: 50.02%

--- [Instance #827957] ---
Prediction: 0, Actual: 2 -> Incorrect!
Current Accuracy: 50.02%

--- [Instance #827958] ---
Prediction: 0, Actual: 1 -> Incorrect!
Current Accuracy: 50.02%

--- [Instance #827959] ---
Prediction: 0, Actual: 1 -> Incorrect!
Current Accuracy: 50.02%

--- [Instance #827960] ---
Prediction: 0, Actual: 2 -> Incorrect!
Current Accuracy: 50.02%

--- [Instance #827961] ---
Prediction: 0, Actual: 1 -> Incorrect!
Current Accuracy: 50.02%

--- [Instance #827962] ---
Prediction: 0, Actual: 1 -> Incorrect!
Current Accuracy: 50.02%

--- [Instance #827963] ---
Predict