In [4]:
import numpy as np
import pandas as pd

In [5]:
from sklearn.datasets import make_regression

X, y = make_regression(n_samples=1000, n_features=14, n_informative=10, noise=15, random_state=42)
X = pd.DataFrame(X)
y = pd.Series(y)
X.columns = [f'col_{col}' for col in X.columns]

In [6]:
from sklearn.datasets import make_classification

X, y = make_classification(n_samples=1000, n_features=14, n_informative=10, random_state=42)
X = pd.DataFrame(X)
y = pd.Series(y)
X.columns = [f'col_{col}' for col in X.columns]
df = pd.read_csv('./data/data_banknote_authentication.txt', header=None)
df.columns = ['variance', 'skewness', 'curtosis', 'entropy', 'target']
X, y = df.iloc[:,:4], df['target']

In [7]:
class Node:
    def __init__(self, value, left=None, right=None):
        self.value = value
        self.left = left
        self.right = right

    def __str__(self):
        return f"Node: ({self.value}), ({self.right}), ({self.left})"

In [94]:
class MyTreeClf:
    def __init__(self, max_depth=5, min_samples_split=2, max_leafs=20, bins=None, criterion='entropy'):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.max_leafs = max_leafs

        self.leafs_cnt = 0
        self.root = None
        self.bins = bins
        self.splitters = None
        self.criterion = criterion

        self.fi = {}


    def __repr__(self):
        return f"MyTreeClf class: max_depth={self.max_depth}, min_samples_split={self.min_samples_split}, max_leafs={self.max_leafs}"

    def _get_splitters(self, x):
        values = np.array(sorted(x.unique()))
        splitters = np.array([(values[i] + values[i+1])/2 for i in range(len(values)-1)])
        return splitters
    
    def _get_splitters_wbins(self, X, y):
        self.splitters = {}
        if self.bins:
            native_splitters = {col: self._get_splitters(X[col]) for col in X.columns}
            for col, splitters in native_splitters.items():
                if len(splitters) <= self.bins - 1:
                    self.splitters[col] = splitters
                else:
                    hist = np.histogram(X[col], bins=self.bins)[1][1:-1]
                    self.splitters[col] = hist

    def _entropy(self, col_targ):
        p0 = (col_targ.iloc[:, 1] == 0).sum() / (col_targ.shape[0] + 1e-15)
        p1 = col_targ.iloc[:, 1].sum() / (col_targ.shape[0] + 1e-15)
        S = -p0*np.log2(p0+1e-15) - p1*np.log2(p1+1e-15)
        return S

    def _gini(self, col_targ):
        p0 = (col_targ.iloc[:, 1] == 0).sum() / (col_targ.shape[0] + 1e-15)
        p1 = col_targ.iloc[:, 1].sum() / (col_targ.shape[0] + 1e-15)
        G = 1 - p0**2 - p1**2
        return G

    def _get_ig(self, x, y, split):
        col_targ = pd.concat([x, y], axis=1)

        left_sub = col_targ.loc[col_targ[x.name] <= split, :]
        right_sub = col_targ.loc[col_targ[x.name] > split, :]

        if self.criterion == 'gini':
            Gp = self._gini(col_targ)
            Gl, Gr = self._gini(left_sub), self._gini(right_sub)
            IG = Gp - left_sub.shape[0]/(col_targ.shape[0] + 1e-15)*Gl - right_sub.shape[0]/(col_targ.shape[0] + 1e-15)*Gr
        else:
            S0 = self._entropy(col_targ)
            S1, S2 = self._entropy(left_sub), self._entropy(right_sub)
            IG = S0 - left_sub.shape[0]/(col_targ.shape[0] + 1e-15)*S1 - right_sub.shape[0]/(col_targ.shape[0] + 1e-15)*S2
        return IG
        

    def _get_best_split(self, X, y):
        cols = X.columns
        if self.bins is None:
            splitters = {col: self._get_splitters(X[col]) for col in cols}
        else:
            splitters = self.splitters
        best_col = None
        best_split = None
        best_ig = 0
        for col, splits in splitters.items():
            x = X[col]
            igs = np.array([self._get_ig(x, y, split) for split in splits])
            max_idx = igs.argmax()
            max_ig, max_split = igs[max_idx], splits[max_idx]
            if max_ig > best_ig:
                best_col = col
                best_split = max_split
                best_ig = max_ig
        
        return best_col, best_split, best_ig

    def is_leaf(self, data, depth):
        return (all(data.iloc[:, -1] == 1)) or\
               (all(data.iloc[:, -1] == 0)) or\
               (depth >= self.max_depth-1) or\
               (data.shape[0] < self.min_samples_split-1) or\
               (self.leafs_cnt >= self.max_leafs-1)

    def _get_I(self, data):
        if self.criterion == 'gini':
            return self._gini(data)
        return self._entropy(data)

    
    def _fit(self, X, y, depth=0):
        best_col, best_split, best_ig = self._get_best_split(X, y)
        root = Node((best_col, best_split))
        #print(best_col, best_split, best_ig)
        col_targ = pd.concat([X, y], axis=1)


        if (best_col is None):
            value = col_targ.iloc[:, -1].sum() / col_targ.shape[0]
            self.leafs_cnt += 1
            return Node(('leaf', value))

        left_sub = col_targ.loc[col_targ[best_col] <= best_split, :]
        right_sub = col_targ.loc[col_targ[best_col] > best_split, :]

        Np, Nl, Nr = (X.shape[0] + 1e-15), left_sub.shape[0], right_sub.shape[0]
        col_for_I = [list(col_targ.columns).index(best_col), -1]
        Ip, Il, Ir = self._get_I(col_targ.iloc[:, col_for_I]), self._get_I(left_sub.iloc[:, col_for_I]), self._get_I(right_sub.iloc[:, col_for_I])
        FI = Np*(Ip - Nl/Np*Il - Nr/Np*Ir)
        self.fi[best_col] += FI

        if self.is_leaf(left_sub, depth):
            value = left_sub.iloc[:, -1].sum() / (left_sub.shape[0] + 1e-15)
            root.left = Node(('left', value))
            self.leafs_cnt += 1
        else:
            X, y = left_sub.drop(left_sub.columns[-1], axis=1), left_sub.iloc[:, -1]
            root.left = self._fit(X, y, depth+1)

        if self.is_leaf(right_sub, depth):
            value = right_sub.iloc[:, -1].sum() / (right_sub.shape[0] + 1e-15)
            root.right = Node(('right', value))
            self.leafs_cnt += 1
        else:
            X, y = right_sub.drop(right_sub.columns[-1], axis=1), right_sub.iloc[:, -1]
            root.right = self._fit(X, y, depth+1)
        return root

    def fit(self, X, y):
        self._get_splitters_wbins(X, y)
        self.fi = {col: 0 for col in X.columns}
        self.root = self._fit(X, y)
        self.fi = {col: self.fi[col]/X.shape[0] for col in X.columns}

    
    def _predict_proba(self, x, root):
        if (root.right is None and root.left is None):
            return root.value[1]
        
        feat, split = root.value
        if x[feat] <= split:
            return self._predict_proba(x, root.left)
        else:
            return self._predict_proba(x, root.right)

    def predict_proba(self, X):
        y_pred_logits = np.array([self._predict_proba(X.iloc[i, :], self.root) for i in range(X.shape[0])])
        return y_pred_logits

    def predict(self, X):
        y_pred = (self.predict_proba(X) > 0.5).astype(int)
        return y_pred


    def _print_tree(self, root, intend):
        if (root is None):
            return None
        
        feat, split = root.value
        if (root.left is None and root.right is None):
            print('  '*intend, end='')
            print(f"{feat} = {split}")
        else:
            print('  '*intend, end='')
            print(f"{feat} > {split}")

        self._print_tree(root.left, intend+1)
        self._print_tree(root.right, intend+1)

    def print_tree(self):
        self._print_tree(self.root, 0)

    def _sum_leafs(self, root):
        if (root is None):
            return 0
        if (root.left is None and root.right is None):
            return root.value[1]
        return self._sum_leafs(root.left) + self._sum_leafs(root.right)
    
    def sum_leafs(self):
        return self._sum_leafs(self.root)



In [95]:
from sklearn.datasets import make_classification

X, y = make_classification(n_samples=1000, n_features=14, n_informative=10, random_state=42)
X = pd.DataFrame(X)
y = pd.Series(y)
X.columns = [f'col_{col}' for col in X.columns]
df = pd.read_csv('./data/data_banknote_authentication.txt', header=None)
df.columns = ['variance', 'skewness', 'curtosis', 'entropy', 'target']
X, y = df.iloc[:,:4], df['target']

In [96]:
col_targ = pd.concat([X, y], axis=1)
col_targ.loc[:, y.name]

0       0
1       0
2       0
3       0
4       0
       ..
1367    1
1368    1
1369    1
1370    1
1371    1
Name: target, Length: 1372, dtype: int64

In [97]:
list(col_targ.columns).index('variance')

0

In [98]:
my_tree = MyTreeClf(15, 20, 30, 6)
my_tree.fit(X, y)
print(my_tree.leafs_cnt)
print(round(my_tree.sum_leafs(), 6))
my_tree.print_tree()
print(my_tree.fi)

26
12.025427
variance > -0.10864999999999903
  skewness > 8.497483333333335
    variance > -2.4197999999999995
      curtosis > 6.32065
        left = 1.0
        skewness > -0.41075000000000017
          left = 1.0
          right = 0.0
      skewness > 4.043366666666667
        curtosis > 6.32065
          curtosis > 2.4517333333333333
            left = 1.0
            skewness > -0.41075000000000017
              left = 1.0
              right = 0.33333333333333326
          skewness > -4.864866666666666
            entropy > -1.2163999999999993
              left = 0.7999999999999998
              right = 1.0
            skewness > -0.41075000000000017
              left = 0.24999999999999994
              right = 0.0
        right = 0.0
    variance > -4.73095
      left = 0.9999999999999999
      right = 0.0
  variance > 2.2025000000000006
    curtosis > -1.4171833333333335
      skewness > 8.497483333333335
        skewness > 4.043366666666667
          left = 1.0
          ent

In [99]:
y_pred_logits = my_tree.predict_proba(X)
y_pred = my_tree.predict(X)
(y_pred == y).sum() / len(y)

0.9803206997084548

In [38]:
class MyTreeReg:
    def __init__(self, max_depth=5, min_samples_split=2, max_leafs=20):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.max_leafs = max_leafs

        self.leafs_cnt = 0
        self.root = None

    def __repr__(self):
        return f"MyTreeReg class: max_depth={self.max_depth}, min_samples_split={self.min_samples_split}, max_leafs={self.max_leafs}"


    def _get_splitters(self, x):
        values = np.array(sorted(x.unique()))
        splitters = [(values[i] + values[i+1]) / 2 for i in range(len(values)-1)]
        return np.array(splitters)

    def _get_mse(self, col_targ):
        y = col_targ.iloc[:, -1]
        return np.sum((y - y.mean())**2) / (len(y) + 1e-15)

    def _get_ig(self, x, y, split):
        col_targ = pd.concat([x, y], axis=1)

        left_sub = col_targ.loc[col_targ[x.name] <= split, :]
        right_sub = col_targ.loc[col_targ[x.name] > split, :]

        Np, Nl, Nr = col_targ.shape[0], left_sub.shape[0], right_sub.shape[0]
        MSEp, MSEl, MSEr = self._get_mse(col_targ), self._get_mse(left_sub), self._get_mse(right_sub)
        IG = MSEp - (Nl/Np*MSEl + Nr/Np*MSEr)
        return IG

    def _get_best_split(self, X, y):
        splitters = {col: self._get_splitters(X[col]) for col in X.columns}

        best_col = None
        best_split = None
        best_ig = 0
        for col, splits in splitters.items():
            igs = np.array([self._get_ig(X[col], y, split) for split in splits])
            if not len(igs): continue
            max_ig_idx = igs.argmax()
            max_ig = igs[max_ig_idx]
            max_split = splits[max_ig_idx]
            if max_ig > best_ig:
                best_col = col
                best_split = max_split
                best_ig = max_ig
        return best_col, best_split, best_ig

    def is_leaf(self, data, depth):
        return (data.iloc[:, -1].unique().shape[0] <= 1) or\
               (depth >= self.max_depth) or\
               (data.shape[0] <= self.min_samples_split-1) or\
               (self.leafs_cnt >= self.max_leafs-2)
    
    def _fit(self, X, y, depth=0):
        best_col, best_split, best_if = self._get_best_split(X, y)
        root = Node((best_col, best_split))

        col_targ = pd.concat([X, y], axis=1)

        left_sub = col_targ.loc[col_targ[best_col] <= best_split, :]
        right_sub = col_targ.loc[col_targ[best_col] > best_split, :]

        if self.is_leaf(left_sub, depth):
            value = left_sub.iloc[:, -1].mean()
            root.left = Node(('left', value))
            self.leafs_cnt += 1
        else:
            X, y = left_sub.drop(left_sub.columns[-1], axis=1), left_sub.iloc[:, -1] 
            root.left = self._fit(X, y, depth+1)

        if self.is_leaf(right_sub, depth):
            value = right_sub.iloc[:, -1].mean()
            root.right = Node(('right', value))
            self.leafs_cnt += 1
        else:
            X, y = right_sub.drop(right_sub.columns[-1], axis=1), right_sub.iloc[:, -1]
            root.right = self._fit(X, y, depth+1)

        return root
            
    def fit(self, X, y):
        self.root = self._fit(X, y)


    def _predict(self, x, root):
        if root.left is None and root.right is None:
            return root.value[1]
        
        feat, split = root.value
        if x[feat] <= split:
            return self._predict(x, root.left)
        else:
            return self._predict(x, root.right)

    def predict(self, X):
        y_pred = [self._predict(X.iloc[i, :], self.root) for i in range(X.shape[0])]
        return np.array(y_pred)

    
    def _print_tree(self, root, intend):
        if (root is None):
            return None
        
        feat, split = root.value
        if (root.left is None and root.right is None):
            print('  '*intend, end='')
            print(f"{feat} = {split}")
        else:
            print('  '*intend, end='')
            print(f"{feat} > {split}")

        self._print_tree(root.left, intend+1)
        self._print_tree(root.right, intend+1)

    def print_tree(self):
        self._print_tree(self.root, 0)

    def _sum_leafs(self, root):
        if (root is None):
            return 0
        if (root.left is None and root.right is None):
            return root.value[1]
        return self._sum_leafs(root.left) + self._sum_leafs(root.right)
    
    def sum_leafs(self):
        return self._sum_leafs(self.root)



In [39]:
from sklearn.datasets import load_diabetes

data = load_diabetes(as_frame=True)
X, y = data['data'], data['target']

In [41]:
my_tree_reg = MyTreeReg(15, 35, 30)
my_tree_reg.fit(X, y)
print(my_tree_reg.leafs_cnt)
print(my_tree_reg.sum_leafs())
my_tree_reg.print_tree()

27
4352.894212698733
s5 > -0.0037611760063045703
  bmi > 0.0061888847138220964
    s3 > 0.02102781591949656
      s1 > 0.06310082451524143
        sex > 0.003019241116414738
          bp > -0.03493465558747611
            left = 100.42857142857143
            right = 140.33333333333334
          s6 > -0.02800139244688993
            left = 74.22222222222223
            right = 107.57142857142857
        right = 241.5
      sex > 0.003019241116414738
        bmi > -0.06279108801454907
          left = 68.57142857142857
          bp > 0.025315236489885963
            bmi > -0.013211732616032285
              s5 > -0.011400050993453053
                age > -0.04002367026349452
                  left = 114.33333333333333
                  right = 87.23076923076923
                right = 137.66666666666666
              right = 67.7
            right = 142.5
        right = 68.55
    age > -0.07998159322470814
      left = 274.0
      bp > 0.009822407098564287
        left = 135.538461538

In [None]:
from sklearn.datasets import make_regression

X, y = make_regression(n_samples=1000, n_features=14, n_informative=10, noise=15, random_state=42)
X = pd.DataFrame(X)
y = pd.Series(y)
X.columns = [f'col_{col}' for col in X.columns]