# Decision tree click-through prediction project.

In [1]:
import matplotlib.pyplot as plt
import numpy as np

In [2]:
# First metric for measuring a split: weighted Gini impurity.
def gini_impurity_weighted(labels, class_weight):
    # Count the occurrences of each label.
    classes, counts = np.unique(labels, return_counts=True)

    weighted_counts = np.array([
        counts[c] * class_weight.get(classes[c], 1.0) for c in range(len(classes))
    ])
    total = weighted_counts.sum()

    # When the set is empty, it is also pure.
    if total == 0:
        return 0.0

    fractions = weighted_counts / total
    return 1.0 - np.sum(fractions ** 2)

In [3]:
# Second metric for measuring a split: weighted entropy.
def entropy_weighted(labels):
    classes, counts = np.unique(labels, return_counts=True)

    weighted_counts = np.array([
        counts[i] * class_weight.get(classes[i], 1.0) for i in range(len(classes))
    ])
    total = weighted_counts.sum()

    if total == 0:
        return 0.0

    fractions = weighted_counts / total
    # The fractions are probabilities, so they have to be >= 0.
    fractions = fractions[fractions > 0]
    return - np.sum(fractions * np.log2(fractions))

In [4]:
# Now we combine the two metrics into one function to calculate the weighted impurity.
criterion_function = {'gini': gini_impurity_weighted, 'entropy': entropy_weighted}

def weighted_impurity(groups, criterion='gini', class_weight=None):
    """
    Calculate weighted impurity of children after a split.
    @param groups: list of children, and a child consists a list of class labels.
    @param criterion: metric to measure the quality of a split, 'gini' for weighted Gini impurity or 'entropy' for weighted information gain.
    @return: float, weighted impurity.
    """
    total_weight = 0.0
    group_weights = []

    for group in groups:
        w = sum(class_weight.get(y, 1.0) for y in group)
        group_weights.append(w)
        total_weight += w

    weighted_sum = 0.0
    for group, w in zip(groups, group_weights):
        if w > 0:
            weighted_sum += (w / total_weight) * criterion_function[criterion](group, class_weight)
    return weighted_sum

In [5]:
def split_node(X, y, index, value):
    """
    X: matrice delle feature (array NumPy).
    y: vettore delle etichette.
    index: indice delle feature su cui fare lo split.
    value: soglia numerica o categoria.
    Restituisce il figlio sinistro e destro in coppia come una lista [X_child, y_child].
    """
    x_index = X[:, index]
    # If this feature is numerical.
    if x_index.dtype.kind in ['i', 'f']:
        mask = x_index >= value
    # If this feature is categorical.
    else:
        mask = x_index == value
    # Split into left and right child.
    left = [X[~mask, :], y[~mask]]
    right = [X[mask, :], y[mask]]
    return left, right

In [6]:
# Greedy search function.
def get_best_split(X, y, criterion, class_weight):
    best_index, best_value, best_score, children = None, None, 1, None
    for index in range(len(X[0])):
        for value in np.sort(np.unique(X[:, index])):
            groups = split_node(X, y, index, value)
            impurity = weighted_impurity([groups[0][1], groups[1][1]], criterion, class_weight)
            if impurity < best_score:
                best_index, best_value, best_score, children = index, value, impurity, groups
    return {'index': best_index, 'value': best_value, 'children': children, 'labels': y}

In [7]:
# When a stopping criterion is met, the process stops at a node, and the major label is assigned to this leaf node.
def get_leaf(labels, class_weight):
    # Restituisce la classe "maggioritaria", pesata; è coerente con weighted_impurity.
    counts = np.bincount(labels)
    weighted = [
        counts[i] * class_weight.get(i, 1.0) for i in range(len(counts))
    ]
    return np.argmax(weighted)

In [8]:
# Recursive function:
# 1) it assigns a leaf node if one of two child nodes is empty;
# 2) it assigns a leaf node if the current branch depth exceeds the maximum depth allowed;
# 3) it assigns a leaf node if the node does not contain sufficient samples required for a further split;
# 4) otherwise, it proceeds with a further split with the optimal splitting point.
def split(node, max_depth, min_samples_split, depth, criterion, class_weight):
    """
    node: dizionario che rappresenta un nodo dell'albero;
    max_depth: profondità massima dell'albero;
    min_samples_split: numero minimo di campioni per effettuare un ulteriore split;
    depth: profondità attuale del nodo;
    criterion: funzione per calcolare la bontà dello split (Gini o entropia).
    """
    left, right = node['children']
    del (node['children'])
    if left[1].size == 0 and right[1].size == 0:
        # Entrambi vuoti --> nodo corrente diventa foglia.
        node['leaf'] = get_leaf(node['labels'], class_weight) # labels originali del nodo.
        return
    elif left[1].size == 0:
        node['right'] = get_leaf(right[1], class_weight)
        return
    elif right[1].size == 0:
        node['left'] = get_leaf(left[1], class_weight)
        return
    # Check if the current depth exceeds the maximal depth.
    if depth >= max_depth:
        node['left'], node['right'] = get_leaf(left[1], class_weight), get_leaf(right[1], class_weight)
        return
    # Check if the left child has enough samples.
    if left[1].size <= min_samples_split:
        node['left'] = get_leaf(left[1], class_weight)
    else:
        # It has enough samples, we further split it.
        result = get_best_split(left[0], left[1], criterion, class_weight)
        result_left, result_right = result['children']

        if result_left[1].size == 0:
            node['left'] = get_leaf(result_right[1], class_weight)
        elif result_right[1].size == 0:
            node['left'] = get_leaf(result_left[1], class_weight)
        else:
            node['left'] = result
            split(node['left'], max_depth, min_samples_split, depth+1, criterion, class_weight)
    # Check if the right child has enough samples.
    if right[1].size <= min_samples_split:
        node['right'] = get_leaf(right[1], class_weight)
    else:
        # It has enough samples, we further split it.
        result = get_best_split(right[0], right[1], criterion)
        result_left, result_right = result['children']

        if result_left[1].size == 0:
            node['right'] = get_leaf(result_right[1], class_weight)
        elif result_right[1].size == 0:
            node['right'] = get_leaf(result_left[1], class_weight)
        else:
            node['right'] = result
            split(node['right'], max_depth, min_samples_split, depth+1, criterion)

In [9]:
# CART (classification and regression tree) algorithm entry point.
def train_tree(X_train, y_train, max_depth, min_samples_split, criterion, class_weight):
    X = np.array(X_train)
    y = np.array(y_train)
    root = get_best_split(X, y, criterion, class_weight)
    split(root, max_depth, min_samples_split, 1, criterion, class_weight)
    return root

In [10]:
# Function that displays the tree.
CONDITION = {'numerical': {'yes': '>=', 'no': '<'}, 'categorical': {'yes': 'is', 'no': 'is not'}}

def visualize_tree(node, depth=0):
    if isinstance(node, dict): # Se node è un dizionario, sono in un nodo interno.
        if node['value'].dtype.kind in ['i', 'f']:
            condition = CONDITION['numerical']
        else:
            condition = CONDITION['categorical']
        print('{}|- X{} {} {}'.format(depth * ' ', node['index'] + 1, condition['no'], node['value']))
        if 'left' in node:
            visualize_tree(node['left'], depth+1)
        print('{}|- X{} {} {}'.format(depth * ' ', node['index'] + 1, condition['yes'], node['value']))
        if 'right' in node:
            visualize_tree(node['right'], depth+1)
    else:
        print(f"{depth * ' '}[{node}]") # Altrimenti sono in una foglia.

In [11]:
# First test with a toy dataset.
X_train = [['tech', 'professional'],
          ['fashion', 'student'],
          ['fashion', 'professional'],
          ['sports', 'student'],
          ['tech', 'student'],
          ['tech', 'retired'],
          ['sports', 'professional']]
y_train = [1, 0, 0, 0, 1, 0, 1]
tree = train_tree(X_train, y_train, 2, 2, 'gini', {0: 1.0, 1: 5.0})
visualize_tree(tree)

|- X1 is not fashion
 |- X2 is not retired
  [1]
 |- X2 is retired
  [0]
|- X1 is fashion
 [0]


In [12]:
# Second test with a toy dataset.
X_train_n = [[6, 7],
            [2, 4],
            [7, 2],
            [3, 6],
            [4, 7],
            [5, 2],
            [1, 6],
            [2, 0],
            [6, 3],
            [4, 1]]
y_train_n = [0, 0, 0, 0, 0, 1, 1, 1, 1, 1]
tree = train_tree(X_train_n, y_train_n, 2, 2, 'gini', {0: 1.0, 1: 5.0})
visualize_tree(tree)

|- X2 < 7
 |- X1 < 7
  [1]
 |- X1 >= 7
  [0]
|- X2 >= 7
 [0]


In [13]:
# To validate the correctness of the weighted CART implementation,
# the algorithm was tested on small toy datasets.
# The resulting trees clearly show that class weights influence both the choice and the order of splits,
# prioritizing the separation of the minority class.