In [1]:
import pandas as pd
import numpy as np
import sklearn
import matplotlib as plt
import seaborn as sb
import graphviz
import os
import math
import pickle

In [2]:
# Get working directory
PWD = os.getcwd()
# Get output directory
OUTPUT_DIR = os.path.join(PWD, 'output')
# Get dataset from 'PWD/input'
DATASET_DIR = os.path.join(PWD, 'input')
# Get dataset path
DATASET = os.path.join(DATASET_DIR, 'titanic.csv')

In [3]:
# Read in titanic.csv as a DataFrame
titanic = pd.read_csv(DATASET, index_col='PassengerId')

def split_train_test(data, SPLIT_SIZE=0.8, column_label_name='Survived'):
    """
    Input:
        data: dataset to be split
        SPLIT_SIZE: split size of train and test dataset sizes, default is 0.8
        column_label_name: the label column to be extracted
    Output:
        X_train: training data
        X_test: test data
        y_train: training data's associated labels
        y_test: test data's associated labels
    """
    indices = np.random.permutation(len(data))
    train_size = int(len(data) * SPLIT_SIZE)
    train_indices = indices[:train_size]
    test_indices = indices[train_size:]
    X_train = data.iloc[train_indices]
    X_test = data.iloc[test_indices]
    return X_train, X_test

In [4]:
# Convert any continuous features into binary features or categorical features by thresholding
# Fill 'NaN' values in 'Age' with median
age_median = titanic['Age'].median()
titanic['Age'].fillna(age_median, inplace=True)
num_of_nulls_age = titanic['Age'].isnull().sum()
print(f'Number of NaN values in \'Age\' column {num_of_nulls_age}')

# Fill 'nan' values in 'Embarked' with mode
embarked_mode = titanic['Embarked'].mode()[0]
titanic.fillna(embarked_mode, inplace=True)
num_of_nulls_embarked = titanic['Embarked'].isnull().sum()
print(f'Number of NaN values in \'Embarked\' column {num_of_nulls_embarked}')

# Age --> 0-12 child, 12-18 teen, 18-64 adults, 64+ elderly
modify_age_col = pd.DataFrame(titanic['Age'])
modify_age_col = modify_age_col.apply(lambda x: pd.cut(x, [0, 12.0, 18.0, 64.0, 100.0], labels=['child', 'teen', 'adults', 'elderly']))

# Fare --> 0-10 poor, 10-40 middle class, 40+ upper class (?)
modify_fare_col = pd.DataFrame(titanic['Fare'])
modify_fare_col = modify_fare_col.apply(lambda x: pd.cut(x, [-0.5, 10.0, 40.0, 600.0], labels=['poor', 'middle class', 'upper class']))

# Put modified columns into original dataset
titanic['Age'] = modify_age_col['Age']
titanic['Fare'] = modify_fare_col['Fare']

# Don't handle 'NaN' values in 'Cabin' column just to see how well decision trees handle that(?)

Number of NaN values in 'Age' column 0
Number of NaN values in 'Embarked' column 0


In [5]:
# Get rid of 'Name', 'Ticket', 'Cabin' column
titanic = titanic.drop('Name', axis=1)
titanic = titanic.drop('Ticket', axis=1)
titanic = titanic.drop('Cabin', axis=1)

# Split data set into train and test set
X_train, X_test = split_train_test(titanic)
print(f'The shape of training data: {X_train.shape}')
print(f'The shape of test data: {X_test.shape}')

The shape of training data: (712, 8)
The shape of test data: (179, 8)


In [6]:
# Build trees of a depth maximum. A user will be able to specify the max depth
class ID3:
    def __init__(self, max_depth=None):
        """
        Constructs an ID3 Decision Tree
        Input:
            dataset: The dataset to fit onto
            max_depth: Specified max depth of the tree
        Output:
            null
        Initializes:
            self.max_depth: Max tree depth of Decision Tree object
            self.true_pos: True positive metric
            self.true_neg: True negative metric
            self.false_pos: False positive metric
            self.false_neg: False negative metric
        """
        if ((max_depth != None) and (max_depth < 0)):
            raise Exception('max_depth can\'t be negative')
        self.max_depth = max_depth
        self.true_pos = 0
        self.true_neg = 0
        self.false_pos = 0
        self.false_neg = 0
        self.accuracy = 0.0
        self.precision = 0.0
        self.recall = 0.0
        self.conf_matrix = []
        
    def fit(self, X, target_attribute='Survived'):
        """
        Fits the ID3 Decision Tree to the data provided (X)
        Input:
            X: The dataset to fit on
            target_attribute: The attribute whose value is to be predicted by the tree
        Output:
            Root tree Node object
        """
        if (not(target_attribute in list(X.columns.values))):
            raise Exception('Provided target_attribute does not exist in the dataset X')
        if (not(isinstance(X, pd.DataFrame))):
            raise Exception('X is not of type \'pd.DataFrame\'')
        
        # Compute information gain on whole dataset then call _helper(X, target_attribute, attributes - highestIG(A), 1)
        
        # List of all attributes in X
        attributes = list(X.columns.values)
        # Remove class labels from being considered in IG calculations
        attributes.remove(target_attribute)
        # Max IG and its corresponding attribute
        root_gain, root_attribute = self._get_max_information_gain(X, target_attribute, attributes)
        # List of root_attribute's unique values
        list_unique_vals = list(X[root_attribute].unique())
        # Initialize this ID3's root with root_attribute and it's unique values
        self.root = Node(root_attribute, list_unique_vals)
        # Duplicate attributes since it'll be altered throughout the course of the program
        dup_attributes = attributes.copy()
        # Remove root_attribute from being considered in further IG calculations
        dup_attributes.remove(self.root.attribute)
        
        # Check if root is max depth allowed by user
        if (self.max_depth == 0):
            # Return single-node tree root, with label = most common value of target_attribute in examples
            # Get number of positive instances in X
            num_pos = len(X.loc[X[target_attribute] == 1])
            # Get number of negative instances in X
            num_neg = len(X.loc[X[target_attribute] == 0])
            if (num_neg > num_pos):
                self.root = Node('Death')
                return self.root
            else:
                self.root = Node('Survived')
                return self.root
            
        for root_value in self.root.values:
            # X with all instances where root_attribute == root_value
            split_df = X.loc[X[root_attribute] == root_value]
            self.root.insert_edge(root_value, self._fit_helper(split_df, target_attribute, dup_attributes, 1))
        return self.root
        
    def _fit_helper(self, X, target_attribute, attributes, cur_depth):
        """
        Recurses down to a leaf node for each attribute and its attributes's values and builds a Node to be connected to the root.
        Input:
            X: Dataset whether previously partitioned or not
            target_attribute: The attribute whose value is to be predicted by the tree
            attributes: A list of all the attributes under examination after partitions
        Output:
            Child Node object connected to root
        """        
        root = Node()
        # Get number of positive instances in X
        num_pos = len(X.loc[X[target_attribute] == 1])
        # Get number of negative instances in X
        num_neg = len(X.loc[X[target_attribute] == 0])
        
        # If cur_depth == max_depth, return single-node tree root, with label = most common value of target_attribute in examples
        if (cur_depth == self.max_depth):
            if (num_neg > num_pos):
                root.attribute = 'Death'
                return root
            else:
                root.attribute = 'Survived'
                return root
        # If all examples are positive, return the single-node tree root, with label = +
        if (num_neg == 0 and num_pos > 0):
            root.attribute = 'Survived'
            return root
        # If all examples are negative, return the single-node tree root, with label = -
        if (num_pos == 0 and num_neg > 0):
            root.attribute = 'Death'
            return root
        # If attributes is empty, return the single-node tree root, with label = most common value of target_attribute in examples
        if (len(attributes) == 0):
            if (num_neg > num_pos):
                root.attribute = 'Death'
                return root
            else:
                root.attribute = 'Survived'
                return root
            
        root_gain, root_attribute = self._get_max_information_gain(X, target_attribute, attributes)
        # List of root_attribute's unique values
        list_unique_vals = list(X[root_attribute].unique())
        # Initialize this ID3's root with root_attribute and it's unique values
        root.attribute = root_attribute
        root.values = list_unique_vals
        # Duplicate attributes since it'll be altered throughout the course of the program
        dup_attributes = attributes.copy()
        # Remove root_attribute from being considered in further IG calculations
        dup_attributes.remove(root.attribute)
        for root_value in root.values:
            # X with all instances where root_attribute == root_value
            split_df = X.loc[X[root_attribute] == root_value]
            # If examples split on the ith value of root_attribute == 0, return single-node tree root, with label = most common value of target_attribute in examples
            if (len(split_df) == 0):
                label = ''
                if (num_neg > num_pos):
                    label = 'Death'
                else:
                    label = 'Survived'
                leaf = Node(label)
                return root.insert_edge(leaf)
            else:
                root.insert_edge(root_value, self._fit_helper(split_df, target_attribute, dup_attributes, cur_depth + 1))
        return root
                
    def _get_max_information_gain(self, X, target_attribute, attributes):
        """
        Returns max information gain
        Input:
            X: Dataset whether previously partitioned or not
            attributes: A list of the attributes for the the dataset provided
        Output:
            max_gain: The max information gain for the dataset
            partition_attribute: The attribute to split on
        """
        max_gain = float("-inf") # Max info. gain variable
        partition_attribute = None # The string name of the attribute with max info. gain
        neg_examples = len(X.loc[X[target_attribute] == 1, target_attribute]) # Number of negative examples
        pos_examples = len(X.loc[X[target_attribute] == 0, target_attribute]) # Number of positive examples
        examples = neg_examples + pos_examples # Total examples
        whole_entropy = -((pos_examples/examples) * math.log2(pos_examples/examples)) - ((neg_examples/examples) * math.log2(neg_examples/examples))
        
        for attribute in attributes:
            # Keep track of max
            max_so_far = whole_entropy
            # Get list of values for the attribute
            values = X[attribute].unique() 
            for value in values:
                # Get number of positive examples for this attribute's value
                pos_val_examples = len(X.loc[(X[target_attribute] == 1) & (X[attribute] == value), target_attribute])
                # Get number of negative examples for this attribute's value
                neg_val_examples = len(X.loc[(X[target_attribute] == 0) & (X[attribute] == value), target_attribute])
                # Get total number of examples for this attribute's value
                value_examples = neg_val_examples + pos_val_examples
                value_entropy = 0
                
                try:
                    value_entropy = -((pos_val_examples/value_examples) * math.log2(pos_val_examples/value_examples)) - ((neg_val_examples/value_examples) * math.log2(neg_val_examples/value_examples))
                except:
                    if ((pos_val_examples/value_examples) == 0.0 and (neg_val_examples/value_examples) == 0):
                        value_entropy = 0
                    elif ((pos_val_examples/value_examples) == 0.0):
                        value_entropy = -((neg_val_examples/value_examples) * math.log2(neg_val_examples/value_examples))
                    else:
                        value_entropy = -((pos_val_examples/value_examples) * math.log2(pos_val_examples/value_examples))
                
                value_ratio = value_examples / examples
                max_so_far -= ((value_ratio) * (value_entropy))
            if (max_so_far > max_gain):
                max_gain = max_so_far
                partition_attribute = attribute
        return max_gain, partition_attribute
    
    def predict(self, X, target_attribute="Survived"):
        """
        Take X to test this ID3 on, and initalize metrics
        Input:
            X: The dataset to test this ID3 on
            target_attribute: The name of the label column used to measure predictions
        Output:
            self.accuracy: Computes the accuracy after predictions have been done and returns it
        Initialize:
            self.true_pos: Increments self.true_pos for every correct positive example we classify
            self.true_neg: Increments self.true_neg for every correct negative example we classify
            self.false_pos: Increments self.false_pos for every incorrect positive example we classify
            self.false_neg: Increments self.false_neg for every incorrect negative example we classify
            self.conf_matrix: A 'c x c' dimensional matrix with C_ij is equal to the number of observations to be in group i and predicted to be in group j
        """
        for index, row in X.iterrows():
            instance_label = row[target_attribute]
            node = self.root
            node_attrib = node.attribute
            while (node.edges):
                # Get value of the instance's attribute that corresponds to the root attribute
                attrib_value = row[node_attrib]
                # Update node
                if (attrib_value in node.edges):
                    node = node.edges[attrib_value]
                else:
                    # Attribute value does not exist in this decision tree path
                    self.false_pos += 1
                    break
                # Update the attribute at the updated node
                node_attrib = node.attribute
                
            if (node.attribute == 'Survived' and instance_label == 1):
                # True positive
                self.true_pos += 1
            elif (node.attribute == 'Death' and instance_label == 0):
                # True negative
                self.true_neg += 1
            elif (node.attribute == 'Survived' and instance_label == 0):
                # False positive
                self.false_pos += 1
            elif (node.attribute == 'Death' and instance_label == 1):
                # False negative
                self.false_neg += 1
        self.conf_matrix = [[self.true_neg, self.false_pos], [self.false_neg, self.true_pos]]
        self.accuracy = (self.true_pos + self.true_neg) / (self.true_pos + self.true_neg + self.false_pos + self.false_neg)
        self.precision = (self.true_pos) / (self.true_pos + self.false_pos)
        self.recall = (self.true_pos) / (self.true_pos + self.false_neg)
        return self.accuracy
    
    def export_graphviz(self, SAVE_DIR=OUTPUT_DIR, filename='id3.sv'):
        """
        Saves the visualization of this ID3's tree structure and renders the image
        """
        self.digraph = graphviz.Digraph()
        root = self.root 
        self.digraph.node(str(root), str(root.attribute))
        for key, value in root.edges.items():
            self.digraph.node(str(value), str(value.attribute))
            self.digraph.edge(str(root), str(value), label=str(key))
            self._export_graphviz_helper(value)
        self.digraph.render(os.path.join(SAVE_DIR, filename), view=True)
    
    def _export_graphviz_helper(self, root):
        """
        Passed a Node object and connects the rest of the subtree to root parameter
        Input:
            Node object to connect the rest of the subtree too
        Output:
            null
        """
        if (not root.edges):
            # If no edges then store node with root.attribute's value of 'Survived' or 'Death'
            return
        else:
            for k, v in root.edges.items():
                self.digraph.node(str(v), str(v.attribute))
                self.digraph.edge(str(root), str(v), label=str(k))
                self._export_graphviz_helper(v)
        
    def __str__(self):
        pass
    
class Node:
    def __init__(self, attribute=None, values=None, attribute_value=None):
        """
        Constructs a tree Node
        Input:
            attribute: The attribute we split the data on
            values: A list of all unique values of the given attribute
        Output:
            null
        Initializes:
            self.attribute: Value of this Node oject
            self.values: Values associated with this Node's value (a.k.a attribute)
            self.edges: A list of all the edges this attribute Node connects too
        """
        self.attribute = attribute
        self.values = values
        self.edges = {}
        
    def insert_edge(self, attribute_value, node):
        self.edges[attribute_value] = node

In [7]:
# Small data
play_tennis_data = pd.read_csv(os.path.join(os.path.join(DATASET_DIR, 'play-tennis.csv')))
play_tennis = ID3()
play_tennis_root = play_tennis.fit(play_tennis_data, 'play-tennis')

In [8]:
# Run fit on X_train dataset
id3 = ID3()
root = id3.fit(X_train)

In [9]:
# Output the learned tree as a both object code and as a visual interpretable model
# Object code
pickle.dump(id3, open(os.path.join(OUTPUT_DIR, 'id3.sav'), 'wb'))
loaded_model = pickle.load(open(os.path.join(OUTPUT_DIR, 'id3.sav'), 'rb'))

In [10]:
# Graphviz
play_tennis.export_graphviz(OUTPUT_DIR, 'play-tennis.sv')
id3.export_graphviz()

In [11]:
# Output a confusion matrix from the constructed tree on the test set. Compute accuracy, precision, and recall. 
id3_accuracy = id3.predict(X_test)
print(f'ID3\'s true positive: {id3.true_pos}')
print(f'ID3\'s true negative: {id3.true_neg}')
print(f'ID3\'s false positive: {id3.false_pos}')
print(f'ID3\'s false negative: {id3.false_neg}')
print(f'ID3\'s accuracy: {id3.accuracy}')
print(f'ID3\'s precision: {id3.precision}')
print(f'ID3\'s recall: {id3.recall}')
print(f'ID3\'s confusion matrix; {id3.conf_matrix}')

ID3's true positive: 44
ID3's true negative: 94
ID3's false positive: 25
ID3's false negative: 16
ID3's accuracy: 0.770949720670391
ID3's precision: 0.6376811594202898
ID3's recall: 0.7333333333333333
ID3's confusion matrix; [[94, 25], [16, 44]]


In [12]:
# Compare your coded tree's metrics with standard Scikit-Learn's decision tree implementation.
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import confusion_matrix

sklearn_X_train = X_train.copy()
sklearn_y_train = sklearn_X_train.pop('Survived')
sklearn_X_test = X_test.copy()
sklearn_y_test = sklearn_X_test.pop('Survived')

# Preprocess for sklearn Decision Tree
# Sex column
sklearn_X_train['Sex'] = sklearn_X_train['Sex'].map({'female': 0, 'male': 1}).astype(int)
sklearn_X_test['Sex'] = sklearn_X_test['Sex'].map({'female': 0, 'male': 1}).astype(int)
# Age column
sklearn_X_train['Age'] = sklearn_X_train['Age'].map({'child': 0, 'teen': 1, 'adults': 2, 'elderly':3}).astype(int)
sklearn_X_test['Age'] = sklearn_X_test['Age'].map({'child': 0, 'teen': 1, 'adults': 2, 'elderly':3}).astype(int)
# Fare column
sklearn_X_train['Fare'] = sklearn_X_train['Fare'].map({'poor': 0, 'middle class': 1, 'upper class': 2}).astype(int)
sklearn_X_test['Fare'] = sklearn_X_test['Fare'].map({'poor': 0, 'middle class': 1, 'upper class': 2}).astype(int)
# Embarked column
sklearn_X_train['Embarked'] = sklearn_X_train['Embarked'].map({'S': 0, 'C': 1, 'Q': 2}).astype(int)
sklearn_X_test['Embarked'] = sklearn_X_test['Embarked'].map({'S': 0, 'C': 1, 'Q': 2}).astype(int)


dec_clf = DecisionTreeClassifier('entropy')
dec_clf.fit(sklearn_X_train, sklearn_y_train)
predictions = dec_clf.predict(sklearn_X_test)
sklearn_conf_matrix = confusion_matrix(sklearn_y_test, predictions)
print(f'sklearn\'s confusion matrix: {sklearn_conf_matrix}')
print(f'sklearn\'s accuracy: {(sklearn_conf_matrix[0][0] + sklearn_conf_matrix[-1][-1]) / (sklearn_conf_matrix[0][0] + sklearn_conf_matrix[0][-1] + sklearn_conf_matrix[-1][0] + sklearn_conf_matrix[-1][-1])}')

sklearn's confusion matrix: [[93 18]
 [21 47]]
sklearn's accuracy: 0.7821229050279329
