In [1]:
import numpy as np

## Implementation
### Splitting

In [None]:
def train_test_split(data, test_size=0.4, random_state=2060):
    """
    Split the data into training and testing sets randomly.

    Parameters:
    - data: 2D numpy array, the entire dataset.
    - test_size: float, the proportion of the data to include in the test split.
    - random_state: int, the seed used by the random number generator.

    Returns:
    - train_data: 2D numpy array, the training set.
    - test_data: 2D numpy array, the testing set.
    """
    if random_state is not None:
        np.random.seed(random_state)
    n_samples = data.shape[0]
    indices = np.random.permutation(n_samples) # shuffling
    test_size = int(n_samples * test_size)
    test_indices = indices[:test_size]
    train_indices = indices[test_size:]
    train_data = data[train_indices]
    test_data = data[test_indices]
    return train_data, test_data

### CART

In [6]:
class Node:
    """
    Represents a node in the decision tree. Each node can be one of the following:
    - Decision node: splits the data.
    - Leaf node: predicts the label.

    Attributes:
    - left: none for leaf nodes.
    - right: none for leaf nodes.
    - label: none for decision nodes.
    - feature: none for leaf nodes.
    - threshold: none for leaf nodes or categorical splits.
    """
    def __init__(self, left=None, right=None, label=None, feature=None, threshold=None):
        self.left = left
        self.right = right
        self.label = label
        self.feature = feature
        self.threshold = threshold

    def is_leaf(self):
        """
        Check if a node is a leaf node.

        Returns:
        - A boolean value indicating whether the node is a leaf node. True if it is a leaf node, False otherwise.
        """
        return self.label is not None

In [None]:
class CART:
    # TODO: visit hw5? set major = 0 if balanced?
    # TODO: if training set is empty, should we take predict all labels in the test set to be the major class?
    """
    A class to implement a CART decision tree.

    Attributes:
    - max_depth: int, the maximum depth of the tree.
    - min_samples_split: int, the minimum number of samples required to split an internal node.
    - tree: Node, the node of the decision tree. It can be None if the tree is not built yet.
    """
    def __init__(self, max_depth=10, min_samples_split=10):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.tree = None

    def fit(self, data):
        """
        Fit the decision tree to the training data.

        Parameters:
        - data: 2D numpy array, the entire training dataset where the last column is the target variable.
            - Note: below all "data" refers to the entire training data, including the target variable.
        """
        self.tree = self._build_tree(data, depth=0)

    def _build_tree(self, data, depth):
        """
        Recursively build the tree.
        This method constructs the tree by splitting the dataset at each node based on the feature and threshold
        that minimize the Gini impurity. It stops the recursion when one of the following conditions is met:
        - All labels in the current node are the same.
        - The maximum depth is reached.
        - The minimum number of samples required to split an internal node is not met.

        Parameters:
        - data: 2D numpy array, the entire training dataset at the current node.
        - depth: int, the current depth of the tree.
        
        Returns:
        - Node: a decision node or leaf node of the tree.
        """
        # If the input training dataset is empty, then guess all labels to be 0
        if len(data) == 0:
            return Node(label=0)
        
        labels = data[:, -1]
        # Stopping conditions
        # All labels are the same
        if len(np.unique(labels)) == 1:
            return Node(label=labels[0])
        # Max depth or minimum split size is reached
        # TODO: check if we need to add the = case?
        if depth >= self.max_depth or len(data) < self.min_samples_split:
            major = np.bincount(labels.astype(int)).argmax()
            return Node(label=major) # return the majority class

        # Find the best split
        best_split = self._find_best_split(data)
        # No valid split
        if not best_split:
            major = np.bincount(labels.astype(int)).argmax()
            return Node(label=major) # return the majority class
        # Remove the splitted categorical feature
        if best_split["type"] == "categorical":
            remaining_data_left = np.delete(best_split["left"], best_split["feature"], axis=1)
            remaining_data_right = np.delete(best_split["right"], best_split["feature"], axis=1)
        else:
            remaining_data_left = best_split["left"]
            remaining_data_right = best_split["right"]
        # Recursion
        left_tree = self._build_tree(remaining_data_left, depth + 1)
        right_tree = self._build_tree(remaining_data_right, depth + 1)
        return Node(
            left=left_tree,
            right=right_tree,
            feature=best_split["feature"],
            threshold=best_split["threshold"]
        )
        
    def _find_best_split(self, data):
        """
        Find the best feature (and threshold) to split the current data.
        This method iterates over all features (excluding the target) in the dataset to find the best split that 
        minimizes the weighted Gini impurity. It handles both continuous and categorical features:
        - For continuous features, it finds the best threshold to split the data.
            - Thresholds are the midpoints between unique values.
        - For categorical features, binary splits are evaluated.

        Parameters:
        - data: 2D numpy array, the entire training dataset at the current node.
        
        Returns:
        - A dictionary containing the best split information. It can be None if no valid split is found.
        """
        best_gini = float("inf") # use positive infinity since we will minimize it
        best_split = None
        n_features = data.shape[1] - 1 # last column is the target
        for feature in range(n_features):
            unique_values = np.unique(data[:, feature])
            sorted_values = np.sort(unique_values)
            if len(unique_values) > 2:
                thresholds = (sorted_values[1:] + sorted_values[:-1]) / 2 # midpoints
                for threshold in thresholds:
                    left, right = self._split_continuous(data, feature, threshold)
                    if len(left) == 0 or len(right) == 0:
                        continue
                    gini = self._gini_for_split(data, left, right)
                    if gini < best_gini:
                        best_gini = gini
                        best_split = {
                            "feature": feature,
                            "threshold": threshold,
                            "left": left,
                            "right": right,
                            "type": "continuous"
                        }
            else:
                left, right = self._split_categorical(data, feature)
                if len(left) == 0 or len(right) == 0:
                    continue
                gini = self._gini_for_split(data, left, right)
                if gini < best_gini:
                    best_gini = gini
                    best_split = {
                        "feature": feature,
                        "threshold": None, # binary split
                        "left": left,
                        "right": right,
                        "type": "categorical"
                    }
        return best_split
    
    # TODO: add a line to handle cases when len(data) = 0 in Gini?

    def _gini_for_node(self, data):
        """
        Calculate the Gini impurity for a node.
        """
        labels = data[:, -1] # the last column
        unique_labels, counts = np.unique(labels, return_counts=True)
        probs = counts / len(data) # two probabilities of being in two classes
        gini = 1 - np.sum(probs ** 2)
        return gini

    def _gini_for_split(self, data, left, right):
        """
        Calculate the weighted Gini impurity for a split.
        """
        total_size = len(data)
        left_size = len(left)
        right_size = len(right)
        gini_left = self._gini_for_node(left)
        gini_right = self._gini_for_node(right)
        gini = (left_size / total_size) * gini_left + (right_size / total_size) * gini_right
        return gini

    def _split_continuous(self, data, feature_index, threshold):
        """
        Split the data based on a continuous feature.

        Parameters:
        - data: 2D numpy array, the entire training dataset at the current node.
        - feature_index: int, index of the feature to split.
        
        Returns:
        - Two subsets of the data.
        """
        left = data[data[:, feature_index] <= threshold]
        right = data[data[:, feature_index] > threshold]
        return left, right

    def _split_categorical(self, data, feature_index):
        """
        Split the data based on a categorical feature.

        Parameters:
        - data: 2D numpy array, the entire training dataset at the current node.
        - feature_index: int, index of the feature to split.

        Returns:
        - Two subsets of the data.
        """
        values = np.unique(data[:, feature_index])
        threshold = np.mean(values)
        left = data[data[:, feature_index] <= threshold]
        right = data[data[:, feature_index] > threshold]
        return left, right

    def _predict_row(self, node, row):
        """
        Predict the label for a single row. Traverse the tree to determine the predicted label.

        Parameters:
        - node: Node, the current node in the tree.
        - row: 1D numpy array, a single data point.

        Returns:
        - The predicted label.
        """
        if node.is_leaf():
            return node.label
        # Categorical split
        if node.threshold is None:
            return self._predict_row(node.left, row) if row[node.feature] == 0 else self._predict_row(node.right, row)
        # Continuous split
        else:  
            return self._predict_row(node.left, row) if row[node.feature] <= node.threshold else self._predict_row(node.right, row)
        
    def predict(self, test_data):
        """
        Predict the labels for the test data. Traverse the tree to determine the predicted labels.

        Parameters:
        - test_data: 2D numpy array, the test dataset.

        Returns:
        - A 1D numpy array containing the predicted labels
        """
        if len(test_data.shape) == 0:
            return np.array([]) # return an empty array
        return np.array([self._predict_row(self.tree, row) for row in test_data])
    
    def loss(self, data):
        """
        Calculate the loss for the data.
        """
        preds = self.predict(data[:, :-1])
        true_labels = data[:, -1]
        return np.sum(preds != true_labels) / len(true_labels)
    
    def accuracy(self, data):
        """
        Calculate the accuracy for the data.
        """
        return 1 - self.loss(data)
    
    def visualize(self):
        """
        Visualize the decision tree.
        """
        if self.tree is None:
            print("Empty tree.")
        else:
            print("--- START PRINT TREE ---")
            self._visualize_tree(self.tree)
            print("--- END PRINT TREE ---")

    def _visualize_tree(self, node, depth=0):
        """
        Recursively visualize the decision tree.
        """
        indent = "  " * depth
        if node.is_leaf():
            print(f"{indent}Predict -> {node.label}")
        else:
            if node.threshold is None:
                print(f"{indent}Split attribute = {node.feature}; categorical")
            else:
                print(f"{indent}Split attribute = {node.feature}; threshold = {node.threshold:.3f}")
            print(f"{indent}Left:")
            self._visualize_tree(node.left, depth + 1)
            print(f"{indent}Right:")
            self._visualize_tree(node.right, depth + 1)

## Unit Tests

In [81]:
# Tests for splitting
def test_splitting():
    # Check if the splitted sizes are correct
    data = np.array([[i] for i in range(101)])
    train_data, test_data = train_test_split(data, test_size=0.3, random_state=42)
    assert train_data.shape[0] == 71, "Train set size is incorrect."
    assert test_data.shape[0] == 30, "Test set size is incorrect."
    # Check if the function works well for empty data
    data = np.array([]).reshape(0, 2)
    train_data, test_data = train_test_split(data, test_size=0.4, random_state=42)
    assert train_data.shape[0] == 0, "Train set should be empty."
    assert test_data.shape[0] == 0, "Test set should be empty."
    print("Splitting tests passed.")

# Tests for Node
def test_Node():
    # Check if the node is a leaf node
    leaf_node = Node(label=1)
    assert leaf_node.is_leaf(), "Leaf node should be a leaf."
    # Check if the node is a decision node
    decision_node = Node(left="LeftNode", right="RightNode", feature=2, threshold=0.5)
    assert not decision_node.is_leaf(), "Decision node should not be a leaf."
    print("Node tests passed.")

# Tests for _find_best_split
def test_find_best_split():
    data = np.array([
        [1.0, 2.5, 0],
        [2.0, 3.5, 1],
        [1.5, 2.0, 0]
    ])
    cart = CART()
    best_split = cart._find_best_split(data)
    # Check a valid split is found and the split is correct
    # Should not split on the target but split on one of the continuous features
    assert best_split is not None, "Best split should not be None."
    assert best_split["type"] == "continuous", "Best split should be continuous."
    assert best_split["feature"] != 2, "Best split should not be on the target."
    data = np.array([
        [1.0, 2.5, 0],
        [1.0, 2.5, 0],
        [1.0, 2.5, 0]
    ])
    cart = CART()
    best_split = cart._find_best_split(data)
    # Check that no split should be found if all features are the same
    assert best_split is None, "Best split should be None."
    print("Find best split tests passed.")

# Tests for Gini impurity calculations
def test_gini():
    data = np.array([
        [1.0, 2.5, 0],
        [2.0, 3.5, 1]
    ])
    cart = CART()
    gini = cart._gini_for_node(data) # Expected = 0.5
    assert abs(gini - 0.5) < 1e-6, "Gini impurity for node is incorrect."
    left = data[:1]
    right = data[1:]
    gini = cart._gini_for_split(data, left, right) # Expected = 0
    assert abs(gini - 0) < 1e-6, "Gini impurity for split is incorrect."
    print("Gini tests passed.")

# Tests for splitting continuous features and categorical features
def test_split_features():
    data = np.array([
        [1.0, 2.5, 0],
        [2.0, 3.5, 1],
        [1.0, 2.0, 0],
        [2.0, 4.5, 1]
    ])
    cart = CART()
    left, right = cart._split_continuous(data, 1, 3.0)
    # Check if the split is correct for a continuous feature
    assert len(left) == 2, "Left split size is incorrect."
    assert len(right) == 2, "Right split size is incorrect."
    left, right = cart._split_categorical(data, 2)
    # Check if the split is correct for a categorical feature
    assert len(left) == 2, "Left split size is incorrect."
    assert len(right) == 2, "Right split size is incorrect."
    print("Split features tests passed.")

# Tests for fit and _build_tree
def test_fit():
    data = np.array([
        [1, 2, 0],
        [3, 4, 1],
        [1, 2, 0],
        [3, 4, 1]
    ])
    cart = CART(max_depth=2, min_samples_split=2)
    cart.fit(data)
    # Check if the tree is built
    assert cart.tree is not None, "Tree should not be None after fitting."
    assert cart.tree.feature is not None, "Tree root should have a splitting feature."
    data = np.empty((0, 3))
    cart.fit(data)
    # Check if the label is 0
    assert cart.tree.label == 0, "Incorrect label for empty data."
    print("Fit tests passed.")

# Tests for predict
def test_predict():
    # Check if a single row prediction is correct
    tree = Node(left=Node(label=1), right=Node(label=0), feature=0, threshold=1.5)
    cart = CART()
    row = np.array([1.0, 2.0])  # Expected: left -> 1
    pred = cart._predict_row(tree, row)
    assert pred == 1, "Prediction for single row is incorrect."
    # Check predictions for a dataset
    cart.tree = tree
    data = np.array([
        [1.0, 2.0],
        [2.0, 3.0]
    ])
    preds = cart.predict(data)
    assert np.array_equal(preds, [1, 0]), "Batch predictions are incorrect."
    print("Predict tests passed.")

# Tests for loss and accuracy
def test_loss_acc():
    tree = Node(left=Node(label=1), right=Node(label=0), feature=0, threshold=1.5)
    cart = CART()
    cart.tree = tree
    data = np.array([
        [1.0, 2.0, 1],
        [2.0, 3.0, 0],
        [0.5, 1.0, 1],
        [3.0, 4.0, 0]
    ])
    # Check if loss = 0 and accuracy = 1
    assert cart.loss(data) == 0.0, "Loss calculation is incorrect."
    assert cart.accuracy(data) == 1.0, "Accuracy calculation is incorrect."
    print("Loss and accuracy tests passed.")

In [82]:
test_splitting()
test_Node()
test_find_best_split()
test_gini()
test_split_features()
test_fit()
test_predict()
test_loss_acc()

Splitting tests passed.
Node tests passed.
Find best split tests passed.
Gini tests passed.
Split features tests passed.
Fit tests passed.
Predict tests passed.
Loss and accuracy tests passed.


## Main

In [83]:
data = np.loadtxt("heart.csv", delimiter=",", skiprows=1)
train_data, test_data = train_test_split(data, test_size=0.4, random_state=2060)

model = CART(max_depth=10, min_samples_split=10)
model.fit(train_data)
train_accuracy = model.accuracy(train_data)
test_accuracy = model.accuracy(test_data)
print(f"Training accuracy: {train_accuracy:.3f}")
print(f"Testing accuracy: {test_accuracy:.3f}")
model.visualize()

Training accuracy: 0.934
Testing accuracy: 0.793
--- START PRINT TREE ---
Split attribute = 11; threshold = 0.500
Left:
  Split attribute = 12; threshold = 2.500
  Left:
    Split attribute = 9; threshold = 2.700
    Left:
      Split attribute = 7; threshold = 92.500
      Left:
        Predict -> 0.0
      Right:
        Split attribute = 3; threshold = 158.000
        Left:
          Split attribute = 9; threshold = 1.700
          Left:
            Predict -> 1.0
          Right:
            Predict -> 1
        Right:
          Predict -> 1
    Right:
      Predict -> 0
  Right:
    Split attribute = 7; threshold = 143.500
    Left:
      Split attribute = 9; threshold = 0.250
      Left:
        Predict -> 1.0
      Right:
        Predict -> 0.0
    Right:
      Split attribute = 2; threshold = 0.500
      Left:
        Predict -> 0
      Right:
        Split attribute = 0; threshold = 39.000
        Left:
          Predict -> 0.0
        Right:
          Split attribute = 0; thr

## Scikit-Learn Verification

Below the CART model using scikit-learning is written by our group members, Yixun Kang and David Ning. The model use a pipeline structure, including the preprocessor and the algorithm. In the preprocessor, we used One-Hot encoder for two categorical features, `sex` and `exange` and then applied `StandardScaler()` to all features. We used K-Fold as the cross validation and 6/2/2 for train/test/validation. In the parameter grid, we tuned for `min_samples_leaf` and `max_leaf_nodes` and keep the `max_depth` and `min_samples_split` the same as in main.

In [86]:
import pandas as pd
import numpy as np
from sklearn.compose import ColumnTransformer 
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.model_selection import KFold, train_test_split, GridSearchCV, ParameterGrid
from sklearn.pipeline import Pipeline, make_pipeline
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score
from sklearn.tree import DecisionTreeClassifier

In [87]:
data = pd.read_csv("heart.csv")
X = data.drop(columns=["target"])
y = data["target"]

In [88]:
# Preprocessor
cat_ftrs = ["sex", "exang"]
num_ftrs = ["age", "trestbps", "chol", "thalach", "oldpeak", "cp", "fbs", "restecg", "slope", "ca", "thal"]

categorical_transformer = Pipeline(steps=[
    ("onehot", OneHotEncoder(sparse_output=False, handle_unknown="ignore")),
    ("scaler", StandardScaler())
])
numerical_transformer = Pipeline(steps=[
    ("scaler", StandardScaler())
])
preprocessor = ColumnTransformer([
    ("cat", categorical_transformer, cat_ftrs),
    ("num", numerical_transformer, num_ftrs)
])

In [89]:
# ML pipeline
def MLpipe_kfold(X, y, random_states, preprocessor, ML_algo, param_grid, n_splits=5):
    test_scores = []
    best_models = []
    for i, random_state in enumerate(random_states):
        X_other, X_test, y_other, y_test = train_test_split(X, y, test_size=0.2, random_state=random_state)
        kf = KFold(n_splits=n_splits, shuffle=True, random_state=random_state)
        pipe = make_pipeline(preprocessor, ML_algo)
        grid = GridSearchCV(pipe, param_grid=param_grid, cv=kf, n_jobs=-1, return_train_score=True, 
                            verbose=True, scoring="accuracy")
        grid.fit(X_other, y_other)
        results = pd.DataFrame(grid.cv_results_)
        best_models.append(grid)
        y_test_pred = best_models[-1].predict(X_test)
        test_accuracy = accuracy_score(y_test, y_test_pred)
        test_scores.append(test_accuracy)
    return test_scores, best_models

In [90]:
random_states = [2060]
ML_algo = DecisionTreeClassifier(random_state=2060, criterion="gini", max_depth=10, min_samples_split=10)
param_grid = {
    "decisiontreeclassifier__min_samples_leaf": [1, 2, 5, 10],
    "decisiontreeclassifier__max_leaf_nodes": [5, 10]
}
test_scores, best_models = MLpipe_kfold(X, y, random_states, preprocessor, ML_algo, param_grid, n_splits=10)
print("Average Testing Accuracy:", np.mean(test_scores))

Fitting 10 folds for each of 8 candidates, totalling 80 fits
Average Testing Accuracy: 0.8360655737704918


The scikit-learn average testing accuracy is 83.6% and the implementation testing accuracy is 79.3%. The scikit-learn model perform slight better than the implementation. Also, in the implemented model, we observed some levels of overfitting. We will consider using cross-validation and pruning method later to optimize the model.