In [57]:
import pandas as pd
import random
from sklearn import preprocessing
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
import numpy as np

In [77]:
def h(p):
        h = 0 
        for k in range(2):
            h += p[k] * (1-p[k])
        return h
    
    
def p(y, cc=2):
        p = []
        for i in range(cc):
            p.append(0)
            for Y in y:
                if Y == i:
                    p[i] += 1 
                        
            p[i] = p[i] / len(y)       
        
        return p
    

def gini(slice, df):
    """
    Gini impurity
    """
        
    # for i in range(class_count):       
    q_left = df[df[df.columns[0]] < slice]['y']     
    q_right = df[df[df.columns[0]] >= slice]['y']
    
    if len(q_left) == 0 or len(q_right) == 0:
        # not a slice
        return [-1, 1, 1]
    
    p_left = p(q_left)
    h_left = h(p_left)

    p_right = p(q_right)
    h_right = h(p_right)

    q_left = len(q_left) / len(df)
    q_right = len(q_right) / len(df)       
    
    IG = h(p(df['y'])) - q_left * h_left - q_right * h_right
        
    #print(f'GINI || Q_LEFT: {q_left} | Q_RIGHT: {q_right} | P_LEFT: {p_left}')       

    return [IG, h_left, h_right]
    
    
def best_gini(feature, y):
    best = -1
    best_slice = -1
    best_left = 1
    best_right = 1

    # can be no devide opportunity
    if feature.min() != feature.max():
        df = feature.to_frame()
        df['y'] = y

        # 100 попыток найти лучший gini
        for i in range(10):
            slice = random.uniform(feature.min(), feature.max())
            g = gini(slice, df)

            if g[0] > best:
                best = g[0]
                best_slice = slice
                best_left = g[1]
                best_right = g[2]
    
    result = {feature.name: {'gini': best, 'slice': best_slice, 'left': best_left, 'right': best_right}}
    #print(result)
    return result


def node(X, y):  
    # apply to every feature
    features_gini = X.apply(lambda x: best_gini(x, y))
    
    best_feature = {'gini': 0}
    
    df = X
    df['y'] = y
    
    for f in features_gini:
        for key, value in f.items():
            if value['gini'] > best_feature['gini']:
                best_feature['name'] = key
                best_feature['slice'] = value['slice']
                best_feature['gini'] = value['gini']
                best_feature['h_left'] = value['left']
                best_feature['h_right'] = value['right']
                
            # we have the final node!
            if value['gini'] == 0:
                left = df[df[key] < value['slice']]
                dleft = left['y'].iloc[0]
                right = df[df[key] >= value['slice']]
                dright = right['y'].iloc[0]
                print('STOP!!!')
                print(f'Left way is desicion: Class #{dleft}')
                print(f'Right way is desicion: Class #{dright}')
                
                return {'feature': key, 'slice': value['slice'], 'left': dleft, 'right': dright} 
                
    
    # still there is 
    #if best_feature['gini'] > 0 :
    print('Best step feature: ' + str(best_feature['name']) + ' | slice: ' + str(best_feature['slice']) + ' | GINI: '  + str(best_feature['gini']))
    
    #save model
    model = {'feature': best_feature['name'], 'slice': best_feature['slice']} 
    
    # left ветка
    left = df[df[best_feature['name']] < best_feature['slice']]
    if best_feature['h_left'] == 0:
        # decision
        decision = left['y']
        try:
            decision = decision.iloc[0]
        except:
            print(df[best_feature['name']])
            print('Trying to slice ', best_feature['slice'])
            print('WRONG ', decision)
            input()
        print(f'Left way is desicion: Class #{decision}')
        model['left'] = decision
    else:
        model['left'] = node(left.drop('y', axis=1), left['y'])
    
    # right ветка
    right = df[df[best_feature['name']] >= best_feature['slice']]
    if best_feature['h_right'] == 0:
        # decision
        decision = right['y']
        try:
            decision = decision.iloc[0]
        except:
            print(decision)
        print(f'Right way is desicion: Class #{decision}')
        model['right'] = decision
    else:
        model['right'] = node(right.drop('y', axis=1), right['y'])
    
    return model
        
    
def fit(X,y):
    model = node(X,y)
    print('==== Model ready!')
    return model


def predict(model, X):
    """
    And know we will predict class
    """   
    # left 
    if X[model['feature']] < model['slice']:
        # next node
        if type(model['left']) == dict:
            return predict(model['left'], X)
        else:
            return model['left']
    
    # Right
    else:
        if type(model['right']) == dict:
            return predict(model['right'], X)
        else:
            return model['right']
    

# Тестовые данные

In [59]:
# data for small test

df = pd.DataFrame([[5,8,0],[1,3,1],[2,4,0],[3,6,1]])

X = df.drop(2, axis=1)
y = df[2]

In [60]:
# pental data

from sklearn import datasets
iris = datasets.load_iris()

prepare_data = pd.DataFrame(iris.data)
prepare_data['class'] = iris.target

# remove third class
prepare_data = prepare_data[prepare_data['class'] != 2] 

X = prepare_data.drop('class', axis=1)
y = prepare_data['class']

# Данные от такси

In [79]:
# simple prediciton. taxi driver response base on service class
dset = pd.read_csv('taxi_tree.csv')
X = dset.drop(['driver_response', 'close_to_driver', 'close_to_client'], axis = 1)
y = dset['driver_response']

# huge test size, because fit is quite slow without branch limiting 
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.99, random_state=42)

#min_max_scaler = preprocessing.MinMaxScaler()
#X = pd.DataFrame(min_max_scaler.fit_transform(X))

In [None]:
model = fit(X_train,y_train)

In [63]:
#print('Predict ',predict(model, X_test.iloc[18]))
#print('Real class is ', y_test.iloc[18])

#prediction = []

#for i in range(len(X_test)):
#    x = X_test.iloc[i]
#    prediction.append(predict(model, x))

# OUR PREDICTION

In [81]:
def convert_x(_x, X):
    """
    We need dictionary to pass in predict function
    """
    result = {}
    
    col = [x for x in X_test.columns]
    
    for i in range(len(col)):
        result[col[i]] = _x[i]
    
    return result
        
    
pred = X_test.copy()
pred['y'] = pred.apply(lambda x: predict(model, convert_x(x, X)), axis=1)

# Скорим нашу модель

In [88]:
print('Metrics')
print('F1: ', f1_score(y_test, pred['y'], average="macro"))
print('Accuracy score', accuracy_score(y_test, pred['y']))
print('Precision score', precision_score(y_test, pred['y'], average="macro"))
print('Recall score', recall_score(y_test, pred['y'], average="macro"))

Metrics
F1:  0.6715467877388039
Accuracy score 0.671830985915493
Precision score 0.6717975530742233
Recall score 0.6715305991653969


## Неплохая точность с учетом того, что мы брали только 1% выборки для обучения

Точно не скажешь, что модель переучена :)

# Вопросы к преподавателю

Правильно ли я ставлю условия выхода? По Gini = 0, Hleft = 0, hright = 0?