# **Random Forests**
Laboratory 5 of the Machine Learning class by Prof. F. Chiariotti at University of Padova during academic year 2024-2025.

In [1]:
# importing all the necessary python libraries
import numpy as np
import pandas as pd
import random as rnd
from matplotlib import pyplot as plt
from sklearn import linear_model, preprocessing
from sklearn.model_selection import train_test_split

# **Classification of Stayed/Churned Customers**

The Customer Churn table contains information on all 3,758 customers from a Telecommunications company in California in Q2 2022. We want to use this task to predict if a customer will churn (i.e. turn to another company to get a better deal) or not based on three features. It contains three features:
- **Tenure** (in Months) Number of months the customer has stayed with the company
- **Monthly Charge**: The amount charged to the customer monthly
- **Age**: Customer's age
- **Customer Status**:  is 0 if the customer has stayed with the company and 1 if the customer has churned.

In [2]:
np.random.seed(1) # setting the seed

def load_dataset(filename):
    # function that loads the csv file and splits the dataset between input and labels, while also
    # shuffling the data and shifting the label range from [0,1] to [-1,1] (-1 = stayed, +1 = churned)
    data_train = pd.read_csv(filename)
    data_train = data_train.sample(frac=1).reset_index(drop=True) # shuffling
    X = data_train.iloc[:, 0:3].values 
    Y = data_train.iloc[:, 3].values 
    Y = 2*Y-1 #
    return X,Y

# loading the dataset
X, Y = load_dataset('data/telecom_customer_churn_cleaned.csv')

We are going to differentiate (classify) between **class "1" (churned)** and **class "-1" (stayed)**

## Divide the data into training and test sets

In [3]:
# splitting the data into training and test sets
m_training = int(0.75*X.shape[0]) # computing the split
m_test =  X.shape[0] - m_training
X_training =  X[:m_training] # splitting
Y_training =  Y[:m_training]
X_test =   X[m_training:]
Y_test =  Y[m_training:]
print("# of samples in the train set:", X_training.shape[0])
print("# of samples in the test set:", X_test.shape[0])
print("# of churned users in test:", np.sum(Y_test==-1))
print("# of loyal users in test:", np.sum(Y_test==1))

# standardizing the tests by computing a transformation on the training input set
scaler = preprocessing.StandardScaler().fit(X_training)  
np.set_printoptions(suppress=True) # setting to zero floating point numbers < min_float_eps
# and appling the transformation to both training and test sets
X_training =  scaler.transform(X_training)
print ("Mean of the training input data:", X_training.mean(axis=0))
print ("Std of the training input data:",X_training.std(axis=0))
X_test =  scaler.transform(X_test)
print ("Mean of the test input data:", X_test.mean(axis=0))
print ("Std of the test input data:", X_test.std(axis=0))

# of samples in the train set: 2817
# of samples in the test set: 940
# of churned users in test: 479
# of loyal users in test: 461
Mean of the training input data: [-0.  0. -0.]
Std of the training input data: [1. 1. 1.]
Mean of the test input data: [0.0575483  0.05550169 0.0073833 ]
Std of the test input data: [0.98593187 0.97629659 1.00427583]


## 1. **Decision tree**

Now **complete** the class *Tree* and all auxiliary functions. <br>

The input parameters to pass to the *id3_training* function are:
- $X$: the matrix of input features, one row for each sample
- $Y$: the vector of labels for the input features matrix X
- $max\_depth$: the maximum depth of the tree

In [4]:
class Tree:
    # implementing a decision tree with two possible training methods:
    # id3_training or extra_training

    def __init__(self):
        # method that initializes a node with the following default values
        self.idx = -1 # index of the features used for splitting
        self.thresh = 0 # threshold value for splitting
        self.leaf = 0 # value of the leaf node
        self.left = [] # left and right splits obtained after splitting
        self.right = []

    def entropy(left, right):
        # method that computes the entropy for a split characterized by 
        # the two sets (left, right) obtained after the split
        H = 0
        tot_length = len(left) + len(right)
        # calculating, for each side, the probability of obtaining that set, and adding to
        # the Shanno Entropy with a weight proportional to the size of the set
        left_prob = len(np.where(left > 0)[0]) / len(left)
        right_prob = len(np.where(right > 0)[0]) / len(right)
        if (left_prob > 0):
            H -= len(left) * left_prob * np.log2(left_prob) / tot_length
        if (left_prob < 1):
            H -= len(left) * (1 - left_prob) * np.log2(1 - left_prob) / tot_length
        if (right_prob > 0):
            H -= len(right) * right_prob * np.log2(right_prob) / tot_length
        if (right_prob < 1):
            H -= len(right) * (1 - right_prob) * np.log2(1 - right_prob) / tot_length
        return H

    def classify(self, x):
        # method that recursively classifies the input data point x
        if (self.leaf == 0): # if the node is not a leaf
            if (x[self.idx] > self.thresh): # we compare the feature with the threshold
                return self.right.classify(x) # recur on right tree
            else:
                return self.left.classify(x) # recur on left tree
        else:
            return self.leaf # if the node is a leaf, then the leaf value is returned

    def id3_training(self, X, Y, max_depth):
        # method that trains the decision tree based on the ID3 algorithm on the
        # training set (X,Y), given an input maximum depth
        # if the node is a leaf (i.e. all nodes have the same label), the leaf is set
        # to that value and the algorithm is stopped
        if (np.max(Y) - np.min(Y) < 1e-3): 
            self.leaf = np.max(Y) 
            return
        if (max_depth < 1): # if the maximum depth is 0, the node must be a leaf
            if (len(np.where(Y > 0)) > len(Y) / 2):
                self.leaf = 1
            else:
                self.leaf = -1
            return
        best_idx = -1
        best_thresh = -1
        best_entropy = 1e9
        # iterating over the features to find the best split
        for idx in range(X.shape[1]): # for each feature
            values = X[:, idx]
            sorted_ind = np.argsort(values)
            values = np.unique(values[sorted_ind])
            # all possible thresholds are found as midpoints of uniquely sorted values
            thresh = (values[:-1] + values[1:])/2
            for j in range(len(values) - 1): # for each split
                # splitting into left and right set
                left = np.where(X[:, idx] <= thresh[j])[0]
                right = np.where(X[:, idx] > thresh[j])[0]
                if (len(left) == 0 or len(right) == 0):
                    print('error!',best_idx,thresh[j],values)
                H = Tree.entropy(Y[left], Y[right]) # calculating the entropy
                if (H < best_entropy): # and only the minimum value found is kept/saved
                    best_idx = idx
                    best_thresh = thresh[j]
                    best_entropy = H
        if (best_idx == -1): # if no optimal split was found in either directions, all points are identical
            self.leaf = np.sign(np.sum(Y))
            if (self.leaf == 0):
                self.leaf = 1
            return
        # splitting into left and right set based on the best split
        left_samples = np.where(X[:, best_idx] <= best_thresh)[0]
        right_samples = np.where(X[:, best_idx] > best_thresh)[0]
        self.idx = best_idx
        self.thresh = best_thresh
        # creating left and right subtrees and recursively training them with the corresponding data
        self.left = Tree()
        self.right = Tree()
        self.left.id3_training(X[left_samples, :], Y[left_samples], max_depth - 1)
        self.right.id3_training(X[right_samples, :], Y[right_samples], max_depth - 1)

    def extra_training(self, X, Y, max_depth):
        # if the node is a leaf (i.e. all nodes have the same label), the leaf is set
        # to that value and the algorithm is stopped
        if (np.max(Y) - np.min(Y) < 1e-3):
            self.leaf = np.max(Y)
            return
        if (max_depth < 1): # if the maximum depth is 0, the node must be a leaf
            if (len(np.where(Y > 0)) > len(Y) / 2):
                self.leaf = 1
            else:
                self.leaf = -1
            return
        best_idx = -1
        best_thresh = -1
        best_entropy = 1e9
        # iterating over the features to find the best split
        for idx in range(X.shape[1]): # for each feature
            values = X[:, idx]
            sorted_ind = np.argsort(values)
            values = np.unique(values[sorted_ind])
            if (len(values) > 1):
                # selecting a random threshold in that range of values
                thresh = rnd.uniform(np.min(values), np.max(values))
                # splitting into left and right set based on the random split
                left = np.where(X[:, idx] <= thresh)[0]
                right = np.where(X[:, idx] > thresh)[0]
                H = Tree.entropy(left, right) # computing the entropy
                if (H < best_entropy):
                    best_idx = idx
                    best_thresh = thresh
                    best_entropy = H
        if (best_idx == -1):  # if no optimal split was found in either directions, all points are identical
            self.leaf = np.sign(np.sum(Y))
            if (self.leaf == 0):
                self.leaf = 1
            return
        # splitting into left and right set based on the best random split
        left_samples = np.where(X[:, best_idx] <= best_thresh)[0]
        right_samples = np.where(X[:, best_idx] > best_thresh)[0]
        self.idx = best_idx
        self.thresh = best_thresh
        # creating left and right subtrees and recursively training them with the corresponding data
        self.left = Tree()
        self.right = Tree()
        self.left.extra_training(X[left_samples, :], Y[left_samples], max_depth - 1)
        self.right.extra_training(X[right_samples, :], Y[right_samples], max_depth - 1)
        

Now we use the implementation to learn a model from the training data directly. 

In [5]:
single_tree = Tree()
single_tree.id3_training(X_training, Y_training, 12) # maximum depth is set to 12

print('deterministic single tree')
train_loss = 0
for i in range(len(Y_training)):
    predicted = single_tree.classify(X_training[i, :])
    if (Y_training[i] != predicted):
        train_loss += 1 / len(Y_training)
print('training loss: ' + str(train_loss))

test_loss = 0
for i in range(len(Y_test)):
    predicted = single_tree.classify(X_test[i, :])
    if (Y_test[i] != predicted):
        test_loss += 1 / len(Y_test)
print('test loss: ' + str(test_loss))

deterministic single tree
training loss: 0.19524316648917342
test loss: 0.3276595744680835


Because of the big difference between training and test loss, we understand that the single tree is overfitting.

## 2. **Random Forest**

In [6]:
class Forest:
    # implementation of a Random Forest, i.e. a combination of multple decision trees (Tree)
    # to improve predicting performance and reduce overfitting

    def __init__(self, trees, n_features, n_samples, max_depth):
        # method for building the forest
        self.forest = [] # a list of decision trees (see Tree)
        self.features = n_features # number of features to be exhamined for each tree
        self.max_depth = max_depth # maximum depth of each tree
        self.samples = n_samples # number of data samples to train each tree on
        for i in range(trees):
            self.forest.append(Tree())

    def classify(self, x):
        # method for classifying a single data point x based on the forest
        vote = 0
        for tree in self.forest:
            vote += tree.classify(x)
        return np.sign(vote)

    def train(self, X, Y):
        # method for training the forest
        for tree in self.forest: # by training each tree separately
            X_train, Y_train = self.bag(X, Y)
            tree.id3_training(X_train, Y_train, self.max_depth)
        
    def bag(self, X, Y):
        # method for boothstrapping (i.e. sampling with replacement) and
        # feature subsampling
        features = X.shape[1]
        points = X.shape[0]
        bagged = rnd.choices(range(points), k = self.samples)
        X_bagged = X[bagged, :]
        # randomly selecting which features to keep
        selected = rnd.sample(range(features), k = self.features)
        for i in range(features): 
            if i not in selected:
                X_bagged[:, i] = 0
        return X_bagged, Y[bagged]

In [7]:
# creating a forest of 400 trees, with a maximum depth of 12
# since there are not that many features, we keep all 3
# we decide to train each tree on all the training point
# (bagging still adds some randomness)
forest = Forest(400, 3, X_training.shape[0], 12)
forest.train(X_training, Y_training)
test_loss = 0
for i in range(len(Y_test)):
    predicted = forest.classify(X_test[i, :])
    if (Y_test[i] * predicted <= 0):
        test_loss += 1 / len(Y_test)
print('Test loss: ' + str(test_loss))

Test loss: 0.292553191489361


## 3. **Extra Trees**

In [10]:
class ExtraTrees:
    # implementing thr Extremely Randomized Tree classifier, which introduces more
    # bias but also reduces variance

    def __init__(self, trees, n_features, n_samples, max_depth):
        # method for building the forest
        self.forest = [] # a list of decision trees (see Tree)
        self.features = n_features # number of features to be exhamined for each tree
        self.max_depth = max_depth # maximum depth of each tree
        self.samples = n_samples # number of data samples to train each tree on
        for i in range(trees):
            self.forest.append(Tree())

    def classify(self, x):
        # method for classifying a single data point x based on the forest
        vote = 0
        for tree in self.forest:
            vote += tree.classify(x)
        return np.sign(vote)

    def train(self, X, Y):
        # method for training the forest
        for tree in self.forest: # by training each tree separately
            X_train, Y_train = self.bag(X, Y)
            tree.extra_training(X_train, Y_train, self.max_depth)
        
    def bag(self, X, Y):
        # method for boothstrapping (i.e. sampling with replacement) and
        # feature subsampling
        features = X.shape[1]
        points = X.shape[0]
        bagged = rnd.choices(range(points), k = self.samples)
        X_bagged = X[bagged, :]
        # randomly selecting which features to keep
        selected = rnd.sample(range(features), k = self.features)
        for i in range(features):
            if i not in selected:
                X_bagged[:, i] = 0
        return X_bagged, Y[bagged]

In [11]:
# creating a Extra Tree Classifier of 1000 trees, with a maximum depth of 12
# since there are not that many features, we keep all 3
# we decide to train each tree on all the training point
# (bagging still adds some randomness)

extra = ExtraTrees(1000, 3, X_training.shape[0], 12)
extra.train(X_training, Y_training)
test_loss = 0
for i in range(len(Y_test)):
    predicted = extra.classify(X_test[i, :])
    if (Y_test[i] * predicted <= 0):
        test_loss += 1 / len(Y_test)
print('Test loss: ' + str(test_loss))

Test loss: 0.25744680851063845
