<center><font size = '+3'>Self-Constructed CART</font></center>
<br>
<center><font size = '+2'>an ensemble learning project</font></center>
<center><font size = '+1'>by</font></center>
<center><font size = '+1'>Congjie AN, Jialin DU, Zeli PAN, Yifan WANG</font></center>

## Construction of CART

In this part we implement a CART from scratch ***using only Numpy***

In [128]:
# import required packages

import numpy as np

In [129]:
# define the function to calculate impurity

def impurity(criterion, labels, decimal = None):
  assert criterion in ['gini', 'mse', 'entropy'], f"Error: criterion \'{criterion}\' is not supported."
  labels = np.array(labels)
  if criterion == 'gini': # Gini impurity for classification tasks
    length = float(len(labels)) # pre-define to save runtime
    res =  1.0 - sum([ ( float(len(labels[labels == label])) / length ) ** 2.0 for label in np.unique(labels) ])
  elif criterion == 'mse': # MSE for regression tasks
    res = np.mean( (labels - np.mean(labels)) ** 2.0 )
  else: # Entropy impurity for classification tasks
    res = list()
    length = float(len(labels)) # pre-define to save runtime
    p = np.array([float(len(labels[labels == label])) / length for label in np.unique(labels)])
    res = sum( p * np.log2(p) * (-1) )
  # set the result to targeted decimal
  return res if decimal == None else round(res, decimal)

In [130]:
# test the impurity function

lst1 = [1, 1, 1, 1, 1, 1]
lst2 = [1, 1, 1, 2, 2, 2]

assert impurity('gini', lst1) == 0.0
assert impurity('gini', lst2) == 0.5
assert impurity('entropy', lst1) == 0.0
assert impurity('entropy', lst2) == 1.0
assert impurity('mse', lst1) == 0.0
assert impurity('mse', lst2) == 0.25

del lst1
del lst2

lst3 = [1, 2, 2, 3, 3, 3, 4, 4, 4, 4]
print(impurity('gini', lst3, 2))
print(impurity('entropy', lst3, 2))
print(impurity('mse', lst3, 2))
del lst3

0.7
1.85
1.0


In [131]:
# define the node class

class Node(object):
  def __init__(self, criterion, max_depth, min_samples, min_gain):
    # these are the basic attributes of a node
    self.label = None # the label assigned to the samples in this node
    self.depth = None # the depth of this node
    self.num_samples = None # the number of samples in this node
    self.impurity = None # the impurity of the samples in this node
    # these are the attributes related to the tree
    self.criterion = criterion
    self.max_depth = max_depth # max depth of a tree
    self.min_samples = min_samples # minimum number of samples in a node
    self.min_gain = min_gain # minimum impurity gain from a split
    # these are the attributes related to the splitment of a node
    self.is_leaf = False # whether a node is a leaf
    self.split_feature = None # the feature that used to split
    self.split_threshold = None # the point to split
    self.child_left = None # the child node on the left
    self.left_num_samples = None # the number of sample of the left child
    self.child_right = None # the child node on the right
    self.right_num_samples = None # the number of sample of the right child
    self.split_gain = None # the impurity gain of the split

  # a function to grow the tree (split the node)
  def grow(self, features, targets):
    # set number of samples for current node
    self.num_samples = features.shape[0]
    # calculate the impurity of this node
    self.impurity = impurity(criterion = self.criterion, labels = targets, decimal = 4)
    # set label of samples for current node
    # if all the samples belong to one class, stop growing from this node
    if len(np.unique(targets)) == 1:
      self.label = targets[0]
      self.is_leaf = True
      return
    # else, choose the most common class as node label
    if self.criterion in ['gini', 'entropy']: # classification
      unique_labels, labels_counts = np.unique(targets, return_counts = True)
      self.label = unique_labels[np.argmax(labels_counts)]
      del unique_labels, labels_counts
    else: # regression
      self.label = np.mean(targets)
    # if the depth of the node reaches max_depth, stop growing
    if self.depth == self.max_depth:
      self.is_leaf = True
      return
    
    # Decide using which feature and which threshold to split the node
    for feature_index in range(features.shape[1]):
      feature_values = np.unique(features[:, feature_index])
      feature_values = np.sort(feature_values) # sorted unique values from this feature
      thresholds = (feature_values[:-1] + feature_values[1:]) / 2.0
      for threshold in thresholds:
        # calculate the attributes of the left child node
        targets_left = targets[features[:, feature_index] <= threshold]
        impurity_left = impurity(self.criterion, targets_left, 4)
        ratio_left = float(len(targets_left)) / float(self.num_samples)
        # calculate the attributes of the right child node
        targets_right = targets[features[:, feature_index] > threshold]
        impurity_right = impurity(self.criterion, targets_right, 4)
        ratio_right = float(len(targets_right)) / float(self.num_samples)
        # calculate the impurity gain from this split
        impurity_gain = self.impurity - (ratio_left * impurity_left + ratio_right * impurity_right)
        # save the best way of split
        if (self.split_gain == None) or (impurity_gain > self.split_gain):
          self.split_gain = impurity_gain
          self.split_feature = feature_index
          self.split_threshold = threshold
          self.left_num_samples = len(targets_left)
          self.right_num_samples = len(targets_right)
    # check whether this split meet stop conditions
    if self.min_samples != None: # stop by min_samples
      if (self.left_num_samples < self.min_samples) or (self.right_num_samples < self.min_samples):
        self.is_leaf = True
        return
    if self.min_gain != None: # stop by min_gain
      if self.split_gain < self.min_gain:
        self.is_leaf = True
        return
    # pass the stop conditions, begin splitting
    # split the left node
    features_left = features[features[:, self.split_feature] <= self.split_threshold]
    targets_left = targets[features[:, self.split_feature] <= self.split_threshold]
    self.child_left = Node(criterion = self.criterion, max_depth = self.max_depth, min_samples = self.min_samples, min_gain = self.min_gain)
    self.child_left.depth = self.depth + 1
    self.child_left.grow(features_left, targets_left)
    # split the right node
    features_right = features[features[:, self.split_feature] > self.split_threshold]
    targets_right = targets[features[:, self.split_feature] > self.split_threshold]
    self.child_right = Node(criterion = self.criterion, max_depth = self.max_depth, min_samples = self.min_samples, min_gain = self.min_gain)
    self.child_right.depth = self.depth + 1
    self.child_right.grow(features_right, targets_right)
  
  # a function to make classification for a given instance
  def predict(self, row):
    if self.is_leaf == False:
      if row[self.split_feature] <= self.split_threshold:
        return self.child_left.predict(row)
      else:
        return self.child_right.predict(row)
    else: 
      return self.label

  # a function to print the label, split feature and threshold
  def show(self, condition):
    if self.is_leaf == False:
      print('  ' * self.depth + condition + f"if X[{str(self.split_feature)}] <= {str(self.split_threshold)}")
      self.child_left.show('then ')
      self.child_right.show('else ')
    else:
      if self.criterion == 'mse':
        print('  ' * self.depth + f"Value: {str(self.label)}, Num_samples: {str(self.num_samples)}")
      else:
        print('  ' * self.depth + f"Class: {str(self.label)}, Num_samples: {str(self.num_samples)}")

In [132]:
# define the CART class

# Define a class for the whole CART
class CART(object):
  def __init__(self, criterion = 'gini', max_depth = 5, min_samples = None, min_gain = None, print_begin = False):
    assert criterion in ['gini', 'mse', 'entropy'], f"Error: criterion \'{criterion}\' is not supported."
    self.criterion = criterion # the impurity function used
    self.max_depth = max_depth # the maximum depth of the tree
    self.min_samples = min_samples # the minimum number of samples in a node
    self.min_gain = min_gain # the minimum split impurity gain
    self.root = Node(criterion = self.criterion, max_depth = self.max_depth, min_samples = self.min_samples, min_gain = self.min_gain)
    self.root.depth = 0
    if print_begin == True:
      print("Tree initialized, make sure that your criterion is \'gini\' or \'entropy\' for classification tasks, and \'mse\' for regression tasks.")

  # the function to train the CART
  def fit(self, features, targets):
    self.root.grow(features, targets)

  # the function to make predictions
  def predict(self, features):
    return np.array([self.root.predict(row) for row in features])

  # the function to print the CART
  def show(self):
    return self.root.show('')

## Evaluation

- We evaluate our CART in classification tasks using the Iris, Wine and Breast Cancer Wisconsin (Diagnostic) datasets.
- We evaluate our CART in regression tasks using the datasets we generated.

In [133]:
from sklearn.datasets import load_iris
from sklearn.datasets import load_wine
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
from sklearn import tree
from sklearn.metrics import f1_score

def classification_evaluation(sklearn_dataset, print_tree = True):
  assert sklearn_dataset in ['iris', 'wine', 'breast_cancer'], f"Error: dataset \'{sklearn_dataset}\' is not supported."
  print(f"Begin evaluation on {sklearn_dataset} dataset.")

  # load the chosen dataset
  if sklearn_dataset == 'iris':
    dataset = load_iris()
  elif sklearn_dataset == 'wine':
    dataset = load_wine()
  else:
    dataset = load_breast_cancer()
  X, y = dataset.data, dataset.target
  X_train, X_test, y_train, y_test = train_test_split(X, y, random_state = 513)

  # predict using sklearn
  cls = tree.DecisionTreeClassifier(criterion = 'entropy', max_depth = 5)
  cls.fit(X_train, y_train)
  pred = cls.predict(X_test)
  f1score = f1_score(y_test, pred, average = 'weighted')
  print(f"\tSklearn Library Tree Prediction f1 score: {f1score}")

  # predict using self-defined CART
  cls = CART(criterion = 'entropy', max_depth = 5)
  cls.fit(X_train, y_train)
  pred = cls.predict(X_test)
  f1score = f1_score(y_test, pred, average = 'weighted')
  print(f"\tCART Prediction f1 score: {f1score}")
  
  # print the CART
  if print_tree == True:
    cls.show()

def regression_evaluation(print_tree = True):
  print("Begin evaluation on regression tasks.")
  # Generate a random dataset
  parameters = np.random.randint(low = 1, high = 6, size = 5)
  c = np.random.randint(low = -50, high = 50)
  X = np.random.randint(low = 1, high = 101, size=(500, 5))
  error = np.random.normal(loc = 0, scale = 1, size = 500) * np.mean(X, axis=1)
  y = X[:, 0] * parameters[0]
  for i in [1, 2, 3, 4]:
    y += X[:, i] * parameters[i]
  y += c
  y = y.astype('float64')
  y += error
  del parameters, c, error
  X_train, X_test, y_train, y_test = train_test_split(X, y, random_state = 513)
  
  # predict using sklearn
  reg = tree.DecisionTreeRegressor(max_depth = 5)
  reg.fit(X_train, y_train)
  pred = reg.predict(X_test)
  mse = np.mean((pred - y_test) ** 2)
  print(f"\tSklearn Library Tree Prediction MSE: {mse}")
  
  # predict using CART
  reg = CART(criterion = 'mse', max_depth = 5)
  reg.fit(X_train, y_train)
  pred = reg.predict(X_test)
  mse = np.mean((pred - y_test) ** 2)
  print(f"\tCart Prediction MSE: {mse}")

  # print the CART
  if print_tree == True:
    reg.show()

In [134]:
classification_evaluation('iris', print_tree = True)

Begin evaluation on iris dataset.
	Sklearn Library Tree Prediction f1 score: 0.9738894018672412
	CART Prediction f1 score: 0.9738894018672412
if X[2] <= 2.45
  Class: 0, Num_samples: 35
  else if X[3] <= 1.65
    then if X[2] <= 4.95
      Class: 1, Num_samples: 34
      else if X[0] <= 6.05
        then if X[1] <= 2.45
          Class: 2, Num_samples: 1
          Class: 1, Num_samples: 1
        Class: 2, Num_samples: 2
    else if X[2] <= 4.85
      then if X[1] <= 3.1
        Class: 2, Num_samples: 3
        Class: 1, Num_samples: 1
      Class: 2, Num_samples: 35


In [135]:
classification_evaluation('wine', print_tree = True)

Begin evaluation on wine dataset.
	Sklearn Library Tree Prediction f1 score: 0.9561438923395444
	CART Prediction f1 score: 0.9555555555555556
if X[6] <= 1.58
  then if X[9] <= 3.825
    Class: 1, Num_samples: 10
    else if X[2] <= 2.06
      Class: 1, Num_samples: 1
      Class: 2, Num_samples: 36
  else if X[12] <= 724.5
    Class: 1, Num_samples: 42
    else if X[0] <= 12.66
      Class: 1, Num_samples: 3
      Class: 0, Num_samples: 41


In [136]:
classification_evaluation('breast_cancer', print_tree = True)

Begin evaluation on breast_cancer dataset.
	Sklearn Library Tree Prediction f1 score: 0.9373317757757117
	CART Prediction f1 score: 0.9373317757757117
if X[23] <= 884.75
  then if X[27] <= 0.13579999999999998
    then if X[13] <= 35.71
      then if X[21] <= 33.269999999999996
        Class: 1, Num_samples: 228
        else if X[20] <= 14.43
          Class: 1, Num_samples: 11
          Class: 0, Num_samples: 3
      else if X[19] <= 0.003566
        then if X[1] <= 16.08
          Class: 1, Num_samples: 3
          Class: 0, Num_samples: 5
        Class: 1, Num_samples: 7
    else if X[1] <= 20.299999999999997
      then if X[13] <= 21.285
        Class: 1, Num_samples: 8
        else if X[12] <= 2.62
          Class: 0, Num_samples: 8
          Class: 1, Num_samples: 6
      Class: 0, Num_samples: 15
  else if X[1] <= 15.015
    then if X[4] <= 0.097625
      Class: 1, Num_samples: 4
      Class: 0, Num_samples: 4
    Class: 0, Num_samples: 124


In [137]:
regression_evaluation()

Begin evaluation on regression tasks.
	Sklearn Library Tree Prediction MSE: 14318.652696196305
	Cart Prediction MSE: 14318.652696196305
if X[3] <= 48.0
  then if X[4] <= 28.5
    then if X[0] <= 28.5
      then if X[2] <= 79.5
        then if X[3] <= 25.0
          Value: 304.2830404124651, Num_samples: 7
          Value: 390.0190771722673, Num_samples: 5
        else if X[4] <= 6.0
          Value: 370.6246415136852, Num_samples: 2
          Value: 516.1115516873718, Num_samples: 2
      else if X[3] <= 29.0
        then if X[3] <= 10.5
          Value: 343.14456747748136, Num_samples: 1
          Value: 484.54385930633106, Num_samples: 12
        else if X[4] <= 17.0
          Value: 533.2030063969042, Num_samples: 7
          Value: 678.4869220609514, Num_samples: 4
    else if X[4] <= 66.5
      then if X[0] <= 55.5
        then if X[3] <= 23.0
          Value: 561.9072286110744, Num_samples: 15
          Value: 663.0161108280506, Num_samples: 18
        else if X[4] <= 43.5
      

## Evolutionary Hyperparameters Optimization

In the task of using Decision Tree for classification or regression, parameter adjustment is very important. Appropriate parameters can improve model performance while avoiding overfitting. However, the traditional grid search method is inefficient. So, while writing CART, we created algorithms that use evolutionary algorithms to find optimal hyperparameters.

In [145]:
import random as rd
from tqdm import tqdm
import matplotlib.pyplot as plt

# define a function to get the performance of a parameters combination
def perform(X_train, X_test, y_train, y_test, criterion, max_depth, min_samples, min_gain):
  model = CART(criterion, max_depth, min_samples, min_gain)
  model.fit(X_train, y_train)
  pred = model.predict(X_test)
  if criterion == 'mse':
    return (-1) * np.mean((pred - y_test) ** 2)
  else:
    return f1_score(y_test, pred, average = 'weighted')

# main function
def evo_optimize(X_train, X_test, y_train, y_test, task, max_depth_list, min_samples_list, min_gain_active = False, min_gain_range = (0.01, 0.05), epochs = 10, plot = False):
  # task can be 'regression' or 'classification'
  # max_depth_list is the possible values for max_depth
  # min_samples_list is the possible values for min_samples
  # min_gain_active decides whether we will optimize min_gain
  # min_gain_range is the range for possible values for min_gain
  # epoch is the number of rounds of evolution

  # initialization
  print("Initializing......")
  if task == 'regression':
    criterion = ['mse'] * 10
  else:
    criterion = rd.choices(['gini', 'entropy'], k = 10)
  max_depth = rd.choices(max_depth_list, k = 10)
  min_samples = rd.choices(min_samples_list, k = 10)
  if min_gain_active == False:
    min_gain = [None] * 10
  else:
    min_gain = [rd.uniform(min_gain_range[0], min_gain_range[1]) for _ in range(10)]
  # generate 10 initial samples
  env = list()
  for i in range(10):
    env.append([criterion[i], max_depth[i], min_samples[i], min_gain[i]])
  del criterion, max_depth, min_samples, min_gain
  # get the performances
  performances = [perform(X_train, X_test, y_train, y_test, row[0], row[1], row[2], row[3]) for row in env]
  # sort both env and performances
  order = [i for i, _ in sorted(enumerate(performances), key = lambda x:x[1], reverse = True)]
  env = [env[i] for i in order]
  performances = [performances[i] for i in order]
  # loop for pre-set generations
  history = [performances[0]]
  print("Initialization finished, begin evolving......")
  for epoch in tqdm(range(1, epochs + 1)):
    env = env[0:4] # Eliminate the last six
    performances = performances[0:4]
    # Reproduce
    new_env = list()
    for i,j in [(0,1), (0,2), (0,3), (1,2), (1,3), (2,3)]:
      child = list()
      for k in range(4):
        prob = rd.random()
        if prob < 0.35:
          child.append(env[i][k])
        elif prob < 0.7:
          child.append(env[j][k])
        else: # mutation probability is 30%
          if (k == 0) and (task == 'regression'):
            child.append('mse')
          elif (k == 0) and (task == 'classification'):
            child.append(rd.choice(['gini', 'entropy']))
          elif k == 1:
            child.append(rd.choice(max_depth_list))
          elif k == 2:
            child.append(rd.choice(min_samples_list))
          elif (k == 3) and (min_gain_active == False):
            child.append(None)
          else:
            child.append(rd.uniform(min_gain_range[0], min_gain_range[1]))
      new_env.append(child)
    # generate new performances, merge and sort
    new_performances = [perform(X_train, X_test, y_train, y_test, row[0], row[1], row[2], row[3]) for row in new_env]
    env.extend(new_env)
    performances.extend(new_performances)
    order = [i for i, value in sorted(enumerate(performances), key = lambda x:x[1], reverse = True)]
    env = [env[i] for i in order]
    performances = [performances[i] for i in order]
    history.append(performances[0])
  # print the results
  print(f"The optimal hyperparameter choice for this {task} task is:")
  print(f"criterion: {env[0][0]}")
  print(f"max_depth: {env[0][1]}")
  print(f"min_samples: {env[0][2]}")
  print(f"min_gain: {env[0][3]}")
  if task == 'classification':
    print(f"The final f1 score is {performances[0]}.")
    if plot == True:
      plt.plot(history)
      plt.xlabel('generation')
      plt.ylabel('f1 score')
      plt.title('f1 score history')
      plt.show()
  else:
    history = list(np.array(history) * (-1))
    print(f"The final MSE is {(-1) * performances[0]}.")
    if plot == True:
      plt.plot(history)
      plt.xlabel('generation')
      plt.ylabel('MSE')
      plt.title('mse history')
      plt.show()

## Evaluation

We evaluate this algorithm on the same tasks.

In [146]:
dataset = load_iris()
X, y = dataset.data, dataset.target
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state = 513)

task = 'classification'
max_depth_list = range(3, 13)
min_samples_list = range(1, 6)
min_gain_active = False

evo_optimize(X_train, X_test, y_train, y_test, task, max_depth_list, min_samples_list, min_gain_active = False, min_gain_range = (0.01, 0.05), epochs = 10)

Initializing......
Initialization finished, begin evolving......


100%|██████████| 10/10 [00:01<00:00,  6.39it/s]

The optimal hyperparameter choice for this classification task is:
criterion: gini
max_depth: 6
min_samples: 3
min_gain: None
The final f1 score is 0.9738894018672412.





In [147]:
cls = CART(criterion = 'gini', max_depth = 6, min_samples = 3, min_gain = None)
cls.fit(X_train, y_train)
cls.show()

if X[2] <= 2.45
  Class: 0, Num_samples: 35
  else if X[3] <= 1.65
    then if X[2] <= 4.95
      Class: 1, Num_samples: 34
      Class: 2, Num_samples: 4
    else if X[2] <= 4.85
      Class: 2, Num_samples: 4
      Class: 2, Num_samples: 35


In [148]:
dataset = load_wine()
X, y = dataset.data, dataset.target
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state = 513)

task = 'classification'
max_depth_list = range(3, 13)
min_samples_list = range(1, 6)
min_gain_active = False

evo_optimize(X_train, X_test, y_train, y_test, task, max_depth_list, min_samples_list, min_gain_active = False, min_gain_range = (0.01, 0.05), epochs = 10)

Initializing......
Initialization finished, begin evolving......


100%|██████████| 10/10 [00:14<00:00,  1.47s/it]

The optimal hyperparameter choice for this classification task is:
criterion: entropy
max_depth: 10
min_samples: 5
min_gain: None
The final f1 score is 0.9555555555555556.





In [149]:
cls = CART(criterion = 'entropy', max_depth = 10, min_samples = 5, min_gain = None)
cls.fit(X_train, y_train)
cls.show()

if X[6] <= 1.58
  then if X[9] <= 3.825
    Class: 1, Num_samples: 10
    Class: 2, Num_samples: 37
  else if X[12] <= 724.5
    Class: 1, Num_samples: 42
    Class: 0, Num_samples: 44


In [151]:
parameters = np.random.randint(low = 1, high = 6, size = 5)
c = np.random.randint(low = -50, high = 50)
X = np.random.randint(low = 1, high = 101, size=(500, 5))
error = np.random.normal(loc = 0, scale = 1, size = 500) * np.mean(X, axis=1)
y = X[:, 0] * parameters[0]
for i in [1, 2, 3, 4]:
  y += X[:, i] * parameters[i]
y += c
y = y.astype('float64')
y += error
del parameters, c, error
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state = 513)

task = 'regression'
max_depth_list = range(3, 13)
min_samples_list = range(1, 6)
min_gain_active = False

evo_optimize(X_train, X_test, y_train, y_test, task, max_depth_list, min_samples_list, min_gain_active = False, min_gain_range = (0.01, 0.05), epochs = 10)

Initializing......
Initialization finished, begin evolving......


100%|██████████| 10/10 [00:35<00:00,  3.56s/it]

The optimal hyperparameter choice for this regression task is:
criterion: mse
max_depth: 8
min_samples: 3
min_gain: None
The final MSE is 15044.028244783318.





In [152]:
reg = CART(criterion = 'mse', max_depth = 8, min_samples = 3, min_gain = None)
reg.fit(X_train, y_train)
reg.show()

if X[3] <= 50.5
  then if X[1] <= 47.0
    then if X[3] <= 21.5
      then if X[4] <= 64.0
        then if X[1] <= 34.5
          Value: 365.8835642749943, Num_samples: 14
          Value: 453.65896395141596, Num_samples: 8
        else if X[1] <= 15.5
          Value: 462.84406176807073, Num_samples: 7
          Value: 586.2087346723567, Num_samples: 4
      else if X[4] <= 75.5
        then if X[1] <= 26.5
          then if X[4] <= 13.0
            Value: 390.1760172012003, Num_samples: 3
            Value: 496.28587182530094, Num_samples: 12
          else if X[4] <= 23.0
            then if X[4] <= 16.0
              Value: 553.1898741140201, Num_samples: 5
              Value: 474.949339540143, Num_samples: 4
            else if X[0] <= 53.5
              Value: 577.5577817112726, Num_samples: 4
              Value: 695.8546096490811, Num_samples: 4
        else if X[1] <= 31.5
          then if X[3] <= 33.0
            Value: 564.3653627595679, Num_samples: 4
            Value: 6