In [4]:
import numpy as np
from river.drift import ADWIN

class Node:
    def __init__(self, is_leaf=True, prediction=None):
        self.is_leaf = is_leaf
        self.prediction = prediction
        self.split_feature = None
        self.split_value = None
        self.children = {}
        self.class_counts = np.zeros(2)
        self.adwin = ADWIN()
        self.alternate_tree = None  # Alternate tree starts as None

def hoeffding_bound(R, n):
    return np.sqrt((R**2 * np.log(1/0.10)) / (2 * n))

def entropy(labels):
    label_counts = np.bincount(labels, minlength=2)
    probabilities = label_counts / np.sum(label_counts)
    probabilities = probabilities[probabilities > 0]
    return -np.sum(probabilities * np.log2(probabilities))

def information_gain(parent_labels, left_labels, right_labels):
    entropy_before = entropy(parent_labels)
    total_size = len(parent_labels)
    left_size = len(left_labels)
    right_size = len(right_labels)
    weighted_entropy = (left_size / total_size) * entropy(left_labels) + \
                        (right_size / total_size) * entropy(right_labels)
    return entropy_before - weighted_entropy

def best_split(data, labels):
    features = data.shape[1]
    best_split_feature = None
    best_split_value = None
    best_split_information_gain = -np.inf
    
    for feature in range(features):
        values = np.sort(np.unique(data[:, feature]))
        for i in range(len(values) - 1):
            split_value = (values[i] + values[i+1]) / 2
            smaller_values = data[:, feature] <= split_value
            bigger_values = ~smaller_values
            info_gain = information_gain(labels, labels[smaller_values], labels[bigger_values])
            if info_gain > best_split_information_gain:
                best_split_feature = feature
                best_split_value = split_value
                best_split_information_gain = info_gain
                
    return best_split_feature, best_split_value

class HAT:
    def __init__(self):
        self.root = Node(is_leaf=True, prediction=0)
    
    def fit(self, X, y):
        y_adjusted = y - 1
        for xi, yi in zip(X, y_adjusted):
            self._fit_single(xi, yi)
    
    def _fit_single(self, x, y):
        y = y - 1
        node = self.root
        while not node.is_leaf:
            if x[node.split_feature] <= node.split_value:
                node = node.children['left']
            else:
                node = node.children['right']
        
        node.class_counts[y] += 1
        node.prediction = np.argmax(node.class_counts)
        
        # ADWIN update and drift check
        old_prediction = node.prediction
        node.adwin.update(y == old_prediction)
        
        if node.adwin.drift_detected:
            if node.alternate_tree is None:
                node.alternate_tree = Node(is_leaf=True, prediction=np.argmax(node.class_counts))
            self._fit_single(x, y, node.alternate_tree)  # Train alternate tree
        
        # Compare performance if alternate tree exists
        if node.alternate_tree and node.alternate_tree.adwin.estimation < node.adwin.estimation:
            # Replace subtree
            node.is_leaf = node.alternate_tree.is_leaf
            node.split_feature = node.alternate_tree.split_feature
            node.split_value = node.alternate_tree.split_value
            node.children = node.alternate_tree.children
        
        # Attempt to split if the node is still a leaf
        if node.is_leaf and np.sum(node.class_counts) > 25:
            self._attempt_to_split(node, x, y)
    
    def _attempt_to_split(self, node, x, y):
        X_sub = np.array([x])
        y_sub = np.array([y])
        feature, value = best_split(X_sub, y_sub)
        if feature is not None:
            n = np.sum(node.class_counts)
            epsilon = hoeffding_bound(1, n)
            if epsilon < 0.1: 
                node.is_leaf = False
                node.split_feature = feature
                node.split_value = value
                node.children['left'] = Node(is_leaf=True, prediction=np.argmax(node.class_counts))
                node.children['right'] = Node(is_leaf=True, prediction=np.argmax(node.class_counts))
    
    def predict(self, x):
        node = self.root
        while not node.is_leaf:
            if x[node.split_feature] <= node.split_value:
                node = node.children['left']
            else:
                node = node.children['right']
        return node.prediction + 1  # Adjust back to original class labels

# You would then use the HAT class with your data similarly to how you'd use the EFDT.


In [5]:
import numpy as np

# Assuming your file is named 'data.txt' and located in the current directory
file_path = 'Skin_NonSkin 2.txt'

# Load the data
data = np.loadtxt(file_path, delimiter='\t')
np.random.shuffle(data)


# Split the data into features and target variable
X = data[:, :-1]
y = data[:, -1].astype(int) 

current = 0
drift = False

for i in range(len(y)):
    if drift:
        y[i] = 1 if y[i] == 2 else 2
    current += 1
    
    # Changes every 50000
    if current > 50000:
        drift = not drift
        current = 0


In [6]:

# Initialize the Tree
model = HAT()

#Initialize variables
errors_count = 0
error_rates = []

for idx in range(len(y)):
    pred = model.predict(X[idx])

    if pred != y[idx]:
        errors_count += 1
        
    
    # Calculate Error Rate
    
    if idx > 10000:
        error_rate = errors_count / (idx + 1)
        error_rates.append([idx, error_rate])
    
    # Print Every 10000 Iterations
    if idx % 10000 == 0:
        print(f'Instance: {idx}')
    

    model._fit_single(X[idx], y[idx])



Instance: 0
Instance: 10000
Instance: 20000
Instance: 30000
Instance: 40000
Instance: 50000


TypeError: HAT._fit_single() takes 3 positional arguments but 4 were given

In [None]:
import matplotlib.pyplot as plt

# Splitting the list into two lists, idxs and errors
idxs, errors = zip(*error_rates)

# Plotting
plt.figure(figsize=(10, 6))  # Optional: Specifies the figure size
plt.plot(idxs, errors, marker='o', linestyle='-', color='b')  # Marker, linestyle, and color are optional
plt.title('Error Rate Over Time')
plt.xlabel('Index (or Time)')
plt.ylabel('Error Rate')
plt.grid(True)  # Optional: Adds a grid for easier reading
plt.show()