✓ Decision tree (without stop-split-early condition)
It includes the following components:
1) Calculate information gain; (10 marks)
2) split the data via finding the best feature based on a); (10 marks)
3) create branches recursively based on b) until every leaf only contains a single category; (10 marks)


In [1]:
import random
import csv
import numpy as np


#Define a function to randomly choose 10000 rows form 'data.csv'
def select_rows(input_file, output_file):
    with open(input_file, "r") as f:
        reader = csv.reader(f)
        rows = [row for row in reader]
    first_row = [rows[0]]
    random_rows = random.sample(rows[1:], 9999)
    result = first_row + random_rows
    with open(output_file, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerows(result)
# define a node class for the Decision Tree
class Node:
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels
        self.left = None   # left child of the node
        self.right = None   # right child of the node
        self.feature_idx = None # index of the features for splitting
        self.threshold = None #threshold for splitting
        self.label = None # predicted label for the node


class DecisionTree:
    def __init__(self, min_samples_split=2):
        self.min_samples_split = min_samples_split # Define minimum number of samples required to split a node
        self.root = None # root of the tree

    def calculate_entropy(self, labels):
        # calculate entropy of a list of labels
        _, counts = np.unique(labels, return_counts=True)
        probabilities = counts / counts.sum()
        entropy = sum(probabilities * -np.log2(probabilities))
        return entropy

    def calculate_info_gain(self, parent, left_child, right_child):
        # Calculate the information gain of a split
        weighted_child_entropy = sum(
            [len(child_labels) / len(parent) * self.calculate_entropy(child_labels) for child_labels in
             [left_child, right_child]])
        parent_entropy = self.calculate_entropy(parent)
        info_gain = parent_entropy - weighted_child_entropy
        return info_gain

    def find_best_split(self, data, labels):
        # find the best split of a node
        n_features = len(data[0])
        best_gain = -1
        best_feature_idx = None
        best_threshold = None

        for feature_idx in range(n_features):
            # generate possible split thresholds for the feature
            thresholds = sorted(list(set([row[feature_idx] for row in data])))
            for threshold in thresholds:
                # partition the data and labels based on the split threshold
                left_idxs = [i for i, row in enumerate(data) if row[feature_idx] < threshold]
                right_idxs = [i for i, row in enumerate(data) if row[feature_idx] >= threshold]

                if len(left_idxs) == 0 or len(right_idxs) == 0:
                    continue
                gain = self.calculate_info_gain(labels, [labels[i] for i in left_idxs], [labels[i] for i in right_idxs])
                # update the best information
                if gain > best_gain:
                    best_gain = gain
                    best_feature_idx = feature_idx
                    best_threshold = threshold

        return best_feature_idx, best_threshold

    def build_tree(self, data, labels):
        n_samples = len(data)

        # Base: only 1 sample or all samples belong to the same class
        if len(set(labels)) == 1 or n_samples < self.min_samples_split:
            return Node(data, labels)

        # Recursive: find the best split and create sub-trees
        best_feature_idx, best_threshold = self.find_best_split(data, labels)
        left_idxs = [idx for idx in range(n_samples) if data[idx][best_feature_idx] <= best_threshold]
        right_idxs = [idx for idx in range(n_samples) if data[idx][best_feature_idx] > best_threshold]

        # Ensure both left and right child contain at least one sample
        if len(left_idxs) > 0 and len(right_idxs) > 0:
            left_data = [data[idx] for idx in left_idxs]
            left_labels = [labels[idx] for idx in left_idxs]
            right_data = [data[idx] for idx in right_idxs]
            right_labels = [labels[idx] for idx in right_idxs]

            node = Node(data, labels)
            node.feature_idx = best_feature_idx
            node.threshold = best_threshold
            node.left = self.build_tree(left_data, left_labels)
            node.right = self.build_tree(right_data, right_labels)

            return node

    def traverse_tree(self, x, node):
        if node.label is not None:
            return node.label
        if x[node.feature_idx] <= node.threshold:
            return self.traverse_tree

    def fit(self, X, y):
        self.root = self.build_tree(X, y)

    def predict(self, data):
        predictions = [] #initialize an empty list to store the predictions
         # iterate over each sample in the input data startting from root node.
        for sample in data:
            node = self.root
            while node.left and node.right:
                # if the feature value for the current node is less than or equal to the threshold, follow the left child
                if sample[node.feature_idx] <= node.threshold:
                    node = node.left
                # Otherwise follow right child
                else:
                    node = node.right
            #append the most common label for the node to the predictions list
            predictions.append(max(set(node.labels), key=node.labels.count))
        return predictions


def load_data(filename):
    X = []  # features
    y = []  # target labels

    with open(filename, 'r') as f:
        reader = csv.reader(f)
        next(reader)  # skip header row
        for row in reader:
            X.append([float(x) for x in row[:-1]])
            y.append(int(row[-1]))

    return X, y


def compute_accuracy(predictions, labels):
    # check the length to see if there is mistake
    assert len(predictions) == len(labels)
    #returns 1 if the prediction matches the actual label, and 0 otherwise.
    n_correct = sum([1 if p == l else 0 for p, l in zip(predictions, labels)])
    accuracy = n_correct / len(predictions)
    return accuracy

def k_fold_cross_validation(data, labels, k, model):
    fold_size = len(data) // k
    indices = list(range(len(data)))
    # shuffle the data randomly and divide them into k groups
    random.shuffle(indices)
    accuracies = []
    for i in range(k):
        start = i * fold_size
        end = (i + 1) * fold_size
        # split all data into training and validation set
        validation_indices = indices[start:end]
        training_indices = indices[:start] + indices[end:]
        training_data = [data[idx] for idx in training_indices]
        training_labels = [labels[idx] for idx in training_indices]
        validation_data = [data[idx] for idx in validation_indices]
        validation_labels = [labels[idx] for idx in validation_indices]
        model.fit(training_data, training_labels) # train the model on the training data
        predictions = model.predict(validation_data)
        accuracy = compute_accuracy(predictions, validation_labels)
        accuracies.append(accuracy)
    mean_accuracy = sum(accuracies) / k # sum all the accuracy and devide by K
    return mean_accuracy




select_rows('D:\dts205tc\data.csv', 'data1.csv')
data, labels = load_data('data1.csv')
tree = DecisionTree()
mean_accuracy = k_fold_cross_validation(data, labels, k=5, model=tree)
print(f"Mean accuracy of DecisionTree: {mean_accuracy}")


Mean accuracy of DecisionTree: 0.8210105052526263


The original dataset was too large for our devices to run the codes, more than 30 minutes were required. So we extract 10000 rows randomly from the original dataset to optimize the running time.The rest of the tasks will also perform on this extracted dataset 'data1.csv'

✓ Random Forest
4) Bagging. Perform 100 Bootstrapping on the data, generate different decision trees based on task3), and perform majority-voting on their prediction results. (10 marks)

In [8]:
import random
from DecisionTree import DecisionTree, load_data,compute_accuracy;

# Load data
X, y = load_data('D:\dts205tc\data1.csv') 

# Initialize a list to store the trees
trees = []

# Perform 100 bootstrapping
for i in range(100):
    # Create a bootstrap sample
    sample_indices = [random.randint(0, len(X) - 1) for _ in range(len(X))]
    X_sample = [X[j] for j in sample_indices]
    y_sample = [y[j] for j in sample_indices]

    # Initialize and fit a decision tree
    tree = DecisionTree()
    tree.fit(X_sample, y_sample)

    # Append the tree to the list
    trees.append(tree)


def k_fold_CV(data, labels, k, models):
    fold_size = len(data) // k
    indices = list(range(len(data)))
    random.shuffle(indices)
    accuracies = []
    for i in range(k):
        start = i * fold_size
        end = (i + 1) * fold_size
        validation_indices = indices[start:end]
        training_indices = indices[:start] + indices[end:]
        training_data = [data[idx] for idx in training_indices]
        training_labels = [labels[idx] for idx in training_indices]
        validation_data = [data[idx] for idx in validation_indices]
        validation_labels = [labels[idx] for idx in validation_indices]
        predictions = []
        for sample in validation_data:
            tree_predictions = [tree.predict([sample])[0] for tree in models]
            prediction = max(set(tree_predictions), key=tree_predictions.count)
            predictions.append(prediction)
        accuracy = compute_accuracy(predictions, validation_labels)
        accuracies.append(accuracy)
    mean_accuracy = sum(accuracies) / k
    return mean_accuracy


# Set the number of folds
k = 5

# Calculate the mean accuracy over k folds
mean_accuracy = k_fold_CV(X, y, k, trees)

# Print the mean accuracy
print("Mean accuracy of Bagging Tree:", mean_accuracy)

# Perform majority voting on the predictions of all trees
predictions = []
for sample in X:
    tree_predictions = [tree.predict([sample])[0] for tree in trees]
    prediction = max(set(tree_predictions), key=tree_predictions.count)
    predictions.append(prediction)

# Calculate overall accuracy
overall_accuracy = sum([1 for i in range(len(predictions)) if predictions[i] == y[i]]) / len(predictions)
print("Overall Accuracy:", overall_accuracy)

Mean accuracy of Bagging Tree: 0.8497248624312157
Overall Accuracy: 0.8497849784978497


5)When each node of the decision tree is split, 3 features are randomly selected in the way of
non-replacement sampling, and task 2) is performed accordingly. (10 marks)

In [7]:
import random                                                                                  
from DecisionTree import DecisionTree, load_data, compute_accuracy, k_fold_cross_validation                                                                                                                                               
X, y = load_data('D:\dts205tc\data1.csv')                                                                                                   
# Initialize a list to store the trees                                                         
trees = []                                                                                     
                                                                                               
# Perform 100 bootstrapping                                                                    
for i in range(100):                                                                           
    # Create a bootstrap sample                                                                
    sample_indices = [random.randint(0, len(X) - 1) for _ in range(len(X))]                    
    X_sample = [X[j] for j in sample_indices]                                                  
    y_sample = [y[j] for j in sample_indices]                                                  
                                                                                               
    # Initialize and fit a decision tree                                                       
    tree = DecisionTree()                                                                      
                                                                                               
    # Randomly select 3 features without replacement                                           
    feature_indices = random.sample(range(len(X[0])), 3)                                       
    X_sample = [[x[i] for i in feature_indices] for x in X_sample]                             
                                                                                               
    tree.fit(X_sample, y_sample)                                                               
                                                                                               
    # Append the tree to the list                                                              
    trees.append(tree)                                                                         
                                                                                               
                                                                                               
def predict(X):                                                                                
    # Make predictions for each tree                                                           
    predictions = [tree.predict(X[:, feature_indices]) for tree in trees]                      
                                                                                               
    # Take the majority vote as the final prediction                                           
    y_pred = []                                                                                
    for i in range(len(X)):                                                                    
        counts = {label: 0 for label in set(y)}                                                
        for prediction in predictions:                                                         
            counts[prediction[i]] += 1                                                         
        y_pred.append(max(counts, key=counts.get))                                             
    return y_pred                                                                              
                                                                                               
                                                                                                                                                  
tree = DecisionTree()                                                                          
mean_accuracy = k_fold_cross_validation(X,y,k=5, model=tree)                         
print(f"Mean accuracy of RandomForest: {mean_accuracy}")                                       
                                                                                               

Mean accuracy of RandomForest: 0.8261130565282642


In [9]:
# Single-layer Decision Tree
import random
import csv
import numpy as np

class Node:
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels
        self.left = None
        self.right = None
        self.feature_idx = None
        self.threshold = None
        self.label = None


class DecisionTreeSingle:
    def __init__(self, min_samples_split=2):
        self.min_samples_split = min_samples_split
        self.root = None

    def calculate_entropy(self, labels):
        _, counts = np.unique(labels, return_counts=True)
        probabilities = counts / counts.sum()
        entropy = sum(probabilities * -np.log2(probabilities))
        return entropy

    def calculate_info_gain(self, parent, left_child, right_child):
        weighted_child_entropy = sum(
            [len(child_labels) / len(parent) * self.calculate_entropy(child_labels) for child_labels in
             [left_child, right_child]])
        parent_entropy = self.calculate_entropy(parent)
        info_gain = parent_entropy - weighted_child_entropy
        return info_gain

    def find_best_split(self, data, labels):
        n_features = len(data[0])
        best_gain = -1
        best_feature_idx = None
        best_threshold = None

        for feature_idx in range(n_features):
            feature_values = [data[i][feature_idx] for i in range(len(data))]
            unique_values = set(feature_values)

            for threshold in unique_values:
                left_labels = [labels[i] for i in range(len(labels)) if data[i][feature_idx] <= threshold]
                right_labels = [labels[i] for i in range(len(labels)) if data[i][feature_idx] > threshold]

                if len(left_labels) == 0 or len(right_labels) == 0:
                    continue

                gain = self.calculate_info_gain(labels, left_labels, right_labels)

                if gain > best_gain:
                    best_gain = gain
                    best_feature_idx = feature_idx
                    best_threshold = threshold

        return best_feature_idx, best_threshold

    def build_tree(self, data, labels):
        n_samples = len(data)

        # Base case: only 1 sample or all samples belong to the same class
        if len(set(labels)) == 1 or n_samples < self.min_samples_split:
            return Node(data, labels)

        # Recursive case: find the best split and create sub-trees
        best_feature_idx, best_threshold = self.find_best_split(data, labels)
        left_idxs = [idx for idx in range(n_samples) if data[idx][best_feature_idx] <= best_threshold]
        right_idxs = [idx for idx in range(n_samples) if data[idx][best_feature_idx] > best_threshold]

        # Ensure both left and right child contain at least one sample
        if len(left_idxs) > 0 and len(right_idxs) > 0:
            left_data = [data[idx] for idx in left_idxs]
            left_labels = [labels[idx] for idx in left_idxs]
            right_data = [data[idx] for idx in right_idxs]
            right_labels = [labels[idx] for idx in right_idxs]

            node = Node(data, labels)
            node.feature_idx = best_feature_idx
            node.threshold = best_threshold
            node.left = self.build_tree(left_data, left_labels)
            node.right = self.build_tree(right_data, right_labels)

            return node

    def traverse_tree(self, x, node):
        if node.label is not None:
            return node.label
        if x[node.feature_idx] <= node.threshold:
            return self.traverse_tree

    def fit(self, X, y):
        self.root = self.build_tree(X, y)

    def predict(self, data):
        predictions = []
        for sample in data:
            node = self.root
            while node.left and node.right:
                if sample[node.feature_idx] <= node.threshold:
                    node = node.left
                else:
                    node = node.right
            predictions.append(max(set(node.labels), key=node.labels.count))
        return predictions


def load_data(filename):
    X = []  # features
    y = []  # target labels

    with open(filename, 'r') as f:
        reader = csv.reader(f)
        next(reader)  # skip header row
        for row in reader:
            X.append([float(x) for x in row[:-1]])
            y.append(int(row[-1]))

    return X, y


def k_fold_cross_validation(data, labels, k, model):
    fold_size = len(data) // k
    indices = list(range(len(data)))
    random.shuffle(indices)
    accuracies = []
    for i in range(k):
        start = i * fold_size
        end = (i + 1) * fold_size
        validation_indices = indices[start:end]
        training_indices = indices[:start] + indices[end:]
        training_data = [data[idx] for idx in training_indices]
        training_labels = [labels[idx] for idx in training_indices]
        validation_data = [data[idx] for idx in validation_indices]
        validation_labels = [labels[idx] for idx in validation_indices]
        model.fit(training_data, training_labels)
        predictions = model.predict(validation_data)
        accuracy = compute_accuracy(predictions, validation_labels)
        accuracies.append(accuracy)
    mean_accuracy = sum(accuracies) / k
    return mean_accuracy


def compute_accuracy(predictions, labels):
    assert len(predictions) == len(labels)
    n_correct = sum([1 if p == l else 0 for p, l in zip(predictions, labels)])
    accuracy = n_correct / len(predictions)
    return accuracy


data, labels = load_data('D:\dts205tc\data.csv')
tree = DecisionTreeSingle(min_samples_split=2)
mean_accuracy = k_fold_cross_validation(data, labels, k=5, model=tree)
print(f"Mean accuracy: {mean_accuracy}")

Mean accuracy: 0.7426999999999999


6) Based on task 2) single-layer decision tree, 3) decision tree, 4) bagging tree, 5) random forest,
perform 5-fold CV, and compare their average prediction Accuracy on the validation set. (10 marks)


In [10]:
import pandas as pd
form={"Accuracy":[ 0.7426999999999999,0.8210105052526263,0.8497248624312157,0.8261130565282642]}
df=pd.DataFrame(form,index=['Single-layer Decision Tree','Decision Tree','Bagging Tree','Random Forest'] )
df

Unnamed: 0,Accuracy
Single-layer Decision Tree,0.7427
Decision Tree,0.821011
Bagging Tree,0.849725
Random Forest,0.826113


Note: The current device information:
processor : Intel(R) Core(TM) i7-10710U CPU @ 1.10GHz   1.61 GHz,Ram 
Ram: 16 GB
So it is hard for the device to run the original dataset. I tried the original dataset with the DecisionTree model, which takes more than 20 minutes to get the result, so dataset was modified as data1.csv which contains 10000 rows of the original dataset.
Although accuracy was optimized by randomly shuffling the data, it may still be affected by the modification.