In [1]:
import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
import seaborn as sns

import random
from pprint import pprint

# Descision Tree algorithm

In [2]:
class DescisionTree:
  def __init__(self, min_samples_leaf, min_samples_split, max_depth, criterion, max_features, ml_task):
    self.data = None
    self.X = None
    self.y = None
    self.max_features = max_features
    self.ml_task = ml_task
    self.min_samples_leaf = min_samples_leaf
    self.min_samples_split = min_samples_split
    self.max_depth = max_depth
    self.metric = criterion
    self.feature_importances_ = None
    self.complete_tree = None
    self.n_entries = {}
    self.n_weighted_entries = {}
    self.parent_node = 1  # root node
    self.yes_node = 2     # left node
    self.no_node = 3      # right node
    self.leaf_count = 0
    if ml_task == 'classification': self.classes_and_counts = {};self.leaf_node_class_proba = {}
    else: self.leaf_node_loss = {}

  ''' This method is used to get the collective counts of all classes in target '''
  def get_classes_and_counts(self, data):
    label_column = data[:, -1]
    unique_classes, counts_unique_classes = np.unique(label_column, return_counts=True)
    for i in range(len(unique_classes)):
      self.classes_and_counts[unique_classes[i]] = counts_unique_classes[i]

  ''' This method is used to get the collective probabilities of all classes in target '''
  def get_probability_for_all_classes(self, data):
    label_column = data[:, -1]
    unique_classes_new, counts_unique_classes_new = np.unique(label_column, return_counts=True)
    
    classes_and_counts_new = {}
    for i in list(self.classes_and_counts.keys()):
      if i in list(unique_classes_new):
        classes_and_counts_new[i] = counts_unique_classes_new[list(unique_classes_new).index(i)]
      else:
        classes_and_counts_new[i] = 0
    array = np.array(list(classes_and_counts_new.values())) / sum(classes_and_counts_new.values())

    return [round(i, 5) for i in array]


  ''' This method checks the purity of a target vector '''
  def check_purity(self, data):
      label_column = data[:, -1]
      unique_classes = np.unique(label_column)
      if len(unique_classes) == 1:
          return True
      else:
          return False


  ''' This method performs classification '''
  def create_leaf(self, data, ml_task, current_node):
      self.leaf_count += 1
      label_column = data[:, -1]
      if ml_task == "regression":
          leaf = np.mean(label_column)
          self.leaf_node_loss[current_node] = self.mse(data)
          return str(leaf) + ' Node: '+str(current_node)
      else:
          probabilities = []
          unique_classes, counts_unique_classes = np.unique(label_column, return_counts=True)
          index = counts_unique_classes.argmax()
          leaf = unique_classes[index]
          # probability = counts_unique_classes[index] / sum(counts_unique_classes)
          self.leaf_node_class_proba[current_node] = self.get_probability_for_all_classes(data)
          
          return str(leaf) + ' Node: '+str(current_node)
    
    
  ''' this function generates all possible potential splits for a given training data '''
  def get_potential_splits(self, data, random_subspace):  # randomly selecting certain features
    potential_splits = {}
    _, n_columns = data.shape
    column_indices = list(range(n_columns - 1))    # excluding the last column which is the label
    if random_subspace and random_subspace <= len(column_indices):
        column_indices = random.sample(population=column_indices, k=random_subspace)
    for column_index in column_indices:          
        values = data[:, column_index]
        unique_values = np.unique(values)
        potential_splits[column_index] = unique_values

    return potential_splits
    
    
  ''' This function splits the data into two partitions: Yes and no cases'''
  def split_data(self, data, split_column, split_value):
      split_column_values = data[:, split_column]
      type_of_feature = FEATURE_TYPES[split_column]

      if type_of_feature == "continuous":
          data_below = data[split_column_values <= split_value]
          data_above = data[split_column_values >  split_value]
      else:
          data_below = data[split_column_values == split_value]
          data_above = data[split_column_values != split_value]
      
      return data_below, data_above
    
    
  ''' This method calculates mse loss'''
  def mse(self, data):
      actual_values = data[:, -1]
      if len(actual_values) == 0:   # empty data
          mse = 0
      else:
          prediction = np.mean(actual_values)
          mse = np.mean((actual_values - prediction) **2)
      
      return mse

  ''' This method calculates entropy loss '''
  def entropy(self, data):
      label_column = data[:, -1]
      _, counts = np.unique(label_column, return_counts=True)
      probabilities = counts / counts.sum()
      entropy = sum(probabilities * -np.log2(probabilities))
      
      return entropy


  ''' This method calculates gini impurity'''
  def gini(self, data):
    label_column = data[:, -1]
    _, counts = np.unique(label_column, return_counts=True)
    probabilities = counts / counts.sum()
    gini_index = 0

    for i in probabilities:
      gini_index += i ** 2
    
    return 1 - gini_index


  ''' calculating total/weighed value of the used metric '''
  def calculate_overall_metric(self, data_below, data_above, metric_function):
    n = len(data_below) + len(data_above)
    p_data_below = len(data_below) / n
    p_data_above = len(data_above) / n
    # weighted MSE, RMSE, Gini, and entropy
    overall_metric =  (p_data_below * metric_function(self, data_below) 
                     + p_data_above * metric_function(self, data_above))
    
    return overall_metric


  
  ''' Determining which split is the best by using the metric '''
  def determine_best_split(self, data, potential_splits, ml_task, criterion):
    first_iteration = True
    for column_index in potential_splits:
        for value in potential_splits[column_index]:
            data_below, data_above = self.split_data(data, split_column=column_index, split_value=value)
            
        
            current_overall_metric = self.calculate_overall_metric(data_below, data_above, metric_function=criterion)
            if first_iteration or current_overall_metric <= best_overall_metric:
                first_iteration = False
                best_overall_metric = current_overall_metric
                best_split_column = column_index
                best_split_value = value
    
    return best_split_column, best_split_value


  ''' determining the type of a feature among all features '''
  def determine_type_of_feature(self, df):
    feature_types = []
    n_unique_values_treshold = 10

    for feature in df.columns:
        if feature != "label":
            unique_values = df[feature].unique()
            example_value = unique_values[0]
            if (isinstance(example_value, str)) or (len(unique_values) <= n_unique_values_treshold):
                feature_types.append("categorical")
            else:
                feature_types.append("continuous")
    
    return feature_types
 
    
  ''' THIS IS THE MAIN RECURSIVE ALGORITHM FOR DESCISION TREE'''

  def tree(self, df, ml_task, counter,current_node, min_samples_leaf, min_samples_split,max_depth, criterion, answer, max_features):

    # When the tree starts, the dataframe is converteed to numpy array, the depth of the tree is checked using counter variable and all features types are detected
    if counter == 0:
        global COLUMN_HEADERS, FEATURE_TYPES
        COLUMN_HEADERS = df.columns
        FEATURE_TYPES = self.determine_type_of_feature(df)
        data = df.values
        criterion = getattr(DescisionTree, criterion)
    else:
        data = df      

    # storing the length of data passed into the node
    self.n_entries['Node: '+str(current_node)] = [len(df)]
    # criterion = getattr(DescisionTree, criterion)

    # storing the loss/mse/rmse/gini/entropy in a specific node
    self.n_entries['Node: '+str(current_node)].append(criterion(self, data))
    
    # incrementing yes/left nodes and no/right nodes such that yes will be a even node and no will be a odd node repectively
    if (answer == 'yes answer'):
      self.yes_node += 2
    elif (answer == 'no answer'):
      self.no_node += 2 
      
    # checking if that target of the data passed is either pure, has minimum samples to create a leaf, or the depth of tree has reached its maximum depth
    if (self.check_purity(data)) or (len(data) == min_samples_leaf) or (counter == max_depth):
        leaf = self.create_leaf(data, ml_task, current_node) # creating the leaf
        return leaf 
    # if above requirements to create a leaf are not met, two new nodes will be created recursively respectively.
    else:    
        counter += 1 # when two new nodes are created, the depth of three is also incremented
        
        # if the data is not yet pure, but has not minimum samples to perform the split, a leaf is created
        if (len(data) < min_samples_split):
          leaf = self.create_leaf(data, ml_task, current_node)
          return leaf
        else:
          # getting the all possible splits, determining which split has least loss, and splitting the data into left and right nodes respectively
          potential_splits = self.get_potential_splits(data, max_features)
          split_column, split_value = self.determine_best_split(data, potential_splits, ml_task,criterion)
          data_below, data_above = self.split_data(data, split_column, split_value)
          
          # if the data seperated into left and right nodes, but there is no data, instead of creating a node, a leaft is created
          if len(data_below) == 0 or len(data_above) == 0:
              leaf = self.create_leaf(data, ml_task, current_node)
              return leaf
          
          # finding the type of a selected feature column and its name
          feature_name = COLUMN_HEADERS[split_column]
          type_of_feature = FEATURE_TYPES[split_column]

#           # creating the tree questions
          if type_of_feature == "continuous":
              question = "{} <= {} (Node: {})".format(feature_name, split_value, current_node)
          # feature is categorical
          else:
              question = "{} = {} (Node: {})".format(feature_name, split_value, current_node)

          # instantiate sub-tree
          sub_tree = {question: []}

          # creating left and right nodes recursively
          yes_answer = self.tree(data_below, ml_task, counter, self.yes_node,min_samples_leaf, min_samples_split,max_depth, criterion, 'yes answer', max_features)
          no_answer = self.tree(data_above, ml_task, counter, self.no_node,min_samples_leaf,min_samples_split, max_depth, criterion, 'no answer', max_features)
          
          # if both left and right nodes are same, only taking one value for a leaf node
          if yes_answer == no_answer:
              sub_tree = yes_answer
          else:
            sub_tree[question].append(yes_answer)
            sub_tree[question].append(no_answer)
          
          return sub_tree

    

  ''' making probability of the predictions using this tree '''
  def predict_example_probability(self, example, tree):
    question = list(tree.keys())[0]
    feature_name, comparison_operator, value = question.split(" ")[:3]

    # ask question
    if comparison_operator == "<=":
        if example[feature_name] <= float(value):
            answer = tree[question][0]
        else:
            answer = tree[question][1]
    # feature is categorical
    else:
        if str(example[feature_name]) == value:
            answer = tree[question][0]
        else:
            answer = tree[question][1]
    # base case
    if not isinstance(answer, dict):
        return int(answer.split()[2])
            
    # recursive part
    else:
        residual_tree = answer
        return self.predict_example_probability(example, residual_tree)

    
  ''' fitting our tree with training data '''
  def fit(self, X,y):
    self.X = X.copy()
    self.y = y.copy()
    self.X['label'] = self.y
    self.data = self.X

    if self.ml_task == 'classification':
      self.get_classes_and_counts(self.data.values)
    self.complete_tree = self.tree(self.data, self.ml_task, 0,self.parent_node, self.min_samples_leaf, self.min_samples_split, self.max_depth, self.metric,'parent_node', self.max_features)

    # calculating weighted entries
    for key, value in self.n_entries.items():
      self.n_weighted_entries[key] = [value[0] / len(self.X), value[1]]

    return self.complete_tree

# Gradient Boosting for regression

In [2]:
import warnings
warnings.filterwarnings("ignore")

In [30]:
from tqdm import tqdm

class GradientBoosting:
  def __init__(self,learning_rate = 0.1,subsample = 1,criterion = 'mse', n_estimators=100, ml_task='regression', \
               max_depth=5, max_features= None, min_samples_leaf = 1, min_samples_split = 2):
    self.n_estimators = n_estimators
    self.subsample = subsample
    self.max_features = max_features
    self.ml_task = ml_task
    self.min_samples_leaf = min_samples_leaf
    self.min_samples_split = min_samples_split
    self.max_depth = max_depth
    self.metric = criterion
    self.forest = []
    self.X = None
    self.y = None
    self.each_tree_with_class_probabilities = []
    self.n_classes = None
    self.learning_rate = learning_rate


  ''' Calculating the negative first order derivative of MSE'''
  def negative_first_order_derivative_of_mse(self,y,y_pred):
        return  (y - y_pred)


  ''' Training our model '''
  def fit(self, X, y):
    self.X = X.copy(); self.y = y.copy()
    self.initial_leaf_pred = np.full((X.shape[0]), y.mean())
    
    for i in tqdm(range(self.n_estimators)):
      pseudo_residuals = self.negative_first_order_derivative_of_mse(y, self.initial_leaf_pred)
      self.a = pseudo_residuals
      tree = DescisionTree(min_samples_leaf = self.min_samples_leaf, min_samples_split = self.min_samples_split, max_depth =  self.max_depth, \
                           criterion = self.metric, max_features = self.max_features, ml_task = self.ml_task)
      complete_tree = tree.fit(X, pseudo_residuals)
      self.initial_leaf_pred += self.learning_rate * self.decision_tree_predictions(X, complete_tree)
      self.forest.append(complete_tree)
    
    return 'Training completed'

  ''' Making individual predictions '''
  def predict_example(self, example, tree):
    question = list(tree.keys())[0]
    feature_name, comparison_operator, value = question.split(" ")[:3]

    # ask question
    if comparison_operator == "<=":
        if example[feature_name] <= float(value):
            answer = tree[question][0]
        else:
            answer = tree[question][1]
    # feature is categorical
    else:
        if str(example[feature_name]) == value:
            answer = tree[question][0]
        else:
            answer = tree[question][1]
    # base case
    if not isinstance(answer, dict):
        return float(answer.split()[0])
    
    # recursive part
    else:
        residual_tree = answer
        return self.predict_example(example, residual_tree)



  ''' Making predictions on testing dataframe '''
  def decision_tree_predictions(self, test_df, tree):
      predictions = test_df.apply(self.predict_example, args=(tree,), axis=1)
      return predictions

  
    
  ''' Method for making predictions ''' 
  def predict(self, test_df):
      preds = np.full((test_df.shape[0]), self.y.mean())
      for tree in self.forest:
            preds += self.learning_rate * self.decision_tree_predictions(test_df, tree)
      return preds

In [5]:
df = pd.read_csv('insurance.csv')
df.head()

Unnamed: 0,age,sex,bmi,children,smoker,region,charges
0,19,female,27.9,0,yes,southwest,16884.924
1,18,male,33.77,1,no,southeast,1725.5523
2,28,male,33.0,3,no,southeast,4449.462
3,33,male,22.705,0,no,northwest,21984.47061
4,32,male,28.88,0,no,northwest,3866.8552


In [6]:
from sklearn.preprocessing import LabelEncoder
encoder = LabelEncoder()

In [7]:
df['sex'] = encoder.fit_transform(df['sex'])
df['smoker'] = encoder.fit_transform(df['smoker'])
df['region'] = encoder.fit_transform(df['region'])

In [8]:
X = df[df.columns.to_list()[:-1]]
y = df['charges']
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X,y, random_state = 42)#, test_size = 0.32)

In [31]:
booster = GradientBoosting(n_estimators = 500, max_depth=3, learning_rate = 0.01)
booster.fit(X_train, y_train)

100%|████████████████████████████████████████████████████████████████████████████████| 500/500 [01:13<00:00,  6.84it/s]


'Training completed'

In [32]:
preds = booster.predict(X_train)

In [33]:
from sklearn.metrics import mean_squared_error
print("Training RMSE", np.sqrt(mean_squared_error(y_train, preds)))

Training RMSE 4064.402213347101


In [34]:
print("Testing RMSE", np.sqrt(mean_squared_error(y_test, booster.predict(X_test))))

Testing RMSE 4526.665608674652


In [25]:
from sklearn.metrics import mean_squared_error
print("Training RMSE", np.sqrt(mean_squared_error(y_train, preds)))

Training RMSE 3713.092115610719


In [26]:
print("Testing RMSE", np.sqrt(mean_squared_error(y_test, booster.predict(X_test))))

Testing RMSE 4578.005337462994


# sklearn model

In [27]:
from sklearn.ensemble import GradientBoostingRegressor

In [20]:
booster1 = GradientBoostingRegressor(n_estimators = 500, max_depth=3, learning_rate = 0.01,max_features= None, min_samples_leaf = 1, min_samples_split = 2)
booster1.fit(X_train, y_train)

GradientBoostingRegressor(learning_rate=0.01, n_estimators=500)

In [21]:
print("Training RMSE", np.sqrt(mean_squared_error(y_train, booster1.predict(X_train))))

Training RMSE 4062.3460252481586


In [22]:
print("Testing RMSE", np.sqrt(mean_squared_error(y_test, booster1.predict(X_test))))

Testing RMSE 4509.5479444432785


# Gradient boosting for binary classification

In [2]:
df = pd.read_csv('train.csv')
df["label"] = df.Survived
df = df.drop(["PassengerId", "Survived", "Name", "Ticket", "Cabin"], axis=1)

# handling missing values
median_age = df.Age.median()
mode_embarked = df.Embarked.mode()[0]

df = df.fillna({"Age": median_age, "Embarked": mode_embarked})
df.Sex = df.Sex.replace({'male':0,'female':1})
df.Embarked = df.Embarked.replace({'S':0,'C':1,'Q':2})

In [3]:
X = df[df.columns.to_list()[:-1]]
y = df['label']

from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X,y,test_size = 0.25, random_state = 4221314)

In [4]:
class DescisionTree:
  def __init__(self, min_samples_leaf, min_samples_split, max_depth, criterion, max_features, ml_task, previous_probability_of_residuals):
    self.data = None
    self.X = None
    self.y = None
    self.max_features = max_features
    self.ml_task = ml_task
    self.min_samples_leaf = min_samples_leaf
    self.min_samples_split = min_samples_split
    self.max_depth = max_depth
    self.metric = criterion
    self.feature_importances_ = None
    self.complete_tree = None
    self.n_entries = {}
    self.n_weighted_entries = {}
    self.parent_node = 1  # root node
    self.yes_node = 2     # left node
    self.no_node = 3      # right node
    self.leaf_count = 0
    if ml_task == 'classification': self.classes_and_counts = {};self.leaf_node_class_proba = {}
    else: self.leaf_node_loss = {}
    self.previous_probability_of_residuals = previous_probability_of_residuals

  ''' This method is used to get the collective counts of all classes in target '''
  def get_classes_and_counts(self, data):
    label_column = data[:, -1]
    unique_classes, counts_unique_classes = np.unique(label_column, return_counts=True)
    for i in range(len(unique_classes)):
      self.classes_and_counts[unique_classes[i]] = counts_unique_classes[i]

  ''' This method is used to get the collective probabilities of all classes in target '''
  def get_probability_for_all_classes(self, data):
    label_column = data[:, -1]
    unique_classes_new, counts_unique_classes_new = np.unique(label_column, return_counts=True)
    
    classes_and_counts_new = {}
    for i in list(self.classes_and_counts.keys()):
      if i in list(unique_classes_new):
        classes_and_counts_new[i] = counts_unique_classes_new[list(unique_classes_new).index(i)]
      else:
        classes_and_counts_new[i] = 0
    array = np.array(list(classes_and_counts_new.values())) / sum(classes_and_counts_new.values())

    return [round(i, 5) for i in array]


  ''' This method checks the purity of a target vector '''
  def check_purity(self, data):
      label_column = data[:, -1]
      unique_classes = np.unique(label_column)
      if len(unique_classes) == 1:
          return True
      else:
          return False


  ''' This method performs classification '''
  def create_leaf(self, data, ml_task, current_node):
      self.leaf_count += 1
      label_column = data[:, -1]
      # converting the residual derived from probability to log(odds)   
      numerator = np.sum(label_column)
      denominator = 0#[probability * (1 - probability)]
      for x in label_column:
        prob = self.previous_probability_of_residuals[x]
        denominator += prob * (1 - prob)
        
      leaf = numerator / denominator
      return str(leaf) + ' Node: '+str(current_node)
    
    
  ''' this function generates all possible potential splits for a given training data '''
  def get_potential_splits(self, data, random_subspace):  # randomly selecting certain features
    potential_splits = {}
    _, n_columns = data.shape
    column_indices = list(range(n_columns - 1))    # excluding the last column which is the label
    if random_subspace and random_subspace <= len(column_indices):
        column_indices = random.sample(population=column_indices, k=random_subspace)
    for column_index in column_indices:          
        values = data[:, column_index]
        unique_values = np.unique(values)
        potential_splits[column_index] = unique_values

    return potential_splits
    
    
  ''' This function splits the data into two partitions: Yes and no cases'''
  def split_data(self, data, split_column, split_value):
      split_column_values = data[:, split_column]
      type_of_feature = FEATURE_TYPES[split_column]

      if type_of_feature == "continuous":
          data_below = data[split_column_values <= split_value]
          data_above = data[split_column_values >  split_value]
      else:
          data_below = data[split_column_values == split_value]
          data_above = data[split_column_values != split_value]
      
      return data_below, data_above
    
    
  ''' This method calculates mse loss'''
  def mse(self, data):
      actual_values = data[:, -1]
      if len(actual_values) == 0:   # empty data
          mse = 0
      else:
          prediction = np.mean(actual_values)
          mse = np.mean((actual_values - prediction) **2)
      
      return mse

  ''' This method calculates entropy loss '''
  def entropy(self, data):
      label_column = data[:, -1]
      _, counts = np.unique(label_column, return_counts=True)
      probabilities = counts / counts.sum()
      entropy = sum(probabilities * -np.log2(probabilities))
      
      return entropy


  ''' This method calculates gini impurity'''
  def gini(self, data):
    label_column = data[:, -1]
    _, counts = np.unique(label_column, return_counts=True)
    probabilities = counts / counts.sum()
    gini_index = 0

    for i in probabilities:
      gini_index += i ** 2
    
    return 1 - gini_index


  ''' calculating total/weighed value of the used metric '''
  def calculate_overall_metric(self, data_below, data_above, metric_function):
    n = len(data_below) + len(data_above)
    p_data_below = len(data_below) / n
    p_data_above = len(data_above) / n
    # weighted MSE, RMSE, Gini, and entropy
    overall_metric =  (p_data_below * metric_function(self, data_below) 
                     + p_data_above * metric_function(self, data_above))
    
    return overall_metric


  
  ''' Determining which split is the best by using the metric '''
  def determine_best_split(self, data, potential_splits, ml_task, criterion):
    first_iteration = True
    for column_index in potential_splits:
        for value in potential_splits[column_index]:
            data_below, data_above = self.split_data(data, split_column=column_index, split_value=value)
            
        
            current_overall_metric = self.calculate_overall_metric(data_below, data_above, metric_function=criterion)
            if first_iteration or current_overall_metric <= best_overall_metric:
                first_iteration = False
                best_overall_metric = current_overall_metric
                best_split_column = column_index
                best_split_value = value
    
    return best_split_column, best_split_value


  ''' determining the type of a feature among all features '''
  def determine_type_of_feature(self, df):
    feature_types = []
    n_unique_values_treshold = 10

    for feature in df.columns:
        if feature != "label":
            unique_values = df[feature].unique()
            example_value = unique_values[0]
            if (isinstance(example_value, str)) or (len(unique_values) <= n_unique_values_treshold):
                feature_types.append("categorical")
            else:
                feature_types.append("continuous")
    
    return feature_types
 
    
  ''' THIS IS THE MAIN RECURSIVE ALGORITHM FOR DESCISION TREE'''

  def tree(self, df, ml_task, counter,current_node, min_samples_leaf, min_samples_split,max_depth, criterion, answer, max_features):

    # When the tree starts, the dataframe is converteed to numpy array, the depth of the tree is checked using counter variable and all features types are detected
    if counter == 0:
        global COLUMN_HEADERS, FEATURE_TYPES
        COLUMN_HEADERS = df.columns
        FEATURE_TYPES = self.determine_type_of_feature(df)
        data = df.values
        criterion = getattr(DescisionTree, criterion)
    else:
        data = df      

    # storing the length of data passed into the node
    self.n_entries['Node: '+str(current_node)] = [len(df)]
    # criterion = getattr(DescisionTree, criterion)

    # storing the loss/mse/rmse/gini/entropy in a specific node
    self.n_entries['Node: '+str(current_node)].append(criterion(self, data))
    
    # incrementing yes/left nodes and no/right nodes such that yes will be a even node and no will be a odd node repectively
    if (answer == 'yes answer'):
      self.yes_node += 2
    elif (answer == 'no answer'):
      self.no_node += 2 
      
    # checking if that target of the data passed is either pure, has minimum samples to create a leaf, or the depth of tree has reached its maximum depth
    if (self.check_purity(data)) or (len(data) == min_samples_leaf) or (counter == max_depth):
        leaf = self.create_leaf(data, ml_task, current_node) # creating the leaf
        return leaf 
    # if above requirements to create a leaf are not met, two new nodes will be created recursively respectively.
    else:    
        counter += 1 # when two new nodes are created, the depth of three is also incremented
        
        # if the data is not yet pure, but has not minimum samples to perform the split, a leaf is created
        if (len(data) < min_samples_split):
          leaf = self.create_leaf(data, ml_task, current_node)
          return leaf
        else:
          # getting the all possible splits, determining which split has least loss, and splitting the data into left and right nodes respectively
          potential_splits = self.get_potential_splits(data, max_features)
          split_column, split_value = self.determine_best_split(data, potential_splits, ml_task,criterion)
          data_below, data_above = self.split_data(data, split_column, split_value)
          
          # if the data seperated into left and right nodes, but there is no data, instead of creating a node, a leaft is created
          if len(data_below) == 0 or len(data_above) == 0:
              leaf = self.create_leaf(data, ml_task, current_node)
              return leaf
          
          # finding the type of a selected feature column and its name
          feature_name = COLUMN_HEADERS[split_column]
          type_of_feature = FEATURE_TYPES[split_column]

#           # creating the tree questions
          if type_of_feature == "continuous":
              question = "{} <= {} (Node: {})".format(feature_name, split_value, current_node)
          # feature is categorical
          else:
              question = "{} = {} (Node: {})".format(feature_name, split_value, current_node)

          # instantiate sub-tree
          sub_tree = {question: []}

          # creating left and right nodes recursively
          yes_answer = self.tree(data_below, ml_task, counter, self.yes_node,min_samples_leaf, min_samples_split,max_depth, criterion, 'yes answer', max_features)
          no_answer = self.tree(data_above, ml_task, counter, self.no_node,min_samples_leaf,min_samples_split, max_depth, criterion, 'no answer', max_features)
          
          # if both left and right nodes are same, only taking one value for a leaf node
          if yes_answer == no_answer:
              sub_tree = yes_answer
          else:
            sub_tree[question].append(yes_answer)
            sub_tree[question].append(no_answer)
          
          return sub_tree

    

  ''' making probability of the predictions using this tree '''
  def predict_example_probability(self, example, tree):
    question = list(tree.keys())[0]
    feature_name, comparison_operator, value = question.split(" ")[:3]

    # ask question
    if comparison_operator == "<=":
        if example[feature_name] <= float(value):
            answer = tree[question][0]
        else:
            answer = tree[question][1]
    # feature is categorical
    else:
        if str(example[feature_name]) == value:
            answer = tree[question][0]
        else:
            answer = tree[question][1]
    # base case
    if not isinstance(answer, dict):
        return int(answer.split()[2])
            
    # recursive part
    else:
        residual_tree = answer
        return self.predict_example_probability(example, residual_tree)

    
  ''' fitting our tree with training data '''
  def fit(self, X,y):
    self.X = X.copy()
    self.y = y.copy()
    self.X['label'] = self.y
    self.data = self.X

    if self.ml_task == 'classification':
      self.get_classes_and_counts(self.data.values)
    self.complete_tree = self.tree(self.data, self.ml_task, 0,self.parent_node, self.min_samples_leaf, self.min_samples_split, self.max_depth, self.metric,'parent_node', self.max_features)

    # calculating weighted entries
    for key, value in self.n_entries.items():
      self.n_weighted_entries[key] = [value[0] / len(self.X), value[1]]

    return self.complete_tree

In [5]:
from tqdm import tqdm

class GradientBoosting:
  def __init__(self,learning_rate = 0.1,subsample = 1,criterion = 'mse', n_estimators=100, ml_task='regression', \
               max_depth=5, max_features= None, min_samples_leaf = 1, min_samples_split = 2):
    self.n_estimators = n_estimators
    self.subsample = subsample
    self.max_features = max_features
    self.ml_task = ml_task
    self.min_samples_leaf = min_samples_leaf
    self.min_samples_split = min_samples_split
    self.max_depth = max_depth
    self.metric = criterion
    self.forest = []
    self.X = None
    self.y = None
    self.each_tree_with_class_probabilities = []
    self.n_classes = None
    self.learning_rate = learning_rate
#     self.previous_probability_of_residuals = {}


  def sigmoid(self, x):
    return 1 / (1 + np.exp(-x))

  def negativeDerivitiveLogloss(self, y, log_odds):
    p = self.sigmoid(log_odds)
    return (y - p)

  def log_odds(self, y):
    positive_count = np.count_nonzero(df.label.values == 1)
    negative_count = np.count_nonzero(df.label.values == 0)
    log_odds_value = np.log(positive_count / negative_count)

    return np.full((y.shape[0]), log_odds_value)

  ''' Training our model '''
  def fit(self, X, y):
    self.X = X.copy(); self.y = y.copy()
    self.initial_leaf_pred = self.log_odds(y.values)
    self.previous_probability_of_residuals = []
    self.unique_residuals = []
    
    for i in tqdm(range(self.n_estimators)):
      pseudo_residuals = self.negativeDerivitiveLogloss(y, self.initial_leaf_pred)
      
      previous_probability_of_residuals = {}
      previous_probability = [self.sigmoid(j) for j in self.initial_leaf_pred]
      for key, value in zip(pseudo_residuals, previous_probability):
          previous_probability_of_residuals[key] = value
    
      
      tree = DescisionTree(min_samples_leaf = self.min_samples_leaf, min_samples_split = self.min_samples_split, max_depth =  self.max_depth, \
                           criterion = self.metric, max_features = self.max_features, ml_task = self.ml_task, \
                          previous_probability_of_residuals = previous_probability_of_residuals)
      complete_tree = tree.fit(X, pseudo_residuals)
      self.initial_leaf_pred += self.learning_rate * self.decision_tree_predictions(X, complete_tree)
      self.forest.append(complete_tree)
    
    return 'Training completed'

  ''' Making individual predictions '''
  def predict_example(self, example, tree):
    question = list(tree.keys())[0]
    feature_name, comparison_operator, value = question.split(" ")[:3]

    # ask question
    if comparison_operator == "<=":
        if example[feature_name] <= float(value):
            answer = tree[question][0]
        else:
            answer = tree[question][1]
    # feature is categorical
    else:
        if str(example[feature_name]) == value:
            answer = tree[question][0]
        else:
            answer = tree[question][1]
    # base case
    if not isinstance(answer, dict):
        return float(answer.split()[0])
    
    # recursive part
    else:
        residual_tree = answer
        return self.predict_example(example, residual_tree)



  ''' Making predictions on testing dataframe '''
  def decision_tree_predictions(self, test_df, tree):
      predictions = test_df.apply(self.predict_example, args=(tree,), axis=1)
      return predictions

  def get_clean_output(self, preds):
    return [1 if (i >= 0.5) else 0 for i in preds]
    
  ''' Method for making predictions ''' 
  def predict(self, test_df):
      preds = np.full((test_df.shape[0]), self.log_odds(self.y.values)[0])
      for tree in self.forest:
            preds += self.learning_rate * self.decision_tree_predictions(test_df, tree)
            
      preds = [self.sigmoid(i) for i in preds]
#       return preds
      return self.get_clean_output(preds)

In [6]:
booster1 = GradientBoosting(n_estimators = 5, max_depth=3, learning_rate = 0.1,max_features= None, min_samples_leaf = 1, min_samples_split = 2)
booster1.fit(X_train, y_train)

100%|████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:00<00:00, 11.31it/s]


'Training completed'

In [7]:
from sklearn.metrics import accuracy_score

In [8]:
accuracy_score(y_train, booster1.predict(X_train))

0.8083832335329342

In [9]:
accuracy_score(y_test, booster1.predict(X_test))

0.7757847533632287

# Sklearn classifier

In [10]:
from sklearn.ensemble import GradientBoostingClassifier
# booster1 = GradientBoostingClassifier(n_estimators = 100, max_depth=3, learning_rate = 0.1).fit(X_train, y_train)
booster2 = GradientBoostingClassifier(n_estimators = 5, max_depth=3, learning_rate = 0.1,\
                                      max_features= None, min_samples_leaf = 1, min_samples_split = 2).fit(X_train, y_train)

print(accuracy_score(y_train, booster2.predict(X_train)))
print(accuracy_score(y_test, booster2.predict(X_test)))

0.8188622754491018
0.7757847533632287


# Final check

In [11]:
df = pd.read_csv('pima-indians-diabetes.csv', names = ['a','b','c','d','e','f','g','h','label'])
X = df[df.columns.to_list()[:-1]]
y = df['label']

from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X,y,test_size = 0.25, random_state = 4221314)

In [12]:
booster1 = GradientBoosting(n_estimators = 5, max_depth=3, learning_rate = 0.1,max_features= None, min_samples_leaf = 1, min_samples_split = 2)
booster1.fit(X_train, y_train)
print(accuracy_score(y_train, booster1.predict(X_train)))
print(accuracy_score(y_test, booster1.predict(X_test)))

100%|████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00,  4.32it/s]

0.7309027777777778
0.75





In [13]:
from sklearn.ensemble import GradientBoostingClassifier
# booster1 = GradientBoostingClassifier(n_estimators = 100, max_depth=3, learning_rate = 0.1).fit(X_train, y_train)
booster2 = GradientBoostingClassifier(n_estimators = 5, max_depth=3, learning_rate = 0.1,\
                                      max_features= None, min_samples_leaf = 1, min_samples_split = 2).fit(X_train, y_train)

print(accuracy_score(y_train, booster2.predict(X_train)))
print(accuracy_score(y_test, booster2.predict(X_test)))

0.7309027777777778
0.75
