In [1]:
import numpy as np
import pandas as pd
import sklearn
from sklearn.preprocessing import LabelEncoder

In [2]:
iris_dataset = pd.read_csv('/content/IRIS.csv')
iris_dataset['species'] = LabelEncoder().fit_transform(iris_dataset['species'])
iris_dataset

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,species
0,5.1,3.5,1.4,0.2,0
1,4.9,3.0,1.4,0.2,0
2,4.7,3.2,1.3,0.2,0
3,4.6,3.1,1.5,0.2,0
4,5.0,3.6,1.4,0.2,0
...,...,...,...,...,...
145,6.7,3.0,5.2,2.3,2
146,6.3,2.5,5.0,1.9,2
147,6.5,3.0,5.2,2.0,2
148,6.2,3.4,5.4,2.3,2


In [3]:
class Node():
  def __init__(self, feature_index = None, threshold = None, left = None, right = None, info_gain = None, value = None):
    # for decision nodes (internal nodes)
    self.feature_index = feature_index
    self.threshold = threshold
    self.left = left
    self.right = right
    self.info_gain = info_gain

    # for leaf node
    self.value = value

In [36]:
class DecisionTreeClassifierFromScratch():
  def __init__(self, min_samples_split = 2, max_depth = 2, max_features_per_split = 3, criterion = 'gini'):
    self.root = None
    self.max_features_per_split = max_features_per_split
    self.criterion = criterion

    # set the stopping conditions
    self.min_samples_split = min_samples_split
    self.max_depth = max_depth
    
  def gini_index(self, y):
    class_labels = np.unique(y)
    gini = 0
    for cls in class_labels:
      p_cls = len(y[y == cls])/len(y)
      gini += (p_cls ** 2)
    return 1 - gini

  def entropy(self, y):
    class_labels = np.unique(y)
    entropy = 0 
    for cls in class_labels:
      p_cls = len(y[y == cls])/len(y)
      entropy += -p_cls * np.log2(p_cls)
    return entropy

  def information_gain(self, parent, l_child, r_child, mode):
    ## parent: the list of ground truth labels for all the points in the region of the parent node
    ## l_child : the list of ground truth labels for all the points in the region l_child
    weight_l = len(l_child)/len(parent)
    weight_r = len(r_child)/len(parent)

    if mode == 'gini':
      gain = self.gini_index(parent) - ((weight_l * self.gini_index(l_child)) + (weight_r * self.gini_index(r_child)))
    else:
      gain = self.entropy(parent) - ((weight_l * self.entropy(l_child)) + (weight_r * self.entropy(r_child)))
    return gain

  def split(self, dataset, feature_index, threshold):
    dataset_left = np.array([row for row in dataset if row[feature_index] <= threshold])
    dataset_right = np.array([row for row in dataset if row[feature_index] > threshold])
    return dataset_left, dataset_right

  def get_best_split(self, dataset, num_samples, num_features, max_features_per_split, mode):
    # define a dictionary to store the best split
    best_split = {}
    max_info_gain = -float("inf")

    # loop over a random sample of the features
    random_features_indices = np.random.choice(num_features, size = max_features_per_split, replace = False)
    for feature_index in random_features_indices:
      feature_values = dataset[ :, feature_index]
      possible_thresholds = np.unique(feature_values)

      # loop over all possible split values (thresholds) for the current feature
      for threshold in possible_thresholds:
        dataset_left, dataset_right = self.split(dataset, feature_index, threshold)
        # making sure that the current split has left and right datasets 
        if len(dataset_left) > 0 and len(dataset_right) > 0:
          y_parent, y_left_child, y_right_child = dataset[:, -1], dataset_left[:, -1], dataset_right[:, -1]
          curr_info_gain = self.information_gain(y_parent, y_left_child, y_right_child, mode = mode)
          if curr_info_gain > max_info_gain:
            best_split['feature_index'] = feature_index
            best_split['threshold'] = threshold
            best_split['dataset_left'] = dataset_left
            best_split['dataset_right'] = dataset_right
            best_split['info_gain'] = curr_info_gain
            max_info_gain = curr_info_gain
    return best_split

  def calculate_leaf_value(self, Y):
    Y = list(Y)
    return max(Y, key = Y.count)

  def build_tree(self, dataset, curr_depth = 0):
    # recursive function to build the tree
    X, Y = dataset[:, :-1], dataset[:, -1]
    num_samples, num_features = np.shape(X)
    # we will split until the stopping coditions are met
    if num_samples >= self.min_samples_split and curr_depth <= self.max_depth:
      # getting the best split; it returns a dictionary
      best_split = self.get_best_split(dataset, num_samples, num_features, self.max_features_per_split, self.criterion)
      # check if the information gain after the best split is positive
      if best_split['info_gain'] > 0:
        left_subtree = self.build_tree(best_split['dataset_left'], curr_depth + 1)
        right_subtree = self.build_tree(best_split['dataset_right'], curr_depth + 1)
        return Node(feature_index = best_split['feature_index'], threshold = best_split['threshold'], 
                    left = left_subtree, right = right_subtree, info_gain = best_split['info_gain'], value = None)
        
    # compute the leaf node only if we are out of the stopping condition
    leaf_value = self.calculate_leaf_value(Y) 
    # return leaf node
    return Node(value = leaf_value)

  def fit(self, X, Y):
    dataset = np.concatenate((X, Y), axis = 1)
    self.root = self.build_tree(dataset)

  def make_prediction(self, x, tree):
    if tree.value != None: 
      return tree.value
    feature_val = x[tree.feature_index]
    if feature_val <= tree.threshold:
      return self.make_prediction(x, tree.left)
    else:
      return self.make_prediction(x, tree.right)

  def predict(self, X):
    predictions = [self.make_prediction(x, self.root) for x in X]
    return predictions

  def print_tree(self, tree=None, indent=" "):
        ''' function to print the tree '''
        
        if not tree:
            tree = self.root

        if tree.value is not None:
            print(tree.value)

        else:
            print("X_"+str(tree.feature_index), "<=", tree.threshold, "?", tree.info_gain)
            print("%sleft:" % (indent), end="")
            self.print_tree(tree.left, indent + indent)
            print("%sright:" % (indent), end="")
            self.print_tree(tree.right, indent + indent)


In [5]:
X = iris_dataset.iloc[:, :-1].values
Y = iris_dataset.iloc[:, -1].values.reshape(-1,1)
from sklearn.model_selection import train_test_split
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=.2, random_state=41)

In [42]:
classifier = DecisionTreeClassifierFromScratch(min_samples_split = 3, max_depth = 3, max_features_per_split = 2, criterion= 'gini')
classifier.fit(X_train, Y_train)

In [43]:
Y_pred = classifier.predict(X_test)
from sklearn.metrics import accuracy_score
accuracy_score(Y_test, Y_pred)

0.9333333333333333

In [44]:
classifier.print_tree()

X_2 <= 1.9 ? 0.33741385372714494
 left:0.0
 right:X_3 <= 1.5 ? 0.427106638180289
  left:X_1 <= 2.6 ? 0.0023743569449938967
    left:X_2 <= 4.9 ? 0.13265306122448983
        left:1.0
        right:2.0
    right:1.0
  right:X_3 <= 1.7 ? 0.012784787668159228
    left:X_2 <= 4.5 ? 0.1111111111111111
        left:2.0
        right:1.0
    right:X_2 <= 4.8 ? 0.016158818097876115
        left:2.0
        right:2.0


In [46]:
from sklearn.tree import DecisionTreeClassifier
clf = DecisionTreeClassifier(criterion = 'gini',splitter = 'best', max_depth = 3, min_samples_split = 3, max_features = 2)
clf.fit(X_train, Y_train)
DT_sklearn_preds = clf.predict(X_test)

In [47]:
accuracy_score(Y_test, DT_sklearn_preds)

0.9

### Random Forest

In [48]:
def bootstrap_sample(X, y, bag_size = 100):
  tot_num_of_observations = X.shape[0]
  random_indices = np.random.choice(tot_num_of_observations, size = bag_size, replace = True)
  return X[random_indices], y[random_indices]

from collections import Counter

def most_common_label(y):
  counter = Counter(y)
  most_common = counter.most_common(1)[0][0]
  return most_common

In [50]:
class RandomForest:
  def __init__(self, min_samples_split = 2, max_depth = 2, max_features_per_split = 3, criterion = 'gini', n_trees = 20, sample_size_per_tree = 100):
    self.min_samples_split = min_samples_split
    self.max_depth = max_depth
    self.max_features_per_split = max_features_per_split
    self.criterion = criterion
    self.n_trees = n_trees
    self.sample_size_per_tree = sample_size_per_tree
    # Defining a list of trees
    self.trees = []

  def fit(self, X, y):
    self.trees = []
    for _ in range(self.n_trees):
      tree = DecisionTreeClassifierFromScratch(min_samples_split = self.min_samples_split, max_depth = self.max_depth, 
                                    max_features_per_split = self.max_features_per_split, criterion = self.criterion)
      X_sample, y_sample = bootstrap_sample(X, y, bag_size = self.sample_size_per_tree)
      tree.fit(X_sample, y_sample)
      self.trees.append(tree)

  def predict(self, X):
    tree_preds = np.array([tree.predict(X) for tree in self.trees])
    ## if you have 4 data points for 3 trees, the output will be as follows:
    ## [ T1,  T2,  T3]
    ## [1111 0000 1111]
    ## however, we would like the output to be the predictions for each data-point using the 3 trees
    ## [ p1, p2, p3, p4]
    ## [101 101 101 101]
    tree_preds = np.swapaxes(tree_preds, 0, 1)
    y_pred = [most_common_label(tree_pred) for tree_pred in tree_preds]
    return np.array(y_pred)

In [54]:
RF_classifier = RandomForest(min_samples_split = 3, max_depth = 3, max_features_per_split = 2, criterion= 'gini', n_trees = 50, sample_size_per_tree = 50)

In [55]:
RF_classifier.fit(X_train, Y_train)

In [56]:
Y_preds_RF = RF_classifier.predict(X_test)
accuracy_score(Y_test, Y_preds_RF)

0.8666666666666667