In [1]:
import pandas as pd
import numpy as np
from collections import Counter

In [92]:
df = pd.read_csv('https://raw.githubusercontent.com/gsprint23/cpts215/master/progassignments/files/titanic.txt')
df.head()

Unnamed: 0,class,age,sex,survived
0,1,1,1,1
1,1,1,1,1
2,1,1,1,1
3,1,1,1,1
4,1,1,1,1


In [97]:
cols = ['class','age','sex']
df['survived'] = df['survived'].apply(lambda x: False if x==0 else True)
X = df.loc[:,cols].values
y = df['survived'].ravel()
y[50:60]

array([ True,  True,  True,  True,  True,  True,  True, False, False,
       False])

In [123]:
class ID3():
    
    def __init__(self,col_names):
        '''
        col_names: column names of attributes
        '''
        self.col_names = col_names
    
    
    def transform_structure(self,X,y=None):
        '''
        This will change the data type from X,y arrays to a list of dictonaries format
        '''
        lis_dic = []
        for i in range(len(X)):
            dic = {}
            for j in range(len(X[i])):
                dic[self.col_names[j]] = X[i][j]
            if y is not None:
                lis_dic.append((dic,y[i]))
            else:
                lis_dic.append(dic)
        return lis_dic


    def compute_entropy(self,class_probabilities):
        '''
        class_probabilities is a list of class probabilities
        '''
        terms = [-pi * np.log2(pi) for pi in class_probabilities if pi] # ignore zero probabilities
        H = np.sum(terms)
        return H


    def compute_class_probabilities(self,instance_labels):
        '''
        instance_labels is a list of each examples' class label
        '''
        num_examples = len(instance_labels)
        counts = list(Counter(instance_labels).values())
        probabilities = np.array(counts) / num_examples
        return probabilities


    def compute_subset_entropy(self,subset):
        '''
        subset is a list of instances as two-item tuples (attributes, label)
        '''
        labels = [label for _, label in subset]
        probabilities = self.compute_class_probabilities(labels)
        entropy = self.compute_entropy(probabilities)
        return entropy


    def compute_partition_entropy(self,subsets):
        '''
        subsets is a list of class label lists
        '''
        num_examples = np.sum([len(s) for s in subsets])
        entropies = [(len(s) / num_examples) * self.compute_subset_entropy(s) for s in subsets]
        partition_entropy = np.sum(entropies)
        return partition_entropy


    def partition_by(self,inputs, attribute):
        '''
        inputs is a list of tuple pairs: (attribute_dict, label)
        attribute is the proposed attribute to partition by
        returns a dictionary of attribute value: input subsets pairs
        '''
        subsets = {}
        for example in inputs:
            attribute_value = example[0][attribute]
            if attribute_value in subsets:
                subsets[attribute_value].append(example)
            else: # add this attribute_value to the dict
                subsets[attribute_value] = [example]
        return subsets

    
    def partition_entropy_by(self,inputs, attribute):
        '''
        compute the partition
        compute the entropy of the partition
        '''
        subsets = self.partition_by(inputs, attribute)
        entropies = self.compute_partition_entropy(subsets.values())
        return entropies


    def find_min_entropy_partition(self,inputs, attributes=None):
        '''

        '''
        if attributes is None:
            attributes = list(inputs[0][0].keys())
        partition_entropies = []
        for attribute in attributes:
            partition_entropy = self.partition_entropy_by(inputs, attribute)
            # print(attribute, partition_entropy)
            partition_entropies.append(partition_entropy)
        min_index = np.argmin(partition_entropies)
        return attributes[min_index]

    
    def build_tree(self,inputs, split_candidates=None):
        '''
        implements the ID3 algorithm to build a decision tree
        '''
        if split_candidates is None:
            # this is the first pass
            split_candidates = list(inputs[0][0].keys())

        num_examples = len(inputs)
        # count Trues and Falses in the examples
        num_trues = len([label for attributes, label in inputs if label == True])
        num_falses = num_examples - num_trues

        # part (1) in the ID3 algorithm -> all same class label
        if num_trues == 0: # no trues, this is a False leaf node
            return False
        if num_falses == 0: # no falses, this is a True leaf node
            return True

        # part (2) in the ID3 algorithm -> list of attributes is empty -> leaf node with majority class label
        if not split_candidates: 
            return num_trues >= num_falses

        # part (3) in ID3 algorithm -> split on best attribute
        split_attribute = self.find_min_entropy_partition(inputs, split_candidates)
        partitions = self.partition_by(inputs, split_attribute)
        new_split_candidates = split_candidates[:]
        new_split_candidates.remove(split_attribute)

        # recursively build the subtrees
        subtrees = {}
        for attribute_value, subset in partitions.items():
            subtrees[attribute_value] = self.build_tree(subset, new_split_candidates)

        # missing (or unexpected) attribute value
        subtrees[None] = num_trues > num_falses

        return (split_attribute, subtrees)

    
    def fit(self,X,y):
        self.tree = self.build_tree(self.transform_structure(X,y))
        
        
    def classify(self,tree,new_example):
        '''
        classify new_example using decision tree
        '''
        # leaf node, return value
        if tree in [True, False]:
            return tree

        # decision node, unpack the attribute to split on and subtrees
        attribute, subtree_dict = tree

        subtree_key = new_example.get(attribute) # get return None if attribute not in new_example dict
        if subtree_key not in subtree_dict:
            subtree_key = None # use None subtree if no subtree for key

        subtree = subtree_dict[subtree_key]
        label = self.classify(subtree, new_example)
        return label
    
  
    def predict(self,X):
        '''
        Predict on the training instances
        '''
        X = self.transform_structure(X)
        return [(self.classify(self.tree,i)) for i in X]
    
    
    def get_params(self, deep = False):
        return {'col_names':self.col_names}

In [129]:
from sklearn.model_selection import cross_val_score,cross_validate
cross_validate(ID3(cols),X,y,cv=5,scoring=['accuracy','precision','recall','f1'])

{'fit_time': array([0.06531596, 0.04199862, 0.0233717 , 0.02322888, 0.02225065]),
 'score_time': array([0.01384068, 0.00596166, 0.00660777, 0.00558376, 0.00540304]),
 'test_accuracy': array([0.8185941 , 0.775     , 0.66818182, 0.575     , 0.99318182]),
 'test_precision': array([0.97241379, 0.87735849, 0.42213115, 0.        , 0.86956522]),
 'test_recall': array([0.64976959, 0.51955307, 0.9537037 , 0.        , 1.        ]),
 'test_f1': array([0.77900552, 0.65263158, 0.58522727, 0.        , 0.93023256])}

In [3]:
def change_format(df,label='survived'):
    df_dict = df.to_dict('records')
    updated_format = []  
    for row in df_dict:
        label_val = row.pop(label)
        if label_val == 1:
            label_val = True
        else:
            label_val = False
            
        updated_row = (row,label_val)
        updated_format.append(updated_row)
    return updated_format

In [None]:
class ID3():
    
    def __init__(self,col_names):
        '''
        col_names: column names of attributes
        '''
        self.col_names = col_names
    
    
    def transform_structure(self,X,y=None):
        '''
        This will change the data type from X,y arrays to a list of dictonaries format
        '''
        lis_dic = []
        for i in range(len(X)):
            dic = {}
            for j in range(len(X[i])):
                dic[self.col_names[j]] = X[i][j]
            if y is not None:
                lis_dic.append((dic,y[i]))
            else:
                lis_dic.append(dic)
        return lis_dic


    def compute_entropy(self,class_probabilities):
        '''
        class_probabilities is a list of class probabilities
        '''
        terms = [-pi * np.log2(pi) for pi in class_probabilities if pi] # ignore zero probabilities
        H = np.sum(terms)
        return H


    def compute_class_probabilities(self,instance_labels):
        '''
        instance_labels is a list of each examples' class label
        '''
        num_examples = len(instance_labels)
        counts = list(Counter(instance_labels).values())
        probabilities = np.array(counts) / num_examples
        return probabilities


    def compute_subset_entropy(self,subset):
        '''
        subset is a list of instances as two-item tuples (attributes, label)
        '''
        labels = [label for _, label in subset]
        probabilities = self.compute_class_probabilities(labels)
        entropy = self.compute_entropy(probabilities)
        return entropy


    def compute_partition_entropy(self,subsets):
        '''
        subsets is a list of class label lists
        '''
        num_examples = np.sum([len(s) for s in subsets])
        entropies = [(len(s) / num_examples) * self.compute_subset_entropy(s) for s in subsets]
        partition_entropy = np.sum(entropies)
        return partition_entropy


    def partition_by(self,inputs, attribute):
        '''
        inputs is a list of tuple pairs: (attribute_dict, label)
        attribute is the proposed attribute to partition by
        returns a dictionary of attribute value: input subsets pairs
        '''
        subsets = {}
        for example in inputs:
            attribute_value = example[0][attribute]
            if attribute_value in subsets:
                subsets[attribute_value].append(example)
            else: # add this attribute_value to the dict
                subsets[attribute_value] = [example]
        return subsets

    
    def partition_entropy_by(self,inputs, attribute):
        '''
        compute the partition
        compute the entropy of the partition
        '''
        subsets = self.partition_by(inputs, attribute)
        entropies = self.compute_partition_entropy(subsets.values())
        return entropies


    def find_min_entropy_partition(self,inputs, attributes=None):
        '''

        '''
        if attributes is None:
            attributes = list(inputs[0][0].keys())
        partition_entropies = []
        for attribute in attributes:
            partition_entropy = self.partition_entropy_by(inputs, attribute)
            # print(attribute, partition_entropy)
            partition_entropies.append(partition_entropy)
        min_index = np.argmin(partition_entropies)
        return attributes[min_index]

    
    def build_tree(self,inputs, split_candidates=None):
        '''
        implements the ID3 algorithm to build a decision tree
        '''
        if split_candidates is None:
            # this is the first pass
            split_candidates = list(inputs[0][0].keys())

        num_examples = len(inputs)
        # count Trues and Falses in the examples
        num_trues = len([label for attributes, label in inputs if label == True])
        num_falses = num_examples - num_trues

        # part (1) in the ID3 algorithm -> all same class label
        if num_trues == 0: # no trues, this is a False leaf node
            return False
        if num_falses == 0: # no falses, this is a True leaf node
            return True

        # part (2) in the ID3 algorithm -> list of attributes is empty -> leaf node with majority class label
        if not split_candidates: 
            return num_trues >= num_falses

        # part (3) in ID3 algorithm -> split on best attribute
        split_attribute = self.find_min_entropy_partition(inputs, split_candidates)
        partitions = self.partition_by(inputs, split_attribute)
        new_split_candidates = split_candidates[:]
        new_split_candidates.remove(split_attribute)

        # recursively build the subtrees
        subtrees = {}
        for attribute_value, subset in partitions.items():
            subtrees[attribute_value] = self.build_tree(subset, new_split_candidates)

        # missing (or unexpected) attribute value
        subtrees[None] = num_trues > num_falses

        return (split_attribute, subtrees)


    def classify(self,tree, new_example):
        '''
        classify new_example using decision tree
        '''
        # leaf node, return value
        if tree in [True, False]:
            return tree

        # decision node, unpack the attribute to split on and subtrees
        attribute, subtree_dict = tree

        subtree_key = new_example.get(attribute) # get return None if attribute not in new_example dict
        if subtree_key not in subtree_dict:
            subtree_key = None # use None subtree if no subtree for key

        subtree = subtree_dict[subtree_key]
        label = self.classify(subtree, new_example)
        return label

def predict(X):
    '''
    Predict on the training instances
    '''
    instances = [id3.transform_structure(x) for x in X]
    return [self.classify(tree,instance) for instance in instances]

In [42]:
id3 = ID3(col_names=cols)
X = np.array([[1,2,3],[4,5,6]])
y=[True,False]

id3.fit(X,y)

single_X_test_element = id3.transform_structure(X)[1]
print(id3.classify(id3.tree,single_X_test_element))

id3.predict(X)

False


[True, False]

In [43]:
from sklearn.model_selection import cross_val_score,cross_validate
cross_validate(id3,X,y,cv=2,scoring=['accuracy','precision','recall','f1'])

{'fit_time': array([0.00019526, 0.00015473]),
 'score_time': array([0.00764656, 0.00361347]),
 'test_accuracy': array([0., 0.]),
 'test_precision': array([0., 0.]),
 'test_recall': array([0., 0.]),
 'test_f1': array([0., 0.])}

0        True
1        True
2        True
3        True
4        True
        ...  
2196     True
2197     True
2198    False
2199    False
2200    False
Name: survived, Length: 2201, dtype: bool

In [None]:
! git init
! git add -A
! git commit -m "start of e"