In [None]:
dict = {
    "xs": np.array([[1,2],[2,4],[3,6],[4,8],[5,10]]),
    "y":  np.array([1000,2000,3000,4000,5000]),
}

def debug(*args, **kwargs):
    return
    print(*args, **kwargs)
                    
def split(dict, feature_idx, val):    
    # Create a mask based on the feature at the given index
    mask = dict['xs'][:, feature_idx] >= val
    
    # Use the mask to split xs and y into two groups
    grp1 = {'xs': dict['xs'][~mask], 'y': dict['y'][~mask]}
    grp2 = {'xs': dict['xs'][mask], 'y': dict['y'][mask]}

    return grp1, grp2

def loss(dict):
    avg_y = np.mean(dict['y'])
    abs_values = np.abs(dict['y'] - avg_y)
    return np.sum(abs_values)
    
def find_best_value(dict, feature_idx, min_leaf):
    debug('Find best value: ', feature_idx)
    best_val = -1
    best_val_loss = float('inf')

    all_candidates = dict['xs'][:, feature_idx]
    candidates = np.unique(all_candidates)

    debug('Find best value candidates: ', candidates)

    for val in candidates:
        grp1, grp2 = split(dict, feature_idx, val)

        if len(grp1['xs']) < min_leaf or len(grp2['xs']) < min_leaf:
            continue

        loss_grp1 = loss(grp1)
        loss_grp2 = loss(grp2)
        total_loss = loss_grp1 + loss_grp2

        if best_val_loss > total_loss:
            best_val = val
            best_val_loss = total_loss

        debug('Find best value, evaluated candidate: ', val, val, total_loss, grp1, grp2)

    debug('Find best value, best val: ', best_val)
    debug('Find best value, best val loss: ', best_val_loss)

    return best_val, best_val_loss
    
def find_best_split(dict, min_leaf):
    debug('Find best split')
    
    best_feature_idx = None
    best_feature_val = None
    best_feature_idx_loss = float('inf')
    
    for feature_idx in range(len(dict['xs'][0])):
        best_val, best_val_loss = find_best_value(dict, feature_idx, min_leaf)
        debug('Feature best value: ', feature_idx, best_val, best_val_loss)
        if best_feature_idx_loss > best_val_loss:
            best_feature_idx = feature_idx
            best_feature_val = best_val
            best_feature_idx_loss = best_val_loss
            
    debug('Find best feature, evaluated candidate: ', best_feature_idx, best_feature_val, best_feature_idx_loss)
    
    return best_feature_idx, best_feature_val
        
class Node:
    def __init__(self, feature_idx, val):
        self.feature_idx = feature_idx
        self.val = val
        self.avg = None
        self.leaf = False
        self.child1 = None
        self.child2 = None
        
    def to_s(self):
        if self.leaf:
            return "Leaf - avg " + str(self.avg)
        
        return "Node - Feature_idx: " + str(self.feature_idx) + " - Val: " + str(self.val)
            
        
class CustomDecisionTreeRegressor:
    def __init__(self, dict, min_leaf=1):
        self.dict = dict
        self.min_leaf = min_leaf
        self.tree = None
        
    def fit(self):
        self.tree = self.build_dfs(self.dict)
    
    def build_dfs(self, dict):
        debug('Build next node ', dict)
        
        start_time = time.time()
        best_feature_idx, best_feature_val = find_best_split(dict, self.min_leaf)
        end_time = time.time()
        execution_time = end_time - start_time
        print("Time: ", execution_time)
        

        if(best_feature_idx == None):
            debug('Next node leaf', best_feature_idx, best_feature_val)
            node = Node(None, None)
            node.leaf = True
            node.avg = np.mean(dict['y'])
            return node
        
        debug('Next node node', best_feature_idx, best_feature_val)

        grp1, grp2 = split(dict, best_feature_idx, best_feature_val)
        node = Node(best_feature_idx, best_feature_val)
        node.child1 = self.build_dfs(grp1)
        node.child2 = self.build_dfs(grp2)
        
        return node;
        
    def predict(self, xs):
        return [self.predict_one(x) for x in xs]
    
    def predict_one(self, xs):
        return self.find_next_node(self.tree,xs)

    def find_next_node(self, node, xs):
        if(node.leaf):
            return node.avg
        
        if(xs[node.feature_idx] >= node.val):
            return self.find_next_node(node.child2, xs)
        else:
            return self.find_next_node(node.child1, xs)
            
        
regressor = CustomDecisionTreeRegressor(dict)
regressor.fit()
regressor.predict(np.array([[2,5], [1,5]]))

Time:  0.0010700225830078125
Time:  0.00023937225341796875
Time:  4.9114227294921875e-05
Time:  5.507469177246094e-05
Time:  0.0004661083221435547
Time:  3.266334533691406e-05
Time:  9.846687316894531e-05
Time:  4.601478576660156e-05
Time:  2.5272369384765625e-05


[2000.0, 1000.0]

In [111]:
# Identify columns that contain any strings
string_columns = df.columns[df.applymap(lambda x: isinstance(x, str)).any()]

# Create a copy of the DataFrame
df_custom = df.copy()

# Create xs_cols excluding the dep_var and any columns in string_columns
xs_cols = [col for col in df_custom.columns if col != dep_var and col not in string_columns]

# Cast the xs_cols to float
df_custom[xs_cols] = df_custom[xs_cols].astype(float)

# Convert to NumPy arrays
numpy_xs = df_custom.loc[:, xs_cols].to_numpy()
numpy_y = df[dep_var].to_numpy()

In [114]:
data = { "xs": numpy_xs, "y": numpy_y }
regressor = CustomDecisionTreeRegressor(data)
regressor.fit()

KeyboardInterrupt: 