In [13]:
"""
train_model.ipynb

Purpose
-------
    Implement and train a manual Random Forest classifier from scratch 
    to predict diabetes risk levels based on health and lifestyle data.

Description
-----------
    This notebook includes:
    1. Node, Decision Tree, and Random Forest class implementations 
       using Gini impurity.
    2. Utility methods for tree construction, bootstrap sampling, 
       feature selection, and majority voting.
    3. Loading and splitting the diabetes dataset into training and test sets.
    4. Training the Random Forest model and evaluating accuracy on both sets.
    5. Saving the trained model as 'model.pkl' for backend integration.

Usage
-----
    Run the notebook or script to train and save the model. 
    The saved 'model.pkl' is later used by the backend 
    to predict diabetes risk for new user inputs.

Dependencies
------------
    - numpy
    - pandas
    - dill
    - collections
    - typing
    - pathlib

Author
------
    Waseem Alyazidi.

Date
----
    2025-09-08.
"""


"\ntrain_model.ipynb\n\nPurpose\n-------\n    Implement and train a manual Random Forest classifier from scratch \n    to predict diabetes risk levels based on health and lifestyle data.\n\nDescription\n-----------\n    This notebook includes:\n    1. Node, Decision Tree, and Random Forest class implementations \n       using Gini impurity.\n    2. Utility methods for tree construction, bootstrap sampling, \n       feature selection, and majority voting.\n    3. Loading and splitting the diabetes dataset into training and test sets.\n    4. Training the Random Forest model and evaluating accuracy on both sets.\n    5. Saving the trained model as 'model.pkl' for backend integration.\n\nUsage\n-----\n    Run the notebook or script to train and save the model. \n    The saved 'model.pkl' is later used by the backend \n    to predict diabetes risk for new user inputs.\n\nDependencies\n------------\n    - numpy\n    - pandas\n    - dill\n    - collections\n    - typing\n    - pathlib\n\nAut

In [14]:
# Required libraries
import numpy as np
from collections import Counter
from typing import Tuple, Any, Optional, List

In [15]:
# Node
class Node:
    def __init__(self, feature_index: Optional[int], threshold: Optional[float], left_child: Optional["Node"],
                 right_child: Optional["Node"], value: Optional[int]) -> None:
        self.feature_index: Optional[int] = feature_index
        self.threshold: Optional[float] = threshold
        self.left_child: Optional["Node"] = left_child
        self.right_child: Optional["Node"] = right_child
        self.value: Optional[int] = value
    
    def is_leaf(self) -> bool:
        return self.value is not None

In [16]:
# Manual Decision Tree
class ManualDecisionTree:
    def __init__(self, max_depth: int = 10, min_samples_split: int = 2,
                 max_features: Optional[int] = None, random_state: Optional[int] = None) -> None:
        self.max_depth: int = max_depth
        self.min_samples_split: int = min_samples_split
        self.max_features: Optional[int] = max_features
        self.random_state: Optional[int] = random_state
        self._rng: np.random.Generator = np.random.default_rng(random_state) # reproducibility
        self.root: Optional[Node] = None
    

    # Fit data to the model
    def fit(self, X: np.ndarray, y: np.ndarray) -> None:
        """
            Fit training data to the model.

            Parameters:
                X (np.ndarray): Samples matrix with shape (n_samples, n_features).
                y (np.ndarray): Target vector with shape (n_samples,).
            
            Raises:
                ValueError: If X or y is empty, or if number of samples mismatch.
                TypeError: If X contains non-numeric features.
        """
        X = np.asarray(X)
        y = np.asarray(y)
        
        if X.ndim == 1:
            X = X.reshape(-1, 1)
        if y.ndim >= 2:
            y = y.ravel() # Reshape y to 1D matrix
        
        if X.size == 0 or y.size == 0:
            raise ValueError(f"X or y is empty matrix. Got X: {X.size}, y: {y.size}.")
        if X.shape[0] != y.shape[0]:
            raise ValueError(f"X and y must have the same number of samples. Got X: {X.shape[0]}, y: {y.shape[0]}\n")
        
        if not np.issubdtype(X.dtype, np.number):
            raise TypeError(f"All features must be numeric.\n")
        
        self.root = self._build_tree(X, y, depth=0)
    

    # Build the tree recursively
    def _build_tree(self, X: np.ndarray, y: np.ndarray, depth: int) -> Node:
        """
            Recursively build the decision tree.

            Parameters:
                X (np.ndarray): Samples matrix (n_samples, n_features).
                y (np.ndarray): Target vector (n_samples,).
                depth (int): Current depth of the tree.

            Returns:
                Node: A Node object, either a leaf node or internal node with left/right children.
        """
        # Stop condition (leaf)
        if depth >= self.max_depth or np.unique(y).size == 1 or y.size < self.min_samples_split:
            return Node(feature_index=None, threshold=None, left_child=None,
                        right_child=None, value=self._most_common(y))
        
        feature_index, threshold, X_left, y_left, X_right, y_right = self._best_split(X, y)

        if feature_index is None: # Fallback. No split found
            return Node(feature_index=None, threshold=None, left_child=None,
                        right_child=None, value=self._most_common(y))

        left_subtree: Node = self._build_tree(X_left, y_left, depth + 1)
        right_subtree: Node = self._build_tree(X_right, y_right, depth + 1)

        return Node(
            feature_index=feature_index,
            threshold=threshold,
            left_child=left_subtree,
            right_child=right_subtree,
            value=None
        )
    

    # Find the best split
    def _best_split(self, X: np.ndarray, y: np.ndarray) -> Tuple[
        Optional[int],
        Optional[float],
        Optional[np.ndarray],
        Optional[np.ndarray],
        Optional[np.ndarray],
        Optional[np.ndarray]
    ]:
        """
            Find the best split in the dataset by minimizing Gini impurity.

            Parameters:
                X (np.ndarray): Feature matrix of shape (n_samples, n_features).
                y (np.ndarray): Target vector of shape (n_samples,).

            Returns:
                Tuple containing:
                    - feature_index (int or None): Index of the best feature to split on.
                    - threshold (float or None): Threshold value for the split.
                    - X_left (np.ndarray or None): Subset of features for left split.
                    - y_left (np.ndarray or None): Subset of targets for left split.
                    - X_right (np.ndarray or None): Subset of features for right split.
                    - y_right (np.ndarray or None): Subset of targets for right split.
            """
        best_gini: float = float("inf")
        best_split: Tuple[Any] = (None, None, None, None, None, None)
        n_features: int = X.shape[1]

        # Feature subset (if max_features is set)
        if self.max_features is None:
            feature_indices: np.ndarray = np.arange(n_features)
        else:
            max_features: int = min(self.max_features, n_features)
            feature_indices: np.ndarray = self._rng.choice(n_features, size=max_features, replace=False)

        # Loop over each feature
        for feature_index in feature_indices:
            unique_values: np.ndarray = np.unique(X[:, feature_index])
            if unique_values.size <= 1:
                continue
            thresholds: np.ndarray = (unique_values[:-1] + unique_values[1:]) /2

            # Loop over thresholds
            for threshold in thresholds:
                left_indices: np.ndarray = X[:, feature_index] < threshold
                right_indices: np.ndarray = ~left_indices # Inverse of left_indices

                if left_indices.sum() == 0 or right_indices.sum() == 0:
                    continue
                # Split labels
                left_labels, right_labels = y[left_indices], y[right_indices]
                
                # Calculate gini
                gini_left: float = self._gini(left_labels)
                gini_right: float = self._gini(right_labels)
                weighted_gini: float = ((left_labels.size * gini_left) + (right_labels.size * gini_right)) / y.size

                # Update best split
                if weighted_gini < best_gini:
                    best_gini = weighted_gini
                    best_split = (feature_index, threshold, X[left_indices], left_labels, X[right_indices], right_labels)
        return best_split
    
    
    # Calculate gini
    def _gini(self, y: np.ndarray) -> float:
        """
            Calculate gini.

            Parameters:
                y (np.ndarray): Targets matrix with shape (n_samples,).
            
            Returns:
                float: The gini.
        """
        _,counts = np.unique(y, return_counts=True)
        prob: np.ndarray = counts / counts.sum()
        return 1 - np.sum(prob **2)


    # Most common label
    def _most_common(self, y: np.ndarray) -> int:
        """
            Find the most common label in y matrix.

            Parameters:
                y (np.ndarray): Targets matrix with shape (n_samples,).
            
            Returns:
                int: The label.
        """
        return Counter(y).most_common(1)[0][0]
    

    # Prediction
    def predict(self, X_new: np.ndarray) -> np.ndarray:
        """
            Predict on the new given data.

            Parameters:
                X_new (np.ndarray): New samples matrix to predict on, with shape (n_samples, n_features).
            
            Returns:
                np.ndarray: A matrix with predictions.
        """
        X_new: np.ndarray = np.asarray(X_new)
        if X_new.ndim == 1:
            X_new = X_new.reshape(1, -1)
        if X_new.size == 0:
            raise ValueError("The given X matrix is empty.\n")
        if not np.issubdtype(X_new.dtype, np.number):
            raise TypeError(f"All features must be numeric.\n")
        
        if self.root is None:
            raise ValueError("The model has not been trained! Please call 'fit()' first.\n")
        
        return np.array([self._traverse_tree(x, self.root) for x in X_new])

    # Traverse tree
    def _traverse_tree(self, X_new: np.ndarray, node: Node) -> int:
        """
            Traverse the tree until a leaf node is reached.

            Parameters:
                X_new (np.ndarray): New samples matrix to predict on, with shape (n_samples, n_features).
                node (Node): Current node in the tree.
            
            Returns:
                int: The predicted label.
        """
        X_new: np.ndarray = np.asarray(X_new)
        if X_new.ndim > 1:
            if X_new.shape[0] == 1:
                X_new = X_new[0]
            else:
                raise ValueError("Expected single sample (1D) in _traverse_tree.")

        # Leaf conditions
        if node.is_leaf():
            return node.value
        if node.left_child is None or node.right_child is None:
            return node.value  # fallback
        if node.feature_index is None or node.feature_index >= X_new.size:
            return node.value
        
        if X_new[node.feature_index] < node.threshold:
            return self._traverse_tree(X_new, node.left_child)
        else:
            return self._traverse_tree(X_new, node.right_child)
        

In [17]:
# Manual Random Forest
class ManualRandomForest:
    def __init__(self, n_trees: int = 100, max_depth: int = 10,
                 min_samples_split: int = 2, max_features: Optional[int] = None,
                 random_state: Optional[int] = None) -> None:
        self.n_trees: int = n_trees
        self.max_depth: int = max_depth
        self.min_samples_split: int = min_samples_split
        self.max_features: Optional[int] = max_features
        self.random_state: Optional[int] = random_state
        self._rng: np.random.Generator = np.random.default_rng(random_state) # reproducibility
        self.trees: List[ManualDecisionTree] = []
    
    # Fit data to the model
    def fit(self, X: np.ndarray, y: np.ndarray) -> None:
        """
            Fit training data to the model.

            Parameters:
                X (np.ndarray): Samples matrix with shape (n_samples, n_features).
                y (np.ndarray): Target vector with shape (n_samples,).
            
            Raises:
                ValueError: If X or y is empty, or if number of samples mismatch.
                TypeError: If X contains non-numeric features.
        """
        X = np.asarray(X)
        y = np.asarray(y)
        
        if X.ndim == 1:
            X = X.reshape(-1, 1)
        if y.ndim >= 2:
            y = y.ravel() # Reshape y to 1D matrix
        
        if X.size == 0 or y.size == 0:
            raise ValueError(f"X or y is empty matrix. Got X: {X.size}, y: {y.size}.")
        if X.shape[0] != y.shape[0]:
            raise ValueError(f"X and y must have the same number of samples. Got X: {X.shape[0]}, y: {y.shape[0]}\n")
        
        if not np.issubdtype(X.dtype, np.number):
            raise TypeError(f"All features must be numeric.\n")
        
        self.trees = []
        for _ in range(self.n_trees):
            X_sample, y_sample = self._bootstrap_samples(X, y)

            tree_seed: int = int(self._rng.integers(0, 1e9))
            tree: ManualDecisionTree = ManualDecisionTree(
                max_depth=self.max_depth,
                min_samples_split=self.min_samples_split,
                max_features=self.max_features,
                random_state=int(tree_seed)
            )
            tree.fit(X_sample, y_sample)
            self.trees.append(tree)
    

    # Generate bootstrap samples (sampling with replacement)
    def _bootstrap_samples(self, X: np.ndarray, y: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        """
            Generate bootstrap samples (sampling with replacement).

            Parameters:
                X (np.ndarray): Feature matrix (n_samples, n_features).
                y (np.ndarray): Target vector (n_samples,).

            Returns:
                Tuple[np.ndarray, np.ndarray]: Bootstrapped X and y samples.
        """
        n_samples: int = X.shape[0]

        # Randomly select indices with replacement (bootstrap sampling)
        indices: np.ndarray = self._rng.choice(n_samples, size=n_samples, replace=True)

        # Use the selected indices to create the bootstrap sample
        return X[indices], y[indices]
    
    # Collect trees predictions
    def _collect_trees_predictions(self, X_new: np.ndarray) -> np.ndarray:
        """
            Collect predictions from all decision trees.

            Parameters:
                X_new (np.ndarray): New data samples (n_samples, n_features).

            Returns:
                np.ndarray: Predictions of shape (n_trees, n_samples).
        """
        return np.array([tree.predict(X_new) for tree in self.trees])


    # Determine the final result by majority vote
    def _majority_vote(self, predictions: np.ndarray) -> np.ndarray:
        """
            Aggregate predictions from trees using majority vote.

            Parameters:
                predictions (np.ndarray): Predictions from all trees (n_trees, n_samples).

            Returns:
                np.ndarray: Final predictions (n_samples,).
        """
        predictions = predictions.T # Convert shape to (n_samples, n_trees)
        final_predictions: List[int] = [
            Counter(sample_pred).most_common(1)[0][0] for sample_pred in predictions
        ]
        return np.array(final_predictions)
    

    # Predictions
    def predict(self, X_new: np.ndarray) -> np.ndarray:
        """
            Predict target values for new samples.

            Parameters:
                X_new (np.ndarray): New data samples (n_samples, n_features).

            Returns:
                np.ndarray: Predicted labels (n_samples,).
        """
        if not self.trees:
            raise ValueError("The model has not been trained! Call 'fit()' first.\n")
        predictions: np.ndarray = self._collect_trees_predictions(X_new)
        return self._majority_vote(predictions)

In [18]:
import sys
from pathlib import Path

# Add the project root folder to sys.path
project_root = Path("..").resolve()
sys.path.append(str(project_root))

import pandas as pd
import numpy as np
import dill
from backend.utils import load_csv_data, train_test_split

def main() -> None:
    """Testing and saving the model."""

    # Load and split the data
    df: pd.DataFrame = load_csv_data(source_path=r"../data/diabetes.csv")
    X_train, y_train, X_test, y_test = train_test_split(df, test_size=0.2, target_col="risk_level", random_state=42)

    # Train and test the model
    rf_model: ManualRandomForest = ManualRandomForest(
        n_trees=100,
        max_depth=9,
        min_samples_split=2,
        max_features=2,
        random_state=42
    )
    rf_model.fit(X_train, y_train)
    # Save the trained model.
    with open(r"../backend/model.pkl", mode="wb") as f:
        dill.dump(rf_model, f)

    train_pred: np.ndarray = rf_model.predict(X_train)
    test_pred: np.ndarray = rf_model.predict(X_test)

    train_acc: np.ndarray = np.sum(train_pred == y_train) / y_train.size
    test_acc: np.ndarray = np.sum(test_pred == y_test) / y_test.size

    # Display the results
    print(f"Train Accuracy: {train_acc:.4f}.")
    print(f"Test Accuracy: {test_acc:.4f}.\n")
    print(f"Average Accuracy: {(train_acc+test_acc)/2.0:.2f}.")


if __name__ == "__main__":
    main()

Train Accuracy: 0.9984.
Test Accuracy: 0.8170.

Average Accuracy: 0.91.
