In [1]:
from __future__ import annotations
from typing import Tuple
from abc import ABC,abstractmethod
from scipy import stats
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

In [2]:
class Node(object):
    """
    Class to define & control tree nodes
    """
    
    def __init__(self) -> None:
        """
        Initializer for a Node class instance
        """
        self.__split    = None
        self.__feature  = None
        self.__left     = None
        self.__right    = None
        self.leaf_value = None

    def set_params(self, split: float, feature: int) -> None:
        """
        Set the split & feature parameters for this node
        
        Input:
            split   -> value to split feature on
            feature -> index of feature to be used in splitting 
        """
        self.__split   = split
        self.__feature = feature
        
    def get_params(self) -> Tuple[float,int]:
        """
        Get the split & feature parameters for this node
        
        Output:
            Tuple containing (split,feature) pair
        """
        return(self.__split, self.__feature)    
        
    def set_children(self, left: Node, right: Node) -> None:
        """
        Set the left/right child nodes for the current node
        
        Inputs:
            left  -> LHS child node
            right -> RHS child node
        """
        self.__left  = left
        self.__right = right
        
    def get_left_node(self) -> Node:
        """
        Get the left child node
        
        Output:
            LHS child node
        """
        return(self.__left)
    
    def get_right_node(self) -> Node:
        """
        Get the RHS child node
        
        Output:
            RHS child node
        """
        return(self.__right)


In [3]:
X = np.load('example_X.npy')
treatment = np.load('example_treatment.npy')
y = np.load('example_y.npy')

In [7]:
np.sum(treatment == 1), len(treatment)

(25197, 50000)

In [None]:
class UpliftTreeRegressor():
    def __init__(self,
                max_depth: int = 3, # максимальная глубина дерева.
                min_samples_leaf: int = 1000, # минимальное необходимое число обучающих объектов в листе дерева.
                min_samples_leaf_treated: int = 300, # минимальное необходимое число обучающих объектов с T=1 в листе дерева.
                min_samples_leaf_control: int = 300, # минимальное необходимое число обучающих объектов с T=0 в листе дерева.
    ):
        self.tree              = None
        self.max_depth         = max_depth
        self.min_samples_split = min_samples_split
        self.min_samples_leaf_treated = min_samples_leaf_treated
        self.min_samples_leaf_control = min_samples_leaf_control
    

    def __grow(self, node: Node, D: np.array, level: int) -> None:
        """
        Private recursive function to grow the tree during training
        
        Inputs:
            node  -> input tree node
            D     -> sample of data at node 
            level -> depth level in the tree for node
        """
        # are we in a leaf node?
        depth = (self.max_depth is None) or (self.max_depth >= (level+1))
        msamp = (self.min_samples_split <= D.shape[0])
        n_cls = np.unique(D[:,-1]).shape[0] != 1
        
        # not a leaf node
        if depth and msamp and n_cls:
        
            # initialize the function parameters
            ip_node = None
            feature = None
            split   = None
            left_D  = None
            right_D = None
            # iterate through the possible feature/split combinations
            for f in range(D.shape[1]-1):
                for s in np.unique(D[:,f]):
                    # for the current (f,s) combination, split the dataset
                    D_l = D[D[:,f]<=s]
                    D_r = D[D[:,f]>s]
                    # ensure we have non-empty arrays
                    if D_l.size and D_r.size:
                        # calculate the impurity
                        ip  = (D_l.shape[0]/D.shape[0])*self._impurity(D_l) + (D_r.shape[0]/D.shape[0])*self._impurity(D_r)
                        # now update the impurity and choice of (f,s)
                        if (ip_node is None) or (ip < ip_node):
                            ip_node = ip
                            feature = f
                            split   = s
                            left_D  = D_l
                            right_D = D_r
            # set the current node's parameters
            node.set_params(split,feature)
            # declare child nodes
            left_node  = Node()
            right_node = Node()
            node.set_children(left_node,right_node)
            # investigate child nodes
            self.__grow(node.get_left_node(),left_D,level+1)
            self.__grow(node.get_right_node(),right_D,level+1)
                        
        # is a leaf node
        else:
            
            # set the node value & return
            node.leaf_value = self._leaf_value(D)
            return
    
    def fit(self,
            X: np.ndarray, # массив (n * k) с признаками.
            treatment: np.ndarray, # массив (n) с флагом воздействия.
            y: np.ndarray # массив (n) с целевой переменной.
    ) -> None:
        pass
    
    def _impurity(self, X: np.array, t: np.array) -> float:
        """
        Protected function to define the impurity
        
        Input:
            D -> data to compute the impurity metric over
        Output:
            Impurity metric for D        
        """            
        # use the selected loss function to calculate the node impurity
        
        ip = X[t == 1]/np.sum(t == 1) - X[t == 0]/np.sum(t == 0)
        return(ip)
    
    def _leaf_value(self, D: np.array) -> float:
        """
        Protected function to compute the value at a leaf node
        
        Input:
            D -> data to compute the leaf value
        Output:
            Mean of D           
        """
        return(np.mean(D[:,-1]))
    
    def predict(self, X: np.ndarray) -> Iterable[float]:\
        predictions = None
        return predictions
    
    def _check(model_constructor,
               model_params: dict,
               X,
               treatment,
               y,
               X_test,
               pred_right) -> bool:
        model = model_constructor(**model_params)
        model.fit(X, treatment, y)
        pred = np.array(model.predict(X_test)).reshape(len(X_test))
        passed = (np.max(np.abs(pred - pred_right)) < EPS)
        return passed
    
    def _return_thresholds(col_values: np.ndarray) -> np.ndarray:
        # column_values - одномерный массив со значениями признака в текущей вершине.
        # threshold_options - получившиеся варианты порога. Их и нужно будет перебрать при подборе оптимального порога.
        unique_values = np.unique(col_values)
        if len(unique_values) > 10:
            percentiles = np.percentile(column_values, [3, 5, 10, 20, 30, 50, 70, 80, 90, 95, 97])
        else:
            percentiles = np.percentile(unique_values, [10, 50, 90])
        threshold_options = np.unique(percentiles)
        return threshold_options



    