<a href="https://colab.research.google.com/github/MariiaZimokha/notebooks/blob/main/decision_tree.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Implementation

In [2]:
import numpy as np
import pandas as pd
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

def get_data():
  df = load_iris()
  x = df.data
  y = df.target

  X_train, X_test, y_train, y_test = train_test_split(x, y, test_size=0.33, random_state=42)

  return X_train, X_test, y_train, y_test

In [3]:
X, X_test, y, y_test = get_data()

In [20]:
class Node:
  def __init__(self, value=None):
    self.value = value
    self.left = None
    self.right = None
    self.feature = None
    self.label = None

In [25]:
class DecisionTreeCl:
  def __init__(self, max_depth=3) -> None:
      self.root = Node()
      self.max_depth = max_depth

  def fit(self, X, y):
    self.root = self._build_tree(X, y, 0, self.root)
  
  def _build_tree(self, X, y, depth, node):
    if self.max_depth <= depth or len(np.unique(y)) == 1:
      unique_labels, counts = np.unique(y, return_counts=True)
      node.label = unique_labels[np.argmax(counts)]
      return node

    best_column, best_value = self._get_split(X, y)

    left_split_X, left_split_y, right_split_X, right_split_y = self._split_df(X, y, best_value, best_column)

    node.value = best_value
    node.feature = best_column

    node.left = self._build_tree(left_split_X, left_split_y, depth + 1, Node())    
    node.right = self._build_tree(right_split_X, right_split_y, depth + 1, Node())
    
    return node

  def _get_split(self, X, y):
    best_column = None
    best_value = None
    best_gini = float('inf')

    best_gain = 0

    columns = X.shape[1]
    # for each column in the data (ot for each feature), 
    for column_index in range(columns):
      # we find all unique velues for it 
      unique_values = np.unique(X[:, column_index])
      # create new split for each of the value and calculate the gini impurity
      # The feature and value with the lowest impurity, will be returned
      for value in unique_values:
        left_split_X, left_split_y, right_split_X, right_split_y = self._split_df(X, y, value, column_index)

        # Gini impirity calculates: 
        # gini_left * lenght_left_split/total_lenght + gini_right * lenght_right_split/total_lenght
        gini_impurity = self._gini(left_split_y) * len(left_split_y)/len(y) + self._gini(right_split_y) * len(right_split_y)/len(y)
        
        # the smaller gini impurity the better
        if gini_impurity < best_gini: 
          best_gini = gini_impurity
          best_column = column_index
          best_value = value


    return best_column, best_value

  def _gini(self, y):
    # We need to calculate probability for each target value in the split
    # 1 - sum(P(colums)^2)
    _, counts = np.unique(y, return_counts=True)
    probabilities = counts / len(y)
    gini = 1 - np.sum(probabilities**2)
    return gini

    # p = 0
    # unique_targets = np.unique(y)
    # for target_val in unique_targets:
    #   count_val = np.sum(y == target_val)
    #   p += (count_val/len(y))**2
    # return 1 - p

  def _split_df(self, X, y, threshold, cln_index):
    left_split_mask = X[:, cln_index] <= threshold
    right_split_mask = X[:, cln_index] > threshold

    left_split_X = X[left_split_mask]
    left_split_y = y[left_split_mask]

    right_split_X = X[right_split_mask]
    right_split_y = y[right_split_mask]

    return left_split_X, left_split_y, right_split_X, right_split_y


  def _predict_traverse(self, row, node):
    if node.label is not None:
        return node.label

    column = node.feature

    if row[column] <= node.value:
        return self._predict_traverse(row, node.left)
    else:
        return self._predict_traverse(row, node.right)


  def predict(self, X):
    predictions = []

    for row in X:
      prediction = self._predict_traverse(row, self.root)
      predictions.append(prediction)
    return np.array(predictions)


In [26]:
model_cl = DecisionTreeCl(4)
model_cl.fit(X, y)

In [27]:
# X_test = X_train.to_numpy()[100:]
# y_test = y_train.to_numpy()[100:]
preds = model_cl.predict(X_test)

In [28]:
preds

array([1, 0, 2, 1, 1, 0, 1, 2, 1, 1, 2, 0, 0, 0, 0, 1, 2, 1, 1, 2, 0, 2,
       0, 2, 2, 2, 2, 2, 0, 0, 0, 0, 1, 0, 0, 2, 1, 0, 0, 0, 2, 1, 1, 0,
       0, 1, 1, 2, 1, 2])

In [155]:
y_test

array([1, 0, 2, 1, 1, 0, 1, 2, 1, 1, 2, 0, 0, 0, 0, 1, 2, 1, 1, 2, 0, 2,
       0, 2, 2, 2, 2, 2, 0, 0, 0, 0, 1, 0, 0, 2, 1, 0, 0, 0, 2, 1, 1, 0,
       0, 1, 2, 2, 1, 2])

In [29]:
np.mean(preds == y_test)


0.98

### Chat GTP generation

In [10]:
class DecisionTree:
    def __init__(self, max_depth=None):
        self.max_depth = max_depth
        self.tree = {}

    def fit(self, X, y):
        self.tree = self._build_tree(X, y, depth=0)

    def predict(self, X):
        predictions = []
        for sample in X:
            predictions.append(self._traverse_tree(sample, self.tree))
        return predictions

    def _build_tree(self, X, y, depth):
        # Base cases
        if self._should_stop(X, y, depth):
            return self._leaf_node(y)

        # Find the best split point
        best_split = self._find_best_split(X, y)

        # Create a new internal node
        node = {
            'feature_index': best_split['feature_index'],
            'threshold': best_split['threshold'],
            'left': self._build_tree(best_split['left_X'], best_split['left_y'], depth + 1),
            'right': self._build_tree(best_split['right_X'], best_split['right_y'], depth + 1)
        }

        return node

    def _should_stop(self, X, y, depth):
        # Stop if maximum depth reached
        if self.max_depth is not None and depth >= self.max_depth:
            return True

        # Stop if all samples have the same class
        return len(set(y)) == 1

    def _leaf_node(self, y):
        # Create a leaf node with the majority class
        counts = {}
        for label in y:
            if label in counts:
                counts[label] += 1
            else:
                counts[label] = 1
        majority_class = max(counts, key=counts.get)
        return {'class': majority_class}

    def _find_best_split(self, X, y):
        best_gini = float('inf')
        best_split = None

        for feature_index in range(len(X[0])):
            feature_values = set([sample[feature_index] for sample in X])

            for threshold in feature_values:
                left_X, left_y, right_X, right_y = self._split_dataset(X, y, feature_index, threshold)
                gini = self._calculate_gini(left_y, right_y)

                if gini < best_gini:
                    best_gini = gini
                    best_split = {
                        'feature_index': feature_index,
                        'threshold': threshold,
                        'left_X': left_X,
                        'left_y': left_y,
                        'right_X': right_X,
                        'right_y': right_y
                    }

        return best_split

    def _split_dataset(self, X, y, feature_index, threshold):
        left_X, left_y, right_X, right_y = [], [], [], []

        for i in range(len(X)):
            if X[i][feature_index] <= threshold:
                left_X.append(X[i])
                left_y.append(y[i])
            else:
                right_X.append(X[i])
                right_y.append(y[i])

        return left_X, left_y, right_X, right_y

    def _calculate_gini(self, left_y, right_y):
        total_samples = len(left_y) + len(right_y)
        gini_left = self._calculate_gini_impurity(left_y)
        gini_right = self._calculate_gini_impurity(right_y)
        weighted_gini = (len(left_y) / total_samples) * gini_left + (len(right_y) / total_samples) * gini_right
        return weighted_gini

    def _calculate_gini_impurity(self, y):
        counts = {}
        for label in y:
            if label in counts:
                counts[label] += 1
            else:
                counts[label] = 1
        gini_impurity = 1.0
        for label in counts:
            probability = counts[label] / len(y)
            gini_impurity -= probability ** 2
        return gini_impurity

    def _traverse_tree(self, sample, node):
        if 'class' in node:
            return node['class']

        if sample[node['feature_index']] <= node['threshold']:
            return self._traverse_tree(sample, node['left'])
        else:
            return self._traverse_tree(sample, node['right'])


In [11]:
tree = DecisionTree(max_depth=3)
tree.fit(X, y)

# Make predictions
predictions = tree.predict(X_test)
print(predictions) 

[1, 0, 2, 1, 1, 0, 1, 2, 1, 1, 2, 0, 0, 0, 0, 1, 2, 1, 1, 2, 0, 2, 0, 2, 2, 2, 2, 2, 0, 0, 0, 0, 1, 0, 0, 2, 1, 0, 0, 0, 2, 1, 1, 0, 0, 1, 1, 2, 1, 2]


In [12]:
y_test

array([1, 0, 2, 1, 1, 0, 1, 2, 1, 1, 2, 0, 0, 0, 0, 1, 2, 1, 1, 2, 0, 2,
       0, 2, 2, 2, 2, 2, 0, 0, 0, 0, 1, 0, 0, 2, 1, 0, 0, 0, 2, 1, 1, 0,
       0, 1, 2, 2, 1, 2])

### Chat GTP (recursion)

In [1]:
import numpy as np

class DecisionTree:
    def __init__(self, max_depth=None):
        self.max_depth = max_depth

    def fit(self, X, y):
        self.tree = self._build_tree(X, y, depth=0)

    def predict(self, X):
        predictions = []
        for sample in X:
            predictions.append(self._traverse_tree(sample, self.tree))
        return predictions

    def _build_tree(self, X, y, depth, node=None):
        if self._should_stop(X, y, depth):
            class_label = self._get_majority_class(y)
            return Node(class_label=class_label)

        if node is None:
            node = Node()

        best_split = self._find_best_split(X, y)

        left_X, left_y, right_X, right_y = self._split_dataset(X, y, best_split['feature_index'], best_split['threshold'])

        node.feature_index = best_split['feature_index']
        node.threshold = best_split['threshold']
        node.left = self._build_tree(left_X, left_y, depth + 1, Node())
        node.right = self._build_tree(right_X, right_y, depth + 1, Node())

        return node

    def _should_stop(self, X, y, depth):
        if self.max_depth is not None and depth >= self.max_depth:
            return True
        return len(np.unique(y)) == 1

    def _get_majority_class(self, y):
        unique_classes, class_counts = np.unique(y, return_counts=True)
        majority_class = unique_classes[np.argmax(class_counts)]
        return majority_class

    def _find_best_split(self, X, y):
        best_gini = float('inf')
        best_split = {}

        for feature_index in range(X.shape[1]):
            feature_values = np.unique(X[:, feature_index])

            for threshold in feature_values:
                left_X, left_y, right_X, right_y = self._split_dataset(X, y, feature_index, threshold)
                gini = self._calculate_gini(left_y, right_y)

                if gini < best_gini:
                    best_gini = gini
                    best_split = {
                        'feature_index': feature_index,
                        'threshold': threshold,
                        'left_X': left_X,
                        'left_y': left_y,
                        'right_X': right_X,
                        'right_y': right_y
                    }

        return best_split

    def _split_dataset(self, X, y, feature_index, threshold):
        left_mask = X[:, feature_index] <= threshold
        right_mask = X[:, feature_index] > threshold
        left_X, left_y = X[left_mask], y[left_mask]
        right_X, right_y = X[right_mask], y[right_mask]
        return left_X, left_y, right_X, right_y

    def _calculate_gini(self, left_y, right_y):
        total_samples = len(left_y) + len(right_y)
        gini_left = self._calculate_gini_impurity(left_y)
        gini_right = self._calculate_gini_impurity(right_y)
        weighted_gini = (len(left_y) / total_samples) * gini_left + (len(right_y) / total_samples) * gini_right
        return weighted_gini

    def _calculate_gini_impurity(self, y):
        unique_classes, class_counts = np.unique(y, return_counts=True)
        class_probabilities = class_counts / len(y)
        gini_impurity = 1.0 - np.sum(class_probabilities**2)
        return gini_impurity

    def _traverse_tree(self, sample, node):
        if node.class_label is not None:
            return node.class_label

        if sample[node.feature_index] <= node.threshold:
            return self._traverse_tree(sample, node.left)
        else:
            return self._traverse_tree(sample, node.right)


class Node:
    def __init__(self, feature_index=None, threshold=None, left=None, right=None, class_label=None):
        self.feature_index = feature_index
        self.threshold = threshold
        self.left = left
        self.right = right
        self.class_label = class_label


In [5]:
tree = DecisionTree(max_depth=3)
tree.fit(X, y)

# Make predictions
predictions = tree.predict(X_test)
print(np.array(predictions))
print(y_test)

[1 0 2 1 1 0 1 2 1 1 2 0 0 0 0 1 2 1 1 2 0 2 0 2 2 2 2 2 0 0 0 0 1 0 0 2 1
 0 0 0 2 1 1 0 0 1 1 2 1 2]
[1 0 2 1 1 0 1 2 1 1 2 0 0 0 0 1 2 1 1 2 0 2 0 2 2 2 2 2 0 0 0 0 1 0 0 2 1
 0 0 0 2 1 1 0 0 1 2 2 1 2]
