## Decision Tree Classifier from Scratch

In [2]:
import numpy as np
import pandas as pd

In [3]:
!gdown 1xzLHBaHrghH9AIGBot_0lyiWE2qz7GOU

Downloading...
From: https://drive.google.com/uc?id=1xzLHBaHrghH9AIGBot_0lyiWE2qz7GOU
To: /content/iris.csv
  0% 0.00/2.73k [00:00<?, ?B/s]100% 2.73k/2.73k [00:00<00:00, 2.12MB/s]


In [4]:
col_names = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'type']
data = pd.read_csv('iris.csv', skiprows=1, header=None, names=col_names)
data.head(10)

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,type
0,5.1,3.5,1.4,0.2,0
1,4.9,3.0,1.4,0.2,0
2,4.7,3.2,1.3,0.2,0
3,4.6,3.1,1.5,0.2,0
4,5.0,3.6,1.4,0.2,0
5,5.4,3.9,1.7,0.4,0
6,4.6,3.4,1.4,0.3,0
7,5.0,3.4,1.5,0.2,0
8,4.4,2.9,1.4,0.2,0
9,4.9,3.1,1.5,0.1,0


In [5]:
class Node():
  def __init__(self, feature_index=None, threshold=None, left=None, right=None, info_gain=None, value=None):
    '''constructor'''

    # for decision node
    self.feature_index = feature_index
    self.threshold = threshold
    self.left = left
    self.right = right
    self.info_gain = info_gain

    # for leaf node
    self.value = value

In [6]:
class DecisionTreeClassifier():
  def __init__(self, min_samples_split=2, max_depth=2):
    '''constructor'''

    # initialize the root of the tree
    self.root = None

    # stopping conditions
    self.min_samples_split = min_samples_split # the minimum num of samples in a leaf to be splited
    self.max_depth = max_depth # the maximum depth of the tree

  def build_tree(self, dataset, curr_depth=0):
    '''recursive function to build the tree'''

    X, Y = dataset[:, :-1], dataset[:, -1]
    num_samples, num_features = np.shape(X)

    # split until stopping conditions are met
    if num_samples >= self.min_samples_split and curr_depth <= self.max_depth:
      # find the best split
      best_split = self.get_best_split(dataset, num_samples, num_features)
      # check if information gain is positive
      if best_split['info_gain'] > 0:
        # recur left
        left_subtree = self.build_tree(best_split['dataset_left'], curr_depth+1)
        # recur right
        right_subtree = self.build_tree(best_split['dataset_right'], curr_depth+1)
        # return decision node
        return Node(best_split['feature_index'], best_split['threshold'], left_subtree, right_subtree, best_split['info_gain'])

    # compute leaf node
    leaf_value = self.calculate_leaf_value(Y)
    # return leaf node
    return Node(value=leaf_value)

  def get_best_split(self, dataset, num_samples, num_features):
    '''function to find the best split'''

    # dictionary to store the best split
    best_split = {}
    max_info_gain = -float('inf')

    # loop over all features
    for feature_index in range(num_features):
      feature_values = dataset[:, feature_index]
      possible_threshold = np.unique(feature_values)
      # loop over all features present in the data
      for threshold in possible_threshold:
        # get current split
        dataset_left, dataset_right = self.split(dataset, feature_index, threshold)
        # check if childs are not null
        if len(dataset_left) > 0 and len(dataset_right) > 0:
          y, left_y, right_y = dataset[:, -1], dataset_left[:, -1], dataset_right[:, -1]
          # compute information gain
          curr_info_gain = self.information_gain(y, left_y, right_y, 'gini')
          # update the best split if needed
          if curr_info_gain > max_info_gain:
            best_split['feature_index'] = feature_index
            best_split['threshold'] = threshold
            best_split['dataset_left'] = dataset_left
            best_split['dataset_right'] = dataset_right
            best_split['info_gain'] = curr_info_gain
            max_info_gain = curr_info_gain

    # return best split
    return best_split

  def split(self, dataset, feature_index, threshold):
    '''function to split data'''
    dataset_left = np.array([row for row in dataset if row[feature_index]<=threshold])
    dataset_right = np.array([row for row in dataset if row[feature_index]>threshold])

    return dataset_left, dataset_right

  def information_gain(self, parent, l_child, r_child, mode='entropy'):
    '''function to compute information gain'''

    weight_l = len(l_child)/len(parent)
    weight_r = len(r_child)/len(parent)

    if mode=='gini':
      gain = self.gini_index(parent) - (weight_l * self.gini_index(l_child) + weight_r * self.gini_index(r_child))
    else:
      gain = self.entropy(parent) - (weight_l * self.entropy(l_child) + weight_r * self.entropy(r_child))

    return gain

  def entropy(self, y):
    '''function to compute entropy'''

    class_labels = np.unique(y)
    entropy = 0

    for cls in class_labels:
      p_cls = len(y[y==cls])/len(y)
      entropy += -p_cls * np.log2(p_cls)

    return entropy

  def gini_index(self, y):
    '''function to compute gini index'''

    class_labels = np.unique(y)
    gini = 0
    for cls in class_labels:
      p_cls = len(y[y==cls])/len(y)
      gini += p_cls ** 2

    return 1 - gini

  def calculate_leaf_value(self, Y):
    '''function to compute leaf node'''

    Y = list(Y)

    return max(Y, key=Y.count)

  def print_tree(self, tree=None, indent=" "):
    '''function to print the tree'''

    if not tree:
      tree = self.root

    if tree.value is not None:
      print(tree.value)
    else:
      print(f'X_{tree.feature_index}<={tree.threshold}? {tree.info_gain}')
      print('%sleft:' % (indent), end='')
      self.print_tree(tree.left, indent+indent)
      print('%sright:' % (indent), end='')
      self.print_tree(tree.right, indent+indent)

  def fit(self, X, Y):
    '''function to train the tree'''

    dataset = np.concatenate((X, Y), axis=1)
    self.root = self.build_tree(dataset)

  def predict(self, X):
    '''function to predict new dataset'''

    predictions = [self.make_prediction(x, self.root) for x in X]

    return predictions

  def make_prediction(self, x, tree):
    '''function to predict a single data point'''

    if tree.value != None:
      return tree.value

    feature_val = x[tree.feature_index]
    if feature_val <= tree.threshold:
      return self.make_prediction(x, tree.left)
    else:
      return self.make_prediction(x, tree.right)

In [7]:
X = data.iloc[:, :-1].values
Y = data.iloc[:, -1].values.reshape(-1, 1)

from sklearn.model_selection import train_test_split
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=.2, random_state=41)

In [8]:
classifier = DecisionTreeClassifier(min_samples_split=3, max_depth=3)
classifier.fit(X_train, Y_train)
classifier.print_tree()

X_2<=1.9? 0.33741385372714494
 left:0.0
 right:X_3<=1.5? 0.427106638180289
  left:X_2<=4.9? 0.05124653739612173
    left:1.0
    right:2.0
  right:X_2<=5.0? 0.019631171921475288
    left:X_1<=2.8? 0.20833333333333334
        left:2.0
        right:1.0
    right:2.0


In [9]:
Y_pred = classifier.predict(X_test)

from sklearn.metrics import accuracy_score
accuracy_score(Y_test, Y_pred)

0.9333333333333333

## Homework

### Question 2

In [10]:
def gini_index(y):
    '''function to compute gini index'''

    class_labels = np.unique(y)
    gini = 0
    for cls in class_labels:
      p_cls = len(y[y==cls])/len(y)
      gini += p_cls ** 2

    return 1 - gini

In [11]:
hihi = pd.DataFrame([[23, 0, 0, 0], [25, 1, 1, 0], [27, 1, 0, 1], [29, 0, 1, 1], [29, 0, 0, 0]], columns=['Age', 'Likes English', 'Likes AI', 'Raise Salary'])

gini_index(np.array(hihi)[:, 1])

0.48

In [12]:
hihi

Unnamed: 0,Age,Likes English,Likes AI,Raise Salary
0,23,0,0,0
1,25,1,1,0
2,27,1,0,1
3,29,0,1,1
4,29,0,0,0


### Question 3

In [13]:
haha = np.array(hihi)

parent = np.array(haha)[:, -1]
haha_l = np.array([row[-1] for row in haha if row[1]==0])
haha_r = np.array([row[-1] for row in haha if row[1]==1])

gini = len(haha_l) / len(parent) * gini_index(haha_l) + len(haha_r) / len(parent) * gini_index(haha_r)
gini

0.4666666666666667

### Question 4

In [14]:
haha = np.array(hihi)

parent = np.array(haha)[:, -1]
haha_l = np.array([row[-1] for row in haha if row[0]<=26])
haha_r = np.array([row[-1] for row in haha if row[0]>26])

gini = len(haha_l) / len(parent) * gini_index(haha_l) + len(haha_r) / len(parent) * gini_index(haha_r)
gini

0.26666666666666666

### Question 5

In [15]:
def entropy(y):
  '''function to compute entropy'''

  class_labels = np.unique(y)
  entropy = 0

  for cls in class_labels:
    p_cls = len(y[y==cls])/len(y)
    entropy += -p_cls * np.log2(p_cls)

  return entropy

In [16]:
entropy(np.array(haha[:, -1]))

0.9709505944546686

### Question 6

In [17]:
haha = np.array(hihi)

parent = np.array(haha)[:, -1]
haha_l = np.array([row[-1] for row in haha if row[1]==0])
haha_r = np.array([row[-1] for row in haha if row[1]==1])

weight_l = len(haha_l) / len(parent)
weight_r = len(haha_r) / len(parent)

gain = 1 - (weight_l * entropy(haha_l) + weight_r * entropy(haha_r))
gain

0.04902249956730631

## Decision Tree Regression from Scratch

### Import tools

In [18]:
import numpy as np
import pandas as pd

### Download data

In [19]:
!gdown 1q5iSqD8Y7Kuoks5T1FxxZW7135zqu9op

Downloading...
From: https://drive.google.com/uc?id=1q5iSqD8Y7Kuoks5T1FxxZW7135zqu9op
To: /content/airfoil_noise_data.csv
  0% 0.00/59.9k [00:00<?, ?B/s]100% 59.9k/59.9k [00:00<00:00, 4.34MB/s]


In [20]:
data = pd.read_csv('airfoil_noise_data.csv')
data.head(5)

Unnamed: 0,x0,x1,x2,x3,x4,y
0,800,0.0,0.3048,71.3,0.002663,126.201
1,1000,0.0,0.3048,71.3,0.002663,125.201
2,1250,0.0,0.3048,71.3,0.002663,125.951
3,1600,0.0,0.3048,71.3,0.002663,127.591
4,2000,0.0,0.3048,71.3,0.002663,127.461


### Node class

In [21]:
class Node():
  def __init__(self, feature_index=None, threshold=None, left=None, right=None, var_red=None, value=None):
    '''constructor'''

    # for decision node
    self.feature_index = feature_index
    self.threshold = threshold
    self.left = left
    self.right = right
    self.var_red = var_red

    # for leaf node
    self.value = value

### Tree class

In [33]:
class DecisionTreeRegressor():
  def __init__(self, min_samples_split=2, max_depth=2):
    '''constructor'''

    # initialize the root of the tree
    self.root = None

    # stopping conditions
    self.min_samples_split = min_samples_split
    self.max_depth = max_depth

  def build_tree(self, dataset, curr_depth=0):
    '''recursive function to build the tree'''

    X, Y = dataset[:, :-1], dataset[:, -1]
    num_samples, num_features = np.shape(X)
    best_split = {}

    # split until stopping conditions are met
    if num_samples >= self.min_samples_split and curr_depth <= self.max_depth:
      # find the best split
      best_split = self.get_best_split(dataset, num_samples, num_features)

      # check if information gain is positive
      if best_split['var_red'] > 0:
        # recur left
        left_subtree = self.build_tree(best_split['dataset_left'], curr_depth+1)

        # recur right
        right_subtree = self.build_tree(best_split['dataset_right'], curr_depth+1)

        # return decision node
        return Node(best_split['feature_index'], best_split['threshold'], left_subtree, right_subtree, best_split['var_red'])

    # compute leaf value
    leaf_value = self.calculate_leaf_value(Y)

    # return leaf node
    return Node(value=leaf_value)

  def get_best_split(self, dataset, num_samples, num_features):
    '''function to find the best split'''

    # dictionary to store the best split
    best_split = {}
    max_var_red = -float('inf')

    # loop over all the features
    for feature_index in range(num_features):
      feature_values = dataset[:, feature_index]
      possible_thresholds = np.unique(feature_values)

      # loop over all the feature values present in the data
      for threshold in possible_thresholds:
        # get current split
        dataset_left, dataset_right = self.split(dataset, feature_index, threshold)

        # check if childs are not null
        if len(dataset_left) > 0 and len(dataset_right) > 0:
          y, left_y, right_y = dataset[:, -1], dataset_left[:, -1], dataset_right[:, -1]

          # compute information gain
          curr_var_red = self.variance_reduction(y, left_y, right_y)

          # update the best split if needed
          if curr_var_red > max_var_red:
            best_split['feature_index'] = feature_index
            best_split['threshold'] = threshold
            best_split['dataset_left'] = dataset_left
            best_split['dataset_right'] = dataset_right
            best_split['var_red'] = curr_var_red
            max_var_red = curr_var_red

    # return the best split
    return best_split

  def split(self, dataset, feature_index, threshold):
    '''function to split the data'''

    dataset_left = np.array([row for row in dataset if row[feature_index]<=threshold])
    dataset_right = np.array([row for row in dataset if row[feature_index]>threshold])

    return dataset_left, dataset_right

  def variance_reduction(self, parent, l_child, r_child):
    '''function to compute variance reduction'''

    weight_l = len(l_child) / len(parent)
    weight_r = len(r_child) / len(parent)

    reduction = np.var(parent) - (weight_l * np.var(l_child) + weight_r * np.var(r_child))

    return reduction

  def calculate_leaf_value(self, Y):
    '''function to compute leaf node'''

    val = np.mean(Y)

    return val

  def print_tree(self, tree=None, indent=" "):
    '''function to print the tree'''

    if not tree:
      tree = self.root

    if tree.value is not None:
      print(tree.value)
    else:
      print(f'X_{tree.feature_index} <= {tree.threshold} ? {tree.var_red}')
      print(f'{indent}left: ', end='')
      self.print_tree(tree.left, indent+indent)
      print(f'{indent}right: ', end='')
      self.print_tree(tree.right, indent+indent)

  def fit(self, X, Y):
    '''function to train the tree'''

    dataset = np.concatenate((X, Y), axis=1)
    self.root = self.build_tree(dataset)

  def make_prediction(self, x, tree):
    '''function to predict a single data point'''

    if tree.value != None:
      return tree.value

    feature_val = x[tree.feature_index]
    if feature_val <= tree.threshold:
      return self.make_prediction(x, tree.left)
    else:
      return self.make_prediction(x, tree.right)

  def predict(self, X):
    '''function to predict a new dataset'''

    predictions = [self.make_prediction(x, self.root) for x in X]

    return predictions

### Train-Test split

In [24]:
X = data.iloc[:, :-1].values
Y = data.iloc[:, -1].values.reshape(-1, 1)

from sklearn.model_selection import train_test_split
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=.2, random_state=41)

### Fit the model

In [34]:
regressor = DecisionTreeRegressor(min_samples_split=3, max_depth=3)
regressor.fit(X_train, Y_train)

regressor.print_tree()

X_0 <= 3150.0 ? 7.132048702017748
 left: X_4 <= 0.0337792 ? 3.590330569067664
  left: X_3 <= 55.5 ? 1.17898999813184
    left: X_4 <= 0.00251435 ? 1.614396721819876
        left: 128.9919833333333
        right: 125.90953579676673
    right: X_1 <= 15.4 ? 2.2342245360792994
        left: 129.39160280373832
        right: 123.80422222222222
  right: X_0 <= 1250.0 ? 9.970884020498868
    left: X_4 <= 0.0483159 ? 6.35527515982486
        left: 124.38024528301887
        right: 118.30039999999998
    right: X_3 <= 39.6 ? 5.036286657241031
        left: 113.58091666666667
        right: 118.07284615384616
 right: X_4 <= 0.00146332 ? 29.08299210506528
  left: X_0 <= 8000.0 ? 11.886497073996964
    left: X_2 <= 0.0508 ? 7.608945827689519
        left: 134.04247500000002
        right: 127.33581818181818
    right: X_4 <= 0.00076193 ? 10.6229193224008
        left: 128.94078571428574
        right: 122.40768750000001
  right: X_4 <= 0.0229028 ? 5.638575922510643
    left: X_0 <= 6300.0 ? 5.985

### Test the model

In [35]:
Y_pred = regressor.predict(X_test)

from sklearn.metrics import mean_squared_error
np.sqrt(mean_squared_error(Y_test, Y_pred))

4.851358097184457

## Homework

### Question 9

In [40]:
def sse(parent, l_child, r_child):
  '''function to compute variance reduction'''

  weight_l = len(l_child) / len(parent)
  weight_r = len(r_child) / len(parent)

  reduction = np.var(l_child)  + np.var(r_child)

  return reduction

In [37]:
hehe = np.array([[23, 0, 0, 200], [25, 1, 1, 400], [27, 1, 0, 300], [29, 0, 1, 500], [29, 0, 0, 400]])
hehe

array([[ 23,   0,   0, 200],
       [ 25,   1,   1, 400],
       [ 27,   1,   0, 300],
       [ 29,   0,   1, 500],
       [ 29,   0,   0, 400]])

In [45]:
hehe_l = np.array([row[-1] for row in hehe if row[0]<=24])
hehe_r = np.array([row[-1] for row in hehe if row[0]>24])

sse(hehe, hehe_l, hehe_r)

5000.0