In [None]:
class XGBoostFromScratch():
    '''A Custom Implementation of XGBoost with Decision Trees'''

    def __init__(self, hyperparams, seed=None):
        # Initialize hyperparameters with default values if not provided
        self.hyperparams = defaultdict(lambda: None, hyperparams)

        # Set subsampling ratio for the training data
        self.sample_fraction = self.hyperparams['subsample'] or 1.0

        # Learning rate, controls how much to shrink the predictions at each boosting round
        self.eta = self.hyperparams['learning_rate'] or 0.3

        # Base prediction score (initial prediction for all examples)
        self.initial_score = self.hyperparams['base_score'] or 0.5

        # Maximum depth of the decision trees (controls model complexity)
        self.depth_limit = self.hyperparams['max_depth'] or 5

        # Initialize a random number generator (for reproducibility and randomness in subsampling)
        self.random_gen = np.random.default_rng(seed=seed)

    def fit(self, X, y, objective_function, num_rounds, verbose=False):
        '''
        Trains the model by building a sequence of decision trees.

        Parameters:
        - X: Features matrix (pandas DataFrame or numpy array).
        - y: Target variable (numpy array or pandas Series).
        - objective_function: A loss function object that provides gradient and curvature (second derivative).
        - num_rounds: Number of boosting rounds (number of trees to build).
        - verbose: If True, prints loss at each round for monitoring.
        '''

        # Initialize predictions with the base score (initial constant value for all data points)
        self.trees = []  # To store all decision trees
        current_predictions = self.initial_score * np.ones(y.shape)  # Initial predictions

        # Training loop for each boosting round (adding one tree per round)
        for round_num in range(num_rounds):
            # Compute gradients (first derivative of the loss function w.r.t. predictions)
            gradients = objective_function.gradient(y, current_predictions)

            # Compute curvatures (second derivative of the loss function w.r.t. predictions)
            curvatures = objective_function.curvature(y, current_predictions)

            # Randomly subsample the data if subsampling is enabled (subsample < 1.0)
            selected_indices = None if self.sample_fraction == 1.0 else \
                self.random_gen.choice(len(y), size=math.floor(self.sample_fraction * len(y)), replace=False)

            # Build a single decision tree using the gradients and curvatures
            tree = DecisionTree(X, gradients, curvatures, self.hyperparams, self.depth_limit, selected_indices)

            # Update the predictions with the output from the new tree (shrinkage by learning rate)
            current_predictions += self.eta * tree.predict(X)

            # Store the tree for future predictions
            self.trees.append(tree)

            # Optionally print the loss for this round to monitor progress
            if verbose:
                print(f'[{round_num}] Training Loss = {objective_function.loss(y, current_predictions)}')

    def predict(self, X):
        '''
        Makes predictions for the input data by summing up the contributions from all trees.

        Parameters:
        - X: Input data for which predictions are to be made.

        Returns:
        - Predicted values for the input data.
        '''

        # Start with the base prediction and add the contributions from all trees
        return self.initial_score + self.eta * np.sum([tree.predict(X) for tree in self.trees], axis=0)


class DecisionTree():
    '''Represents a single decision tree built for a boosting round.'''

    def __init__(self, X, gradients, curvatures, hyperparams, depth_limit, selected_indices=None):
        '''
        Builds a decision tree based on the gradients and curvatures of the loss function.

        Parameters:
        - X: Features matrix (pandas DataFrame or numpy array).
        - gradients: Gradients (first derivative of the loss function w.r.t. predictions).
        - curvatures: Curvatures (second derivative of the loss function w.r.t. predictions).
        - hyperparams: Dictionary of hyperparameters (like max depth, regularization).
        - depth_limit: Maximum depth of the tree (how deep the tree can grow).
        - selected_indices: Optional subsample of indices for training this tree.
        '''

        self.hyperparams = hyperparams
        self.depth_limit = depth_limit
        assert self.depth_limit >= 0, 'Depth must be nonnegative'

        # Hyperparameter settings with fallback values
        self.min_child_weight = hyperparams['min_child_weight'] or 1.0
        self.regularization = hyperparams['reg_lambda'] or 1.0
        self.split_penalty = hyperparams['gamma'] or 0.0
        self.feature_sampling_rate = hyperparams['colsample_bynode'] or 1.0

        # Convert pandas Series to numpy arrays if necessary
        if isinstance(gradients, pd.Series):
            gradients = gradients.values
        if isinstance(curvatures, pd.Series):
            curvatures = curvatures.values

        # If no subsampling, use all data points
        if selected_indices is None:
            selected_indices = np.arange(len(gradients))

        # Store data and necessary statistics for building the tree
        self.X, self.gradients, self.curvatures, self.selected_indices = X, gradients, curvatures, selected_indices
        self.sample_size, self.num_features = len(selected_indices), X.shape[1]

        # Prediction value for this node, calculated as a leaf node initially
        # The formula minimizes the loss function based on gradients and curvatures
        self.prediction_value = -gradients[selected_indices].sum() / (curvatures[selected_indices].sum() + self.regularization)

        # This variable tracks the best split found during the tree construction
        self.best_gain = 0.0

        # If the tree can grow deeper, we evaluate possible splits
        if self.depth_limit > 0:
            self._evaluate_possible_splits()

    def _evaluate_possible_splits(self):
        '''Evaluates potential splits at each feature and chooses the best one based on gain.'''

        # Iterate through all features to find the best split
        for feature_idx in range(self.num_features):
            self._attempt_split(feature_idx)

        # If no good split was found, this node will remain a leaf
        if self.is_leaf_node:
            return

        # Partition the data based on the best split found
        feature_values = self.X.values[self.selected_indices, self.best_split_feature]
        left_idx = np.nonzero(feature_values <= self.split_threshold)[0]
        right_idx = np.nonzero(feature_values > self.split_threshold)[0]

        # Recursively grow the left and right child trees
        self.left_child = DecisionTree(self.X, self.gradients, self.curvatures, self.hyperparams, self.depth_limit - 1, self.selected_indices[left_idx])
        self.right_child = DecisionTree(self.X, self.gradients, self.curvatures, self.hyperparams, self.depth_limit - 1, self.selected_indices[right_idx])

    @property
    def is_leaf_node(self):
        '''Check if the current node is a leaf node (i.e., no further split was found).'''
        return self.best_gain == 0.0

    def _attempt_split(self, feature_idx):
        '''
        Tries to find the best split for a given feature by evaluating the potential gain in loss reduction.

        Parameters:
        - feature_idx: Index of the feature to evaluate.
        '''

        # Sort the data by the selected feature to evaluate potential splits
        feature_values = self.X.values[self.selected_indices, feature_idx]
        gradients, curvatures = self.gradients[self.selected_indices], self.curvatures[self.selected_indices]
        sorted_indices = np.argsort(feature_values)
        sorted_gradients, sorted_curvatures, sorted_feature_values = gradients[sorted_indices], curvatures[sorted_indices], feature_values[sorted_indices]

        # Calculate total gradients and curvatures for the entire node
        total_gradient, total_curvature = gradients.sum(), curvatures.sum()
        left_gradient, left_curvature = 0.0, 0.0  # Cumulative values for left child
        right_gradient, right_curvature = total_gradient, total_curvature  # Cumulative values for right child

        # Iterate through each point and evaluate the gain for splitting at that point
        for i in range(self.sample_size - 1):
            grad_i, curv_i, val_i, next_val = sorted_gradients[i], sorted_curvatures[i], sorted_feature_values[i], sorted_feature_values[i + 1]
            left_gradient += grad_i
            right_gradient -= grad_i
            left_curvature += curv_i
            right_curvature -= curv_i

            # Skip splits that don't satisfy minimum child weight or result in identical splits
            if left_curvature < self.min_child_weight or val_i == next_val:
                continue
            if right_curvature < self.min_child_weight:
                break

            # Calculate the gain from splitting the node at this point
            gain = 0.5 * ((left_gradient**2 / (left_curvature + self.regularization))
                          + (right_gradient**2 / (right_curvature + self.regularization))
                          - (total_gradient**2 / (total_curvature + self.regularization))
                         ) - self.split_penalty / 2

            # If this split provides a better gain, record it as the best split
            if gain > self.best_gain:
                self.best_split_feature = feature_idx
                self.best_gain = gain
                self.split_threshold = (val_i + next_val) / 2

    def predict(self, X):
        '''Makes predictions for the input data by recursively traversing the decision tree.'''
        return np.array([self._predict_single(row) for i, row in X.iterrows()])

    def _predict_single(self, row):
        '''Predicts the value for a single input row by following the tree structure.'''
        if self.is_leaf_node:
            return self.prediction_value
        if row[self.best_split_feature] <= self.split_threshold:
            return self.left_child._predict_single(row)
        else:
            return self.right_child._predict_single(row)
