# Lab 10 -- Decision Trees for Regression

Compared to last week, this is a very simple lab <span style="font-size:20pt;">😃</span> You'll have fun programming!

You will implement the **Classification and Regression Tree (CART)** algorithm from scratch.

The lab is broken down into the following pieces:

* Regression Criterion
* Creating Splits
* Buiding a Tree
* Making a prediction

## Exercise 1 -- Download and load the dataset

We will be using the usual Boston Housing dataset, which is available to download from ECLASS

* Download the file
* Read it and separate the target variable from the features.
* Make a 80/10/10 train/validation/test split

In [145]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error

The target variable will be as usual `MEDV`. Use the rest as features.

In [146]:
# Step 1: Load the dataset
data = pd.read_csv('bostonhousing.csv')

# Step 2: Separate the target variable from the features
X = data.drop(columns='medv')
y = data['medv']

# Step 3: Make an 80/10/10 train/validation/test split
# First, split into 80% train and 20% temp (validation + test)
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.2, random_state=42)

# Now split the 20% temp into 50% validation and 50% test (i.e., 10% of the original data each)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

# Display the shapes of the resulting datasets
print(f"Train set: {X_train.shape}, {y_train.shape}")
print(f"Validation set: {X_val.shape}, {y_val.shape}")
print(f"Test set: {X_test.shape}, {y_test.shape}")

Train set: (404, 13), (404,)
Validation set: (51, 13), (51,)
Test set: (51, 13), (51,)


## Exercise 2 -- Optimization Criterion

For regression, a simple criterion to optimize is to minimize the sum of squared errors for a given region. This is, for all datapoints in a region with size, we minimize:

$$\sum_{i=1}^N(y_i - \hat{y})^2$$

where $N$ is the number of datapoits in the region and $\hat{y}$ is the mean value of the region for the target variable. 

Implement such a function using the description below.

Please, don't use an existing implementation, refer to the [book](https://www.statlearning.com/s/ISLRSeventhPrinting.pdf), and if you need help, ask questions!

In [147]:
def regression_criterion(region):
    """
    Implements the sum of squared error criterion in a region
    
    Parameters
    ----------
    region : ndarray
        Array of shape (N,) containing the values of the target values 
        for N datapoints in the training set.
    
    Returns
    -------
    float
        The sum of squared error
        
    Note
    ----
    The error for an empty region should be infinity (use: float("inf"))
    This avoids creating empty regions
    """
    if len(region) == 0:
        return float("inf")
    mean_value = np.mean(region)
    sum_squared_error = np.sum((region - mean_value) ** 2)
    return sum_squared_error

In [148]:
# Test the code with the provided examples
rng = np.random.default_rng(0)
print(regression_criterion(rng.random(size=40)))  # Test with random values
print(regression_criterion(np.ones(10)))          # Test with ones
print(regression_criterion(np.zeros(10)))         # Test with zeros
print(regression_criterion(np.array([])))         # Test with empty array

3.6200679838629544
0.0
0.0
inf


## Exercise 3 -- Make a split

In [149]:
def split_region(region, feature_index, tau):
    """
    Given a region, splits it based on the feature indicated by
    `feature_index`, the region will be split in two, where
    one side will contain all points with the feature with values 
    lower than `tau`, and the other split will contain the 
    remaining datapoints.
    
    Parameters
    ----------
    region : array of size (n_samples, n_features)
        a partition of the dataset (or the full dataset) to be split
    feature_index : int
        the index of the feature (column of the region array) used to make this partition
    tau : float
        The threshold used to make this partition
        
    Return
    ------
    left_partition : array
        indices of the datapoints in `region` where feature < `tau`
    right_partition : array
        indices of the datapoints in `region` where feature >= `tau` 
    """
    left_partition = region[region[:, feature_index] < tau]
    right_partition = region[region[:, feature_index] >= tau]
    return left_partition, right_partition

# Test the code
# Example dataset (X_train)
X_train = np.array([
    [5, 2, 15],
    [7, 3, 12],
    [4, 4, 10],
    [6, 1, 20],
    [8, 2, 25],
    [9, 3, 30]
])


In [150]:
# Split based on feature at index 2 with threshold 10
l, r = split_region(X_train, 2, 10)
print(l.shape, r.shape)  # Should print the shapes of the left and right partitions
print(l)
print(r)

(0, 3) (6, 3)
[]
[[ 5  2 15]
 [ 7  3 12]
 [ 4  4 10]
 [ 6  1 20]
 [ 8  2 25]
 [ 9  3 30]]


## Exercise 4 -- Find the best split

The strategy is quite simple (as well as inefficient), but it helps to reinforce the concepts.
We are going to use a greedy, exhaustive algorithm to select splits, selecting the `feature_index` and the `tau` that minimizes the Regression Criterion

In [151]:
def get_split(X, y):
    """
    Given a dataset (full or partial), splits it on the feature that minimizes the sum of squared error
    
    Parameters
    ----------
    X : array (n_samples, n_features)
        features 
    y : array (n_samples, )
        labels
    
    Returns
    -------
    decision : dictionary
        keys are:
        * 'feature_index' -> an integer that indicates the feature (column) of `X` on which the data is split
        * 'tau' -> the threshold used to make the split
        * 'left_region' -> array of indices where the `feature_index`th feature of X is lower than `tau`
        * 'right_region' -> indices not in `low_region`
    """
    best_feature_index = None
    best_tau = None
    best_sse = float("inf")
    best_left_indices = None
    best_right_indices = None
    
    n_samples, n_features = X.shape
    
    for feature_index in range(n_features):
        unique_values = np.unique(X[:, feature_index])
        for tau in unique_values:
            left_indices = np.where(X[:, feature_index] < tau)[0]
            right_indices = np.where(X[:, feature_index] >= tau)[0]
            
            if len(left_indices) == 0 or len(right_indices) == 0:
                continue
            
            left_sse = regression_criterion(y[left_indices])
            right_sse = regression_criterion(y[right_indices])
            total_sse = left_sse + right_sse
            
            if total_sse < best_sse:
                best_sse = total_sse
                best_feature_index = feature_index
                best_tau = tau
                best_left_indices = left_indices
                best_right_indices = right_indices
    
    decision = {
        'feature_index': best_feature_index,
        'tau': best_tau,
        'left_region': best_left_indices,
        'right_region': best_right_indices
    }
    
    return decision

# Define the regression criterion function
def regression_criterion(region):
    """
    Implements the sum of squared error criterion in a region
    
    Parameters
    ----------
    region : ndarray
        Array of shape (N,) containing the values of the target values 
        for N datapoints in the training set.
    
    Returns
    -------
    float
        The sum of squared error
        
    Note
    ----
    The error for an empty region should be infinity (use: float("inf"))
    This avoids creating empty regions
    """
    if len(region) == 0:
        return float("inf")
    mean_value = np.mean(region)
    sum_squared_error = np.sum((region - mean_value) ** 2)
    return sum_squared_error

# Test the code with a sample dataset
X_train = np.array([
    [5, 2, 15],
    [7, 3, 12],
    [4, 4, 10],
    [6, 1, 20],
    [8, 2, 25],
    [9, 3, 30]
])
y_train = np.array([1, 2, 1, 2, 3, 3])

split_decision = get_split(X_train, y_train)
print(split_decision)


{'feature_index': 0, 'tau': 6, 'left_region': array([0, 2], dtype=int64), 'right_region': array([1, 3, 4, 5], dtype=int64)}


In [152]:
get_split(X_train[:15, :], y_train[:15])

{'feature_index': 0,
 'tau': 6,
 'left_region': array([0, 2], dtype=int64),
 'right_region': array([1, 3, 4, 5], dtype=int64)}

In [153]:
def recursive_growth(node, max_depth, min_size, depth, X, y):
    """
    Recursively grow the tree by finding the best splits using the get_split function.
    
    Parameters
    ----------
    node : TreeNode
        The current node to be split
    max_depth : int
        The maximum depth of the tree
    min_size : int
        The minimum number of samples required to make a split
    depth : int
        The current depth of the tree
    X : array
        The feature matrix
    y : array
        The target values
    """
    left_indices, right_indices = node['left_region'], node['right_region']
    left_X, right_X = X[left_indices], X[right_indices]
    left_y, right_y = y[left_indices], y[right_indices]

    # Check for stopping conditions
    if len(left_y) == 0 or len(right_y) == 0:
        node['left'] = node['right'] = np.mean(y)
        return
    if depth >= max_depth:
        node['left'] = np.mean(left_y)
        node['right'] = np.mean(right_y)
        return
    if len(left_y) <= min_size:
        node['left'] = np.mean(left_y)
    else:
        left_split = get_split(left_X, left_y)
        node['left'] = left_split
        recursive_growth(left_split, max_depth, min_size, depth + 1, left_X, left_y)
    
    if len(right_y) <= min_size:
        node['right'] = np.mean(right_y)
    else:
        right_split = get_split(right_X, right_y)
        node['right'] = right_split
        recursive_growth(right_split, max_depth, min_size, depth + 1, right_X, right_y)


In [154]:
k = 20
test_root = get_split(X_train[:k, :], y_train[:k])
recursive_growth(test_root, 5, 3, 1, X_train[:k, :], y_train[:k])

## Exercise 5 -- Recursive Splitting

The test above is an example on how to find the root node of our decision tree. The algorithm now is a greedy search until we reach a stop criterion. To find the actual root node of our decision tree, you must provide the whole training set, not just a slice of 15 rows as the test above.

The trivial stopping criterion is to recursively grow the tree until each split contains a single point (perfect node purity). If we go that far, it normally means we are overfitting.

You will implement these criteria to stop the growth:

* A node is a leaf if:
    * It has less than `min_samples` datapoints
    * It is at the `max_depth` level from the root (each split creates a new level)
    * The criterion is `0`



In [155]:
def recursive_growth(node, min_samples, max_depth, current_depth, X, y):
    """
    Recursively grows a decision tree.
    
    Parameters
    ----------
    node : dictionary
        If the node is terminal, it contains only the "value" key, which determines the value to be used as a prediction.
        If the node is not terminal, the dictionary has the structure defined by `get_split`
    min_samples : int
        parameter for stopping criterion if a node has <= min_samples datapoints
    max_depth : int
        parameter for stopping criterion if a node belongs to this depth
    current_depth : int
        current distance from the root
    X : array (n_samples, n_features)
        features (full dataset)
    y : array (n_samples, )
        labels (full dataset)
    
    Notes
    -----
    To create a terminal node, a dictionary is created with a single "value" key, with a value that
    is the mean of the target variable
    
    'left' and 'right' keys are added to non-terminal nodes, which contain (possibly terminal) nodes 
    from higher levels of the tree:
    'left' corresponds to the 'left_region' key, and 'right' to the 'right_region' key
    """
    left_indices, right_indices = node['left_region'], node['right_region']
    left_X, right_X = X[left_indices], X[right_indices]
    left_y, right_y = y[left_indices], y[right_indices]

    # Check for stopping conditions
    if len(left_y) <= min_samples or current_depth >= max_depth or np.var(left_y) == 0:
        node['left'] = {'value': np.mean(left_y)}
    else:
        left_split = get_split(left_X, left_y)
        node['left'] = left_split
        recursive_growth(left_split, min_samples, max_depth, current_depth + 1, left_X, left_y)
    
    if len(right_y) <= min_samples or current_depth >= max_depth or np.var(right_y) == 0:
        node['right'] = {'value': np.mean(right_y)}
    else:
        right_split = get_split(right_X, right_y)
        node['right'] = right_split
        recursive_growth(right_split, min_samples, max_depth, current_depth + 1, right_X, right_y)

# Define the get_split function from the previous steps
def get_split(X, y):
    """
    Given a dataset (full or partial), splits it on the feature that minimizes the sum of squared error
    
    Parameters
    ----------
    X : array (n_samples, n_features)
        features 
    y : array (n_samples, )
        labels
    
    Returns
    -------
    decision : dictionary
        keys are:
        * 'feature_index' -> an integer that indicates the feature (column) of `X` on which the data is split
        * 'tau' -> the threshold used to make the split
        * 'left_region' -> array of indices where the `feature_index`th feature of X is lower than `tau`
        * 'right_region' -> indices not in `low_region`
    """
    best_feature_index = None
    best_tau = None
    best_sse = float("inf")
    best_left_indices = None
    best_right_indices = None
    
    n_samples, n_features = X.shape
    
    for feature_index in range(n_features):
        unique_values = np.unique(X[:, feature_index])
        for tau in unique_values:
            left_indices = np.where(X[:, feature_index] < tau)[0]
            right_indices = np.where(X[:, feature_index] >= tau)[0]
            
            if len(left_indices) == 0 or len(right_indices) == 0:
                continue
            
            left_sse = regression_criterion(y[left_indices])
            right_sse = regression_criterion(y[right_indices])
            total_sse = left_sse + right_sse
            
            if total_sse < best_sse:
                best_sse = total_sse
                best_feature_index = feature_index
                best_tau = tau
                best_left_indices = left_indices
                best_right_indices = right_indices
    
    decision = {
        'feature_index': best_feature_index,
        'tau': best_tau,
        'left_region': best_left_indices,
        'right_region': best_right_indices
    }
    
    return decision

# Define the regression criterion function
def regression_criterion(region):
    """
    Implements the sum of squared error criterion in a region
    
    Parameters
    ----------
    region : ndarray
        Array of shape (N,) containing the values of the target values 
        for N datapoints in the training set.
    
    Returns
    -------
    float
        The sum of squared error
        
    Note
    ----
    The error for an empty region should be infinity (use: float("inf"))
    This avoids creating empty regions
    """
    if len(region) == 0:
        return float("inf")
    mean_value = np.mean(region)
    sum_squared_error = np.sum((region - mean_value) ** 2)
    return sum_squared_error

In [156]:
# Example usage:
min_samples = 3
max_depth = 5
current_depth = 1

# Assume you have your training data in X_train and y_train
# You should provide the full training dataset, not just a slice
X_train = np.array([
    [5, 2, 15],
    [7, 3, 12],
    [4, 4, 10],
    [6, 1, 20],
    [8, 2, 25],
    [9, 3, 30],
    [3, 3, 5],
    [1, 1, 1],
    [2, 2, 2],
    [10, 5, 50],
    [11, 6, 55],
    [12, 7, 60],
    [13, 8, 65],
    [14, 9, 70],
    [15, 10, 75]
])
y_train = np.array([1, 2, 1, 2, 3, 3, 1, 0, 0, 5, 5, 6, 6, 7, 7])

# Get the initial split
root = get_split(X_train, y_train)

# Grow the tree
recursive_growth(root, min_samples, max_depth, current_depth, X_train, y_train)

Below we provide code to visualise the generated tree!

In [157]:
# Print the tree structure
def print_tree(node, depth=0):
    if 'value' in node:
        print('.  ' * depth + f"[{node['value']}]")
    else:
        print('.  ' * depth + f"X_{node['feature_index']} < {node['tau']}")
        print_tree(node['left'], depth + 1)
        print_tree(node['right'], depth + 1)

print_tree(root, depth=0)


X_0 < 10
.  X_0 < 6
.  .  X_0 < 3
.  .  .  [0.0]
.  .  .  [1.0]
.  .  X_0 < 8
.  .  .  [2.0]
.  .  .  [3.0]
.  X_0 < 12
.  .  [5.0]
.  .  X_0 < 14
.  .  .  [6.0]
.  .  .  [7.0]


In [158]:
print_tree(root, depth=0)

X_0 < 10
.  X_0 < 6
.  .  X_0 < 3
.  .  .  [0.0]
.  .  .  [1.0]
.  .  X_0 < 8
.  .  .  [2.0]
.  .  .  [3.0]
.  X_0 < 12
.  .  [5.0]
.  .  X_0 < 14
.  .  .  [6.0]
.  .  .  [7.0]


# Exercise 6 -- Make a Prediction
Use the a node to predict the class of a compatible dataset

In [165]:
def predict_sample(node, sample):
    """
    Makes a prediction based on the decision tree defined by `node`
    
    Parameters
    ----------
    node : dictionary
        A node created by one of the methods above
    sample : array of size (n_features,)
        A sample datapoint
    
    Returns
    -------
    float
        The predicted value for the sample
    """
    if 'value' in node:
        return node['value']
    
    feature_value = sample[node['feature_index']]
    if isinstance(feature_value, (int, float)):
        if feature_value < node['tau']:
            return predict_sample(node['left'], sample)
        else:
            return predict_sample(node['right'], sample)
    else:
        # Handle non-numeric feature values (e.g., categorical)
        if feature_value == node['tau']:
            return predict_sample(node['left'], sample)
        else:
            return predict_sample(node['right'], sample)




def predict(node, X):
    """
    Makes predictions based on the decision tree defined by `node`
    
    Parameters
    ----------
    node : dictionary
        A node created by one of the methods above
    X : array of size (n_samples, n_features)
        n_samples predictions will be made
    
    Returns
    -------
    array
        An array of predicted values for the dataset
    """
    return np.array([predict_sample(node, sample) for sample in X])



Now use the functions defined above to calculate the RMSE of the validation set. 
* Try first with `min_samples=20` and `max_depth=6` (for this values you should get a validation RMSE of ~8.8)

Then, experiment with different values for the stopping criteria.

In [166]:
# Define the min_samples and max_depth
min_samples = 20
max_depth = 6

# Get the initial split for the root node using the full training set
root = get_split(X_train, y_train)

# Grow the tree
recursive_growth(root, min_samples, max_depth, current_depth=1, X=X_train, y=y_train)

# Make predictions on the validation set
y_val_pred = predict(root, X_val)

In [None]:
# Calculate RMSE
rmse = mean_squared_error(y_val, y_val_pred)
print(f'Validation RMSE: {rmse:.2f}')

In [None]:
# Experiment with different values
experiments = [
    (10, 4),
    (20, 6),
    (30, 8)
]

for min_samples, max_depth in experiments:
    root = get_split(X_train, y_train)
    recursive_growth(root, min_samples, max_depth, current_depth=1, X=X_train, y=y_train)
    y_val_pred = predict(root, X_val)
    rmse = mean_squared_error(y_val, y_val_pred)
    print(f'Validation RMSE (min_samples={min_samples}, max_depth={max_depth}): {rmse:.2f}')