# Imports

In [1]:
# Import necessary libraries
import pandas as pd
import numpy as np
import math

# Load in Data 

In [2]:
# Read in datafiles
training_data = np.loadtxt('./data/pa2train.txt')
print(training_data.shape)

(2000, 23)


In [3]:
# Read in datafiles
validation_data = np.loadtxt('./data/pa2validation.txt')
print(validation_data.shape)

(1000, 23)


In [4]:
# Read in datafiles
test_data = np.loadtxt('./data/pa2test.txt')
print(test_data.shape)

(1000, 23)


# Define Functions

In [5]:
# Function for calculating info gain
def info_gain(subset, feature, threshold, label):
    
    num_samples = len(subset)
    # Partition Subset into two sets v1, v2
    v1, v2 = [x for x in subset if x[feature] < threshold], [x for x in subset if x[feature] >= threshold]
    v1_samples, v2_samples = len(v1), len(v2)
    
    # Find distribution of labels for both partitions
    v1_default, v2_default = len([x for x in v1 if x[label] == 1]), len([x for x in v2 if x[label] == 0])
    v1_no_default, v2_no_default = (v1_samples - v1_default), (v2_samples - v2_default)
    
    # Compare distribution of labels in both subsets, P(Z=z)
    p_yes_threshold =  (v2_samples/num_samples)
    p_no_threshold = (1 - p_yes_threshold)
    
    # Calculate conditional entropy for sboth partitions H(X|Z=z)
    if v1_default == 0 or v1_no_default == 0: 
        if v1_samples == 0: 
            cond_entropy_no = 0
        elif v1_default == 0: 
            cond_entropy_no = ((v1_no_default/len(v1))*math.log(v1_no_default/len(v1)))
        else: 
            cond_entropy_no = ((v1_default/len(v1))*math.log(v1_default/len(v1)))
    else:
        cond_entropy_no = -(((v1_no_default/len(v1))*math.log(v1_no_default/len(v1)))+((v1_default/len(v1))*math.log(v1_default/len(v1))))
        # Calculate conditional entropy for sboth partitions H(X|Z=z)
    
    if v2_default == 0 or v2_no_default == 0: 
        if v2_samples == 0: 
            cond_entropy_yes = 0
        elif v2_default == 0: 
            cond_entropy_yes = ((v2_no_default/len(v2))*math.log(v2_no_default/len(v2)))
        else: 
            cond_entropy_yes = ((v2_default/len(v2))*math.log(v2_default/len(v2)))
    else:
        cond_entropy_yes = -(((v2_default/len(v2))*math.log(v2_default/len(v2)))+((v2_no_default/len(v2))*math.log(v2_no_default/len(v2)))) 
    
    # return overall conditional entropy H(X|Z)
    return (cond_entropy_yes*(len(v2)/num_samples) + cond_entropy_no*(len(v1)/num_samples))


In [6]:
# Define Function for Obtaining threshold values
def get_thresholds(feature_vals):
    thresholds = []
    features = sorted(set(feature_vals))
    for i in range(1,len(features)): 
        thresholds.append((feature_vals[i-1]+feature_vals[i])/2)
    return thresholds

In [7]:
def split_threshold(feature, threshold, data): 
    return [x for x in data if x[feature] < threshold], [x for x in data if x[feature] >= threshold]

In [8]:
def find_decision_rule(training_samples): 
    # Pick feature, threshold pair that maxes info gain 
    split_rule = {}
    for i in range(training_samples.shape[1]-1):
        feature_dict = {}
        # Obtain thresholds
        thresholds = get_thresholds(training_samples[:,i])
        for threshold in thresholds: 
            # Calculate info gain for threshold-feature pair
            ig = info_gain(training_samples, i, threshold, training_samples.shape[1]-1)
            # Append to dictionary 
            feature_dict[ig] = (i, threshold)
            # Use feature-threshold pair with max info gain
            max_ig = sorted(feature_dict.keys())[0]
            feature_threshold = feature_dict[max_ig]
            split_rule[max_ig] = feature_threshold      
    # Find final split rule
    split = split_rule[sorted(split_rule.keys())[0]]
    return (split[0],split[1]), sorted(split_rule.keys())[0]

In [10]:
# Test functions
test_data = np.array([[0,0,1],[1,0,1],[1,1,0],[2,1,0],[2,0,0],[1,2,0],[2,2,0]])

split_rule, info_gain = find_decision_rule(test_data)
print("Optimal Feature and Threshold: ", split_rule,' Conditional Entropy: ', info_gain)

Optimal Feature and Threshold:  (1, 0.5)  Conditional Entropy:  0.2727917864120626


# Define Decision Tree Class

In [None]:
# Define Decision Tree
class decisionTree: 
    # Define Decision Tree Node Class
    class decisionTreeNode: 
        
        # Define constructor
        def __init__(self, data):
            self.children = []
            self.pure = False
            self.feature = 0
            self.threshold = 0
            self.data = data
            self.predicted_label = None
            self.entropy = float(0.0)
        
        def isPure(self, label): 
            isPure = False
            labels = [x[label] for x in self.data]
            # Check if labels for node are pure
            if len(set(labels)) == 1: 
                isPure = True
            return isPure
    
    # Define Decision Tree Constructor
    def __init__(self, training_data):
        self.root = decisionTreeNode(training_data)
        self.impure_leaf_nodes = [self.root]
    
    
    # Function for building decision tree
    def trainDecisionTree(self, training_samples): 
        # Continue Algorithm until all leaf nodes are pure
        while len(self.impure_leaf_nodes) != 0: 
            # Pick an impure node V and remove from list
            parent_node = self.impure_leaf_nodes[-1]
            self.impure_leaf_nodes.pop(-1)
            split_rules = {}
            
            # Find decision rule for parent node
            split_rule, cond_entropy = find_decision_rule(parent_node.data)
            parent_node.feature = split_rule[0]
            parent_node.threshold = split_rule[1]
            
            # Define subsets based on splitting rule
            right_split, left_split = split_threshold(parent_node.feature, parent_node.threshold, training_samples)
            # Create child nodes 
            right_child_node = decisionTreeNode(right_split)
            left_child_node = decisionTreeNode(left_split)
            parent_node.children = [left_child_node, right_child_node]
            
            # Check Purity of Child Nodes
            right_purity = right_child_node.isPure(label)
            left_purity = left_child_node.isPure(label)
            if right_purity == False: 
                # Add to impure nodes list
                self.impure_leaf_nodes.append(right_child_node)
            else: 
                # Add prediction label to leaf node
                label_index = right_child_node.data.shape[1]
                right_child_node.predicted_label = right_child_node.data[0,label_index]
            if left_purity == False: 
                # Add to impure nodes list
                self.impure_leaf_nodes.append(left_child_node)
            else: 
                # Add prediction label to leaf node
                label_index = left_child_node.data.shape[1]
                left_child_node.predicted_label = left_child_node.data[0,label_index]
        
        # Return root of contructed decision tree
        return self.root
    
    # Define Pruning Algorithm
    def tree_pruning(self):
        return 0